You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by er...@apache.org on 2023/01/12 06:12:12 UTC

[iotdb] branch optimize/iotdb-5403 created (now e28625ce88)

This is an automated email from the ASF dual-hosted git repository.

ericpai pushed a change to branch optimize/iotdb-5403
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at e28625ce88 [IOTDB-5403] Refine IT: Add ensureNodeStatus to test status after starting or stopping

This branch includes the following new commits:

     new e28625ce88 [IOTDB-5403] Refine IT: Add ensureNodeStatus to test status after starting or stopping

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: [IOTDB-5403] Refine IT: Add ensureNodeStatus to test status after starting or stopping

Posted by er...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ericpai pushed a commit to branch optimize/iotdb-5403
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e28625ce88abfa46125f731450c24ed48abdbd01
Author: ericpai <er...@hotmail.com>
AuthorDate: Thu Jan 12 14:11:56 2023 +0800

    [IOTDB-5403] Refine IT: Add ensureNodeStatus to test status after starting or stopping
---
 .../apache/iotdb/it/env/cluster/AbstractEnv.java   | 64 ++++++++++++++++++++--
 .../iotdb/it/env/cluster/AbstractNodeWrapper.java  |  7 ---
 .../iotdb/it/env/remote/RemoteServerEnv.java       |  7 +++
 .../java/org/apache/iotdb/itbase/env/BaseEnv.java  | 13 +++++
 .../apache/iotdb/itbase/env/BaseNodeWrapper.java   |  2 -
 .../it/cluster/IoTDBClusterNodeErrorStartUpIT.java | 60 +++++---------------
 .../load/IoTDBClusterRegionLeaderBalancingIT.java  | 28 ++--------
 .../it/partition/IoTDBPartitionDurableIT.java      | 24 +++-----
 8 files changed, 106 insertions(+), 99 deletions(-)

diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/AbstractEnv.java
index 3f1c32d58c..eade9e47d0 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/AbstractEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/AbstractEnv.java
@@ -18,17 +18,22 @@
  */
 package org.apache.iotdb.it.env.cluster;
 
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.ClientPoolFactory;
 import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
 import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
 import org.apache.iotdb.isession.ISession;
 import org.apache.iotdb.isession.SessionConfig;
+import org.apache.iotdb.it.env.EnvFactory;
 import org.apache.iotdb.it.framework.IoTDBTestLogger;
 import org.apache.iotdb.itbase.env.BaseEnv;
+import org.apache.iotdb.itbase.env.BaseNodeWrapper;
 import org.apache.iotdb.itbase.env.ClusterConfig;
 import org.apache.iotdb.itbase.runtime.ClusterTestConnection;
 import org.apache.iotdb.itbase.runtime.NodeConnection;
@@ -43,6 +48,7 @@ import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.session.Session;
 import org.apache.iotdb.session.pool.SessionPool;
 
+import org.apache.thrift.TException;
 import org.slf4j.Logger;
 
 import java.io.File;
