You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2020/04/14 21:41:31 UTC

[helix] branch master updated: Fix unstable test TestRebalancePipeline.testMsgTriggeredRebalance() (#953)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 64c95ab  Fix unstable test TestRebalancePipeline.testMsgTriggeredRebalance() (#953)
64c95ab is described below

commit 64c95ab2aee8cb3fdf777e913413f83dcbc09a46
Author: Jiajun Wang <18...@users.noreply.github.com>
AuthorDate: Tue Apr 14 14:41:21 2020 -0700

    Fix unstable test TestRebalancePipeline.testMsgTriggeredRebalance() (#953)
    
    Remove all the hardcoded thread sleeps in the test case. Replaced with Verifiers.
---
 .../controller/stages/TestRebalancePipeline.java   | 71 +++++++++++++---------
 1 file changed, 41 insertions(+), 30 deletions(-)

diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
index e78b1d5..723185b 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
@@ -158,14 +158,19 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     ClusterControllerManager controller =
         new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
     controller.syncStart();
-    // round1: controller sends O->S to both node0 and node1
-    Thread.sleep(1000);
 
+    // round1: controller sends O->S to both node0 and node1
     Builder keyBuilder = accessor.keyBuilder();
-    List<String> messages = accessor.getChildNames(keyBuilder.messages("localhost_0"));
-    Assert.assertEquals(messages.size(), 1);
-    messages = accessor.getChildNames(keyBuilder.messages("localhost_1"));
-    Assert.assertEquals(messages.size(), 1);
+    Assert.assertTrue(TestHelper.verify(() -> {
+      for (LiveInstance liveInstance : liveInstances) {
+        List<String> messages =
+            accessor.getChildNames(keyBuilder.messages(liveInstance.getInstanceName()));
+        if (messages.size() < 1) {
+          return false;
+        }
+      }
+      return true;
+    }, 2000));
 
     // round2: node0 and node1 update current states but not removing messages
     // Since controller's rebalancer pipeline will GC pending messages after timeout, and both hosts
@@ -176,25 +181,37 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     setCurrentState(clusterName, "localhost_1", resourceName, resourceName + "_0", liveInstances.get(1).getEphemeralOwner(),
         "SLAVE", true);
 
-    // Controller has timeout > 1sec, so after 1s, controller should not have GCed message
-    Thread.sleep(1000);
-    Assert.assertEquals(accessor.getChildValues(keyBuilder.messages("localhost_0")).size(), 1);
-    Assert.assertEquals(accessor.getChildValues(keyBuilder.messages("localhost_1")).size(), 1);
+    // Controller has timeout > 1sec, so within 1s, controller should not have GCed message
+    Assert.assertTrue(msgPurgeDelay > 1000);
+    Assert.assertFalse(TestHelper.verify(() -> {
+      for (LiveInstance liveInstance : liveInstances) {
+        List<String> messages =
+            accessor.getChildNames(keyBuilder.messages(liveInstance.getInstanceName()));
+        if (messages.size() >= 1) {
+          return false;
+        }
+      }
+      return true;
+    }, 1000));
 
     // After another purge delay, controller should cleanup messages and continue to rebalance
     Thread.sleep(msgPurgeDelay);
     // Manually trigger another rebalance by touching current state
-    setCurrentState(clusterName, "localhost_0", resourceName, resourceName + "_0", liveInstances.get(0).getEphemeralOwner(),
-        "SLAVE");
-    Thread.sleep(1000);
-
-    List<Message> host0Msg = accessor.getChildValues(keyBuilder.messages("localhost_0"));
-    List<Message> host1Msg = accessor.getChildValues(keyBuilder.messages("localhost_1"));
-    List<Message> allMsgs = new ArrayList<>(host0Msg);
-    allMsgs.addAll(host1Msg);
-    Assert.assertEquals(allMsgs.size(), 1);
-    Assert.assertEquals(allMsgs.get(0).getToState(), "MASTER");
-    Assert.assertEquals(allMsgs.get(0).getFromState(), "SLAVE");
+    List<Message> allMsgs = new ArrayList<>();
+    setCurrentState(clusterName, "localhost_0", resourceName, resourceName + "_0",
+        liveInstances.get(0).getEphemeralOwner(), "SLAVE");
+    Assert.assertTrue(TestHelper.verify(() -> {
+      allMsgs.clear();
+      for (LiveInstance liveInstance : liveInstances) {
+        allMsgs
+            .addAll(accessor.getChildValues(keyBuilder.messages(liveInstance.getInstanceName())));
+      }
+      if (allMsgs.size() != 1 || !allMsgs.get(0).getToState().equals("MASTER") || !allMsgs.get(0)
+          .getFromState().equals("SLAVE")) {
+        return false;
+      }
+      return true;
+    }, 2000));
 
     // round3: node0 changes state to master, but failed to delete message,
     // controller will clean it up
@@ -204,8 +221,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     // touch current state to trigger rebalance
     setCurrentState(clusterName, "localhost_0", resourceName, resourceName + "_0", liveInstances.get(0).getEphemeralOwner(),
         "MASTER", false);
-    Thread.sleep(1000);
-    Assert.assertTrue(accessor.getChildNames(keyBuilder.messages("localhost_0")).isEmpty());
+    Assert.assertTrue(TestHelper.verify(() -> accessor.getChildNames(keyBuilder.messages("localhost_0")).isEmpty(), 2000));
 
     // round4: node0 has duplicated but valid message, i.e. there is a P2P message sent to it
     // due to error in the triggered pipeline, controller should remove duplicated message
@@ -216,18 +232,13 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     dupMsg.getRecord().setListFields(sourceMsg.getRecord().getListFields());
     dupMsg.getRecord().setMapFields(sourceMsg.getRecord().getMapFields());
     accessor.setProperty(dupMsg.getKey(accessor.keyBuilder(), dupMsg.getTgtName()), dupMsg);
-    Thread.sleep(1000);
-
-    messages = accessor.getChildNames(keyBuilder.messages("localhost_0"));
-    Assert.assertTrue(messages.isEmpty());
+    Assert.assertTrue(TestHelper.verify(() -> accessor.getChildNames(keyBuilder.messages("localhost_0")).isEmpty(), 1500));
 
     // round5: node0 has completely invalid message, controller should immediately delete it
     dupMsg.setFromState("SLAVE");
     dupMsg.setToState("OFFLINE");
     accessor.setProperty(dupMsg.getKey(accessor.keyBuilder(), dupMsg.getTgtName()), dupMsg);
-    Thread.sleep(1000);
-    messages = accessor.getChildNames(keyBuilder.messages("localhost_0"));
-    Assert.assertTrue(messages.isEmpty());
+    Assert.assertTrue(TestHelper.verify(() -> accessor.getChildNames(keyBuilder.messages("localhost_0")).isEmpty(), 1500));
 
     if (controller.isConnected()) {
       controller.syncStop();