You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by wa...@apache.org on 2022/07/16 06:26:42 UTC
[iotdb] branch master updated: [IOTDB-3720] Unify retry logic of AsyncClientPool in ConfigNode (#6670)
This is an automated email from the ASF dual-hosted git repository.
wangchao316 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new d0d298e986 [IOTDB-3720] Unify retry logic of AsyncClientPool in ConfigNode (#6670)
d0d298e986 is described below
commit d0d298e98673b44154a54c454abde973ee956a3b
Author: 任宇华 <79...@users.noreply.github.com>
AuthorDate: Sat Jul 16 14:26:37 2022 +0800
[IOTDB-3720] Unify retry logic of AsyncClientPool in ConfigNode (#6670)
[IOTDB-3720] Unify retry logic of AsyncClientPool in ConfigNode (#6670)
---
.../confignode/client/AsyncDataNodeClientPool.java | 343 ---------------------
.../confignode/client/ConfigNodeRequestType.java | 10 +-
.../confignode/client/DataNodeRequestType.java | 21 +-
.../confignode}/AsyncConfigNodeClientPool.java | 4 +-
.../async/datanode/AsyncDataNodeClientPool.java | 283 +++++++++++++++++
.../async/handlers/AbstractRetryHandler.java | 75 +++++
.../handlers/ConfigNodeHeartbeatHandler.java | 2 +-
.../{ => async}/handlers/CreateRegionHandler.java | 43 ++-
.../handlers/DataNodeHeartbeatHandler.java | 2 +-
.../client/{ => async}/handlers/FlushHandler.java | 34 +-
.../handlers/FunctionManagementHandler.java | 37 ++-
.../client/{ => async}/handlers/SetTTLHandler.java | 34 +-
.../handlers/UpdateRegionRouteMapHandler.java | 31 +-
.../confignode}/SyncConfigNodeClientPool.java | 15 +-
.../datanode}/SyncDataNodeClientPool.java | 23 +-
.../confignode/conf/ConfigNodeRemoveCheck.java | 6 +-
.../confignode/manager/ClusterSchemaManager.java | 41 ++-
.../confignode/manager/DataNodeRemoveManager.java | 15 +-
.../iotdb/confignode/manager/NodeManager.java | 36 ++-
.../iotdb/confignode/manager/PartitionManager.java | 2 +-
.../confignode/manager/PermissionManager.java | 6 +-
.../iotdb/confignode/manager/UDFManager.java | 74 ++---
.../iotdb/confignode/manager/load/LoadManager.java | 45 +--
.../procedure/env/ConfigNodeProcedureEnv.java | 20 +-
.../iotdb/confignode/service/ConfigNode.java | 6 +-
.../thrift/ConfigNodeRPCServiceProcessor.java | 6 +-
26 files changed, 651 insertions(+), 563 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java
deleted file mode 100644
index 475f75c98c..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java
+++ /dev/null
@@ -1,343 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.confignode.client;
-
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.common.rpc.thrift.TFlushReq;
-import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
-import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
-import org.apache.iotdb.commons.client.IClientManager;
-import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
-import org.apache.iotdb.confignode.client.handlers.CreateRegionHandler;
-import org.apache.iotdb.confignode.client.handlers.DataNodeHeartbeatHandler;
-import org.apache.iotdb.confignode.client.handlers.FlushHandler;
-import org.apache.iotdb.confignode.client.handlers.FunctionManagementHandler;
-import org.apache.iotdb.confignode.client.handlers.SetTTLHandler;
-import org.apache.iotdb.confignode.client.handlers.UpdateRegionRouteMapHandler;
-import org.apache.iotdb.confignode.consensus.request.write.CreateRegionGroupsPlan;
-import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
-import org.apache.iotdb.mpp.rpc.thrift.TCreateFunctionRequest;
-import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
-import org.apache.iotdb.mpp.rpc.thrift.TDropFunctionRequest;
-import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq;
-import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
-
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.BitSet;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.concurrent.CountDownLatch;
-
-/** Asynchronously send RPC requests to DataNodes. See mpp.thrift for more details. */
-public class AsyncDataNodeClientPool {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(AsyncDataNodeClientPool.class);
-
- private final IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> clientManager;
-
- private AsyncDataNodeClientPool() {
- clientManager =
- new IClientManager.Factory<TEndPoint, AsyncDataNodeInternalServiceClient>()
- .createClientManager(
- new ConfigNodeClientPoolFactory.AsyncDataNodeInternalServiceClientPoolFactory());
- }
-
- /**
- * Execute CreateRegionsReq asynchronously
- *
- * @param createRegionGroupsPlan CreateRegionsReq
- * @param ttlMap Map<StorageGroupName, TTL>
- */
- public void createRegions(
- CreateRegionGroupsPlan createRegionGroupsPlan, Map<String, Long> ttlMap) {
-
- // TODO: Unify retry logic
-
- // Index of each Region
- int index = 0;
- // Number of regions to be created
- int regionNum = 0;
- // Map<TConsensusGroupId, Map<DataNodeId, index>>
- Map<TConsensusGroupId, Map<Integer, Integer>> indexMap = new TreeMap<>();
- // Assign an independent index to each Region
- for (Map.Entry<String, List<TRegionReplicaSet>> entry :
- createRegionGroupsPlan.getRegionGroupMap().entrySet()) {
- for (TRegionReplicaSet regionReplicaSet : entry.getValue()) {
- regionNum += regionReplicaSet.getDataNodeLocationsSize();
- for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) {
- indexMap
- .computeIfAbsent(regionReplicaSet.getRegionId(), idMap -> new TreeMap<>())
- .put(dataNodeLocation.getDataNodeId(), index);
- index += 1;
- }
- }
- }
-
- BitSet bitSet = new BitSet(regionNum);
- for (int retry = 0; retry < 3; retry++) {
- CountDownLatch latch = new CountDownLatch(regionNum - bitSet.cardinality());
- createRegionGroupsPlan
- .getRegionGroupMap()
- .forEach(
- (storageGroup, regionReplicaSets) -> {
- // Enumerate each RegionReplicaSet
- regionReplicaSets.forEach(
- regionReplicaSet -> {
- // Enumerate each Region
- regionReplicaSet
- .getDataNodeLocations()
- .forEach(
- dataNodeLocation -> {
- // Skip those created successfully
- if (!bitSet.get(
- indexMap
- .get(regionReplicaSet.getRegionId())
- .get(dataNodeLocation.getDataNodeId()))) {
- TEndPoint endPoint = dataNodeLocation.getInternalEndPoint();
- CreateRegionHandler handler =
- new CreateRegionHandler(
- indexMap
- .get(regionReplicaSet.getRegionId())
- .get(dataNodeLocation.getDataNodeId()),
- bitSet,
- latch,
- regionReplicaSet.getRegionId(),
- dataNodeLocation);
-
- switch (regionReplicaSet.getRegionId().getType()) {
- case SchemaRegion:
- createSchemaRegion(
- endPoint,
- genCreateSchemaRegionReq(storageGroup, regionReplicaSet),
- handler);
- break;
- case DataRegion:
- createDataRegion(
- endPoint,
- genCreateDataRegionReq(
- storageGroup,
- regionReplicaSet,
- ttlMap.get(storageGroup)),
- handler);
- }
- }
- });
- });
- });
-
- try {
- // Waiting until this batch of create requests done
- latch.await();
- } catch (InterruptedException e) {
- LOGGER.error("ClusterSchemaManager was interrupted during create Regions on DataNodes", e);
- }
-
- if (bitSet.cardinality() == regionNum) {
- // Break if all creations success
- break;
- }
- }
-
- if (bitSet.cardinality() < regionNum) {
- LOGGER.error(
- "Failed to create some SchemaRegions or DataRegions on DataNodes. Please check former logs.");
- }
- }
-
- private TCreateSchemaRegionReq genCreateSchemaRegionReq(
- String storageGroup, TRegionReplicaSet regionReplicaSet) {
- // TODO: Add a retry logic
- TCreateSchemaRegionReq req = new TCreateSchemaRegionReq();
- req.setStorageGroup(storageGroup);
- req.setRegionReplicaSet(regionReplicaSet);
- return req;
- }
-
- /**
- * Create a SchemaRegion on specific DataNode
- *
- * @param endPoint The specific DataNode
- */
- private void createSchemaRegion(
- TEndPoint endPoint, TCreateSchemaRegionReq req, CreateRegionHandler handler) {
- // TODO: Add a retry logic
- AsyncDataNodeInternalServiceClient client;
- try {
- client = clientManager.borrowClient(endPoint);
- client.createSchemaRegion(req, handler);
- } catch (IOException e) {
- LOGGER.error("Can't connect to DataNode {}", endPoint, e);
- } catch (TException e) {
- LOGGER.error("Create SchemaRegion on DataNode {} failed", endPoint, e);
- }
- }
-
- private TCreateDataRegionReq genCreateDataRegionReq(
- String storageGroup, TRegionReplicaSet regionReplicaSet, long TTL) {
- // TODO: Add a retry logic
- TCreateDataRegionReq req = new TCreateDataRegionReq();
- req.setStorageGroup(storageGroup);
- req.setRegionReplicaSet(regionReplicaSet);
- req.setTtl(TTL);
- return req;
- }
-
- /**
- * Create a DataRegion on specific DataNode
- *
- * @param endPoint The specific DataNode
- */
- public void createDataRegion(
- TEndPoint endPoint, TCreateDataRegionReq req, CreateRegionHandler handler) {
- // TODO: Add a retry logic
- AsyncDataNodeInternalServiceClient client;
- try {
- client = clientManager.borrowClient(endPoint);
- client.createDataRegion(req, handler);
- } catch (IOException e) {
- LOGGER.error("Can't connect to DataNode {}", endPoint, e);
- } catch (TException e) {
- LOGGER.error("Create DataRegion on DataNode {} failed", endPoint, e);
- }
- }
-
- /**
- * Only used in LoadManager
- *
- * @param endPoint The specific DataNode
- */
- public void getDataNodeHeartBeat(
- TEndPoint endPoint, THeartbeatReq req, DataNodeHeartbeatHandler handler) {
- // TODO: Add a retry logic
- AsyncDataNodeInternalServiceClient client;
- try {
- client = clientManager.borrowClient(endPoint);
- client.getDataNodeHeartBeat(req, handler);
- } catch (Exception e) {
- LOGGER.error("Asking DataNode: {}, for heartbeat failed", endPoint, e);
- }
- }
-
- /**
- * Always call this interface when a DataNode is restarted or removed
- *
- * @param endPoint The specific DataNode
- */
- public void resetClient(TEndPoint endPoint) {
- clientManager.clear(endPoint);
- }
-
- /**
- * Only used in UDFManager
- *
- * @param endPoint The specific DataNode
- */
- public void createFunction(
- TEndPoint endPoint, TCreateFunctionRequest request, FunctionManagementHandler handler) {
- // TODO: Add a retry logic
- try {
- clientManager.borrowClient(endPoint).createFunction(request, handler);
- } catch (Exception e) {
- LOGGER.error("Failed to asking DataNode to create function: {}", endPoint, e);
- }
- }
-
- /**
- * Only used in UDFManager
- *
- * @param endPoint The specific DataNode
- */
- public void dropFunction(
- TEndPoint endPoint, TDropFunctionRequest request, FunctionManagementHandler handler) {
- // TODO: Add a retry logic
- try {
- clientManager.borrowClient(endPoint).dropFunction(request, handler);
- } catch (Exception e) {
- LOGGER.error("Failed to asking DataNode to create function: {}", endPoint, e);
- }
- }
-
- /**
- * Flush on specific DataNode
- *
- * @param endPoint The specific DataNode
- */
- public void flush(TEndPoint endPoint, TFlushReq flushReq, FlushHandler handler) {
- // TODO: Add a retry logic
- try {
- clientManager.borrowClient(endPoint).flush(flushReq, handler);
- } catch (Exception e) {
- LOGGER.error("Failed to asking DataNode to flush: {}", endPoint, e);
- }
- }
-
- /**
- * Set TTL on specific DataNode
- *
- * @param endPoint The specific DataNode
- */
- public void setTTL(TEndPoint endPoint, TSetTTLReq setTTLReq, SetTTLHandler handler) {
- // TODO: Add a retry logic
- try {
- clientManager.borrowClient(endPoint).setTTL(setTTLReq, handler);
- } catch (IOException e) {
- LOGGER.error("Can't connect to DataNode {}", endPoint, e);
- } catch (TException e) {
- LOGGER.error("Set TTL on DataNode {} failed", endPoint, e);
- }
- }
-
- /**
- * Update the RegionRouteMap cache on specific DataNode
- *
- * @param endPoint The specificDataNode
- */
- public void updateRegionRouteMap(
- TEndPoint endPoint, TRegionRouteReq regionRouteReq, UpdateRegionRouteMapHandler handler) {
- // TODO: Add a retry logic
- try {
- clientManager.borrowClient(endPoint).updateRegionCache(regionRouteReq, handler);
- } catch (IOException e) {
- LOGGER.error("Can't connect to DataNode {}", endPoint, e);
- } catch (TException e) {
- LOGGER.error("Update RegionRouteMap on DataNode {} failed", endPoint, e);
- }
- }
-
- // TODO: Is the ClientPool must be a singleton?
- private static class ClientPoolHolder {
-
- private static final AsyncDataNodeClientPool INSTANCE = new AsyncDataNodeClientPool();
-
- private ClientPoolHolder() {
- // Empty constructor
- }
- }
-
- public static AsyncDataNodeClientPool getInstance() {
- return ClientPoolHolder.INSTANCE;
- }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/ConfigNodeRequestType.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/ConfigNodeRequestType.java
index c2592ceb56..d547759700 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/ConfigNodeRequestType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/ConfigNodeRequestType.java
@@ -20,9 +20,9 @@
package org.apache.iotdb.confignode.client;
public enum ConfigNodeRequestType {
- addConsensusGroup,
- notifyRegisterSuccess,
- registerConfigNode,
- removeConfigNode,
- stopConfigNode;
+ ADD_CONSENSUS_GROUP,
+ NOTIFY_REGISTER_SUCCESS,
+ REGISTER_CONFIG_NODE,
+ REMOVE_CONFIG_NODE,
+ STOP_CONFIG_NODE;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
index be28c12a04..f9a96122a4 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
@@ -20,11 +20,18 @@
package org.apache.iotdb.confignode.client;
public enum DataNodeRequestType {
- deleteRegions,
- invalidatePartitionCache,
- invalidatePermissionCache,
- invalidateSchemaCache,
- migrateRegion,
- disableDataNode,
- stopDataNode;
+ DELETE_REGIONS,
+ INVALIDATE_PARTITION_CACHE,
+ INVALIDATE_PERMISSION_CACHE,
+ INVALIDATE_SCHEMA_CACHE,
+ MIGRATE_REGION,
+ DISABLE_DATA_NODE,
+ STOP_DATA_NODE,
+
+ SET_TTL,
+ CREATE_REGIONS,
+ CREATE_FUNCTION,
+ DROP_FUNCTION,
+ FLUSH,
+ UPDATE_REGION_ROUTE_MAP
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncConfigNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/confignode/AsyncConfigNodeClientPool.java
similarity index 94%
rename from confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncConfigNodeClientPool.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/client/async/confignode/AsyncConfigNodeClientPool.java
index 00e37493dd..620cdcc4f3 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncConfigNodeClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/confignode/AsyncConfigNodeClientPool.java
@@ -16,12 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.client;
+package org.apache.iotdb.confignode.client.async.confignode;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.async.AsyncConfigNodeIServiceClient;
-import org.apache.iotdb.confignode.client.handlers.ConfigNodeHeartbeatHandler;
+import org.apache.iotdb.confignode.client.async.handlers.ConfigNodeHeartbeatHandler;
import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
import org.slf4j.Logger;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeClientPool.java
new file mode 100644
index 0000000000..7475ea3516
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeClientPool.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.client.async.datanode;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TFlushReq;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
+import org.apache.iotdb.confignode.client.ConfigNodeClientPoolFactory;
+import org.apache.iotdb.confignode.client.async.handlers.AbstractRetryHandler;
+import org.apache.iotdb.confignode.client.async.handlers.CreateRegionHandler;
+import org.apache.iotdb.confignode.client.async.handlers.DataNodeHeartbeatHandler;
+import org.apache.iotdb.confignode.client.async.handlers.FlushHandler;
+import org.apache.iotdb.confignode.client.async.handlers.FunctionManagementHandler;
+import org.apache.iotdb.confignode.client.async.handlers.SetTTLHandler;
+import org.apache.iotdb.confignode.client.async.handlers.UpdateRegionRouteMapHandler;
+import org.apache.iotdb.confignode.consensus.request.write.CreateRegionGroupsPlan;
+import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
+import org.apache.iotdb.mpp.rpc.thrift.TCreateFunctionRequest;
+import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
+import org.apache.iotdb.mpp.rpc.thrift.TDropFunctionRequest;
+import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq;
+import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/** Asynchronously send RPC requests to DataNodes. See mpp.thrift for more details. */
+public class AsyncDataNodeClientPool {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(AsyncDataNodeClientPool.class);
+
+ private final IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> clientManager;
+
+ private final int retryNum = 6;
+
+ private AsyncDataNodeClientPool() {
+ clientManager =
+ new IClientManager.Factory<TEndPoint, AsyncDataNodeInternalServiceClient>()
+ .createClientManager(
+ new ConfigNodeClientPoolFactory.AsyncDataNodeInternalServiceClientPoolFactory());
+ }
+
+ /**
+ * Send asynchronize requests to the specific DataNodes, and reconnect the DataNode that failed to
+ * receive the requests
+ *
+ * @param req request
+ * @param handlerMap Map<index, Handler>
+ * @param dataNodeLocations ConcurrentHashMap<index, TDataNodeLocation> The specific DataNodes
+ */
+ public void sendAsyncRequestToDataNodeWithRetry(
+ Object req,
+ Map<Integer, AbstractRetryHandler> handlerMap,
+ Map<Integer, TDataNodeLocation> dataNodeLocations) {
+ CountDownLatch countDownLatch = new CountDownLatch(dataNodeLocations.size());
+ if (dataNodeLocations.isEmpty()) {
+ return;
+ }
+ for (int retry = 0; retry < retryNum; retry++) {
+ AbstractRetryHandler handler = null;
+ for (Map.Entry<Integer, TDataNodeLocation> entry : dataNodeLocations.entrySet()) {
+ handler = handlerMap.get(entry.getKey());
+ // If it is not the first request, then prove that this operation is a retry.
+ // The count of countDownLatch needs to be updated
+ if (retry != 0) {
+ handler.setCountDownLatch(countDownLatch);
+ }
+ // send request
+ sendAsyncRequestToDataNode(entry.getValue(), req, handler, retry);
+ }
+ try {
+ handler.getCountDownLatch().await();
+ } catch (InterruptedException e) {
+ LOGGER.error("Interrupted during {} on ConfigNode", handler.getDataNodeRequestType());
+ }
+ // Check if there is a node that fails to send the request, and retry if there is one
+ if (!handler.getDataNodeLocations().isEmpty()) {
+ countDownLatch = new CountDownLatch(handler.getDataNodeLocations().size());
+ } else {
+ break;
+ }
+ }
+ }
+
+ public void sendAsyncRequestToDataNode(
+ TDataNodeLocation dataNodeLocation,
+ Object req,
+ AbstractRetryHandler handler,
+ int retryCount) {
+ AsyncDataNodeInternalServiceClient client;
+ try {
+ client = clientManager.borrowClient(dataNodeLocation.getInternalEndPoint());
+ switch (handler.getDataNodeRequestType()) {
+ case SET_TTL:
+ client.setTTL((TSetTTLReq) req, (SetTTLHandler) handler);
+ break;
+ case CREATE_REGIONS:
+ TConsensusGroupType regionType =
+ ((CreateRegionHandler) handler).getConsensusGroupId().getType();
+ if (regionType.equals(TConsensusGroupType.SchemaRegion)) {
+ client.createSchemaRegion(
+ (TCreateSchemaRegionReq) ((Map<Integer, Object>) req).get(handler.getIndex()),
+ (CreateRegionHandler) handler);
+ } else if (regionType.equals(TConsensusGroupType.DataRegion)) {
+ client.createDataRegion(
+ (TCreateDataRegionReq) ((Map<Integer, Object>) req).get(handler.getIndex()),
+ (CreateRegionHandler) handler);
+ }
+ break;
+ case CREATE_FUNCTION:
+ client.createFunction((TCreateFunctionRequest) req, (FunctionManagementHandler) handler);
+ break;
+ case DROP_FUNCTION:
+ client.dropFunction((TDropFunctionRequest) req, (FunctionManagementHandler) handler);
+ break;
+ case FLUSH:
+ client.flush((TFlushReq) req, (FlushHandler) handler);
+ break;
+ case UPDATE_REGION_ROUTE_MAP:
+ client.updateRegionCache((TRegionRouteReq) req, (UpdateRegionRouteMapHandler) handler);
+ break;
+ default:
+ return;
+ }
+ } catch (Exception e) {
+ LOGGER.warn(
+ "{} failed on ConfigNode {}, because {}, retrying {}...",
+ handler.getDataNodeRequestType(),
+ dataNodeLocation.getInternalEndPoint(),
+ e.getMessage(),
+ retryCount);
+ }
+ }
+
+ /**
+ * Execute CreateRegionsReq asynchronously
+ *
+ * @param createRegionGroupsPlan CreateRegionsReq
+ * @param ttlMap Map<StorageGroupName, TTL>
+ */
+ public void createRegions(
+ CreateRegionGroupsPlan createRegionGroupsPlan, Map<String, Long> ttlMap) {
+
+ // Number of regions to be created
+ int regionNum = 0;
+ // Assign an independent index to each Region
+ for (Map.Entry<String, List<TRegionReplicaSet>> entry :
+ createRegionGroupsPlan.getRegionGroupMap().entrySet()) {
+ for (TRegionReplicaSet regionReplicaSet : entry.getValue()) {
+ regionNum += regionReplicaSet.getDataNodeLocationsSize();
+ }
+ }
+ Map<Integer, AbstractRetryHandler> handlerMap = new ConcurrentHashMap<>();
+ ConcurrentHashMap<Integer, TDataNodeLocation> dataNodeLocations = new ConcurrentHashMap<>();
+ Map<Integer, Object> req = new ConcurrentHashMap<>();
+ AtomicInteger index = new AtomicInteger();
+ CountDownLatch latch = new CountDownLatch(regionNum);
+ createRegionGroupsPlan
+ .getRegionGroupMap()
+ .forEach(
+ (storageGroup, regionReplicaSets) -> {
+ // Enumerate each RegionReplicaSet
+ regionReplicaSets.forEach(
+ regionReplicaSet -> {
+ // Enumerate each Region
+ regionReplicaSet
+ .getDataNodeLocations()
+ .forEach(
+ dataNodeLocation -> {
+ handlerMap.put(
+ index.get(),
+ new CreateRegionHandler(
+ index.get(),
+ latch,
+ regionReplicaSet.getRegionId(),
+ dataNodeLocation,
+ dataNodeLocations));
+
+ switch (regionReplicaSet.getRegionId().getType()) {
+ case SchemaRegion:
+ req.put(
+ index.get(),
+ genCreateSchemaRegionReq(storageGroup, regionReplicaSet));
+ break;
+ case DataRegion:
+ req.put(
+ index.get(),
+ genCreateDataRegionReq(
+ storageGroup,
+ regionReplicaSet,
+ ttlMap.get(storageGroup)));
+ }
+ dataNodeLocations.put(index.getAndIncrement(), dataNodeLocation);
+ });
+ });
+ });
+ sendAsyncRequestToDataNodeWithRetry(req, handlerMap, dataNodeLocations);
+ }
+
+ private TCreateSchemaRegionReq genCreateSchemaRegionReq(
+ String storageGroup, TRegionReplicaSet regionReplicaSet) {
+ TCreateSchemaRegionReq req = new TCreateSchemaRegionReq();
+ req.setStorageGroup(storageGroup);
+ req.setRegionReplicaSet(regionReplicaSet);
+ return req;
+ }
+
+ private TCreateDataRegionReq genCreateDataRegionReq(
+ String storageGroup, TRegionReplicaSet regionReplicaSet, long TTL) {
+ TCreateDataRegionReq req = new TCreateDataRegionReq();
+ req.setStorageGroup(storageGroup);
+ req.setRegionReplicaSet(regionReplicaSet);
+ req.setTtl(TTL);
+ return req;
+ }
+
+ /**
+ * Only used in LoadManager
+ *
+ * @param endPoint
+ */
+ public void getDataNodeHeartBeat(
+ TEndPoint endPoint, THeartbeatReq req, DataNodeHeartbeatHandler handler) {
+ // TODO: Add a retry logic
+ AsyncDataNodeInternalServiceClient client;
+ try {
+ client = clientManager.borrowClient(endPoint);
+ client.getDataNodeHeartBeat(req, handler);
+ } catch (Exception e) {
+ LOGGER.error("Asking DataNode: {}, for heartbeat failed", endPoint, e);
+ }
+ }
+
+ /**
+ * Always call this interface when a DataNode is restarted or removed
+ *
+ * @param endPoint The specific DataNode
+ */
+ public void resetClient(TEndPoint endPoint) {
+ clientManager.clear(endPoint);
+ }
+
+ // TODO: Is the ClientPool must be a singleton?
+ private static class ClientPoolHolder {
+
+ private static final AsyncDataNodeClientPool INSTANCE = new AsyncDataNodeClientPool();
+
+ private ClientPoolHolder() {
+ // Empty constructor
+ }
+ }
+
+ public static AsyncDataNodeClientPool getInstance() {
+ return ClientPoolHolder.INSTANCE;
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AbstractRetryHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AbstractRetryHandler.java
new file mode 100644
index 0000000000..518036dd19
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AbstractRetryHandler.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.client.async.handlers;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
+
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+public abstract class AbstractRetryHandler {
+
+ protected final int index;
+
+ protected CountDownLatch countDownLatch;
+ /**
+ * Map<Index, TDataNodeLocation> The DataNode that successfully execute the request will be
+ * removed from this map
+ */
+ protected Map<Integer, TDataNodeLocation> dataNodeLocations;
+
+ protected DataNodeRequestType dataNodeRequestType;
+ /** Target DataNode */
+ protected TDataNodeLocation targetDataNode;
+
+ public AbstractRetryHandler(
+ CountDownLatch countDownLatch,
+ DataNodeRequestType dataNodeRequestType,
+ TDataNodeLocation targetDataNode,
+ Map<Integer, TDataNodeLocation> dataNodeLocations,
+ int index) {
+ this.countDownLatch = countDownLatch;
+ this.dataNodeLocations = dataNodeLocations;
+ this.dataNodeRequestType = dataNodeRequestType;
+ this.targetDataNode = targetDataNode;
+ this.index = index;
+ }
+
+ public void setCountDownLatch(CountDownLatch countDownLatch) {
+ this.countDownLatch = countDownLatch;
+ }
+
+ public CountDownLatch getCountDownLatch() {
+ return countDownLatch;
+ }
+
+ public Map<Integer, TDataNodeLocation> getDataNodeLocations() {
+ return dataNodeLocations;
+ }
+
+ public DataNodeRequestType getDataNodeRequestType() {
+ return dataNodeRequestType;
+ }
+
+ public int getIndex() {
+ return index;
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/ConfigNodeHeartbeatHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/ConfigNodeHeartbeatHandler.java
similarity index 97%
rename from confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/ConfigNodeHeartbeatHandler.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/ConfigNodeHeartbeatHandler.java
index d177a13cad..7eca734b83 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/ConfigNodeHeartbeatHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/ConfigNodeHeartbeatHandler.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.client.handlers;
+package org.apache.iotdb.confignode.client.async.handlers;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.confignode.manager.load.heartbeat.ConfigNodeHeartbeatCache;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/CreateRegionHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/CreateRegionHandler.java
similarity index 75%
rename from confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/CreateRegionHandler.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/CreateRegionHandler.java
index fbcca3e121..c1828b64ab 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/CreateRegionHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/CreateRegionHandler.java
@@ -16,69 +16,58 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.client.handlers;
+package org.apache.iotdb.confignode.client.async.handlers;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.BitSet;
+import java.util.Map;
import java.util.concurrent.CountDownLatch;
/** Only use CreateRegionHandler when the LoadManager wants to create Regions */
-public class CreateRegionHandler implements AsyncMethodCallback<TSStatus> {
+public class CreateRegionHandler extends AbstractRetryHandler
+ implements AsyncMethodCallback<TSStatus> {
private static final Logger LOGGER = LoggerFactory.getLogger(CreateRegionHandler.class);
- // Mark BitSet when successfully create
- private final int index;
- private final BitSet bitSet;
-
- // Used to protect asynchronous creation
- private final CountDownLatch latch;
-
// Used for Logger
private final TConsensusGroupId consensusGroupId;
- private final TDataNodeLocation dataNodeLocation;
public CreateRegionHandler(
int index,
- BitSet bitSet,
CountDownLatch latch,
TConsensusGroupId consensusGroupId,
- TDataNodeLocation dataNodeLocation) {
- this.index = index;
- this.bitSet = bitSet;
- this.latch = latch;
+ TDataNodeLocation targetDataNode,
+ Map<Integer, TDataNodeLocation> dataNodeLocations) {
+ super(latch, DataNodeRequestType.CREATE_REGIONS, targetDataNode, dataNodeLocations, index);
this.consensusGroupId = consensusGroupId;
- this.dataNodeLocation = dataNodeLocation;
}
@Override
public void onComplete(TSStatus tsStatus) {
if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- synchronized (bitSet) {
- bitSet.set(index);
- }
+ getDataNodeLocations().remove(index);
LOGGER.info(
String.format(
"Successfully create %s on DataNode: %s",
- ConsensusGroupId.formatTConsensusGroupId(consensusGroupId), dataNodeLocation));
+ ConsensusGroupId.formatTConsensusGroupId(consensusGroupId), targetDataNode));
} else {
LOGGER.error(
String.format(
"Create %s on DataNode: %s failed, %s",
ConsensusGroupId.formatTConsensusGroupId(consensusGroupId),
- dataNodeLocation,
+ targetDataNode,
tsStatus));
}
- latch.countDown();
+ countDownLatch.countDown();
}
@Override
@@ -86,7 +75,11 @@ public class CreateRegionHandler implements AsyncMethodCallback<TSStatus> {
LOGGER.error(
String.format(
"Create %s on DataNode: %s failed, %s",
- ConsensusGroupId.formatTConsensusGroupId(consensusGroupId), dataNodeLocation, e));
- latch.countDown();
+ ConsensusGroupId.formatTConsensusGroupId(consensusGroupId), targetDataNode, e));
+ countDownLatch.countDown();
+ }
+
+ public TConsensusGroupId getConsensusGroupId() {
+ return consensusGroupId;
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/DataNodeHeartbeatHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/DataNodeHeartbeatHandler.java
similarity index 98%
rename from confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/DataNodeHeartbeatHandler.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/DataNodeHeartbeatHandler.java
index d6c02d3712..a148c1cc14 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/DataNodeHeartbeatHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/DataNodeHeartbeatHandler.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.client.handlers;
+package org.apache.iotdb.confignode.client.async.handlers;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/FlushHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/FlushHandler.java
similarity index 61%
rename from confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/FlushHandler.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/FlushHandler.java
index e45ec58525..f72cf7ee74 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/FlushHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/FlushHandler.java
@@ -16,37 +16,49 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.client.handlers;
+package org.apache.iotdb.confignode.client.async.handlers;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.thrift.async.AsyncMethodCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.CountDownLatch;
-public class FlushHandler implements AsyncMethodCallback<TSStatus> {
+public class FlushHandler extends AbstractRetryHandler implements AsyncMethodCallback<TSStatus> {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(FlushHandler.class);
- private final TDataNodeLocation dataNodeLocation;
- private final CountDownLatch countDownLatch;
private final List<TSStatus> dataNodeResponseStatus;
public FlushHandler(
- TDataNodeLocation dataNodeLocation,
+ TDataNodeLocation targetDataNode,
CountDownLatch countDownLatch,
- List<TSStatus> dataNodeResponseStatus) {
- this.dataNodeLocation = dataNodeLocation;
- this.countDownLatch = countDownLatch;
+ DataNodeRequestType requestType,
+ List<TSStatus> dataNodeResponseStatus,
+ Map<Integer, TDataNodeLocation> dataNodeLocations,
+ int index) {
+ super(countDownLatch, requestType, targetDataNode, dataNodeLocations, index);
this.dataNodeResponseStatus = dataNodeResponseStatus;
}
@Override
public void onComplete(TSStatus response) {
+ if (response.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ dataNodeResponseStatus.add(response);
+ dataNodeLocations.remove(index);
+ LOGGER.info("Successfully Flush on DataNode: {}", targetDataNode);
+ } else {
+ LOGGER.error("Failed to Flush on DataNode {}, {}", dataNodeLocations, response);
+ }
countDownLatch.countDown();
- dataNodeResponseStatus.add(response);
}
@Override
@@ -57,9 +69,9 @@ public class FlushHandler implements AsyncMethodCallback<TSStatus> {
RpcUtils.getStatus(
TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(),
"Flush error on DataNode: {id="
- + dataNodeLocation.getDataNodeId()
+ + targetDataNode.getDataNodeId()
+ ", internalEndPoint="
- + dataNodeLocation.getInternalEndPoint()
+ + targetDataNode.getInternalEndPoint()
+ "}"
+ exception.getMessage())));
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/FunctionManagementHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/FunctionManagementHandler.java
similarity index 53%
rename from confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/FunctionManagementHandler.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/FunctionManagementHandler.java
index 90a4967e12..306cea3801 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/FunctionManagementHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/FunctionManagementHandler.java
@@ -17,34 +17,48 @@
* under the License.
*/
-package org.apache.iotdb.confignode.client.handlers;
+package org.apache.iotdb.confignode.client.async.handlers;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.thrift.async.AsyncMethodCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.CountDownLatch;
-public class FunctionManagementHandler implements AsyncMethodCallback<TSStatus> {
+public class FunctionManagementHandler extends AbstractRetryHandler
+ implements AsyncMethodCallback<TSStatus> {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(FunctionManagementHandler.class);
- private final CountDownLatch countDownLatch;
private final List<TSStatus> dataNodeResponseStatus;
- private final String ip;
- private final int port;
public FunctionManagementHandler(
- CountDownLatch countDownLatch, List<TSStatus> dataNodeResponseStatus, String ip, int port) {
- this.countDownLatch = countDownLatch;
+ CountDownLatch countDownLatch,
+ TDataNodeLocation targetDataNode,
+ List<TSStatus> dataNodeResponseStatus,
+ DataNodeRequestType requestType,
+ Map<Integer, TDataNodeLocation> dataNodeLocations,
+ int index) {
+ super(countDownLatch, requestType, targetDataNode, dataNodeLocations, index);
this.dataNodeResponseStatus = dataNodeResponseStatus;
- this.ip = ip;
- this.port = port;
}
@Override
public void onComplete(TSStatus response) {
- dataNodeResponseStatus.add(response);
+ if (response.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ dataNodeResponseStatus.add(response);
+ dataNodeLocations.remove(index);
+ LOGGER.info("Successfully {} on DataNode: {}", dataNodeRequestType, targetDataNode);
+ } else {
+ LOGGER.info("Failed to {} on DataNode: {}", dataNodeRequestType, targetDataNode);
+ }
countDownLatch.countDown();
}
@@ -52,7 +66,8 @@ public class FunctionManagementHandler implements AsyncMethodCallback<TSStatus>
public void onError(Exception exception) {
dataNodeResponseStatus.add(
new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
- .setMessage("DataNode[" + ip + ":" + port + "] " + exception.getMessage()));
+ .setMessage(targetDataNode + exception.getMessage()));
+ LOGGER.info("Failed to {} on DataNode: {}", dataNodeRequestType, targetDataNode);
countDownLatch.countDown();
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/SetTTLHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/SetTTLHandler.java
similarity index 56%
rename from confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/SetTTLHandler.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/SetTTLHandler.java
index c31efe1030..e4fb7d79e6 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/SetTTLHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/SetTTLHandler.java
@@ -16,43 +16,47 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.client.handlers;
+package org.apache.iotdb.confignode.client.async.handlers;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Map;
import java.util.concurrent.CountDownLatch;
-public class SetTTLHandler implements AsyncMethodCallback<TSStatus> {
+public class SetTTLHandler extends AbstractRetryHandler implements AsyncMethodCallback<TSStatus> {
private static final Logger LOGGER = LoggerFactory.getLogger(SetTTLHandler.class);
- private final TDataNodeLocation dataNodeLocation;
- private final CountDownLatch latch;
-
- public SetTTLHandler(TDataNodeLocation dataNodeLocation, CountDownLatch latch) {
- this.dataNodeLocation = dataNodeLocation;
- this.latch = latch;
+ public SetTTLHandler(
+ CountDownLatch countDownLatch,
+ DataNodeRequestType requestType,
+ TDataNodeLocation targetDataNode,
+ Map<Integer, TDataNodeLocation> dataNodeLocations,
+ int index) {
+ super(countDownLatch, requestType, targetDataNode, dataNodeLocations, index);
}
@Override
- public void onComplete(TSStatus status) {
- if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- LOGGER.info("Successfully SetTTL on DataNode: {}", dataNodeLocation);
+ public void onComplete(TSStatus response) {
+ if (response.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ getDataNodeLocations().remove(index);
+ LOGGER.info("Successfully SetTTL on DataNode: {}", targetDataNode);
} else {
- LOGGER.error("Failed to SetTTL on DataNode: {}, {}", dataNodeLocation, status);
+ LOGGER.error("Failed to SetTTL on DataNode: {}, {}", targetDataNode, response);
}
- latch.countDown();
+ countDownLatch.countDown();
}
@Override
public void onError(Exception e) {
- latch.countDown();
- LOGGER.error("Failed to SetTTL on DataNode: {}", dataNodeLocation);
+ countDownLatch.countDown();
+ LOGGER.error("Failed to SetTTL on DataNode: {}", targetDataNode);
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/UpdateRegionRouteMapHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/UpdateRegionRouteMapHandler.java
similarity index 68%
rename from confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/UpdateRegionRouteMapHandler.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/UpdateRegionRouteMapHandler.java
index aa97397943..5448f5f2e1 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/UpdateRegionRouteMapHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/UpdateRegionRouteMapHandler.java
@@ -16,43 +16,48 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.client.handlers;
+package org.apache.iotdb.confignode.client.async.handlers;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Map;
import java.util.concurrent.CountDownLatch;
-public class UpdateRegionRouteMapHandler implements AsyncMethodCallback<TSStatus> {
+public class UpdateRegionRouteMapHandler extends AbstractRetryHandler
+ implements AsyncMethodCallback<TSStatus> {
private static final Logger LOGGER = LoggerFactory.getLogger(UpdateRegionRouteMapHandler.class);
- private final TDataNodeLocation dataNodeLocation;
- private final CountDownLatch latch;
-
- public UpdateRegionRouteMapHandler(TDataNodeLocation dataNodeLocation, CountDownLatch latch) {
- this.dataNodeLocation = dataNodeLocation;
- this.latch = latch;
+ public UpdateRegionRouteMapHandler(
+ TDataNodeLocation targetDataNode,
+ CountDownLatch countDownLatch,
+ DataNodeRequestType requestType,
+ Map<Integer, TDataNodeLocation> dataNodeLocations,
+ int index) {
+ super(countDownLatch, requestType, targetDataNode, dataNodeLocations, index);
}
@Override
public void onComplete(TSStatus status) {
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- LOGGER.info("Successfully update the RegionRouteMap on DataNode: {}", dataNodeLocation);
+ getDataNodeLocations().remove(targetDataNode);
+ LOGGER.info("Successfully update the RegionRouteMap on DataNode: {}", targetDataNode);
} else {
- LOGGER.error("Update RegionRouteMap on DataNode: {} failed", dataNodeLocation);
+ LOGGER.error("Update RegionRouteMap on DataNode: {} failed", targetDataNode);
}
- latch.countDown();
+ countDownLatch.countDown();
}
@Override
public void onError(Exception e) {
- LOGGER.error("Update RegionRouteMap on DataNode: {} failed", dataNodeLocation);
- latch.countDown();
+ LOGGER.error("Update RegionRouteMap on DataNode: {} failed", targetDataNode);
+ countDownLatch.countDown();
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/SyncConfigNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/confignode/SyncConfigNodeClientPool.java
similarity index 94%
rename from confignode/src/main/java/org/apache/iotdb/confignode/client/SyncConfigNodeClientPool.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/client/sync/confignode/SyncConfigNodeClientPool.java
index 514cd1eab1..2e607a7de0 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/SyncConfigNodeClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/confignode/SyncConfigNodeClientPool.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.client;
+package org.apache.iotdb.confignode.client.sync.confignode;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
@@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.commons.utils.StatusUtils;
+import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
@@ -65,24 +66,24 @@ public class SyncConfigNodeClientPool {
}
}
- public Object sendSyncRequestToConfigNode(
+ public Object sendSyncRequestToConfigNodeWithRetry(
TEndPoint endPoint, Object req, ConfigNodeRequestType requestType) {
Throwable lastException = null;
for (int retry = 0; retry < retryNum; retry++) {
try (SyncConfigNodeIServiceClient client = clientManager.borrowClient(endPoint)) {
switch (requestType) {
- case registerConfigNode:
+ case REGISTER_CONFIG_NODE:
// Only use registerConfigNode when the ConfigNode is first startup.
return client.registerConfigNode((TConfigNodeRegisterReq) req);
- case addConsensusGroup:
+ case ADD_CONSENSUS_GROUP:
addConsensusGroup((List<TConfigNodeLocation>) req, client);
return null;
- case notifyRegisterSuccess:
+ case NOTIFY_REGISTER_SUCCESS:
client.notifyRegisterSuccess();
return null;
- case removeConfigNode:
+ case REMOVE_CONFIG_NODE:
return removeConfigNode((TConfigNodeLocation) req, client);
- case stopConfigNode:
+ case STOP_CONFIG_NODE:
// Only use stopConfigNode when the ConfigNode is removed.
return client.stopConfigNode((TConfigNodeLocation) req);
default:
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/SyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/datanode/SyncDataNodeClientPool.java
similarity index 93%
rename from confignode/src/main/java/org/apache/iotdb/confignode/client/SyncDataNodeClientPool.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/client/sync/datanode/SyncDataNodeClientPool.java
index 668cbc248c..3610d2403b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/SyncDataNodeClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/datanode/SyncDataNodeClientPool.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.client;
+package org.apache.iotdb.confignode.client.sync.datanode;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
@@ -25,6 +25,8 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.confignode.client.ConfigNodeClientPoolFactory;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
import org.apache.iotdb.mpp.rpc.thrift.TAddConsensusGroup;
import org.apache.iotdb.mpp.rpc.thrift.TDisableDataNodeReq;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq;
@@ -62,25 +64,25 @@ public class SyncDataNodeClientPool {
new ConfigNodeClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory());
}
- public TSStatus sendSyncRequestToDataNode(
+ public TSStatus sendSyncRequestToDataNodeWithRetry(
TEndPoint endPoint, Object req, DataNodeRequestType requestType) {
Throwable lastException = null;
for (int retry = 0; retry < retryNum; retry++) {
try (SyncDataNodeInternalServiceClient client = clientManager.borrowClient(endPoint)) {
switch (requestType) {
- case invalidatePartitionCache:
+ case INVALIDATE_PARTITION_CACHE:
return client.invalidatePartitionCache((TInvalidateCacheReq) req);
- case invalidateSchemaCache:
+ case INVALIDATE_SCHEMA_CACHE:
return client.invalidateSchemaCache((TInvalidateCacheReq) req);
- case deleteRegions:
+ case DELETE_REGIONS:
return client.deleteRegion((TConsensusGroupId) req);
- case invalidatePermissionCache:
+ case INVALIDATE_PERMISSION_CACHE:
return client.invalidatePermissionCache((TInvalidatePermissionCacheReq) req);
- case migrateRegion:
+ case MIGRATE_REGION:
return client.migrateRegion((TMigrateRegionReq) req);
- case disableDataNode:
+ case DISABLE_DATA_NODE:
return client.disableDataNode((TDisableDataNodeReq) req);
- case stopDataNode:
+ case STOP_DATA_NODE:
return client.stopDataNode();
default:
return RpcUtils.getStatus(
@@ -125,7 +127,8 @@ public class SyncDataNodeClientPool {
for (TConsensusGroupId regionId : regionIds) {
LOGGER.debug("Delete region {} ", regionId);
final TSStatus status =
- sendSyncRequestToDataNode(endPoint, regionId, DataNodeRequestType.deleteRegions);
+ sendSyncRequestToDataNodeWithRetry(
+ endPoint, regionId, DataNodeRequestType.DELETE_REGIONS);
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.info("DELETE Region {} successfully", regionId);
deletedRegionSet.removeIf(k -> k.getRegionId().equals(regionId));
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeRemoveCheck.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeRemoveCheck.java
index cc53114cb1..894e712374 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeRemoveCheck.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeRemoveCheck.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.BadNodeUrlException;
import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
-import org.apache.iotdb.confignode.client.SyncConfigNodeClientPool;
+import org.apache.iotdb.confignode.client.sync.confignode.SyncConfigNodeClientPool;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
@@ -77,10 +77,10 @@ public class ConfigNodeRemoveCheck {
status =
(TSStatus)
SyncConfigNodeClientPool.getInstance()
- .sendSyncRequestToConfigNode(
+ .sendSyncRequestToConfigNodeWithRetry(
configNodeLocation.getInternalEndPoint(),
nodeLocation,
- ConfigNodeRequestType.removeConfigNode);
+ ConfigNodeRequestType.REMOVE_CONFIG_NODE);
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
break;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
index 4c2302d3a7..ab88c4add0 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
@@ -24,8 +24,10 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
-import org.apache.iotdb.confignode.client.AsyncDataNodeClientPool;
-import org.apache.iotdb.confignode.client.handlers.SetTTLHandler;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
+import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.async.handlers.AbstractRetryHandler;
+import org.apache.iotdb.confignode.client.async.handlers.SetTTLHandler;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.read.CountStorageGroupPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetNodesInSchemaTemplatePlan;
@@ -61,10 +63,13 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
/** The ClusterSchemaManager Manages cluster schema read and write requests. */
public class ClusterSchemaManager {
@@ -163,23 +168,27 @@ public class ClusterSchemaManager {
.getStorageGroupRelatedDataNodes(
setTTLPlan.getStorageGroup(), TConsensusGroupType.DataRegion);
if (dataNodeLocations.size() > 0) {
+ CountDownLatch countDownLatch = new CountDownLatch(dataNodeLocations.size());
+ Map<Integer, AbstractRetryHandler> handler = new HashMap<>();
+ Map<Integer, TDataNodeLocation> dataNodeLocationMap = new ConcurrentHashMap<>();
+ AtomicInteger index = new AtomicInteger();
// TODO: Use procedure to protect SetTTL on DataNodes
- CountDownLatch latch = new CountDownLatch(dataNodeLocations.size());
for (TDataNodeLocation dataNodeLocation : dataNodeLocations) {
- SetTTLHandler handler = new SetTTLHandler(dataNodeLocation, latch);
- AsyncDataNodeClientPool.getInstance()
- .setTTL(
- dataNodeLocation.getInternalEndPoint(),
- new TSetTTLReq(setTTLPlan.getStorageGroup(), setTTLPlan.getTTL()),
- handler);
- }
-
- try {
- // Waiting until this batch of SetTTL requests done
- latch.await();
- } catch (InterruptedException e) {
- LOGGER.error("ClusterSchemaManager was interrupted during SetTTL on DataNodes", e);
+ handler.put(
+ index.get(),
+ new SetTTLHandler(
+ countDownLatch,
+ DataNodeRequestType.SET_TTL,
+ dataNodeLocation,
+ dataNodeLocationMap,
+ index.get()));
+ dataNodeLocationMap.put(index.getAndIncrement(), dataNodeLocation);
}
+ AsyncDataNodeClientPool.getInstance()
+ .sendAsyncRequestToDataNodeWithRetry(
+ new TSetTTLReq(setTTLPlan.getStorageGroup(), setTTLPlan.getTTL()),
+ handler,
+ dataNodeLocationMap);
}
return getConsensusManager().write(setTTLPlan).getStatus();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/DataNodeRemoveManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/DataNodeRemoveManager.java
index d2756389d9..010beee521 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/DataNodeRemoveManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/DataNodeRemoveManager.java
@@ -28,9 +28,9 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.enums.DataNodeRemoveState;
import org.apache.iotdb.commons.enums.RegionMigrateState;
-import org.apache.iotdb.confignode.client.AsyncDataNodeClientPool;
import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.confignode.client.SyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.sync.datanode.SyncDataNodeClientPool;
import org.apache.iotdb.confignode.consensus.request.write.RemoveDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.UpdateRegionLocationPlan;
import org.apache.iotdb.confignode.consensus.response.DataNodeToStatusResp;
@@ -364,7 +364,8 @@ public class DataNodeRemoveManager {
TDisableDataNodeReq disableReq = new TDisableDataNodeReq(disabledDataNode);
status =
SyncDataNodeClientPool.getInstance()
- .sendSyncRequestToDataNode(server, disableReq, DataNodeRequestType.disableDataNode);
+ .sendSyncRequestToDataNodeWithRetry(
+ server, disableReq, DataNodeRequestType.DISABLE_DATA_NODE);
if (!isSucceed(status)) {
return status;
}
@@ -505,8 +506,8 @@ public class DataNodeRemoveManager {
migrateRegionReq.setNewLeaderNode(newLeaderNode.get());
status =
SyncDataNodeClientPool.getInstance()
- .sendSyncRequestToDataNode(
- node.getInternalEndPoint(), migrateRegionReq, DataNodeRequestType.migrateRegion);
+ .sendSyncRequestToDataNodeWithRetry(
+ node.getInternalEndPoint(), migrateRegionReq, DataNodeRequestType.MIGRATE_REGION);
// maybe send rpc failed
if (isFailed(status)) {
return status;
@@ -648,8 +649,8 @@ public class DataNodeRemoveManager {
AsyncDataNodeClientPool.getInstance().resetClient(dataNode.getInternalEndPoint());
TSStatus status =
SyncDataNodeClientPool.getInstance()
- .sendSyncRequestToDataNode(
- dataNode.getInternalEndPoint(), dataNode, DataNodeRequestType.stopDataNode);
+ .sendSyncRequestToDataNodeWithRetry(
+ dataNode.getInternalEndPoint(), dataNode, DataNodeRequestType.STOP_DATA_NODE);
LOGGER.info("stop Data Node {} result: {}", dataNode, status);
return status;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
index 4e01617060..1e726ca8a4 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
@@ -25,8 +25,10 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodesInfo;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
-import org.apache.iotdb.confignode.client.AsyncDataNodeClientPool;
-import org.apache.iotdb.confignode.client.handlers.FlushHandler;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
+import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.async.handlers.AbstractRetryHandler;
+import org.apache.iotdb.confignode.client.async.handlers.FlushHandler;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.read.GetDataNodeInfoPlan;
@@ -53,10 +55,14 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
/** NodeManager manages cluster node addition and removal requests */
@@ -452,19 +458,23 @@ public class NodeManager {
List<TSStatus> dataNodeResponseStatus =
Collections.synchronizedList(new ArrayList<>(registeredDataNodes.size()));
CountDownLatch countDownLatch = new CountDownLatch(registeredDataNodes.size());
+ Map<Integer, AbstractRetryHandler> handlerMap = new HashMap<>();
+ Map<Integer, TDataNodeLocation> dataNodeLocations = new ConcurrentHashMap<>();
+ AtomicInteger index = new AtomicInteger();
for (TDataNodeInfo dataNodeInfo : registeredDataNodes) {
- AsyncDataNodeClientPool.getInstance()
- .flush(
- dataNodeInfo.getLocation().getInternalEndPoint(),
- req,
- new FlushHandler(dataNodeInfo.getLocation(), countDownLatch, dataNodeResponseStatus));
- }
- try {
- countDownLatch.await();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- LOGGER.error("NodeManager was interrupted during flushing on data nodes", e);
+ handlerMap.put(
+ index.get(),
+ new FlushHandler(
+ dataNodeInfo.getLocation(),
+ countDownLatch,
+ DataNodeRequestType.FLUSH,
+ dataNodeResponseStatus,
+ dataNodeLocations,
+ index.get()));
+ dataNodeLocations.put(index.getAndIncrement(), dataNodeInfo.getLocation());
}
+ AsyncDataNodeClientPool.getInstance()
+ .sendAsyncRequestToDataNodeWithRetry(req, handlerMap, dataNodeLocations);
return dataNodeResponseStatus;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
index 428a9fa5f4..bd95b0e155 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
@@ -30,7 +30,7 @@ import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.partition.DataPartitionTable;
import org.apache.iotdb.commons.partition.SchemaPartitionTable;
import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
-import org.apache.iotdb.confignode.client.SyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.sync.datanode.SyncDataNodeClientPool;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.read.GetDataPartitionPlan;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
index d6d5da8bc1..0fdbae755b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.confignode.manager;
import org.apache.iotdb.common.rpc.thrift.TDataNodeInfo;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.confignode.client.SyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.sync.datanode.SyncDataNodeClientPool;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
import org.apache.iotdb.confignode.consensus.request.auth.AuthorPlan;
import org.apache.iotdb.confignode.consensus.response.PermissionInfoResp;
@@ -107,10 +107,10 @@ public class PermissionManager {
for (TDataNodeInfo dataNodeInfo : allDataNodes) {
status =
SyncDataNodeClientPool.getInstance()
- .sendSyncRequestToDataNode(
+ .sendSyncRequestToDataNodeWithRetry(
dataNodeInfo.getLocation().getInternalEndPoint(),
req,
- DataNodeRequestType.invalidatePermissionCache);
+ DataNodeRequestType.INVALIDATE_PERMISSION_CACHE);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return status;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java
index 22b8f112d8..68daf6461a 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java
@@ -20,10 +20,12 @@
package org.apache.iotdb.confignode.manager;
import org.apache.iotdb.common.rpc.thrift.TDataNodeInfo;
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.confignode.client.AsyncDataNodeClientPool;
-import org.apache.iotdb.confignode.client.handlers.FunctionManagementHandler;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
+import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.async.handlers.AbstractRetryHandler;
+import org.apache.iotdb.confignode.client.async.handlers.FunctionManagementHandler;
import org.apache.iotdb.confignode.consensus.request.write.CreateFunctionPlan;
import org.apache.iotdb.confignode.consensus.request.write.DropFunctionPlan;
import org.apache.iotdb.confignode.persistence.UDFInfo;
@@ -37,8 +39,12 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
public class UDFManager {
@@ -87,24 +93,23 @@ public class UDFManager {
final CountDownLatch countDownLatch = new CountDownLatch(registeredDataNodes.size());
final TCreateFunctionRequest request =
new TCreateFunctionRequest(functionName, className, uris);
-
+ Map<Integer, AbstractRetryHandler> handlerMap = new HashMap<>();
+ Map<Integer, TDataNodeLocation> dataNodeLocations = new ConcurrentHashMap<>();
+ AtomicInteger index = new AtomicInteger(0);
for (TDataNodeInfo dataNodeInfo : registeredDataNodes) {
- final TEndPoint endPoint = dataNodeInfo.getLocation().getInternalEndPoint();
- AsyncDataNodeClientPool.getInstance()
- .createFunction(
- endPoint,
- request,
- new FunctionManagementHandler(
- countDownLatch, dataNodeResponseStatus, endPoint.getIp(), endPoint.getPort()));
+ handlerMap.put(
+ index.get(),
+ new FunctionManagementHandler(
+ countDownLatch,
+ dataNodeInfo.getLocation(),
+ dataNodeResponseStatus,
+ DataNodeRequestType.CREATE_FUNCTION,
+ dataNodeLocations,
+ index.get()));
+ dataNodeLocations.put(index.getAndIncrement(), dataNodeInfo.getLocation());
}
-
- try {
- countDownLatch.await();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- LOGGER.error("UDFManager was interrupted during creating functions on data nodes", e);
- }
-
+ AsyncDataNodeClientPool.getInstance()
+ .sendAsyncRequestToDataNodeWithRetry(request, handlerMap, dataNodeLocations);
return dataNodeResponseStatus;
}
@@ -131,24 +136,23 @@ public class UDFManager {
Collections.synchronizedList(new ArrayList<>(registeredDataNodes.size()));
final CountDownLatch countDownLatch = new CountDownLatch(registeredDataNodes.size());
final TDropFunctionRequest request = new TDropFunctionRequest(functionName);
-
+ Map<Integer, AbstractRetryHandler> handlerMap = new HashMap<>();
+ Map<Integer, TDataNodeLocation> dataNodeLocations = new ConcurrentHashMap<>();
+ AtomicInteger index = new AtomicInteger(0);
for (TDataNodeInfo dataNodeInfo : registeredDataNodes) {
- final TEndPoint endPoint = dataNodeInfo.getLocation().getInternalEndPoint();
- AsyncDataNodeClientPool.getInstance()
- .dropFunction(
- endPoint,
- request,
- new FunctionManagementHandler(
- countDownLatch, dataNodeResponseStatus, endPoint.getIp(), endPoint.getPort()));
+ handlerMap.put(
+ index.get(),
+ new FunctionManagementHandler(
+ countDownLatch,
+ dataNodeInfo.getLocation(),
+ dataNodeResponseStatus,
+ DataNodeRequestType.DROP_FUNCTION,
+ dataNodeLocations,
+ index.get()));
+ dataNodeLocations.put(index.getAndIncrement(), dataNodeInfo.getLocation());
}
-
- try {
- countDownLatch.await();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- LOGGER.error("UDFManager was interrupted during dropping functions on data nodes", e);
- }
-
+ AsyncDataNodeClientPool.getInstance()
+ .sendAsyncRequestToDataNodeWithRetry(request, handlerMap, dataNodeLocations);
return dataNodeResponseStatus;
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index 348744ff8c..6beed810d3 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeInfo;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
@@ -31,11 +32,13 @@ import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.partition.DataPartitionTable;
import org.apache.iotdb.commons.partition.SchemaPartitionTable;
-import org.apache.iotdb.confignode.client.AsyncConfigNodeClientPool;
-import org.apache.iotdb.confignode.client.AsyncDataNodeClientPool;
-import org.apache.iotdb.confignode.client.handlers.ConfigNodeHeartbeatHandler;
-import org.apache.iotdb.confignode.client.handlers.DataNodeHeartbeatHandler;
-import org.apache.iotdb.confignode.client.handlers.UpdateRegionRouteMapHandler;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
+import org.apache.iotdb.confignode.client.async.confignode.AsyncConfigNodeClientPool;
+import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.async.handlers.AbstractRetryHandler;
+import org.apache.iotdb.confignode.client.async.handlers.ConfigNodeHeartbeatHandler;
+import org.apache.iotdb.confignode.client.async.handlers.DataNodeHeartbeatHandler;
+import org.apache.iotdb.confignode.client.async.handlers.UpdateRegionRouteMapHandler;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.write.CreateRegionGroupsPlan;
@@ -300,20 +303,26 @@ public class LoadManager {
CountDownLatch latch = new CountDownLatch(onlineDataNodes.size());
LOGGER.info("Begin to broadcast RegionRouteMap: {}", latestRegionRouteMap);
-
+ Map<Integer, AbstractRetryHandler> handlerMap = new HashMap<>();
+ Map<Integer, TDataNodeLocation> dataNodeLocations = new ConcurrentHashMap<>();
+ AtomicInteger index = new AtomicInteger();
onlineDataNodes.forEach(
- dataNodeInfo ->
- AsyncDataNodeClientPool.getInstance()
- .updateRegionRouteMap(
- dataNodeInfo.getLocation().getInternalEndPoint(),
- new TRegionRouteReq(System.currentTimeMillis(), latestRegionRouteMap),
- new UpdateRegionRouteMapHandler(dataNodeInfo.getLocation(), latch)));
-
- try {
- latch.await();
- } catch (InterruptedException e) {
- LOGGER.warn("Broadcast the latest RegionRouteMap was interrupted!");
- }
+ dataNodeInfo -> {
+ handlerMap.put(
+ index.get(),
+ new UpdateRegionRouteMapHandler(
+ dataNodeInfo.getLocation(),
+ latch,
+ DataNodeRequestType.UPDATE_REGION_ROUTE_MAP,
+ dataNodeLocations,
+ index.get()));
+ dataNodeLocations.put(index.getAndIncrement(), dataNodeInfo.getLocation());
+ });
+ AsyncDataNodeClientPool.getInstance()
+ .sendAsyncRequestToDataNodeWithRetry(
+ new TRegionRouteReq(System.currentTimeMillis(), latestRegionRouteMap),
+ handlerMap,
+ dataNodeLocations);
LOGGER.info("Broadcast the latest RegionRouteMap finished.");
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index 1a6986813f..631a31cf03 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -24,8 +24,8 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeInfo;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.confignode.client.SyncConfigNodeClientPool;
-import org.apache.iotdb.confignode.client.SyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.sync.confignode.SyncConfigNodeClientPool;
+import org.apache.iotdb.confignode.client.sync.datanode.SyncDataNodeClientPool;
import org.apache.iotdb.confignode.consensus.request.write.DeleteStorageGroupPlan;
import org.apache.iotdb.confignode.consensus.request.write.PreDeleteStorageGroupPlan;
import org.apache.iotdb.confignode.exception.AddPeerException;
@@ -115,16 +115,16 @@ public class ConfigNodeProcedureEnv {
for (TDataNodeInfo dataNodeInfo : allDataNodes) {
final TSStatus invalidateSchemaStatus =
SyncDataNodeClientPool.getInstance()
- .sendSyncRequestToDataNode(
+ .sendSyncRequestToDataNodeWithRetry(
dataNodeInfo.getLocation().getInternalEndPoint(),
invalidateCacheReq,
- DataNodeRequestType.invalidateSchemaCache);
+ DataNodeRequestType.INVALIDATE_SCHEMA_CACHE);
final TSStatus invalidatePartitionStatus =
SyncDataNodeClientPool.getInstance()
- .sendSyncRequestToDataNode(
+ .sendSyncRequestToDataNodeWithRetry(
dataNodeInfo.getLocation().getInternalEndPoint(),
invalidateCacheReq,
- DataNodeRequestType.invalidatePartitionCache);
+ DataNodeRequestType.INVALIDATE_PARTITION_CACHE);
if (!verifySucceed(invalidatePartitionStatus, invalidateSchemaStatus)) {
LOG.error(
"Invalidate cache failed, invalidate partition cache status is {}, invalidate schema cache status is {}",
@@ -151,10 +151,10 @@ public class ConfigNodeProcedureEnv {
new ArrayList<>(configManager.getNodeManager().getRegisteredConfigNodes());
configNodeLocations.add(tConfigNodeLocation);
SyncConfigNodeClientPool.getInstance()
- .sendSyncRequestToConfigNode(
+ .sendSyncRequestToConfigNodeWithRetry(
tConfigNodeLocation.getInternalEndPoint(),
configNodeLocations,
- ConfigNodeRequestType.addConsensusGroup);
+ ConfigNodeRequestType.ADD_CONSENSUS_GROUP);
}
/**
@@ -183,10 +183,10 @@ public class ConfigNodeProcedureEnv {
*/
public void notifyRegisterSuccess(TConfigNodeLocation configNodeLocation) {
SyncConfigNodeClientPool.getInstance()
- .sendSyncRequestToConfigNode(
+ .sendSyncRequestToConfigNodeWithRetry(
configNodeLocation.getInternalEndPoint(),
null,
- ConfigNodeRequestType.notifyRegisterSuccess);
+ ConfigNodeRequestType.NOTIFY_REGISTER_SUCCESS);
}
public ReentrantLock getAddConfigNodeLock() {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
index 2b39975316..3aa8035c17 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
@@ -30,7 +30,7 @@ import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
import org.apache.iotdb.commons.udf.service.UDFRegistrationService;
import org.apache.iotdb.commons.utils.NodeUrlUtils;
import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
-import org.apache.iotdb.confignode.client.SyncConfigNodeClientPool;
+import org.apache.iotdb.confignode.client.sync.confignode.SyncConfigNodeClientPool;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeConstant;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
@@ -192,8 +192,8 @@ public class ConfigNode implements ConfigNodeMBean {
TConfigNodeRegisterResp resp =
(TConfigNodeRegisterResp)
SyncConfigNodeClientPool.getInstance()
- .sendSyncRequestToConfigNode(
- targetConfigNode, req, ConfigNodeRequestType.registerConfigNode);
+ .sendSyncRequestToConfigNodeWithRetry(
+ targetConfigNode, req, ConfigNodeRequestType.REGISTER_CONFIG_NODE);
if (resp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
conf.setPartitionRegionId(resp.getPartitionRegionId().getId());
break;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 118c98a2ec..298c287d7f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -29,7 +29,7 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
-import org.apache.iotdb.confignode.client.SyncConfigNodeClientPool;
+import org.apache.iotdb.confignode.client.sync.confignode.SyncConfigNodeClientPool;
import org.apache.iotdb.confignode.conf.ConfigNodeConstant;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.conf.SystemPropertiesUtils;
@@ -461,10 +461,10 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac
status =
(TSStatus)
SyncConfigNodeClientPool.getInstance()
- .sendSyncRequestToConfigNode(
+ .sendSyncRequestToConfigNodeWithRetry(
configNodeLocation.getInternalEndPoint(),
configNodeLocation,
- ConfigNodeRequestType.stopConfigNode);
+ ConfigNodeRequestType.STOP_CONFIG_NODE);
}
// Print log to record the ConfigNode that performs the RemoveConfigNodeRequest