@@ -52,6 +58,7 @@ import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
@@ -288,7 +295,6 @@ public abstract class AbstractEnv implements BaseEnv {
         Stream.concat(this.dataNodeWrapperList.stream(), this.configNodeWrapperList.stream())
             .collect(Collectors.toList())) {
       nodeWrapper.stop();
-      nodeWrapper.waitingToShutDown();
       nodeWrapper.destroyDir();
       String lockPath = EnvUtils.getLockFilePath(nodeWrapper.getPort());
       if (!new File(lockPath).delete()) {
@@ -418,6 +424,7 @@ public abstract class AbstractEnv implements BaseEnv {
     this.testMethodName = testMethodName;
   }
 
+  @Override
   public void dumpTestJVMSnapshot() {
     for (ConfigNodeWrapper configNodeWrapper : configNodeWrapperList) {
       configNodeWrapper.dumpJVMSnapshot(testMethodName);
@@ -462,9 +469,6 @@ public abstract class AbstractEnv implements BaseEnv {
           if (resp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
             // Only the ConfigNodeClient who connects to the ConfigNode-leader
             // will respond the SUCCESS_STATUS
-            logger.info(
-                "Successfully get connection to the leader ConfigNode: {}",
-                configNodeWrapper.getIpAndPortString());
             return client;
           } else {
             // Return client otherwise
@@ -656,10 +660,62 @@ public abstract class AbstractEnv implements BaseEnv {
     dataNodeWrapperList.get(index).start();
   }
 
+  @Override
   public void shutdownDataNode(int index) {
     dataNodeWrapperList.get(index).stop();
   }
 
+  @Override
+  public void ensureNodeStatus(List<BaseNodeWrapper> nodes, List<NodeStatus> targetStatus)
+      throws IllegalStateException {
+    Throwable lastException = null;
+    for (int i = 0; i < 30; i++) {
+      try (SyncConfigNodeIServiceClient client =
+          (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+        List<String> errorMessages = new ArrayList<>(nodes.size());
+        Map<String, Integer> nodeIds = new HashMap<>(nodes.size());
+        TShowClusterResp showClusterResp = client.showCluster();
+        for (TConfigNodeLocation node : showClusterResp.getConfigNodeList()) {
+          nodeIds.put(
+              node.getInternalEndPoint().getIp() + ":" + node.getInternalEndPoint().getPort(),
+              node.getConfigNodeId());
+        }
+        for (TDataNodeLocation node : showClusterResp.getDataNodeList()) {
+          nodeIds.put(
+              node.getClientRpcEndPoint().getIp() + ":" + node.getClientRpcEndPoint().getPort(),
+              node.getDataNodeId());
+        }
+        for (int j = 0; j < nodes.size(); j++) {
+          String endpoint = nodes.get(j).getIpAndPortString();
+          if (!nodeIds.containsKey(endpoint)) {
+            throw new IllegalStateException(
+                "The node " + nodes.get(j).getIpAndPortString() + " is not found!");
+          }
+          String status = showClusterResp.getNodeStatus().get(nodeIds.get(endpoint));
+          if (!targetStatus.get(j).getStatus().equals(status)) {
+            errorMessages.add(
+                String.format(
+                    "Node %s is in status %s, but expected %s",
+                    endpoint, status, targetStatus.get(j)));
+          }
+        }
+        if (errorMessages.isEmpty()) {
+          return;
+        } else {
+          lastException = new IllegalStateException(String.join(". ", errorMessages));
+        }
+      } catch (TException | ClientManagerException | IOException | InterruptedException e) {
+        lastException = e;
+      }
+      try {
+        TimeUnit.SECONDS.sleep(1);
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    }
+    throw new IllegalStateException(lastException);
+  }
+
   @Override
   public int getMqttPort() {
     int randomIndex = new Random(System.currentTimeMillis()).nextInt(dataNodeWrapperList.size());
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/AbstractNodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/AbstractNodeWrapper.java
index 6d3ed5ac7c..d462e3f19d 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/AbstractNodeWrapper.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/AbstractNodeWrapper.java
@@ -291,13 +291,6 @@ public abstract class AbstractNodeWrapper implements BaseNodeWrapper {
       return;
     }
     this.instance.destroy();
-  }
-
-  @Override
-  public void waitingToShutDown() {
-    if (this.instance == null) {
-      return;
-    }
     try {
       if (!this.instance.waitFor(20, TimeUnit.SECONDS)) {
         this.instance.destroyForcibly().waitFor(10, TimeUnit.SECONDS);
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteServerEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteServerEnv.java
index 4a5618fdc8..e1844a5beb 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteServerEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteServerEnv.java
@@ -23,12 +23,14 @@ import org.apache.iotdb.commons.client.ClientPoolFactory;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
 import org.apache.iotdb.isession.ISession;
 import org.apache.iotdb.it.env.EnvFactory;
 import org.apache.iotdb.it.env.cluster.ConfigNodeWrapper;
 import org.apache.iotdb.it.env.cluster.DataNodeWrapper;
 import org.apache.iotdb.itbase.env.BaseEnv;
+import org.apache.iotdb.itbase.env.BaseNodeWrapper;
 import org.apache.iotdb.itbase.env.ClusterConfig;
 import org.apache.iotdb.jdbc.Config;
 import org.apache.iotdb.jdbc.Constant;
@@ -170,6 +172,11 @@ public class RemoteServerEnv implements BaseEnv {
     throw new UnsupportedOperationException();
   }
 
+  @Override
+  public void ensureNodeStatus(List<BaseNodeWrapper> nodes, List<NodeStatus> targetStatus) {
+    throw new UnsupportedOperationException();
+  }
+
   @Override
   public ConfigNodeWrapper generateRandomConfigNodeWrapper() {
     throw new UnsupportedOperationException();
diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
index 31dbf0d767..4132e7b6db 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.itbase.env;
 
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
 import org.apache.iotdb.isession.ISession;
 import org.apache.iotdb.isession.SessionConfig;
@@ -203,6 +204,18 @@ public interface BaseEnv {
   /** Shutdown an existed ConfigNode */
   void shutdownConfigNode(int index);
 
+  /**
+   * Ensure all the nodes being in the corresponding status.
+   *
+   * @param nodes the nodes list to query.
+   * @param targetStatus the target {@link NodeStatus} of each node. It should have the same length
+   *     with nodes.
+   * @throws IllegalStateException if there are some nodes not in the targetStatus after a period
+   *     times of check.
+   */
+  void ensureNodeStatus(List<BaseNodeWrapper> nodes, List<NodeStatus> targetStatus)
+      throws IllegalStateException;
+
   /**
    * Get the {@link ConfigNodeWrapper} of the specified index.
    *
diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseNodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseNodeWrapper.java
index 32bc15d386..e0ab3ebd0f 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseNodeWrapper.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseNodeWrapper.java
@@ -28,8 +28,6 @@ public interface BaseNodeWrapper {
 
   void stop();
 
-  void waitingToShutDown();
-
   String getIp();
 
   int getPort();
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeErrorStartUpIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeErrorStartUpIT.java
index b42e0c1071..8ec75c86d3 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeErrorStartUpIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeErrorStartUpIT.java
@@ -55,8 +55,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.Arrays;
 
 @RunWith(IoTDBTestRunner.class)
 @Category({ClusterIT.class})
@@ -286,29 +285,12 @@ public class IoTDBClusterNodeErrorStartUpIT {
       // Shutdown and check
       EnvFactory.getEnv().shutdownConfigNode(1);
       EnvFactory.getEnv().shutdownDataNode(0);
-      int retryTimes;
-      for (retryTimes = 0; retryTimes < maxRetryTimes; retryTimes++) {
-        AtomicInteger unknownCnt = new AtomicInteger(0);
-        showClusterResp = client.showCluster();
-        showClusterResp
-            .getNodeStatus()
-            .forEach(
-                (nodeId, status) -> {
-                  if (NodeStatus.Unknown.equals(NodeStatus.parse(status))) {
-                    unknownCnt.getAndIncrement();
-                  }
-                });
-
-        if (unknownCnt.get() == testNodeNum - 2) {
-          break;
-        }
-        TimeUnit.SECONDS.sleep(1);
-      }
-      logger.info(showClusterStatus(showClusterResp));
-      if (retryTimes >= maxRetryTimes) {
-        Assert.fail(
-            "The running nodes are still insufficient after retrying " + maxRetryTimes + " times");
-      }
+      EnvFactory.getEnv()
+          .ensureNodeStatus(
+              Arrays.asList(
+                  EnvFactory.getEnv().getConfigNodeWrapper(1),
+                  EnvFactory.getEnv().getDataNodeWrapper(0)),
+              Arrays.asList(NodeStatus.Unknown, NodeStatus.Unknown));
 
       /* Restart and updatePeer */
       // TODO: @Itami-sho, enable this test and delete it
@@ -338,28 +320,12 @@ public class IoTDBClusterNodeErrorStartUpIT {
       // Restart and check
       EnvFactory.getEnv().startConfigNode(1);
       EnvFactory.getEnv().startDataNode(0);
-      for (retryTimes = 0; retryTimes < maxRetryTimes; retryTimes++) {
-        AtomicInteger runningCnt = new AtomicInteger(0);
-        showClusterResp = client.showCluster();
-        showClusterResp
-            .getNodeStatus()
-            .forEach(
-                (nodeId, status) -> {
-                  if (NodeStatus.Running.equals(NodeStatus.parse(status))) {
-                    runningCnt.getAndIncrement();
-                  }
-                });
-
-        if (runningCnt.get() == testNodeNum) {
-          break;
-        }
-        TimeUnit.SECONDS.sleep(1);
-      }
-      logger.info(showClusterStatus(showClusterResp));
-      if (retryTimes >= maxRetryTimes) {
-        Assert.fail(
-            "The running nodes are still insufficient after retrying " + maxRetryTimes + " times");
-      }
+      EnvFactory.getEnv()
+          .ensureNodeStatus(
+              Arrays.asList(
+                  EnvFactory.getEnv().getConfigNodeWrapper(1),
+                  EnvFactory.getEnv().getDataNodeWrapper(0)),
+              Arrays.asList(NodeStatus.Running, NodeStatus.Running));
     }
   }
 
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBClusterRegionLeaderBalancingIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBClusterRegionLeaderBalancingIT.java
index 42de1b6551..0339f3efc0 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBClusterRegionLeaderBalancingIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBClusterRegionLeaderBalancingIT.java
@@ -27,7 +27,6 @@ import org.apache.iotdb.commons.cluster.RegionRoleType;
 import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
-import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
 import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
@@ -198,30 +197,11 @@ public class IoTDBClusterRegionLeaderBalancingIT {
       Assert.assertTrue(isDistributionBalanced);
 
       // Shutdown a DataNode
-      boolean isDataNodeShutdown = false;
       EnvFactory.getEnv().shutdownDataNode(0);
-      for (int retry = 0; retry < retryNum; retry++) {
-        AtomicInteger runningCnt = new AtomicInteger(0);
-        AtomicInteger unknownCnt = new AtomicInteger(0);
-        TShowDataNodesResp showDataNodesResp = client.showDataNodes();
-        showDataNodesResp
-            .getDataNodesInfoList()
-            .forEach(
-                dataNodeInfo -> {
-                  if (NodeStatus.Running.getStatus().equals(dataNodeInfo.getStatus())) {
-                    runningCnt.getAndIncrement();
-                  } else if (NodeStatus.Unknown.getStatus().equals(dataNodeInfo.getStatus())) {
-                    unknownCnt.getAndIncrement();
-                  }
-                });
-        if (runningCnt.get() == testDataNodeNum - 1 && unknownCnt.get() == 1) {
-          isDataNodeShutdown = true;
-          break;
-        }
-
-        TimeUnit.SECONDS.sleep(1);
-      }
-      Assert.assertTrue(isDataNodeShutdown);
+      EnvFactory.getEnv()
+          .ensureNodeStatus(
+              Collections.singletonList(EnvFactory.getEnv().getDataNodeWrapper(0)),
+              Collections.singletonList(NodeStatus.Unknown));
 
       // Check leader distribution
       isDistributionBalanced = false;
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionDurableIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionDurableIT.java
index 0a95840b71..0fec7296ef 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionDurableIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionDurableIT.java
@@ -57,6 +57,7 @@ import org.junit.runner.RunWith;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
@@ -402,6 +403,10 @@ public class IoTDBPartitionDurableIT {
   public void testUnknownDataNode() throws Exception {
     // Shutdown a DataNode, the ConfigNode should still be able to create RegionGroup
     EnvFactory.getEnv().shutdownDataNode(testDataNodeId);
+    EnvFactory.getEnv()
+        .ensureNodeStatus(
+            Collections.singletonList(EnvFactory.getEnv().getDataNodeWrapper(testDataNodeId)),
+            Collections.singletonList(NodeStatus.Unknown));
 
     try (SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
@@ -551,21 +556,10 @@ public class IoTDBPartitionDurableIT {
       Assert.assertEquals(unknownCnt * 2, runningCnt);
 
       EnvFactory.getEnv().startDataNode(testDataNodeId);
-      // Wait for heartbeat check
-      while (true) {
-        boolean containUnknown = false;
-        showClusterResp = client.showCluster();
-        for (TDataNodeLocation dataNodeLocation : showClusterResp.getDataNodeList()) {
-          if (NodeStatus.Unknown.getStatus()
-              .equals(showClusterResp.getNodeStatus().get(dataNodeLocation.getDataNodeId()))) {
-            containUnknown = true;
-            break;
-          }
-        }
-        if (!containUnknown) {
-          break;
-        }
-      }
+      EnvFactory.getEnv()
+          .ensureNodeStatus(
+              Collections.singletonList(EnvFactory.getEnv().getDataNodeWrapper(testDataNodeId)),
+              Collections.singletonList(NodeStatus.Running));
 
       // All Regions should alive after the testDataNode is restarted
       boolean allRunning = true;