You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2022/11/06 15:06:38 UTC
[iotdb] branch master updated: [IOTDB-4807] Check nodes' status at the INVALIDATE_CACHE state (#7895)
This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 96c860f876 [IOTDB-4807] Check nodes' status at the INVALIDATE_CACHE state (#7895)
96c860f876 is described below
commit 96c860f8768bb3a3e3bad9f3167f532151fc1859
Author: Itami Sho <42...@users.noreply.github.com>
AuthorDate: Sun Nov 6 23:06:32 2022 +0800
[IOTDB-4807] Check nodes' status at the INVALIDATE_CACHE state (#7895)
---
.../procedure/env/ConfigNodeProcedureEnv.java | 60 ++++++++++++++--------
.../procedure/store/ConfigProcedureStore.java | 2 +-
2 files changed, 41 insertions(+), 21 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index e96fe4269b..8724be24ec 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -81,6 +81,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
public class ConfigNodeProcedureEnv {
@@ -143,30 +144,49 @@ public class ConfigNodeProcedureEnv {
* @throws TException Thrift IOE
*/
public boolean invalidateCache(String storageGroupName) throws IOException, TException {
- List<TDataNodeConfiguration> allDataNodes =
- configManager.getNodeManager().getRegisteredDataNodes();
+ NodeManager nodeManager = configManager.getNodeManager();
+ List<TDataNodeConfiguration> allDataNodes = nodeManager.getRegisteredDataNodes();
TInvalidateCacheReq invalidateCacheReq = new TInvalidateCacheReq();
invalidateCacheReq.setStorageGroup(true);
invalidateCacheReq.setFullPath(storageGroupName);
for (TDataNodeConfiguration dataNodeConfiguration : allDataNodes) {
- final TSStatus invalidateSchemaStatus =
- SyncDataNodeClientPool.getInstance()
- .sendSyncRequestToDataNodeWithRetry(
- dataNodeConfiguration.getLocation().getInternalEndPoint(),
- invalidateCacheReq,
- DataNodeRequestType.INVALIDATE_SCHEMA_CACHE);
- final TSStatus invalidatePartitionStatus =
- SyncDataNodeClientPool.getInstance()
- .sendSyncRequestToDataNodeWithRetry(
- dataNodeConfiguration.getLocation().getInternalEndPoint(),
- invalidateCacheReq,
- DataNodeRequestType.INVALIDATE_PARTITION_CACHE);
- if (!verifySucceed(invalidatePartitionStatus, invalidateSchemaStatus)) {
- LOG.error(
- "Invalidate cache failed, invalidate partition cache status is {}, invalidate schema cache status is {}",
- invalidatePartitionStatus,
- invalidateSchemaStatus);
- return false;
+ int dataNodeId = dataNodeConfiguration.getLocation().getDataNodeId();
+
+ // if the node is not alive, sleep 1 second and try again
+ NodeStatus nodeStatus = nodeManager.getNodeStatusByNodeId(dataNodeId);
+ if (nodeStatus == NodeStatus.Unknown) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(1000);
+ } catch (InterruptedException e) {
+ LOG.error("Sleep failed in ConfigNodeProcedureEnv: ", e);
+ }
+ nodeStatus = nodeManager.getNodeStatusByNodeId(dataNodeId);
+ }
+
+ if (nodeStatus == NodeStatus.Running) {
+ final TSStatus invalidateSchemaStatus =
+ SyncDataNodeClientPool.getInstance()
+ .sendSyncRequestToDataNodeWithRetry(
+ dataNodeConfiguration.getLocation().getInternalEndPoint(),
+ invalidateCacheReq,
+ DataNodeRequestType.INVALIDATE_SCHEMA_CACHE);
+ final TSStatus invalidatePartitionStatus =
+ SyncDataNodeClientPool.getInstance()
+ .sendSyncRequestToDataNodeWithRetry(
+ dataNodeConfiguration.getLocation().getInternalEndPoint(),
+ invalidateCacheReq,
+ DataNodeRequestType.INVALIDATE_PARTITION_CACHE);
+ if (!verifySucceed(invalidatePartitionStatus, invalidateSchemaStatus)) {
+ LOG.error(
+ "Invalidate cache failed, invalidate partition cache status is {}, invalidate schema cache status is {}",
+ invalidatePartitionStatus,
+ invalidateSchemaStatus);
+ return false;
+ }
+ } else if (nodeStatus == NodeStatus.Unknown) {
+ LOG.warn(
+ "Invalidate cache failed, because DataNode {} is Unknown",
+ dataNodeConfiguration.getLocation().getInternalEndPoint());
}
}
return true;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ConfigProcedureStore.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ConfigProcedureStore.java
index 2d88389e10..ef27ed492f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ConfigProcedureStore.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ConfigProcedureStore.java
@@ -36,7 +36,7 @@ import java.util.List;
public class ConfigProcedureStore implements IProcedureStore {
- private static final Logger LOG = LoggerFactory.getLogger(ProcedureStore.class);
+ private static final Logger LOG = LoggerFactory.getLogger(ConfigProcedureStore.class);
private volatile boolean isRunning = false;
private final ProcedureInfo procedureInfo;