You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2020/04/14 21:41:31 UTC
[helix] branch master updated: Fix unstable test
TestRebalancePipeline.testMsgTriggeredRebalance() (#953)
This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 64c95ab Fix unstable test TestRebalancePipeline.testMsgTriggeredRebalance() (#953)
64c95ab is described below
commit 64c95ab2aee8cb3fdf777e913413f83dcbc09a46
Author: Jiajun Wang <18...@users.noreply.github.com>
AuthorDate: Tue Apr 14 14:41:21 2020 -0700
Fix unstable test TestRebalancePipeline.testMsgTriggeredRebalance() (#953)
Remove all the hardcoded thread sleeps in the test case. Replaced with Verifiers.
---
.../controller/stages/TestRebalancePipeline.java | 71 +++++++++++++---------
1 file changed, 41 insertions(+), 30 deletions(-)
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
index e78b1d5..723185b 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
@@ -158,14 +158,19 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
ClusterControllerManager controller =
new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
controller.syncStart();
- // round1: controller sends O->S to both node0 and node1
- Thread.sleep(1000);
+ // round1: controller sends O->S to both node0 and node1
Builder keyBuilder = accessor.keyBuilder();
- List<String> messages = accessor.getChildNames(keyBuilder.messages("localhost_0"));
- Assert.assertEquals(messages.size(), 1);
- messages = accessor.getChildNames(keyBuilder.messages("localhost_1"));
- Assert.assertEquals(messages.size(), 1);
+ Assert.assertTrue(TestHelper.verify(() -> {
+ for (LiveInstance liveInstance : liveInstances) {
+ List<String> messages =
+ accessor.getChildNames(keyBuilder.messages(liveInstance.getInstanceName()));
+ if (messages.size() < 1) {
+ return false;
+ }
+ }
+ return true;
+ }, 2000));
// round2: node0 and node1 update current states but not removing messages
// Since controller's rebalancer pipeline will GC pending messages after timeout, and both hosts
@@ -176,25 +181,37 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
setCurrentState(clusterName, "localhost_1", resourceName, resourceName + "_0", liveInstances.get(1).getEphemeralOwner(),
"SLAVE", true);
- // Controller has timeout > 1sec, so after 1s, controller should not have GCed message
- Thread.sleep(1000);
- Assert.assertEquals(accessor.getChildValues(keyBuilder.messages("localhost_0")).size(), 1);
- Assert.assertEquals(accessor.getChildValues(keyBuilder.messages("localhost_1")).size(), 1);
+ // Controller has timeout > 1sec, so within 1s, controller should not have GCed message
+ Assert.assertTrue(msgPurgeDelay > 1000);
+ Assert.assertFalse(TestHelper.verify(() -> {
+ for (LiveInstance liveInstance : liveInstances) {
+ List<String> messages =
+ accessor.getChildNames(keyBuilder.messages(liveInstance.getInstanceName()));
+ if (messages.size() >= 1) {
+ return false;
+ }
+ }
+ return true;
+ }, 1000));
// After another purge delay, controller should cleanup messages and continue to rebalance
Thread.sleep(msgPurgeDelay);
// Manually trigger another rebalance by touching current state
- setCurrentState(clusterName, "localhost_0", resourceName, resourceName + "_0", liveInstances.get(0).getEphemeralOwner(),
- "SLAVE");
- Thread.sleep(1000);
-
- List<Message> host0Msg = accessor.getChildValues(keyBuilder.messages("localhost_0"));
- List<Message> host1Msg = accessor.getChildValues(keyBuilder.messages("localhost_1"));
- List<Message> allMsgs = new ArrayList<>(host0Msg);
- allMsgs.addAll(host1Msg);
- Assert.assertEquals(allMsgs.size(), 1);
- Assert.assertEquals(allMsgs.get(0).getToState(), "MASTER");
- Assert.assertEquals(allMsgs.get(0).getFromState(), "SLAVE");
+ List<Message> allMsgs = new ArrayList<>();
+ setCurrentState(clusterName, "localhost_0", resourceName, resourceName + "_0",
+ liveInstances.get(0).getEphemeralOwner(), "SLAVE");
+ Assert.assertTrue(TestHelper.verify(() -> {
+ allMsgs.clear();
+ for (LiveInstance liveInstance : liveInstances) {
+ allMsgs
+ .addAll(accessor.getChildValues(keyBuilder.messages(liveInstance.getInstanceName())));
+ }
+ if (allMsgs.size() != 1 || !allMsgs.get(0).getToState().equals("MASTER") || !allMsgs.get(0)
+ .getFromState().equals("SLAVE")) {
+ return false;
+ }
+ return true;
+ }, 2000));
// round3: node0 changes state to master, but failed to delete message,
// controller will clean it up
@@ -204,8 +221,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
// touch current state to trigger rebalance
setCurrentState(clusterName, "localhost_0", resourceName, resourceName + "_0", liveInstances.get(0).getEphemeralOwner(),
"MASTER", false);
- Thread.sleep(1000);
- Assert.assertTrue(accessor.getChildNames(keyBuilder.messages("localhost_0")).isEmpty());
+ Assert.assertTrue(TestHelper.verify(() -> accessor.getChildNames(keyBuilder.messages("localhost_0")).isEmpty(), 2000));
// round4: node0 has duplicated but valid message, i.e. there is a P2P message sent to it
// due to error in the triggered pipeline, controller should remove duplicated message
@@ -216,18 +232,13 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
dupMsg.getRecord().setListFields(sourceMsg.getRecord().getListFields());
dupMsg.getRecord().setMapFields(sourceMsg.getRecord().getMapFields());
accessor.setProperty(dupMsg.getKey(accessor.keyBuilder(), dupMsg.getTgtName()), dupMsg);
- Thread.sleep(1000);
-
- messages = accessor.getChildNames(keyBuilder.messages("localhost_0"));
- Assert.assertTrue(messages.isEmpty());
+ Assert.assertTrue(TestHelper.verify(() -> accessor.getChildNames(keyBuilder.messages("localhost_0")).isEmpty(), 1500));
// round5: node0 has completely invalid message, controller should immediately delete it
dupMsg.setFromState("SLAVE");
dupMsg.setToState("OFFLINE");
accessor.setProperty(dupMsg.getKey(accessor.keyBuilder(), dupMsg.getTgtName()), dupMsg);
- Thread.sleep(1000);
- messages = accessor.getChildNames(keyBuilder.messages("localhost_0"));
- Assert.assertTrue(messages.isEmpty());
+ Assert.assertTrue(TestHelper.verify(() -> accessor.getChildNames(keyBuilder.messages("localhost_0")).isEmpty(), 1500));
if (controller.isConnected()) {
controller.syncStop();