You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by er...@apache.org on 2023/01/12 06:12:13 UTC
[iotdb] 01/01: [IOTDB-5403] Refine IT: Add ensureNodeStatus to test status after starting or stopping
This is an automated email from the ASF dual-hosted git repository.
ericpai pushed a commit to branch optimize/iotdb-5403
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit e28625ce88abfa46125f731450c24ed48abdbd01
Author: ericpai <er...@hotmail.com>
AuthorDate: Thu Jan 12 14:11:56 2023 +0800
[IOTDB-5403] Refine IT: Add ensureNodeStatus to test status after starting or stopping
---
.../apache/iotdb/it/env/cluster/AbstractEnv.java | 64 ++++++++++++++++++++--
.../iotdb/it/env/cluster/AbstractNodeWrapper.java | 7 ---
.../iotdb/it/env/remote/RemoteServerEnv.java | 7 +++
.../java/org/apache/iotdb/itbase/env/BaseEnv.java | 13 +++++
.../apache/iotdb/itbase/env/BaseNodeWrapper.java | 2 -
.../it/cluster/IoTDBClusterNodeErrorStartUpIT.java | 60 +++++---------------
.../load/IoTDBClusterRegionLeaderBalancingIT.java | 28 ++--------
.../it/partition/IoTDBPartitionDurableIT.java | 24 +++-----
8 files changed, 106 insertions(+), 99 deletions(-)
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/AbstractEnv.java
index 3f1c32d58c..eade9e47d0 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/AbstractEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/AbstractEnv.java
@@ -18,17 +18,22 @@
*/
package org.apache.iotdb.it.env.cluster;
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.ClientPoolFactory;
import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
import org.apache.iotdb.isession.ISession;
import org.apache.iotdb.isession.SessionConfig;
+import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.framework.IoTDBTestLogger;
import org.apache.iotdb.itbase.env.BaseEnv;
+import org.apache.iotdb.itbase.env.BaseNodeWrapper;
import org.apache.iotdb.itbase.env.ClusterConfig;
import org.apache.iotdb.itbase.runtime.ClusterTestConnection;
import org.apache.iotdb.itbase.runtime.NodeConnection;
@@ -43,6 +48,7 @@ import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.session.Session;
import org.apache.iotdb.session.pool.SessionPool;
+import org.apache.thrift.TException;
import org.slf4j.Logger;
import java.io.File;
@@ -52,6 +58,7 @@ import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
@@ -288,7 +295,6 @@ public abstract class AbstractEnv implements BaseEnv {
Stream.concat(this.dataNodeWrapperList.stream(), this.configNodeWrapperList.stream())
.collect(Collectors.toList())) {
nodeWrapper.stop();
- nodeWrapper.waitingToShutDown();
nodeWrapper.destroyDir();
String lockPath = EnvUtils.getLockFilePath(nodeWrapper.getPort());
if (!new File(lockPath).delete()) {
@@ -418,6 +424,7 @@ public abstract class AbstractEnv implements BaseEnv {
this.testMethodName = testMethodName;
}
+ @Override
public void dumpTestJVMSnapshot() {
for (ConfigNodeWrapper configNodeWrapper : configNodeWrapperList) {
configNodeWrapper.dumpJVMSnapshot(testMethodName);
@@ -462,9 +469,6 @@ public abstract class AbstractEnv implements BaseEnv {
if (resp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
// Only the ConfigNodeClient who connects to the ConfigNode-leader
// will respond the SUCCESS_STATUS
- logger.info(
- "Successfully get connection to the leader ConfigNode: {}",
- configNodeWrapper.getIpAndPortString());
return client;
} else {
// Return client otherwise
@@ -656,10 +660,62 @@ public abstract class AbstractEnv implements BaseEnv {
dataNodeWrapperList.get(index).start();
}
+ @Override
public void shutdownDataNode(int index) {
dataNodeWrapperList.get(index).stop();
}
+ @Override
+ public void ensureNodeStatus(List<BaseNodeWrapper> nodes, List<NodeStatus> targetStatus)
+ throws IllegalStateException {
+ Throwable lastException = null;
+ for (int i = 0; i < 30; i++) {
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+ List<String> errorMessages = new ArrayList<>(nodes.size());
+ Map<String, Integer> nodeIds = new HashMap<>(nodes.size());
+ TShowClusterResp showClusterResp = client.showCluster();
+ for (TConfigNodeLocation node : showClusterResp.getConfigNodeList()) {
+ nodeIds.put(
+ node.getInternalEndPoint().getIp() + ":" + node.getInternalEndPoint().getPort(),
+ node.getConfigNodeId());
+ }
+ for (TDataNodeLocation node : showClusterResp.getDataNodeList()) {
+ nodeIds.put(
+ node.getClientRpcEndPoint().getIp() + ":" + node.getClientRpcEndPoint().getPort(),
+ node.getDataNodeId());
+ }
+ for (int j = 0; j < nodes.size(); j++) {
+ String endpoint = nodes.get(j).getIpAndPortString();
+ if (!nodeIds.containsKey(endpoint)) {
+ throw new IllegalStateException(
+ "The node " + nodes.get(j).getIpAndPortString() + " is not found!");
+ }
+ String status = showClusterResp.getNodeStatus().get(nodeIds.get(endpoint));
+ if (!targetStatus.get(j).getStatus().equals(status)) {
+ errorMessages.add(
+ String.format(
+ "Node %s is in status %s, but expected %s",
+ endpoint, status, targetStatus.get(j)));
+ }
+ }
+ if (errorMessages.isEmpty()) {
+ return;
+ } else {
+ lastException = new IllegalStateException(String.join(". ", errorMessages));
+ }
+ } catch (TException | ClientManagerException | IOException | InterruptedException e) {
+ lastException = e;
+ }
+ try {
+ TimeUnit.SECONDS.sleep(1);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ throw new IllegalStateException(lastException);
+ }
+
@Override
public int getMqttPort() {
int randomIndex = new Random(System.currentTimeMillis()).nextInt(dataNodeWrapperList.size());
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/AbstractNodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/AbstractNodeWrapper.java
index 6d3ed5ac7c..d462e3f19d 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/AbstractNodeWrapper.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/AbstractNodeWrapper.java
@@ -291,13 +291,6 @@ public abstract class AbstractNodeWrapper implements BaseNodeWrapper {
return;
}
this.instance.destroy();
- }
-
- @Override
- public void waitingToShutDown() {
- if (this.instance == null) {
- return;
- }
try {
if (!this.instance.waitFor(20, TimeUnit.SECONDS)) {
this.instance.destroyForcibly().waitFor(10, TimeUnit.SECONDS);
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteServerEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteServerEnv.java
index 4a5618fdc8..e1844a5beb 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteServerEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteServerEnv.java
@@ -23,12 +23,14 @@ import org.apache.iotdb.commons.client.ClientPoolFactory;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
import org.apache.iotdb.isession.ISession;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.env.cluster.ConfigNodeWrapper;
import org.apache.iotdb.it.env.cluster.DataNodeWrapper;
import org.apache.iotdb.itbase.env.BaseEnv;
+import org.apache.iotdb.itbase.env.BaseNodeWrapper;
import org.apache.iotdb.itbase.env.ClusterConfig;
import org.apache.iotdb.jdbc.Config;
import org.apache.iotdb.jdbc.Constant;
@@ -170,6 +172,11 @@ public class RemoteServerEnv implements BaseEnv {
throw new UnsupportedOperationException();
}
+ @Override
+ public void ensureNodeStatus(List<BaseNodeWrapper> nodes, List<NodeStatus> targetStatus) {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public ConfigNodeWrapper generateRandomConfigNodeWrapper() {
throw new UnsupportedOperationException();
diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
index 31dbf0d767..4132e7b6db 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.itbase.env;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
import org.apache.iotdb.isession.ISession;
import org.apache.iotdb.isession.SessionConfig;
@@ -203,6 +204,18 @@ public interface BaseEnv {
/** Shutdown an existed ConfigNode */
void shutdownConfigNode(int index);
+ /**
+ * Ensure all the nodes being in the corresponding status.
+ *
+ * @param nodes the nodes list to query.
+ * @param targetStatus the target {@link NodeStatus} of each node. It should have the same length
+ * with nodes.
+ * @throws IllegalStateException if there are some nodes not in the targetStatus after a period
+ * times of check.
+ */
+ void ensureNodeStatus(List<BaseNodeWrapper> nodes, List<NodeStatus> targetStatus)
+ throws IllegalStateException;
+
/**
* Get the {@link ConfigNodeWrapper} of the specified index.
*
diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseNodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseNodeWrapper.java
index 32bc15d386..e0ab3ebd0f 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseNodeWrapper.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseNodeWrapper.java
@@ -28,8 +28,6 @@ public interface BaseNodeWrapper {
void stop();
- void waitingToShutDown();
-
String getIp();
int getPort();
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeErrorStartUpIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeErrorStartUpIT.java
index b42e0c1071..8ec75c86d3 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeErrorStartUpIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeErrorStartUpIT.java
@@ -55,8 +55,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.Arrays;
@RunWith(IoTDBTestRunner.class)
@Category({ClusterIT.class})
@@ -286,29 +285,12 @@ public class IoTDBClusterNodeErrorStartUpIT {
// Shutdown and check
EnvFactory.getEnv().shutdownConfigNode(1);
EnvFactory.getEnv().shutdownDataNode(0);
- int retryTimes;
- for (retryTimes = 0; retryTimes < maxRetryTimes; retryTimes++) {
- AtomicInteger unknownCnt = new AtomicInteger(0);
- showClusterResp = client.showCluster();
- showClusterResp
- .getNodeStatus()
- .forEach(
- (nodeId, status) -> {
- if (NodeStatus.Unknown.equals(NodeStatus.parse(status))) {
- unknownCnt.getAndIncrement();
- }
- });
-
- if (unknownCnt.get() == testNodeNum - 2) {
- break;
- }
- TimeUnit.SECONDS.sleep(1);
- }
- logger.info(showClusterStatus(showClusterResp));
- if (retryTimes >= maxRetryTimes) {
- Assert.fail(
- "The running nodes are still insufficient after retrying " + maxRetryTimes + " times");
- }
+ EnvFactory.getEnv()
+ .ensureNodeStatus(
+ Arrays.asList(
+ EnvFactory.getEnv().getConfigNodeWrapper(1),
+ EnvFactory.getEnv().getDataNodeWrapper(0)),
+ Arrays.asList(NodeStatus.Unknown, NodeStatus.Unknown));
/* Restart and updatePeer */
// TODO: @Itami-sho, enable this test and delete it
@@ -338,28 +320,12 @@ public class IoTDBClusterNodeErrorStartUpIT {
// Restart and check
EnvFactory.getEnv().startConfigNode(1);
EnvFactory.getEnv().startDataNode(0);
- for (retryTimes = 0; retryTimes < maxRetryTimes; retryTimes++) {
- AtomicInteger runningCnt = new AtomicInteger(0);
- showClusterResp = client.showCluster();
- showClusterResp
- .getNodeStatus()
- .forEach(
- (nodeId, status) -> {
- if (NodeStatus.Running.equals(NodeStatus.parse(status))) {
- runningCnt.getAndIncrement();
- }
- });
-
- if (runningCnt.get() == testNodeNum) {
- break;
- }
- TimeUnit.SECONDS.sleep(1);
- }
- logger.info(showClusterStatus(showClusterResp));
- if (retryTimes >= maxRetryTimes) {
- Assert.fail(
- "The running nodes are still insufficient after retrying " + maxRetryTimes + " times");
- }
+ EnvFactory.getEnv()
+ .ensureNodeStatus(
+ Arrays.asList(
+ EnvFactory.getEnv().getConfigNodeWrapper(1),
+ EnvFactory.getEnv().getDataNodeWrapper(0)),
+ Arrays.asList(NodeStatus.Running, NodeStatus.Running));
}
}
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBClusterRegionLeaderBalancingIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBClusterRegionLeaderBalancingIT.java
index 42de1b6551..0339f3efc0 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBClusterRegionLeaderBalancingIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBClusterRegionLeaderBalancingIT.java
@@ -27,7 +27,6 @@ import org.apache.iotdb.commons.cluster.RegionRoleType;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
-import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
@@ -198,30 +197,11 @@ public class IoTDBClusterRegionLeaderBalancingIT {
Assert.assertTrue(isDistributionBalanced);
// Shutdown a DataNode
- boolean isDataNodeShutdown = false;
EnvFactory.getEnv().shutdownDataNode(0);
- for (int retry = 0; retry < retryNum; retry++) {
- AtomicInteger runningCnt = new AtomicInteger(0);
- AtomicInteger unknownCnt = new AtomicInteger(0);
- TShowDataNodesResp showDataNodesResp = client.showDataNodes();
- showDataNodesResp
- .getDataNodesInfoList()
- .forEach(
- dataNodeInfo -> {
- if (NodeStatus.Running.getStatus().equals(dataNodeInfo.getStatus())) {
- runningCnt.getAndIncrement();
- } else if (NodeStatus.Unknown.getStatus().equals(dataNodeInfo.getStatus())) {
- unknownCnt.getAndIncrement();
- }
- });
- if (runningCnt.get() == testDataNodeNum - 1 && unknownCnt.get() == 1) {
- isDataNodeShutdown = true;
- break;
- }
-
- TimeUnit.SECONDS.sleep(1);
- }
- Assert.assertTrue(isDataNodeShutdown);
+ EnvFactory.getEnv()
+ .ensureNodeStatus(
+ Collections.singletonList(EnvFactory.getEnv().getDataNodeWrapper(0)),
+ Collections.singletonList(NodeStatus.Unknown));
// Check leader distribution
isDistributionBalanced = false;
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionDurableIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionDurableIT.java
index 0a95840b71..0fec7296ef 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionDurableIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionDurableIT.java
@@ -57,6 +57,7 @@ import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@@ -402,6 +403,10 @@ public class IoTDBPartitionDurableIT {
public void testUnknownDataNode() throws Exception {
// Shutdown a DataNode, the ConfigNode should still be able to create RegionGroup
EnvFactory.getEnv().shutdownDataNode(testDataNodeId);
+ EnvFactory.getEnv()
+ .ensureNodeStatus(
+ Collections.singletonList(EnvFactory.getEnv().getDataNodeWrapper(testDataNodeId)),
+ Collections.singletonList(NodeStatus.Unknown));
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
@@ -551,21 +556,10 @@ public class IoTDBPartitionDurableIT {
Assert.assertEquals(unknownCnt * 2, runningCnt);
EnvFactory.getEnv().startDataNode(testDataNodeId);
- // Wait for heartbeat check
- while (true) {
- boolean containUnknown = false;
- showClusterResp = client.showCluster();
- for (TDataNodeLocation dataNodeLocation : showClusterResp.getDataNodeList()) {
- if (NodeStatus.Unknown.getStatus()
- .equals(showClusterResp.getNodeStatus().get(dataNodeLocation.getDataNodeId()))) {
- containUnknown = true;
- break;
- }
- }
- if (!containUnknown) {
- break;
- }
- }
+ EnvFactory.getEnv()
+ .ensureNodeStatus(
+ Collections.singletonList(EnvFactory.getEnv().getDataNodeWrapper(testDataNodeId)),
+ Collections.singletonList(NodeStatus.Running));
// All Regions should alive after the testDataNode is restarted
boolean allRunning = true;