You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by er...@apache.org on 2023/01/12 06:12:13 UTC

[iotdb] 01/01: [IOTDB-5403] Refine IT: Add ensureNodeStatus to test status after starting or stopping

This is an automated email from the ASF dual-hosted git repository.

ericpai pushed a commit to branch optimize/iotdb-5403
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e28625ce88abfa46125f731450c24ed48abdbd01
Author: ericpai <er...@hotmail.com>
AuthorDate: Thu Jan 12 14:11:56 2023 +0800

    [IOTDB-5403] Refine IT: Add ensureNodeStatus to test status after starting or stopping
---
 .../apache/iotdb/it/env/cluster/AbstractEnv.java   | 64 ++++++++++++++++++++--
 .../iotdb/it/env/cluster/AbstractNodeWrapper.java  |  7 ---
 .../iotdb/it/env/remote/RemoteServerEnv.java       |  7 +++
 .../java/org/apache/iotdb/itbase/env/BaseEnv.java  | 13 +++++
 .../apache/iotdb/itbase/env/BaseNodeWrapper.java   |  2 -
 .../it/cluster/IoTDBClusterNodeErrorStartUpIT.java | 60 +++++---------------
 .../load/IoTDBClusterRegionLeaderBalancingIT.java  | 28 ++--------
 .../it/partition/IoTDBPartitionDurableIT.java      | 24 +++-----
 8 files changed, 106 insertions(+), 99 deletions(-)

diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/AbstractEnv.java
index 3f1c32d58c..eade9e47d0 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/AbstractEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/AbstractEnv.java
@@ -18,17 +18,22 @@
  */
 package org.apache.iotdb.it.env.cluster;
 
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.ClientPoolFactory;
 import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
 import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
 import org.apache.iotdb.isession.ISession;
 import org.apache.iotdb.isession.SessionConfig;
+import org.apache.iotdb.it.env.EnvFactory;
 import org.apache.iotdb.it.framework.IoTDBTestLogger;
 import org.apache.iotdb.itbase.env.BaseEnv;
+import org.apache.iotdb.itbase.env.BaseNodeWrapper;
 import org.apache.iotdb.itbase.env.ClusterConfig;
 import org.apache.iotdb.itbase.runtime.ClusterTestConnection;
 import org.apache.iotdb.itbase.runtime.NodeConnection;
@@ -43,6 +48,7 @@ import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.session.Session;
 import org.apache.iotdb.session.pool.SessionPool;
 
+import org.apache.thrift.TException;
 import org.slf4j.Logger;
 
 import java.io.File;
