You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2022/11/09 06:02:32 UTC

[iotdb] 01/01: add sendSyncRequestToDataNodeWithGivenRetry method

This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/master2
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a1ef119387b6311d056399f965f4e5cb09c980ab
Author: Beyyes <cg...@foxmail.com>
AuthorDate: Wed Nov 9 14:02:12 2022 +0800

    add sendSyncRequestToDataNodeWithGivenRetry method
---
 .../client/sync/SyncDataNodeClientPool.java        | 98 ++++++++++++++--------
 .../confignode/persistence/node/NodeInfo.java      |  4 +-
 .../procedure/env/DataNodeRemoveHandler.java       | 11 ++-
 .../impl/DataNodeInternalRPCServiceImpl.java       |  2 +-
 4 files changed, 73 insertions(+), 42 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
index d50210bb4d..f70201db65 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
@@ -50,7 +50,7 @@ public class SyncDataNodeClientPool {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(SyncDataNodeClientPool.class);
 
-  private static final int retryNum = 6;
+  private static final int DEFAULT_RETRY_NUM = 6;
 
   private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> clientManager;
 
@@ -64,41 +64,9 @@ public class SyncDataNodeClientPool {
   public TSStatus sendSyncRequestToDataNodeWithRetry(
       TEndPoint endPoint, Object req, DataNodeRequestType requestType) {
     Throwable lastException = null;
-    for (int retry = 0; retry < retryNum; retry++) {
+    for (int retry = 0; retry < DEFAULT_RETRY_NUM; retry++) {
       try (SyncDataNodeInternalServiceClient client = clientManager.borrowClient(endPoint)) {
-        switch (requestType) {
-          case INVALIDATE_PARTITION_CACHE:
-            return client.invalidatePartitionCache((TInvalidateCacheReq) req);
-          case INVALIDATE_SCHEMA_CACHE:
-            return client.invalidateSchemaCache((TInvalidateCacheReq) req);
-          case CREATE_SCHEMA_REGION:
-            return client.createSchemaRegion((TCreateSchemaRegionReq) req);
-          case CREATE_DATA_REGION:
-            return client.createDataRegion((TCreateDataRegionReq) req);
-          case DELETE_REGION:
-            return client.deleteRegion((TConsensusGroupId) req);
-          case INVALIDATE_PERMISSION_CACHE:
-            return client.invalidatePermissionCache((TInvalidatePermissionCacheReq) req);
-          case DISABLE_DATA_NODE:
-            return client.disableDataNode((TDisableDataNodeReq) req);
-          case STOP_DATA_NODE:
-            return client.stopDataNode();
-          case SET_SYSTEM_STATUS:
-            return client.setSystemStatus((String) req);
-          case UPDATE_TEMPLATE:
-            return client.updateTemplate((TUpdateTemplateReq) req);
-          case CREATE_NEW_REGION_PEER:
-            return client.createNewRegionPeer((TCreatePeerReq) req);
-          case ADD_REGION_PEER:
-            return client.addRegionPeer((TMaintainPeerReq) req);
-          case REMOVE_REGION_PEER:
-            return client.removeRegionPeer((TMaintainPeerReq) req);
-          case DELETE_OLD_REGION_PEER:
-            return client.deleteOldRegionPeer((TMaintainPeerReq) req);
-          default:
-            return RpcUtils.getStatus(
-                TSStatusCode.EXECUTE_STATEMENT_ERROR, "Unknown request type: " + requestType);
-        }
+        return executeSyncRequest(requestType, client, req);
       } catch (TException | IOException e) {
         lastException = e;
         LOGGER.warn(
@@ -115,6 +83,66 @@ public class SyncDataNodeClientPool {
         .setMessage("All retry failed due to: " + lastException.getMessage());
   }
 
+  public TSStatus sendSyncRequestToDataNodeWithGivenRetry(
+          TEndPoint endPoint, Object req, DataNodeRequestType requestType, int retryNum) {
+    Throwable lastException = new TException();
+    for (int retry = 0; retry < retryNum; retry++) {
+      try (SyncDataNodeInternalServiceClient client = clientManager.borrowClient(endPoint)) {
+        return executeSyncRequest(requestType, client, req);
+      } catch (TException | IOException e) {
+        lastException = e;
+        LOGGER.warn(
+                "{} failed on DataNode {}, because {}, retrying {}...",
+                requestType,
+                endPoint,
+                e.getMessage(),
+                retry);
+        doRetryWait(retry);
+      }
+    }
+    LOGGER.error("{} failed on DataNode {}", requestType, endPoint, lastException);
+    return new TSStatus(TSStatusCode.ALL_RETRY_FAILED.getStatusCode())
+            .setMessage("All retry failed due to: " + lastException.getMessage());
+  }
+
+  private TSStatus executeSyncRequest(DataNodeRequestType requestType,
+                                      SyncDataNodeInternalServiceClient client,
+                                      Object req) throws TException {
+    switch (requestType) {
+      case INVALIDATE_PARTITION_CACHE:
+        return client.invalidatePartitionCache((TInvalidateCacheReq) req);
+      case INVALIDATE_SCHEMA_CACHE:
+        return client.invalidateSchemaCache((TInvalidateCacheReq) req);
+      case CREATE_SCHEMA_REGION:
+        return client.createSchemaRegion((TCreateSchemaRegionReq) req);
+      case CREATE_DATA_REGION:
+        return client.createDataRegion((TCreateDataRegionReq) req);
+      case DELETE_REGION:
+        return client.deleteRegion((TConsensusGroupId) req);
+      case INVALIDATE_PERMISSION_CACHE:
+        return client.invalidatePermissionCache((TInvalidatePermissionCacheReq) req);
+      case DISABLE_DATA_NODE:
+        return client.disableDataNode((TDisableDataNodeReq) req);
+      case STOP_DATA_NODE:
+        return client.stopDataNode();
+      case SET_SYSTEM_STATUS:
+        return client.setSystemStatus((String) req);
+      case UPDATE_TEMPLATE:
+        return client.updateTemplate((TUpdateTemplateReq) req);
+      case CREATE_NEW_REGION_PEER:
+        return client.createNewRegionPeer((TCreatePeerReq) req);
+      case ADD_REGION_PEER:
+        return client.addRegionPeer((TMaintainPeerReq) req);
+      case REMOVE_REGION_PEER:
+        return client.removeRegionPeer((TMaintainPeerReq) req);
+      case DELETE_OLD_REGION_PEER:
+        return client.deleteOldRegionPeer((TMaintainPeerReq) req);
+      default:
+        return RpcUtils.getStatus(
+                TSStatusCode.EXECUTE_STATEMENT_ERROR, "Unknown request type: " + requestType);
+    }
+  }
+
   private void doRetryWait(int retryNum) {
     try {
       TimeUnit.MILLISECONDS.sleep(100L * (long) Math.pow(2, retryNum));
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java
index ad2f4d23b3..cb6e0f22ca 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java
@@ -165,7 +165,7 @@ public class NodeInfo implements SnapshotProcessor {
    */
   public TSStatus removeDataNode(RemoveDataNodePlan req) {
     LOGGER.info(
-        "{}, There are {} data node in cluster before executed remove-datanode.sh",
+        "{}, There are {} data node in cluster before executed RemoveDataNodePlan",
         REMOVE_DATANODE_PROCESS,
         registeredDataNodes.size());
 
@@ -181,7 +181,7 @@ public class NodeInfo implements SnapshotProcessor {
       dataNodeInfoReadWriteLock.writeLock().unlock();
     }
     LOGGER.info(
-        "{}, There are {} data node in cluster after executed remove-datanode.sh",
+        "{}, There are {} data node in cluster after executed RemoveDataNodePlan",
         REMOVE_DATANODE_PROCESS,
         registeredDataNodes.size());
     return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
index 2e29166188..525d3fb783 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
@@ -412,14 +412,17 @@ public class DataNodeRemoveHandler {
    * @param dataNode old data node
    */
   public void stopDataNode(TDataNodeLocation dataNode) {
-    LOGGER.info("{}, Begin to stop DataNode {}", REMOVE_DATANODE_PROCESS, dataNode);
+    LOGGER.info(
+        "{}, Begin to stop DataNode and kill the DataNode process {}",
+        REMOVE_DATANODE_PROCESS,
+        dataNode);
     AsyncDataNodeClientPool.getInstance().resetClient(dataNode.getInternalEndPoint());
     TSStatus status =
         SyncDataNodeClientPool.getInstance()
-            .sendSyncRequestToDataNodeWithRetry(
-                dataNode.getInternalEndPoint(), dataNode, DataNodeRequestType.STOP_DATA_NODE);
+            .sendSyncRequestToDataNodeWithGivenRetry(
+                dataNode.getInternalEndPoint(), dataNode, DataNodeRequestType.STOP_DATA_NODE, 2);
     configManager.getNodeManager().removeNodeCache(dataNode.getDataNodeId());
-    LOGGER.info("{}, Stop Data Node {} result: {}", REMOVE_DATANODE_PROCESS, dataNode, status);
+    LOGGER.info("{}, Stop Data Node result: {}, stoppedDataNode: {}", REMOVE_DATANODE_PROCESS, status, dataNode);
   }
 
   /**
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index ecade45ea8..cef5dda081 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -1482,7 +1482,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
       DataNode.getInstance().stop();
       status.setMessage("stop datanode succeed");
     } catch (Exception e) {
-      LOGGER.error("stop Data Node error", e);
+      LOGGER.error("Stop Data Node error", e);
       status.setCode(TSStatusCode.DATANODE_STOP_ERROR.getStatusCode());
       status.setMessage(e.getMessage());
     }