You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2022/11/09 06:02:32 UTC
[iotdb] 01/01: add sendSyncRequestToDataNodeWithGivenRetry method
This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch beyyes/master2
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a1ef119387b6311d056399f965f4e5cb09c980ab
Author: Beyyes <cg...@foxmail.com>
AuthorDate: Wed Nov 9 14:02:12 2022 +0800
add sendSyncRequestToDataNodeWithGivenRetry method
---
.../client/sync/SyncDataNodeClientPool.java | 98 ++++++++++++++--------
.../confignode/persistence/node/NodeInfo.java | 4 +-
.../procedure/env/DataNodeRemoveHandler.java | 11 ++-
.../impl/DataNodeInternalRPCServiceImpl.java | 2 +-
4 files changed, 73 insertions(+), 42 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
index d50210bb4d..f70201db65 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
@@ -50,7 +50,7 @@ public class SyncDataNodeClientPool {
private static final Logger LOGGER = LoggerFactory.getLogger(SyncDataNodeClientPool.class);
- private static final int retryNum = 6;
+ private static final int DEFAULT_RETRY_NUM = 6;
private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> clientManager;
@@ -64,41 +64,9 @@ public class SyncDataNodeClientPool {
public TSStatus sendSyncRequestToDataNodeWithRetry(
TEndPoint endPoint, Object req, DataNodeRequestType requestType) {
Throwable lastException = null;
- for (int retry = 0; retry < retryNum; retry++) {
+ for (int retry = 0; retry < DEFAULT_RETRY_NUM; retry++) {
try (SyncDataNodeInternalServiceClient client = clientManager.borrowClient(endPoint)) {
- switch (requestType) {
- case INVALIDATE_PARTITION_CACHE:
- return client.invalidatePartitionCache((TInvalidateCacheReq) req);
- case INVALIDATE_SCHEMA_CACHE:
- return client.invalidateSchemaCache((TInvalidateCacheReq) req);
- case CREATE_SCHEMA_REGION:
- return client.createSchemaRegion((TCreateSchemaRegionReq) req);
- case CREATE_DATA_REGION:
- return client.createDataRegion((TCreateDataRegionReq) req);
- case DELETE_REGION:
- return client.deleteRegion((TConsensusGroupId) req);
- case INVALIDATE_PERMISSION_CACHE:
- return client.invalidatePermissionCache((TInvalidatePermissionCacheReq) req);
- case DISABLE_DATA_NODE:
- return client.disableDataNode((TDisableDataNodeReq) req);
- case STOP_DATA_NODE:
- return client.stopDataNode();
- case SET_SYSTEM_STATUS:
- return client.setSystemStatus((String) req);
- case UPDATE_TEMPLATE:
- return client.updateTemplate((TUpdateTemplateReq) req);
- case CREATE_NEW_REGION_PEER:
- return client.createNewRegionPeer((TCreatePeerReq) req);
- case ADD_REGION_PEER:
- return client.addRegionPeer((TMaintainPeerReq) req);
- case REMOVE_REGION_PEER:
- return client.removeRegionPeer((TMaintainPeerReq) req);
- case DELETE_OLD_REGION_PEER:
- return client.deleteOldRegionPeer((TMaintainPeerReq) req);
- default:
- return RpcUtils.getStatus(
- TSStatusCode.EXECUTE_STATEMENT_ERROR, "Unknown request type: " + requestType);
- }
+ return executeSyncRequest(requestType, client, req);
} catch (TException | IOException e) {
lastException = e;
LOGGER.warn(
@@ -115,6 +83,66 @@ public class SyncDataNodeClientPool {
.setMessage("All retry failed due to: " + lastException.getMessage());
}
+ public TSStatus sendSyncRequestToDataNodeWithGivenRetry(
+ TEndPoint endPoint, Object req, DataNodeRequestType requestType, int retryNum) {
+ Throwable lastException = new TException();
+ for (int retry = 0; retry < retryNum; retry++) {
+ try (SyncDataNodeInternalServiceClient client = clientManager.borrowClient(endPoint)) {
+ return executeSyncRequest(requestType, client, req);
+ } catch (TException | IOException e) {
+ lastException = e;
+ LOGGER.warn(
+ "{} failed on DataNode {}, because {}, retrying {}...",
+ requestType,
+ endPoint,
+ e.getMessage(),
+ retry);
+ doRetryWait(retry);
+ }
+ }
+ LOGGER.error("{} failed on DataNode {}", requestType, endPoint, lastException);
+ return new TSStatus(TSStatusCode.ALL_RETRY_FAILED.getStatusCode())
+ .setMessage("All retry failed due to: " + lastException.getMessage());
+ }
+
+ private TSStatus executeSyncRequest(DataNodeRequestType requestType,
+ SyncDataNodeInternalServiceClient client,
+ Object req) throws TException {
+ switch (requestType) {
+ case INVALIDATE_PARTITION_CACHE:
+ return client.invalidatePartitionCache((TInvalidateCacheReq) req);
+ case INVALIDATE_SCHEMA_CACHE:
+ return client.invalidateSchemaCache((TInvalidateCacheReq) req);
+ case CREATE_SCHEMA_REGION:
+ return client.createSchemaRegion((TCreateSchemaRegionReq) req);
+ case CREATE_DATA_REGION:
+ return client.createDataRegion((TCreateDataRegionReq) req);
+ case DELETE_REGION:
+ return client.deleteRegion((TConsensusGroupId) req);
+ case INVALIDATE_PERMISSION_CACHE:
+ return client.invalidatePermissionCache((TInvalidatePermissionCacheReq) req);
+ case DISABLE_DATA_NODE:
+ return client.disableDataNode((TDisableDataNodeReq) req);
+ case STOP_DATA_NODE:
+ return client.stopDataNode();
+ case SET_SYSTEM_STATUS:
+ return client.setSystemStatus((String) req);
+ case UPDATE_TEMPLATE:
+ return client.updateTemplate((TUpdateTemplateReq) req);
+ case CREATE_NEW_REGION_PEER:
+ return client.createNewRegionPeer((TCreatePeerReq) req);
+ case ADD_REGION_PEER:
+ return client.addRegionPeer((TMaintainPeerReq) req);
+ case REMOVE_REGION_PEER:
+ return client.removeRegionPeer((TMaintainPeerReq) req);
+ case DELETE_OLD_REGION_PEER:
+ return client.deleteOldRegionPeer((TMaintainPeerReq) req);
+ default:
+ return RpcUtils.getStatus(
+ TSStatusCode.EXECUTE_STATEMENT_ERROR, "Unknown request type: " + requestType);
+ }
+ }
+
private void doRetryWait(int retryNum) {
try {
TimeUnit.MILLISECONDS.sleep(100L * (long) Math.pow(2, retryNum));
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java
index ad2f4d23b3..cb6e0f22ca 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java
@@ -165,7 +165,7 @@ public class NodeInfo implements SnapshotProcessor {
*/
public TSStatus removeDataNode(RemoveDataNodePlan req) {
LOGGER.info(
- "{}, There are {} data node in cluster before executed remove-datanode.sh",
+ "{}, There are {} data node in cluster before executed RemoveDataNodePlan",
REMOVE_DATANODE_PROCESS,
registeredDataNodes.size());
@@ -181,7 +181,7 @@ public class NodeInfo implements SnapshotProcessor {
dataNodeInfoReadWriteLock.writeLock().unlock();
}
LOGGER.info(
- "{}, There are {} data node in cluster after executed remove-datanode.sh",
+ "{}, There are {} data node in cluster after executed RemoveDataNodePlan",
REMOVE_DATANODE_PROCESS,
registeredDataNodes.size());
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
index 2e29166188..525d3fb783 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
@@ -412,14 +412,17 @@ public class DataNodeRemoveHandler {
* @param dataNode old data node
*/
public void stopDataNode(TDataNodeLocation dataNode) {
- LOGGER.info("{}, Begin to stop DataNode {}", REMOVE_DATANODE_PROCESS, dataNode);
+ LOGGER.info(
+ "{}, Begin to stop DataNode and kill the DataNode process {}",
+ REMOVE_DATANODE_PROCESS,
+ dataNode);
AsyncDataNodeClientPool.getInstance().resetClient(dataNode.getInternalEndPoint());
TSStatus status =
SyncDataNodeClientPool.getInstance()
- .sendSyncRequestToDataNodeWithRetry(
- dataNode.getInternalEndPoint(), dataNode, DataNodeRequestType.STOP_DATA_NODE);
+ .sendSyncRequestToDataNodeWithGivenRetry(
+ dataNode.getInternalEndPoint(), dataNode, DataNodeRequestType.STOP_DATA_NODE, 2);
configManager.getNodeManager().removeNodeCache(dataNode.getDataNodeId());
- LOGGER.info("{}, Stop Data Node {} result: {}", REMOVE_DATANODE_PROCESS, dataNode, status);
+ LOGGER.info("{}, Stop Data Node result: {}, stoppedDataNode: {}", REMOVE_DATANODE_PROCESS, status, dataNode);
}
/**
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index ecade45ea8..cef5dda081 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -1482,7 +1482,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
DataNode.getInstance().stop();
status.setMessage("stop datanode succeed");
} catch (Exception e) {
- LOGGER.error("stop Data Node error", e);
+ LOGGER.error("Stop Data Node error", e);
status.setCode(TSStatusCode.DATANODE_STOP_ERROR.getStatusCode());
status.setMessage(e.getMessage());
}