@@ -52,6 +58,7 @@ import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
@@ -288,7 +295,6 @@ public abstract class AbstractEnv implements BaseEnv {
         Stream.concat(this.dataNodeWrapperList.stream(), this.configNodeWrapperList.stream())
             .collect(Collectors.toList())) {
       nodeWrapper.stop();
-      nodeWrapper.waitingToShutDown();
       nodeWrapper.destroyDir();
       String lockPath = EnvUtils.getLockFilePath(nodeWrapper.getPort());
       if (!new File(lockPath).delete()) {
@@ -418,6 +424,7 @@ public abstract class AbstractEnv implements BaseEnv {
     this.testMethodName = testMethodName;
   }
 
+  @Override
   public void dumpTestJVMSnapshot() {
     for (ConfigNodeWrapper configNodeWrapper : configNodeWrapperList) {
       configNodeWrapper.dumpJVMSnapshot(testMethodName);
@@ -462,9 +469,6 @@ public abstract class AbstractEnv implements BaseEnv {
           if (resp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
             // Only the ConfigNodeClient who connects to the ConfigNode-leader
             // will respond the SUCCESS_STATUS
-            logger.info(
-                "Successfully get connection to the leader ConfigNode: {}",
-                configNodeWrapper.getIpAndPortString());
             return client;
           } else {
             // Return client otherwise
@@ -656,10 +660,62 @@ public abstract class AbstractEnv implements BaseEnv {
     dataNodeWrapperList.get(index).start();
   }
 
+  @Override
   public void shutdownDataNode(int index) {
     dataNodeWrapperList.get(index).stop();
   }
 
+  @Override
+  public void ensureNodeStatus(List<BaseNodeWrapper> nodes, List<NodeStatus> targetStatus)
+      throws IllegalStateException {
+    Throwable lastException = null;
+    for (int i = 0; i < 30; i++) {
+      try (SyncConfigNodeIServiceClient client =
+          (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+        List<String> errorMessages = new ArrayList<>(nodes.size());
+        Map<String, Integer> nodeIds = new HashMap<>(nodes.size());
+        TShowClusterResp showClusterResp = client.showCluster();
+        for (TConfigNodeLocation node : showClusterResp.getConfigNodeList()) {
+          nodeIds.put(
+              node.getInternalEndPoint().getIp() + ":" + node.getInternalEndPoint().getPort(),
+              node.getConfigNodeId());
+        }
+        for (TDataNodeLocation node : showClusterResp.getDataNodeList()) {
+          nodeIds.put(
+              node.getClientRpcEndPoint().getIp() + ":" + node.getClientRpcEndPoint().getPort(),
+              node.getDataNodeId());
+        }
+        for (int j = 0; j < nodes.size(); j++) {
+          String endpoint = nodes.get(j).getIpAndPortString();
+          if (!nodeIds.containsKey(endpoint)) {
+            throw new IllegalStateException(
+                "The node " + nodes.get(j).getIpAndPortString() + " is not found!");
+          }
+          String status = showClusterResp.getNodeStatus().get(nodeIds.get(endpoint));
+          if (!targetStatus.get(j).getStatus().equals(status)) {
+            errorMessages.add(
+                String.format(
+                    "Node %s is in status %s, but expected %s",
+                    endpoint, status, targetStatus.get(j)));
+          }
+        }
+        if (errorMessages.isEmpty()) {
+          return;
+        } else {
+          lastException = new IllegalStateException(String.join(". ", errorMessages));
+        }
+      } catch (TException | ClientManagerException | IOException | InterruptedException e) {
+        lastException = e;
+      }
+      try {
+        TimeUnit.SECONDS.sleep(1);
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    }
+    throw new IllegalStateException(lastException);
+  }
+
   @Override
   public int getMqttPort() {
     int randomIndex = new Random(System.currentTimeMillis()).nextInt(dataNodeWrapperList.size());
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/AbstractNodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/AbstractNodeWrapper.java
index 6d3ed5ac7c..d462e3f19d 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/AbstractNodeWrapper.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/AbstractNodeWrapper.java
@@ -291,13 +291,6 @@ public abstract class AbstractNodeWrapper implements BaseNodeWrapper {
       return;
     }
     this.instance.destroy();
-  }
-
-  @Override
-  public void waitingToShutDown() {
-    if (this.instance == null) {
-      return;
-    }
     try {
       if (!this.instance.waitFor(20, TimeUnit.SECONDS)) {
         this.instance.destroyForcibly().waitFor(10, TimeUnit.SECONDS);
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteServerEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteServerEnv.java
index 4a5618fdc8..e1844a5beb 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteServerEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteServerEnv.java
@@ -23,12 +23,14 @@ import org.apache.iotdb.commons.client.ClientPoolFactory;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
 import org.apache.iotdb.isession.ISession;
 import org.apache.iotdb.it.env.EnvFactory;
 import org.apache.iotdb.it.env.cluster.ConfigNodeWrapper;
 import org.apache.iotdb.it.env.cluster.DataNodeWrapper;
 import org.apache.iotdb.itbase.env.BaseEnv;
+import org.apache.iotdb.itbase.env.BaseNodeWrapper;
 import org.apache.iotdb.itbase.env.ClusterConfig;
 import org.apache.iotdb.jdbc.Config;
 import org.apache.iotdb.jdbc.Constant;
@@ -170,6 +172,11 @@ public class RemoteServerEnv implements BaseEnv {
     throw new UnsupportedOperationException();
   }
 
+  @Override
+  public void ensureNodeStatus(List<BaseNodeWrapper> nodes, List<NodeStatus> targetStatus) {
+    throw new UnsupportedOperationException();
+  }
+
   @Override
   public ConfigNodeWrapper generateRandomConfigNodeWrapper() {
     throw new UnsupportedOperationException();
diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
index 31dbf0d767..4132e7b6db 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.itbase.env;
 
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
 import org.apache.iotdb.isession.ISession;
 import org.apache.iotdb.isession.SessionConfig;
@@ -203,6 +204,18 @@ public interface BaseEnv {
   /** Shutdown an existed ConfigNode */
   void shutdownConfigNode(int index);
 
+  /**
+   * Ensure all the nodes being in the corresponding status.
+   *
+   * @param nodes the nodes list to query.
+   * @param targetStatus the target {@link NodeStatus} of each node. It should have the same length
+   *     with nodes.
+   * @throws IllegalStateException if there are some nodes not in the targetStatus after a period
+   *     times of check.
+   */
+  void ensureNodeStatus(List<BaseNodeWrapper> nodes, List<NodeStatus> targetStatus)
+      throws IllegalStateException;
+
   /**
    * Get the {@link ConfigNodeWrapper} of the specified index.
    *
diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseNodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseNodeWrapper.java
index 32bc15d386..e0ab3ebd0f 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseNodeWrapper.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseNodeWrapper.java
@@ -28,8 +28,6 @@ public interface BaseNodeWrapper {
 
   void stop();
 
-  void waitingToShutDown();
-
   String getIp();
 
   int getPort();
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeErrorStartUpIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeErrorStartUpIT.java
index b42e0c1071..8ec75c86d3 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeErrorStartUpIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeErrorStartUpIT.java
@@ -55,8 +55,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.Arrays;
 
 @RunWith(IoTDBTestRunner.class)
 @Category({ClusterIT.class})
@@ -286,29 +285,12 @@ public class IoTDBClusterNodeErrorStartUpIT {
       // Shutdown and check
       EnvFactory.getEnv().shutdownConfigNode(1);
       EnvFactory.getEnv().shutdownDataNode(0);
-      int retryTimes;
-      for (retryTimes = 0; retryTimes < maxRetryTimes; retryTimes++) {
-        AtomicInteger unknownCnt = new AtomicInteger(0);
-        showClusterResp = client.showCluster();
-        showClusterResp
-            .getNodeStatus()
-            .forEach(
-                (nodeId, status) -> {
-                  if (NodeStatus.Unknown.equals(NodeStatus.parse(status))) {
-                    unknownCnt.getAndIncrement();
-                  }
-                });
-
-        if (unknownCnt.get() == testNodeNum - 2) {
-          break;
-        }
-        TimeUnit.SECONDS.sleep(1);
-      }
-      logger.info(showClusterStatus(showClusterResp));
-      if (retryTimes >= maxRetryTimes) {
-        Assert.fail(
-            "The running nodes are still insufficient after retrying " + maxRetryTimes + " times");
-      }
+      EnvFactory.getEnv()
+          .ensureNodeStatus(
+              Arrays.asList(
+                  EnvFactory.getEnv().getConfigNodeWrapper(1),
+                  EnvFactory.getEnv().getDataNodeWrapper(0)),
+              Arrays.asList(NodeStatus.Unknown, NodeStatus.Unknown));
 
       /* Restart and updatePeer */
       // TODO: @Itami-sho, enable this test and delete it
@@ -338,28 +320,12 @@ public class IoTDBClusterNodeErrorStartUpIT {
       // Restart and check
       EnvFactory.getEnv().startConfigNode(1);
       EnvFactory.getEnv().startDataNode(0);
-      for (retryTimes = 0; retryTimes < maxRetryTimes; retryTimes++) {
-        AtomicInteger runningCnt = new AtomicInteger(0);
-        showClusterResp = client.showCluster();
-        showClusterResp
-            .getNodeStatus()
-            .forEach(
-                (nodeId, status) -> {
-                  if (NodeStatus.Running.equals(NodeStatus.parse(status))) {
-                    runningCnt.getAndIncrement();
-                  }
-                });
-
-        if (runningCnt.get() == testNodeNum) {
-          break;
-        }
-        TimeUnit.SECONDS.sleep(1);
-      }
-      logger.info(showClusterStatus(showClusterResp));
-      if (retryTimes >= maxRetryTimes) {
-        Assert.fail(
-            "The running nodes are still insufficient after retrying " + maxRetryTimes + " times");
-      }
+      EnvFactory.getEnv()
+          .ensureNodeStatus(
+              Arrays.asList(
+                  EnvFactory.getEnv().getConfigNodeWrapper(1),
+                  EnvFactory.getEnv().getDataNodeWrapper(0)),
+              Arrays.asList(NodeStatus.Running, NodeStatus.Running));
     }
   }
 
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBClusterRegionLeaderBalancingIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBClusterRegionLeaderBalancingIT.java
index 42de1b6551..0339f3efc0 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBClusterRegionLeaderBalancingIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBClusterRegionLeaderBalancingIT.java
@@ -27,7 +27,6 @@ import org.apache.iotdb.commons.cluster.RegionRoleType;
 import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
-import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
 import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
@@ -198,30 +197,11 @@ public class IoTDBClusterRegionLeaderBalancingIT {
       Assert.assertTrue(isDistributionBalanced);
 
       // Shutdown a DataNode
-      boolean isDataNodeShutdown = false;
       EnvFactory.getEnv().shutdownDataNode(0);
-      for (int retry = 0; retry < retryNum; retry++) {
-        AtomicInteger runningCnt = new AtomicInteger(0);
-        AtomicInteger unknownCnt = new AtomicInteger(0);
-        TShowDataNodesResp showDataNodesResp = client.showDataNodes();
-        showDataNodesResp
-            .getDataNodesInfoList()
-            .forEach(
-                dataNodeInfo -> {
-                  if (NodeStatus.Running.getStatus().equals(dataNodeInfo.getStatus())) {
-                    runningCnt.getAndIncrement();
-                  } else if (NodeStatus.Unknown.getStatus().equals(dataNodeInfo.getStatus())) {
-                    unknownCnt.getAndIncrement();
-                  }
-                });
-        if (runningCnt.get() == testDataNodeNum - 1 && unknownCnt.get() == 1) {
-          isDataNodeShutdown = true;
-          break;
-        }
-
-        TimeUnit.SECONDS.sleep(1);
-      }
-      Assert.assertTrue(isDataNodeShutdown);
+      EnvFactory.getEnv()
+          .ensureNodeStatus(
+              Collections.singletonList(EnvFactory.getEnv().getDataNodeWrapper(0)),
+              Collections.singletonList(NodeStatus.Unknown));
 
       // Check leader distribution
       isDistributionBalanced = false;
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionDurableIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionDurableIT.java
index 0a95840b71..0fec7296ef 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionDurableIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionDurableIT.java
@@ -57,6 +57,7 @@ import org.junit.runner.RunWith;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
@@ -402,6 +403,10 @@ public class IoTDBPartitionDurableIT {
   public void testUnknownDataNode() throws Exception {
     // Shutdown a DataNode, the ConfigNode should still be able to create RegionGroup
     EnvFactory.getEnv().shutdownDataNode(testDataNodeId);
+    EnvFactory.getEnv()
+        .ensureNodeStatus(
+            Collections.singletonList(EnvFactory.getEnv().getDataNodeWrapper(testDataNodeId)),
+            Collections.singletonList(NodeStatus.Unknown));
 
     try (SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
@@ -551,21 +556,10 @@ public class IoTDBPartitionDurableIT {
       Assert.assertEquals(unknownCnt * 2, runningCnt);
 
       EnvFactory.getEnv().startDataNode(testDataNodeId);
-      // Wait for heartbeat check
-      while (true) {
-        boolean containUnknown = false;
-        showClusterResp = client.showCluster();
-        for (TDataNodeLocation dataNodeLocation : showClusterResp.getDataNodeList()) {
-          if (NodeStatus.Unknown.getStatus()
-              .equals(showClusterResp.getNodeStatus().get(dataNodeLocation.getDataNodeId()))) {
-            containUnknown = true;
-            break;
-          }
-        }
-        if (!containUnknown) {
-          break;
-        }
-      }
+      EnvFactory.getEnv()
+          .ensureNodeStatus(
+              Collections.singletonList(EnvFactory.getEnv().getDataNodeWrapper(testDataNodeId)),
+              Collections.singletonList(NodeStatus.Running));
 
       // All Regions should alive after the testDataNode is restarted
       boolean allRunning = true;