You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2022/11/06 15:06:38 UTC

[iotdb] branch master updated: [IOTDB-4807] Check nodes' status at the INVALIDATE_CACHE state (#7895)

This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 96c860f876 [IOTDB-4807] Check nodes' status at the INVALIDATE_CACHE state (#7895)
96c860f876 is described below

commit 96c860f8768bb3a3e3bad9f3167f532151fc1859
Author: Itami Sho <42...@users.noreply.github.com>
AuthorDate: Sun Nov 6 23:06:32 2022 +0800

    [IOTDB-4807] Check nodes' status at the INVALIDATE_CACHE state (#7895)
---
 .../procedure/env/ConfigNodeProcedureEnv.java      | 60 ++++++++++++++--------
 .../procedure/store/ConfigProcedureStore.java      |  2 +-
 2 files changed, 41 insertions(+), 21 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index e96fe4269b..8724be24ec 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -81,6 +81,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantLock;
 
 public class ConfigNodeProcedureEnv {
@@ -143,30 +144,49 @@ public class ConfigNodeProcedureEnv {
    * @throws TException Thrift IOE
    */
   public boolean invalidateCache(String storageGroupName) throws IOException, TException {
-    List<TDataNodeConfiguration> allDataNodes =
-        configManager.getNodeManager().getRegisteredDataNodes();
+    NodeManager nodeManager = configManager.getNodeManager();
+    List<TDataNodeConfiguration> allDataNodes = nodeManager.getRegisteredDataNodes();
     TInvalidateCacheReq invalidateCacheReq = new TInvalidateCacheReq();
     invalidateCacheReq.setStorageGroup(true);
     invalidateCacheReq.setFullPath(storageGroupName);
     for (TDataNodeConfiguration dataNodeConfiguration : allDataNodes) {
-      final TSStatus invalidateSchemaStatus =
-          SyncDataNodeClientPool.getInstance()
-              .sendSyncRequestToDataNodeWithRetry(
-                  dataNodeConfiguration.getLocation().getInternalEndPoint(),
-                  invalidateCacheReq,
-                  DataNodeRequestType.INVALIDATE_SCHEMA_CACHE);
-      final TSStatus invalidatePartitionStatus =
-          SyncDataNodeClientPool.getInstance()
-              .sendSyncRequestToDataNodeWithRetry(
-                  dataNodeConfiguration.getLocation().getInternalEndPoint(),
-                  invalidateCacheReq,
-                  DataNodeRequestType.INVALIDATE_PARTITION_CACHE);
-      if (!verifySucceed(invalidatePartitionStatus, invalidateSchemaStatus)) {
-        LOG.error(
-            "Invalidate cache failed, invalidate partition cache status is {}, invalidate schema cache status is {}",
-            invalidatePartitionStatus,
-            invalidateSchemaStatus);
-        return false;
+      int dataNodeId = dataNodeConfiguration.getLocation().getDataNodeId();
+
+      // if the node is not alive, sleep 1 second and try again
+      NodeStatus nodeStatus = nodeManager.getNodeStatusByNodeId(dataNodeId);
+      if (nodeStatus == NodeStatus.Unknown) {
+        try {
+          TimeUnit.MILLISECONDS.sleep(1000);
+        } catch (InterruptedException e) {
+          LOG.error("Sleep failed in ConfigNodeProcedureEnv: ", e);
+        }
+        nodeStatus = nodeManager.getNodeStatusByNodeId(dataNodeId);
+      }
+
+      if (nodeStatus == NodeStatus.Running) {
+        final TSStatus invalidateSchemaStatus =
+            SyncDataNodeClientPool.getInstance()
+                .sendSyncRequestToDataNodeWithRetry(
+                    dataNodeConfiguration.getLocation().getInternalEndPoint(),
+                    invalidateCacheReq,
+                    DataNodeRequestType.INVALIDATE_SCHEMA_CACHE);
+        final TSStatus invalidatePartitionStatus =
+            SyncDataNodeClientPool.getInstance()
+                .sendSyncRequestToDataNodeWithRetry(
+                    dataNodeConfiguration.getLocation().getInternalEndPoint(),
+                    invalidateCacheReq,
+                    DataNodeRequestType.INVALIDATE_PARTITION_CACHE);
+        if (!verifySucceed(invalidatePartitionStatus, invalidateSchemaStatus)) {
+          LOG.error(
+              "Invalidate cache failed, invalidate partition cache status is {}, invalidate schema cache status is {}",
+              invalidatePartitionStatus,
+              invalidateSchemaStatus);
+          return false;
+        }
+      } else if (nodeStatus == NodeStatus.Unknown) {
+        LOG.warn(
+            "Invalidate cache failed, because DataNode {} is Unknown",
+            dataNodeConfiguration.getLocation().getInternalEndPoint());
       }
     }
     return true;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ConfigProcedureStore.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ConfigProcedureStore.java
index 2d88389e10..ef27ed492f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ConfigProcedureStore.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ConfigProcedureStore.java
@@ -36,7 +36,7 @@ import java.util.List;
 
 public class ConfigProcedureStore implements IProcedureStore {
 
-  private static final Logger LOG = LoggerFactory.getLogger(ProcedureStore.class);
+  private static final Logger LOG = LoggerFactory.getLogger(ConfigProcedureStore.class);
 
   private volatile boolean isRunning = false;
   private final ProcedureInfo procedureInfo;