You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by wa...@apache.org on 2022/07/22 06:52:18 UTC
[iotdb] branch master updated: [IOTDB-3880] Optimize asyncclientpool retry logic (#6735)
This is an automated email from the ASF dual-hosted git repository.
wangchao316 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 842105470d [IOTDB-3880] Optimize asyncclientpool retry logic (#6735)
842105470d is described below
commit 842105470de4ad166d2307f4f05ffaa388e66d0c
Author: 任宇华 <79...@users.noreply.github.com>
AuthorDate: Fri Jul 22 14:52:12 2022 +0800
[IOTDB-3880] Optimize asyncclientpool retry logic (#6735)
[IOTDB-3880] Optimize asyncclientpool retry logic (#6735)
---
.../confignode/client/DataNodeRequestType.java | 3 +-
.../async/datanode/AsyncDataNodeClientPool.java | 221 ++++++++++++---------
.../async/handlers/AbstractRetryHandler.java | 33 +--
.../client/async/handlers/CreateRegionHandler.java | 14 +-
.../client/async/handlers/FlushHandler.java | 16 +-
.../async/handlers/FunctionManagementHandler.java | 11 +-
.../client/async/handlers/SetTTLHandler.java | 7 +-
.../handlers/UpdateRegionRouteMapHandler.java | 9 +-
.../confignode/manager/ClusterSchemaManager.java | 30 +--
.../iotdb/confignode/manager/NodeManager.java | 43 ++--
.../iotdb/confignode/manager/UDFManager.java | 63 ++----
.../iotdb/confignode/manager/load/LoadManager.java | 31 +--
12 files changed, 219 insertions(+), 262 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
index f9a96122a4..715ebfd8de 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
@@ -29,7 +29,8 @@ public enum DataNodeRequestType {
STOP_DATA_NODE,
SET_TTL,
- CREATE_REGIONS,
+ CREATE_DATA_REGIONS,
+ CREATE_SCHEMA_REGIONS,
CREATE_FUNCTION,
DROP_FUNCTION,
FLUSH,
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeClientPool.java
index 7475ea3516..91c10c5991 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeClientPool.java
@@ -18,15 +18,15 @@
*/
package org.apache.iotdb.confignode.client.async.datanode;
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
-import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
import org.apache.iotdb.confignode.client.ConfigNodeClientPoolFactory;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
import org.apache.iotdb.confignode.client.async.handlers.AbstractRetryHandler;
import org.apache.iotdb.confignode.client.async.handlers.CreateRegionHandler;
import org.apache.iotdb.confignode.client.async.handlers.DataNodeHeartbeatHandler;
@@ -49,7 +49,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
/** Asynchronously send RPC requests to DataNodes. See mpp.thrift for more details. */
public class AsyncDataNodeClientPool {
@@ -72,38 +71,63 @@ public class AsyncDataNodeClientPool {
* receive the requests
*
* @param req request
- * @param handlerMap Map<index, Handler>
- * @param dataNodeLocations ConcurrentHashMap<index, TDataNodeLocation> The specific DataNodes
+ * @param dataNodeLocationMap Map<DataNodeId, TDataNodeLocation>
+ * @param requestType DataNodeRequestType
+ * @param dataNodeResponseStatus response list.Used by CREATE_FUNCTION,DROP_FUNCTION and FLUSH
*/
public void sendAsyncRequestToDataNodeWithRetry(
Object req,
- Map<Integer, AbstractRetryHandler> handlerMap,
- Map<Integer, TDataNodeLocation> dataNodeLocations) {
- CountDownLatch countDownLatch = new CountDownLatch(dataNodeLocations.size());
- if (dataNodeLocations.isEmpty()) {
+ Map<Integer, TDataNodeLocation> dataNodeLocationMap,
+ DataNodeRequestType requestType,
+ List<TSStatus> dataNodeResponseStatus) {
+ if (dataNodeLocationMap.isEmpty()) {
return;
}
for (int retry = 0; retry < retryNum; retry++) {
- AbstractRetryHandler handler = null;
- for (Map.Entry<Integer, TDataNodeLocation> entry : dataNodeLocations.entrySet()) {
- handler = handlerMap.get(entry.getKey());
- // If it is not the first request, then prove that this operation is a retry.
- // The count of countDownLatch needs to be updated
- if (retry != 0) {
- handler.setCountDownLatch(countDownLatch);
+ CountDownLatch countDownLatch = new CountDownLatch(dataNodeLocationMap.size());
+ for (TDataNodeLocation targetDataNode : dataNodeLocationMap.values()) {
+ AbstractRetryHandler handler;
+ switch (requestType) {
+ case SET_TTL:
+ handler =
+ new SetTTLHandler(countDownLatch, requestType, targetDataNode, dataNodeLocationMap);
+ break;
+ case CREATE_FUNCTION:
+ case DROP_FUNCTION:
+ handler =
+ new FunctionManagementHandler(
+ countDownLatch,
+ requestType,
+ targetDataNode,
+ dataNodeLocationMap,
+ dataNodeResponseStatus);
+ break;
+ case FLUSH:
+ handler =
+ new FlushHandler(
+ countDownLatch,
+ requestType,
+ targetDataNode,
+ dataNodeLocationMap,
+ dataNodeResponseStatus);
+ break;
+ case UPDATE_REGION_ROUTE_MAP:
+ handler =
+ new UpdateRegionRouteMapHandler(
+ countDownLatch, requestType, targetDataNode, dataNodeLocationMap);
+ break;
+ default:
+ return;
}
- // send request
- sendAsyncRequestToDataNode(entry.getValue(), req, handler, retry);
+ sendAsyncRequestToDataNode(targetDataNode, req, handler, retry);
}
try {
- handler.getCountDownLatch().await();
+ countDownLatch.await();
} catch (InterruptedException e) {
- LOGGER.error("Interrupted during {} on ConfigNode", handler.getDataNodeRequestType());
+ LOGGER.error("Interrupted during {} on ConfigNode", requestType);
}
// Check if there is a node that fails to send the request, and retry if there is one
- if (!handler.getDataNodeLocations().isEmpty()) {
- countDownLatch = new CountDownLatch(handler.getDataNodeLocations().size());
- } else {
+ if (dataNodeLocationMap.isEmpty()) {
break;
}
}
@@ -118,21 +142,11 @@ public class AsyncDataNodeClientPool {
try {
client = clientManager.borrowClient(dataNodeLocation.getInternalEndPoint());
switch (handler.getDataNodeRequestType()) {
- case SET_TTL:
- client.setTTL((TSetTTLReq) req, (SetTTLHandler) handler);
+ case CREATE_DATA_REGIONS:
+ client.createDataRegion((TCreateDataRegionReq) req, (CreateRegionHandler) handler);
break;
- case CREATE_REGIONS:
- TConsensusGroupType regionType =
- ((CreateRegionHandler) handler).getConsensusGroupId().getType();
- if (regionType.equals(TConsensusGroupType.SchemaRegion)) {
- client.createSchemaRegion(
- (TCreateSchemaRegionReq) ((Map<Integer, Object>) req).get(handler.getIndex()),
- (CreateRegionHandler) handler);
- } else if (regionType.equals(TConsensusGroupType.DataRegion)) {
- client.createDataRegion(
- (TCreateDataRegionReq) ((Map<Integer, Object>) req).get(handler.getIndex()),
- (CreateRegionHandler) handler);
- }
+ case CREATE_SCHEMA_REGIONS:
+ client.createSchemaRegion((TCreateSchemaRegionReq) req, (CreateRegionHandler) handler);
break;
case CREATE_FUNCTION:
client.createFunction((TCreateFunctionRequest) req, (FunctionManagementHandler) handler);
@@ -147,7 +161,6 @@ public class AsyncDataNodeClientPool {
client.updateRegionCache((TRegionRouteReq) req, (UpdateRegionRouteMapHandler) handler);
break;
default:
- return;
}
} catch (Exception e) {
LOGGER.warn(
@@ -167,61 +180,85 @@ public class AsyncDataNodeClientPool {
*/
public void createRegions(
CreateRegionGroupsPlan createRegionGroupsPlan, Map<String, Long> ttlMap) {
-
- // Number of regions to be created
- int regionNum = 0;
- // Assign an independent index to each Region
- for (Map.Entry<String, List<TRegionReplicaSet>> entry :
- createRegionGroupsPlan.getRegionGroupMap().entrySet()) {
- for (TRegionReplicaSet regionReplicaSet : entry.getValue()) {
- regionNum += regionReplicaSet.getDataNodeLocationsSize();
+ // Because different requests will be sent to the same node when createRegions,
+ // so for CreateRegions use Map<index, TDataNodeLocation>
+ Map<Integer, TDataNodeLocation> dataNodeLocationMap = new ConcurrentHashMap<>();
+ int index = 0;
+ // Count the datanodes to be sent
+ for (List<TRegionReplicaSet> regionReplicaSets :
+ createRegionGroupsPlan.getRegionGroupMap().values()) {
+ for (TRegionReplicaSet regionReplicaSet : regionReplicaSets) {
+ for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) {
+ dataNodeLocationMap.put(index++, dataNodeLocation);
+ }
+ }
+ }
+ if (dataNodeLocationMap.isEmpty()) {
+ return;
+ }
+ for (int retry = 0; retry < retryNum; retry++) {
+ index = 0;
+ CountDownLatch countDownLatch = new CountDownLatch(dataNodeLocationMap.size());
+ for (Map.Entry<String, List<TRegionReplicaSet>> entry :
+ createRegionGroupsPlan.getRegionGroupMap().entrySet()) {
+ // Enumerate each RegionReplicaSet
+ for (TRegionReplicaSet regionReplicaSet : entry.getValue()) {
+ // Enumerate each Region
+ for (TDataNodeLocation targetDataNode : regionReplicaSet.getDataNodeLocations()) {
+ if (dataNodeLocationMap.containsKey(index)) {
+ AbstractRetryHandler handler;
+ switch (regionReplicaSet.getRegionId().getType()) {
+ case SchemaRegion:
+ handler =
+ new CreateRegionHandler(
+ countDownLatch,
+ DataNodeRequestType.CREATE_SCHEMA_REGIONS,
+ regionReplicaSet.regionId,
+ targetDataNode,
+ dataNodeLocationMap,
+ index++);
+ sendAsyncRequestToDataNode(
+ targetDataNode,
+ genCreateSchemaRegionReq(entry.getKey(), regionReplicaSet),
+ handler,
+ retry);
+ break;
+ case DataRegion:
+ handler =
+ new CreateRegionHandler(
+ countDownLatch,
+ DataNodeRequestType.CREATE_DATA_REGIONS,
+ regionReplicaSet.regionId,
+ targetDataNode,
+ dataNodeLocationMap,
+ index++);
+ sendAsyncRequestToDataNode(
+ targetDataNode,
+ genCreateDataRegionReq(
+ entry.getKey(), regionReplicaSet, ttlMap.get(entry.getKey())),
+ handler,
+ retry);
+ break;
+ default:
+ return;
+ }
+ } else {
+ index++;
+ }
+ }
+ }
+ }
+ try {
+ countDownLatch.await();
+ } catch (InterruptedException e) {
+ LOGGER.error("Interrupted during createRegions on ConfigNode");
+ }
+ // Check if there is a node that fails to send the request, and
+ // retry if there is one
+ if (dataNodeLocationMap.isEmpty()) {
+ break;
}
}
- Map<Integer, AbstractRetryHandler> handlerMap = new ConcurrentHashMap<>();
- ConcurrentHashMap<Integer, TDataNodeLocation> dataNodeLocations = new ConcurrentHashMap<>();
- Map<Integer, Object> req = new ConcurrentHashMap<>();
- AtomicInteger index = new AtomicInteger();
- CountDownLatch latch = new CountDownLatch(regionNum);
- createRegionGroupsPlan
- .getRegionGroupMap()
- .forEach(
- (storageGroup, regionReplicaSets) -> {
- // Enumerate each RegionReplicaSet
- regionReplicaSets.forEach(
- regionReplicaSet -> {
- // Enumerate each Region
- regionReplicaSet
- .getDataNodeLocations()
- .forEach(
- dataNodeLocation -> {
- handlerMap.put(
- index.get(),
- new CreateRegionHandler(
- index.get(),
- latch,
- regionReplicaSet.getRegionId(),
- dataNodeLocation,
- dataNodeLocations));
-
- switch (regionReplicaSet.getRegionId().getType()) {
- case SchemaRegion:
- req.put(
- index.get(),
- genCreateSchemaRegionReq(storageGroup, regionReplicaSet));
- break;
- case DataRegion:
- req.put(
- index.get(),
- genCreateDataRegionReq(
- storageGroup,
- regionReplicaSet,
- ttlMap.get(storageGroup)));
- }
- dataNodeLocations.put(index.getAndIncrement(), dataNodeLocation);
- });
- });
- });
- sendAsyncRequestToDataNodeWithRetry(req, handlerMap, dataNodeLocations);
}
private TCreateSchemaRegionReq genCreateSchemaRegionReq(
@@ -244,7 +281,7 @@ public class AsyncDataNodeClientPool {
/**
* Only used in LoadManager
*
- * @param endPoint
+ * @param endPoint The specific DataNode
*/
public void getDataNodeHeartBeat(
TEndPoint endPoint, THeartbeatReq req, DataNodeHeartbeatHandler handler) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AbstractRetryHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AbstractRetryHandler.java
index 518036dd19..6d882ecbd2 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AbstractRetryHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AbstractRetryHandler.java
@@ -27,15 +27,14 @@ import java.util.concurrent.CountDownLatch;
public abstract class AbstractRetryHandler {
- protected final int index;
-
protected CountDownLatch countDownLatch;
+
/**
- * Map<Index, TDataNodeLocation> The DataNode that successfully execute the request will be
- * removed from this map
+ * Map<DataNodeId, TDataNodeLocation> The DataNode that successfully execute the request will be
+ * removed from this list
*/
- protected Map<Integer, TDataNodeLocation> dataNodeLocations;
-
+ protected Map<Integer, TDataNodeLocation> dataNodeLocationMap;
+ /** Request type to DataNode */
protected DataNodeRequestType dataNodeRequestType;
/** Target DataNode */
protected TDataNodeLocation targetDataNode;
@@ -44,32 +43,14 @@ public abstract class AbstractRetryHandler {
CountDownLatch countDownLatch,
DataNodeRequestType dataNodeRequestType,
TDataNodeLocation targetDataNode,
- Map<Integer, TDataNodeLocation> dataNodeLocations,
- int index) {
+ Map<Integer, TDataNodeLocation> dataNodeLocationMap) {
this.countDownLatch = countDownLatch;
- this.dataNodeLocations = dataNodeLocations;
+ this.dataNodeLocationMap = dataNodeLocationMap;
this.dataNodeRequestType = dataNodeRequestType;
this.targetDataNode = targetDataNode;
- this.index = index;
- }
-
- public void setCountDownLatch(CountDownLatch countDownLatch) {
- this.countDownLatch = countDownLatch;
- }
-
- public CountDownLatch getCountDownLatch() {
- return countDownLatch;
- }
-
- public Map<Integer, TDataNodeLocation> getDataNodeLocations() {
- return dataNodeLocations;
}
public DataNodeRequestType getDataNodeRequestType() {
return dataNodeRequestType;
}
-
- public int getIndex() {
- return index;
- }
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/CreateRegionHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/CreateRegionHandler.java
index c1828b64ab..3cef7ed55a 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/CreateRegionHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/CreateRegionHandler.java
@@ -38,23 +38,29 @@ public class CreateRegionHandler extends AbstractRetryHandler
private static final Logger LOGGER = LoggerFactory.getLogger(CreateRegionHandler.class);
+ // The index of dataNodeLocations
+ // We use Index instead of DataNodeId because it is possible to send multiple createRegion
+ // requests to the same DataNode
+ private final int index;
// Used for Logger
private final TConsensusGroupId consensusGroupId;
public CreateRegionHandler(
- int index,
CountDownLatch latch,
+ DataNodeRequestType requestType,
TConsensusGroupId consensusGroupId,
TDataNodeLocation targetDataNode,
- Map<Integer, TDataNodeLocation> dataNodeLocations) {
- super(latch, DataNodeRequestType.CREATE_REGIONS, targetDataNode, dataNodeLocations, index);
+ Map<Integer, TDataNodeLocation> dataNodeLocationMap,
+ int index) {
+ super(latch, requestType, targetDataNode, dataNodeLocationMap);
this.consensusGroupId = consensusGroupId;
+ this.index = index;
}
@Override
public void onComplete(TSStatus tsStatus) {
if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- getDataNodeLocations().remove(index);
+ dataNodeLocationMap.remove(index);
LOGGER.info(
String.format(
"Successfully create %s on DataNode: %s",
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/FlushHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/FlushHandler.java
index f72cf7ee74..aa0c366ced 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/FlushHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/FlushHandler.java
@@ -39,13 +39,12 @@ public class FlushHandler extends AbstractRetryHandler implements AsyncMethodCal
private final List<TSStatus> dataNodeResponseStatus;
public FlushHandler(
- TDataNodeLocation targetDataNode,
CountDownLatch countDownLatch,
DataNodeRequestType requestType,
- List<TSStatus> dataNodeResponseStatus,
- Map<Integer, TDataNodeLocation> dataNodeLocations,
- int index) {
- super(countDownLatch, requestType, targetDataNode, dataNodeLocations, index);
+ TDataNodeLocation targetDataNode,
+ Map<Integer, TDataNodeLocation> dataNodeLocationMap,
+ List<TSStatus> dataNodeResponseStatus) {
+ super(countDownLatch, requestType, targetDataNode, dataNodeLocationMap);
this.dataNodeResponseStatus = dataNodeResponseStatus;
}
@@ -53,10 +52,13 @@ public class FlushHandler extends AbstractRetryHandler implements AsyncMethodCal
public void onComplete(TSStatus response) {
if (response.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
dataNodeResponseStatus.add(response);
- dataNodeLocations.remove(index);
+ dataNodeLocationMap.remove(targetDataNode.getDataNodeId());
LOGGER.info("Successfully Flush on DataNode: {}", targetDataNode);
} else {
- LOGGER.error("Failed to Flush on DataNode {}, {}", dataNodeLocations, response);
+ LOGGER.error(
+ "Failed to Flush on DataNode {}, {}",
+ dataNodeLocationMap.get(targetDataNode.getDataNodeId()),
+ response);
}
countDownLatch.countDown();
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/FunctionManagementHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/FunctionManagementHandler.java
index 306cea3801..e48545a8ae 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/FunctionManagementHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/FunctionManagementHandler.java
@@ -41,12 +41,11 @@ public class FunctionManagementHandler extends AbstractRetryHandler
public FunctionManagementHandler(
CountDownLatch countDownLatch,
- TDataNodeLocation targetDataNode,
- List<TSStatus> dataNodeResponseStatus,
DataNodeRequestType requestType,
- Map<Integer, TDataNodeLocation> dataNodeLocations,
- int index) {
- super(countDownLatch, requestType, targetDataNode, dataNodeLocations, index);
+ TDataNodeLocation targetDataNode,
+ Map<Integer, TDataNodeLocation> dataNodeLocationMap,
+ List<TSStatus> dataNodeResponseStatus) {
+ super(countDownLatch, requestType, targetDataNode, dataNodeLocationMap);
this.dataNodeResponseStatus = dataNodeResponseStatus;
}
@@ -54,7 +53,7 @@ public class FunctionManagementHandler extends AbstractRetryHandler
public void onComplete(TSStatus response) {
if (response.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
dataNodeResponseStatus.add(response);
- dataNodeLocations.remove(index);
+ dataNodeLocationMap.remove(targetDataNode.getDataNodeId());
LOGGER.info("Successfully {} on DataNode: {}", dataNodeRequestType, targetDataNode);
} else {
LOGGER.info("Failed to {} on DataNode: {}", dataNodeRequestType, targetDataNode);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/SetTTLHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/SetTTLHandler.java
index e4fb7d79e6..6792a49e7f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/SetTTLHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/SetTTLHandler.java
@@ -38,15 +38,14 @@ public class SetTTLHandler extends AbstractRetryHandler implements AsyncMethodCa
CountDownLatch countDownLatch,
DataNodeRequestType requestType,
TDataNodeLocation targetDataNode,
- Map<Integer, TDataNodeLocation> dataNodeLocations,
- int index) {
- super(countDownLatch, requestType, targetDataNode, dataNodeLocations, index);
+ Map<Integer, TDataNodeLocation> dataNodeLocationMap) {
+ super(countDownLatch, requestType, targetDataNode, dataNodeLocationMap);
}
@Override
public void onComplete(TSStatus response) {
if (response.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- getDataNodeLocations().remove(index);
+ dataNodeLocationMap.remove(targetDataNode.getDataNodeId());
LOGGER.info("Successfully SetTTL on DataNode: {}", targetDataNode);
} else {
LOGGER.error("Failed to SetTTL on DataNode: {}, {}", targetDataNode, response);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/UpdateRegionRouteMapHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/UpdateRegionRouteMapHandler.java
index 5448f5f2e1..4877fb70a6 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/UpdateRegionRouteMapHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/UpdateRegionRouteMapHandler.java
@@ -36,18 +36,17 @@ public class UpdateRegionRouteMapHandler extends AbstractRetryHandler
private static final Logger LOGGER = LoggerFactory.getLogger(UpdateRegionRouteMapHandler.class);
public UpdateRegionRouteMapHandler(
- TDataNodeLocation targetDataNode,
CountDownLatch countDownLatch,
DataNodeRequestType requestType,
- Map<Integer, TDataNodeLocation> dataNodeLocations,
- int index) {
- super(countDownLatch, requestType, targetDataNode, dataNodeLocations, index);
+ TDataNodeLocation targetDataNode,
+ Map<Integer, TDataNodeLocation> dataNodeLocationMap) {
+ super(countDownLatch, requestType, targetDataNode, dataNodeLocationMap);
}
@Override
public void onComplete(TSStatus status) {
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- getDataNodeLocations().remove(targetDataNode);
+ dataNodeLocationMap.remove(targetDataNode.getDataNodeId());
LOGGER.info("Successfully update the RegionRouteMap on DataNode: {}", targetDataNode);
} else {
LOGGER.error("Update RegionRouteMap on DataNode: {} failed", targetDataNode);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
index ab88c4add0..a3eef8ecbe 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
@@ -26,8 +26,6 @@ import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.confignode.client.DataNodeRequestType;
import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeClientPool;
-import org.apache.iotdb.confignode.client.async.handlers.AbstractRetryHandler;
-import org.apache.iotdb.confignode.client.async.handlers.SetTTLHandler;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.read.CountStorageGroupPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetNodesInSchemaTemplatePlan;
@@ -63,13 +61,10 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
/** The ClusterSchemaManager Manages cluster schema read and write requests. */
public class ClusterSchemaManager {
@@ -162,33 +157,22 @@ public class ClusterSchemaManager {
TSStatusCode.STORAGE_GROUP_NOT_EXIST,
"storageGroup " + setTTLPlan.getStorageGroup() + " does not exist");
}
-
+ Map<Integer, TDataNodeLocation> dataNodeLocationMap = new ConcurrentHashMap<>();
Set<TDataNodeLocation> dataNodeLocations =
getPartitionManager()
.getStorageGroupRelatedDataNodes(
setTTLPlan.getStorageGroup(), TConsensusGroupType.DataRegion);
+ dataNodeLocations.forEach(
+ dataNodeLocation ->
+ dataNodeLocationMap.put(dataNodeLocation.getDataNodeId(), dataNodeLocation));
if (dataNodeLocations.size() > 0) {
- CountDownLatch countDownLatch = new CountDownLatch(dataNodeLocations.size());
- Map<Integer, AbstractRetryHandler> handler = new HashMap<>();
- Map<Integer, TDataNodeLocation> dataNodeLocationMap = new ConcurrentHashMap<>();
- AtomicInteger index = new AtomicInteger();
// TODO: Use procedure to protect SetTTL on DataNodes
- for (TDataNodeLocation dataNodeLocation : dataNodeLocations) {
- handler.put(
- index.get(),
- new SetTTLHandler(
- countDownLatch,
- DataNodeRequestType.SET_TTL,
- dataNodeLocation,
- dataNodeLocationMap,
- index.get()));
- dataNodeLocationMap.put(index.getAndIncrement(), dataNodeLocation);
- }
AsyncDataNodeClientPool.getInstance()
.sendAsyncRequestToDataNodeWithRetry(
new TSetTTLReq(setTTLPlan.getStorageGroup(), setTTLPlan.getTTL()),
- handler,
- dataNodeLocationMap);
+ dataNodeLocationMap,
+ DataNodeRequestType.SET_TTL,
+ null);
}
return getConsensusManager().write(setTTLPlan).getStatus();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
index 03219060df..855bc4ec7a 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
@@ -27,8 +27,6 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.confignode.client.DataNodeRequestType;
import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeClientPool;
-import org.apache.iotdb.confignode.client.async.handlers.AbstractRetryHandler;
-import org.apache.iotdb.confignode.client.async.handlers.FlushHandler;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.read.GetDataNodeConfigurationPlan;
@@ -55,13 +53,10 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
/** NodeManager manages cluster node addition and removal requests */
@@ -179,6 +174,7 @@ public class NodeManager {
public DataNodeConfigurationResp getDataNodeConfiguration(GetDataNodeConfigurationPlan req) {
return (DataNodeConfigurationResp) getConsensusManager().read(req).getDataset();
}
+
/**
* Only leader use this interface
*
@@ -208,6 +204,18 @@ public class NodeManager {
return nodeInfo.getRegisteredDataNodes(dataNodeId);
}
+ public Map<Integer, TDataNodeLocation> getRegisteredDataNodeLocations(int dataNodeId) {
+ Map<Integer, TDataNodeLocation> dataNodeLocations = new ConcurrentHashMap<>();
+ nodeInfo
+ .getRegisteredDataNodes(dataNodeId)
+ .forEach(
+ dataNodeConfiguration ->
+ dataNodeLocations.put(
+ dataNodeConfiguration.getLocation().getDataNodeId(),
+ dataNodeConfiguration.getLocation()));
+ return dataNodeLocations;
+ }
+
public List<TDataNodesInfo> getRegisteredDataNodesInfoList() {
List<TDataNodesInfo> dataNodesLocations = new ArrayList<>();
List<TDataNodeConfiguration> registeredDataNodes = this.getRegisteredDataNodes(-1);
@@ -425,28 +433,13 @@ public class NodeManager {
}
public List<TSStatus> flush(TFlushReq req) {
- List<TDataNodeConfiguration> registeredDataNodes =
- configManager.getNodeManager().getRegisteredDataNodes(req.dataNodeId);
+ Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+ configManager.getNodeManager().getRegisteredDataNodeLocations(req.dataNodeId);
List<TSStatus> dataNodeResponseStatus =
- Collections.synchronizedList(new ArrayList<>(registeredDataNodes.size()));
- CountDownLatch countDownLatch = new CountDownLatch(registeredDataNodes.size());
- Map<Integer, AbstractRetryHandler> handlerMap = new HashMap<>();
- Map<Integer, TDataNodeLocation> dataNodeLocations = new ConcurrentHashMap<>();
- AtomicInteger index = new AtomicInteger();
- for (TDataNodeConfiguration dataNodeInfo : registeredDataNodes) {
- handlerMap.put(
- index.get(),
- new FlushHandler(
- dataNodeInfo.getLocation(),
- countDownLatch,
- DataNodeRequestType.FLUSH,
- dataNodeResponseStatus,
- dataNodeLocations,
- index.get()));
- dataNodeLocations.put(index.getAndIncrement(), dataNodeInfo.getLocation());
- }
+ Collections.synchronizedList(new ArrayList<>(dataNodeLocationMap.size()));
AsyncDataNodeClientPool.getInstance()
- .sendAsyncRequestToDataNodeWithRetry(req, handlerMap, dataNodeLocations);
+ .sendAsyncRequestToDataNodeWithRetry(
+ req, dataNodeLocationMap, DataNodeRequestType.FLUSH, dataNodeResponseStatus);
return dataNodeResponseStatus;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java
index f9c789d95d..32bf01f437 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java
@@ -19,13 +19,10 @@
package org.apache.iotdb.confignode.manager;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.confignode.client.DataNodeRequestType;
import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeClientPool;
-import org.apache.iotdb.confignode.client.async.handlers.AbstractRetryHandler;
-import org.apache.iotdb.confignode.client.async.handlers.FunctionManagementHandler;
import org.apache.iotdb.confignode.consensus.request.write.CreateFunctionPlan;
import org.apache.iotdb.confignode.consensus.request.write.DropFunctionPlan;
import org.apache.iotdb.confignode.persistence.UDFInfo;
@@ -39,12 +36,8 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
public class UDFManager {
@@ -86,30 +79,18 @@ public class UDFManager {
private List<TSStatus> createFunctionOnDataNodes(
String functionName, String className, List<String> uris) {
- final List<TDataNodeConfiguration> registeredDataNodes =
- configManager.getNodeManager().getRegisteredDataNodes(-1);
+ final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+ configManager.getNodeManager().getRegisteredDataNodeLocations(-1);
final List<TSStatus> dataNodeResponseStatus =
- Collections.synchronizedList(new ArrayList<>(registeredDataNodes.size()));
- final CountDownLatch countDownLatch = new CountDownLatch(registeredDataNodes.size());
+ Collections.synchronizedList(new ArrayList<>(dataNodeLocationMap.size()));
final TCreateFunctionRequest request =
new TCreateFunctionRequest(functionName, className, uris);
- Map<Integer, AbstractRetryHandler> handlerMap = new HashMap<>();
- Map<Integer, TDataNodeLocation> dataNodeLocations = new ConcurrentHashMap<>();
- AtomicInteger index = new AtomicInteger(0);
- for (TDataNodeConfiguration dataNodeInfo : registeredDataNodes) {
- handlerMap.put(
- index.get(),
- new FunctionManagementHandler(
- countDownLatch,
- dataNodeInfo.getLocation(),
- dataNodeResponseStatus,
- DataNodeRequestType.CREATE_FUNCTION,
- dataNodeLocations,
- index.get()));
- dataNodeLocations.put(index.getAndIncrement(), dataNodeInfo.getLocation());
- }
AsyncDataNodeClientPool.getInstance()
- .sendAsyncRequestToDataNodeWithRetry(request, handlerMap, dataNodeLocations);
+ .sendAsyncRequestToDataNodeWithRetry(
+ request,
+ dataNodeLocationMap,
+ DataNodeRequestType.CREATE_FUNCTION,
+ dataNodeResponseStatus);
return dataNodeResponseStatus;
}
@@ -130,29 +111,17 @@ public class UDFManager {
}
private List<TSStatus> dropFunctionOnDataNodes(String functionName) {
- final List<TDataNodeConfiguration> registeredDataNodes =
- configManager.getNodeManager().getRegisteredDataNodes(-1);
+ final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+ configManager.getNodeManager().getRegisteredDataNodeLocations(-1);
final List<TSStatus> dataNodeResponseStatus =
- Collections.synchronizedList(new ArrayList<>(registeredDataNodes.size()));
- final CountDownLatch countDownLatch = new CountDownLatch(registeredDataNodes.size());
+ Collections.synchronizedList(new ArrayList<>(dataNodeLocationMap.size()));
final TDropFunctionRequest request = new TDropFunctionRequest(functionName);
- Map<Integer, AbstractRetryHandler> handlerMap = new HashMap<>();
- Map<Integer, TDataNodeLocation> dataNodeLocations = new ConcurrentHashMap<>();
- AtomicInteger index = new AtomicInteger(0);
- for (TDataNodeConfiguration dataNodeInfo : registeredDataNodes) {
- handlerMap.put(
- index.get(),
- new FunctionManagementHandler(
- countDownLatch,
- dataNodeInfo.getLocation(),
- dataNodeResponseStatus,
- DataNodeRequestType.DROP_FUNCTION,
- dataNodeLocations,
- index.get()));
- dataNodeLocations.put(index.getAndIncrement(), dataNodeInfo.getLocation());
- }
AsyncDataNodeClientPool.getInstance()
- .sendAsyncRequestToDataNodeWithRetry(request, handlerMap, dataNodeLocations);
+ .sendAsyncRequestToDataNodeWithRetry(
+ request,
+ dataNodeLocationMap,
+ DataNodeRequestType.DROP_FUNCTION,
+ dataNodeResponseStatus);
return dataNodeResponseStatus;
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index b52b1cd55d..9886593212 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -35,10 +35,8 @@ import org.apache.iotdb.commons.partition.SchemaPartitionTable;
import org.apache.iotdb.confignode.client.DataNodeRequestType;
import org.apache.iotdb.confignode.client.async.confignode.AsyncConfigNodeClientPool;
import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeClientPool;
-import org.apache.iotdb.confignode.client.async.handlers.AbstractRetryHandler;
import org.apache.iotdb.confignode.client.async.handlers.ConfigNodeHeartbeatHandler;
import org.apache.iotdb.confignode.client.async.handlers.DataNodeHeartbeatHandler;
-import org.apache.iotdb.confignode.client.async.handlers.UpdateRegionRouteMapHandler;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.write.CreateRegionGroupsPlan;
@@ -66,7 +64,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -299,30 +296,20 @@ public class LoadManager {
private void broadcastLatestRegionRouteMap() {
Map<TConsensusGroupId, TRegionReplicaSet> latestRegionRouteMap = genLatestRegionRouteMap();
- List<TDataNodeConfiguration> onlineDataNodes = getOnlineDataNodes(-1);
- CountDownLatch latch = new CountDownLatch(onlineDataNodes.size());
+ Map<Integer, TDataNodeLocation> dataNodeLocationMap = new ConcurrentHashMap<>();
+ getOnlineDataNodes(-1)
+ .forEach(
+ onlineDataNode ->
+ dataNodeLocationMap.put(
+ onlineDataNode.getLocation().getDataNodeId(), onlineDataNode.getLocation()));
LOGGER.info("Begin to broadcast RegionRouteMap: {}", latestRegionRouteMap);
- Map<Integer, AbstractRetryHandler> handlerMap = new HashMap<>();
- Map<Integer, TDataNodeLocation> dataNodeLocations = new ConcurrentHashMap<>();
- AtomicInteger index = new AtomicInteger();
- onlineDataNodes.forEach(
- dataNodeInfo -> {
- handlerMap.put(
- index.get(),
- new UpdateRegionRouteMapHandler(
- dataNodeInfo.getLocation(),
- latch,
- DataNodeRequestType.UPDATE_REGION_ROUTE_MAP,
- dataNodeLocations,
- index.get()));
- dataNodeLocations.put(index.getAndIncrement(), dataNodeInfo.getLocation());
- });
AsyncDataNodeClientPool.getInstance()
.sendAsyncRequestToDataNodeWithRetry(
new TRegionRouteReq(System.currentTimeMillis(), latestRegionRouteMap),
- handlerMap,
- dataNodeLocations);
+ dataNodeLocationMap,
+ DataNodeRequestType.UPDATE_REGION_ROUTE_MAP,
+ null);
LOGGER.info("Broadcast the latest RegionRouteMap finished.");
}