You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2022/11/09 06:02:31 UTC

[iotdb] branch beyyes/master2 created (now a1ef119387)

This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a change to branch beyyes/master2
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at a1ef119387 add sendSyncRequestToDataNodeWithGivenRetry method

This branch includes the following new commits:

     new a1ef119387 add sendSyncRequestToDataNodeWithGivenRetry method

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: add sendSyncRequestToDataNodeWithGivenRetry method

Posted by ca...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/master2
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a1ef119387b6311d056399f965f4e5cb09c980ab
Author: Beyyes <cg...@foxmail.com>
AuthorDate: Wed Nov 9 14:02:12 2022 +0800

    add sendSyncRequestToDataNodeWithGivenRetry method
---
 .../client/sync/SyncDataNodeClientPool.java        | 98 ++++++++++++++--------
 .../confignode/persistence/node/NodeInfo.java      |  4 +-
 .../procedure/env/DataNodeRemoveHandler.java       | 11 ++-
 .../impl/DataNodeInternalRPCServiceImpl.java       |  2 +-
 4 files changed, 73 insertions(+), 42 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
index d50210bb4d..f70201db65 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
@@ -50,7 +50,7 @@ public class SyncDataNodeClientPool {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(SyncDataNodeClientPool.class);
 
-  private static final int retryNum = 6;
+  private static final int DEFAULT_RETRY_NUM = 6;
 
   private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> clientManager;
 
@@ -64,41 +64,9 @@ public class SyncDataNodeClientPool {
   public TSStatus sendSyncRequestToDataNodeWithRetry(
       TEndPoint endPoint, Object req, DataNodeRequestType requestType) {
     Throwable lastException = null;
-    for (int retry = 0; retry < retryNum; retry++) {
+    for (int retry = 0; retry < DEFAULT_RETRY_NUM; retry++) {
       try (SyncDataNodeInternalServiceClient client = clientManager.borrowClient(endPoint)) {
-        switch (requestType) {
-          case INVALIDATE_PARTITION_CACHE:
-            return client.invalidatePartitionCache((TInvalidateCacheReq) req);
-          case INVALIDATE_SCHEMA_CACHE:
-            return client.invalidateSchemaCache((TInvalidateCacheReq) req);
-          case CREATE_SCHEMA_REGION:
-            return client.createSchemaRegion((TCreateSchemaRegionReq) req);
-          case CREATE_DATA_REGION:
-            return client.createDataRegion((TCreateDataRegionReq) req);
-          case DELETE_REGION:
-            return client.deleteRegion((TConsensusGroupId) req);
-          case INVALIDATE_PERMISSION_CACHE:
-            return client.invalidatePermissionCache((TInvalidatePermissionCacheReq) req);
-          case DISABLE_DATA_NODE:
-            return client.disableDataNode((TDisableDataNodeReq) req);
-          case STOP_DATA_NODE:
-            return client.stopDataNode();
-          case SET_SYSTEM_STATUS:
-            return client.setSystemStatus((String) req);
-          case UPDATE_TEMPLATE:
-            return client.updateTemplate((TUpdateTemplateReq) req);
-          case CREATE_NEW_REGION_PEER:
-            return client.createNewRegionPeer((TCreatePeerReq) req);
-          case ADD_REGION_PEER:
-            return client.addRegionPeer((TMaintainPeerReq) req);
-          case REMOVE_REGION_PEER:
-            return client.removeRegionPeer((TMaintainPeerReq) req);
-          case DELETE_OLD_REGION_PEER:
-            return client.deleteOldRegionPeer((TMaintainPeerReq) req);
-          default:
-            return RpcUtils.getStatus(
-                TSStatusCode.EXECUTE_STATEMENT_ERROR, "Unknown request type: " + requestType);
-        }
+        return executeSyncRequest(requestType, client, req);
       } catch (TException | IOException e) {
         lastException = e;
         LOGGER.warn(
@@ -115,6 +83,66 @@ public class SyncDataNodeClientPool {
         .setMessage("All retry failed due to: " + lastException.getMessage());
   }
 
+  public TSStatus sendSyncRequestToDataNodeWithGivenRetry(
+          TEndPoint endPoint, Object req, DataNodeRequestType requestType, int retryNum) {
+    Throwable lastException = new TException();
+    for (int retry = 0; retry < retryNum; retry++) {
+      try (SyncDataNodeInternalServiceClient client = clientManager.borrowClient(endPoint)) {
+        return executeSyncRequest(requestType, client, req);
+      } catch (TException | IOException e) {
+        lastException = e;
+        LOGGER.warn(
+                "{} failed on DataNode {}, because {}, retrying {}...",
+                requestType,
+                endPoint,
+                e.getMessage(),
+                retry);
+        doRetryWait(retry);
+      }
+    }
+    LOGGER.error("{} failed on DataNode {}", requestType, endPoint, lastException);
+    return new TSStatus(TSStatusCode.ALL_RETRY_FAILED.getStatusCode())
+            .setMessage("All retry failed due to: " + lastException.getMessage());
+  }
+
+  private TSStatus executeSyncRequest(DataNodeRequestType requestType,
+                                      SyncDataNodeInternalServiceClient client,
+                                      Object req) throws TException {
+    switch (requestType) {
+      case INVALIDATE_PARTITION_CACHE:
+        return client.invalidatePartitionCache((TInvalidateCacheReq) req);
+      case INVALIDATE_SCHEMA_CACHE:
+        return client.invalidateSchemaCache((TInvalidateCacheReq) req);
+      case CREATE_SCHEMA_REGION:
+        return client.createSchemaRegion((TCreateSchemaRegionReq) req);
+      case CREATE_DATA_REGION:
+        return client.createDataRegion((TCreateDataRegionReq) req);
+      case DELETE_REGION:
+        return client.deleteRegion((TConsensusGroupId) req);
+      case INVALIDATE_PERMISSION_CACHE:
+        return client.invalidatePermissionCache((TInvalidatePermissionCacheReq) req);
+      case DISABLE_DATA_NODE:
+        return client.disableDataNode((TDisableDataNodeReq) req);
+      case STOP_DATA_NODE:
+        return client.stopDataNode();
+      case SET_SYSTEM_STATUS:
+        return client.setSystemStatus((String) req);
+      case UPDATE_TEMPLATE:
+        return client.updateTemplate((TUpdateTemplateReq) req);
+      case CREATE_NEW_REGION_PEER:
+        return client.createNewRegionPeer((TCreatePeerReq) req);
+      case ADD_REGION_PEER:
+        return client.addRegionPeer((TMaintainPeerReq) req);
+      case REMOVE_REGION_PEER:
+        return client.removeRegionPeer((TMaintainPeerReq) req);
+      case DELETE_OLD_REGION_PEER:
+        return client.deleteOldRegionPeer((TMaintainPeerReq) req);
+      default:
+        return RpcUtils.getStatus(
+                TSStatusCode.EXECUTE_STATEMENT_ERROR, "Unknown request type: " + requestType);
+    }
+  }
+
   private void doRetryWait(int retryNum) {
     try {
       TimeUnit.MILLISECONDS.sleep(100L * (long) Math.pow(2, retryNum));
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java
index ad2f4d23b3..cb6e0f22ca 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java
@@ -165,7 +165,7 @@ public class NodeInfo implements SnapshotProcessor {
    */
   public TSStatus removeDataNode(RemoveDataNodePlan req) {
     LOGGER.info(
-        "{}, There are {} data node in cluster before executed remove-datanode.sh",
+        "{}, There are {} data node in cluster before executed RemoveDataNodePlan",
         REMOVE_DATANODE_PROCESS,
         registeredDataNodes.size());
 
@@ -181,7 +181,7 @@ public class NodeInfo implements SnapshotProcessor {
       dataNodeInfoReadWriteLock.writeLock().unlock();
     }
     LOGGER.info(
-        "{}, There are {} data node in cluster after executed remove-datanode.sh",
+        "{}, There are {} data node in cluster after executed RemoveDataNodePlan",
         REMOVE_DATANODE_PROCESS,
         registeredDataNodes.size());
     return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
index 2e29166188..525d3fb783 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
@@ -412,14 +412,17 @@ public class DataNodeRemoveHandler {
    * @param dataNode old data node
    */
   public void stopDataNode(TDataNodeLocation dataNode) {
-    LOGGER.info("{}, Begin to stop DataNode {}", REMOVE_DATANODE_PROCESS, dataNode);
+    LOGGER.info(
+        "{}, Begin to stop DataNode and kill the DataNode process {}",
+        REMOVE_DATANODE_PROCESS,
+        dataNode);
     AsyncDataNodeClientPool.getInstance().resetClient(dataNode.getInternalEndPoint());
     TSStatus status =
         SyncDataNodeClientPool.getInstance()
-            .sendSyncRequestToDataNodeWithRetry(
-                dataNode.getInternalEndPoint(), dataNode, DataNodeRequestType.STOP_DATA_NODE);
+            .sendSyncRequestToDataNodeWithGivenRetry(
+                dataNode.getInternalEndPoint(), dataNode, DataNodeRequestType.STOP_DATA_NODE, 2);
     configManager.getNodeManager().removeNodeCache(dataNode.getDataNodeId());
-    LOGGER.info("{}, Stop Data Node {} result: {}", REMOVE_DATANODE_PROCESS, dataNode, status);
+    LOGGER.info("{}, Stop Data Node result: {}, stoppedDataNode: {}", REMOVE_DATANODE_PROCESS, status, dataNode);
   }
 
   /**
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index ecade45ea8..cef5dda081 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -1482,7 +1482,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
       DataNode.getInstance().stop();
       status.setMessage("stop datanode succeed");
     } catch (Exception e) {
-      LOGGER.error("stop Data Node error", e);
+      LOGGER.error("Stop Data Node error", e);
       status.setCode(TSStatusCode.DATANODE_STOP_ERROR.getStatusCode());
       status.setMessage(e.getMessage());
     }