You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2021/02/02 22:43:47 UTC

[helix] branch master updated: Use WAGED rebalancer as the default rebalancer for the controller resources in the super cluster. (#1619)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 282c469  Use WAGED rebalancer as the default rebalancer for the controller resources in the super cluster. (#1619)
282c469 is described below

commit 282c4690088fd05b38172a95da7473374f79b83b
Author: Jiajun Wang <jj...@linkedin.com>
AuthorDate: Tue Feb 2 14:43:34 2021 -0800

    Use WAGED rebalancer as the default rebalancer for the controller resources in the super cluster. (#1619)
    
    Use WAGED rebalancer as the default rebalancer for the controller resources in the supercluster.
    
    Regarding the test change, add a workaround for issue #1617. Note this issue won't impact the product since one JVM shall not run more than one GenericHelixController.
---
 .../org/apache/helix/manager/zk/ZKHelixAdmin.java  |  9 +--
 .../apache/helix/integration/TestAddClusterV2.java | 11 +++-
 .../org/apache/helix/tools/TestHelixAdminCli.java  | 66 ++++++++++++----------
 3 files changed, 51 insertions(+), 35 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index df5abe4..62840b6 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -1266,11 +1266,10 @@ public class ZKHelixAdmin implements HelixAdmin {
     idealState.setNumPartitions(1);
     idealState.setStateModelDefRef("LeaderStandby");
     idealState.setRebalanceMode(RebalanceMode.FULL_AUTO);
-    idealState.setRebalancerClassName(DelayedAutoRebalancer.class.getName());
-    idealState.setRebalanceStrategy(CrushEdRebalanceStrategy.class.getName());
+    idealState.setRebalancerClassName(WagedRebalancer.class.getName());
     // TODO: Give user an option, say from RestAPI to config the number of replicas.
     idealState.setReplicas(Integer.toString(DEFAULT_SUPERCLUSTER_REPLICA));
-    idealState.getRecord().setListField(clusterName, new ArrayList<String>());
+    idealState.getRecord().setListField(clusterName, new ArrayList<>());
 
     List<String> controllers = getInstancesInCluster(grandCluster);
     if (controllers.size() == 0) {
@@ -1278,10 +1277,12 @@ public class ZKHelixAdmin implements HelixAdmin {
     }
 
     ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(grandCluster, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
+        new ZKHelixDataAccessor(grandCluster, new ZkBaseDataAccessor<>(_zkClient));
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 
     accessor.setProperty(keyBuilder.idealStates(idealState.getResourceName()), idealState);
+    LOG.info("Cluster {} has been added to grand cluster {} with rebalance configuration {}.",
+        clusterName, grandCluster, idealState.getRecord().getSimpleFields().toString());
   }
 
   @Override
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAddClusterV2.java b/helix-core/src/test/java/org/apache/helix/integration/TestAddClusterV2.java
index cf4c580..d1dfee8 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestAddClusterV2.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAddClusterV2.java
@@ -21,10 +21,13 @@ package org.apache.helix.integration;
 
 import java.util.Date;
 
+import org.apache.helix.HelixAdmin;
 import org.apache.helix.TestHelper;
 import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
 import org.apache.helix.integration.manager.ClusterDistributedController;
 import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.IdealState;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
 import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
@@ -90,7 +93,13 @@ public class TestAddClusterV2 extends ZkTestBase {
 
   @Test
   public void Test() {
-
+    // Verify the super cluster resources are all rebalanced by the WAGED rebalancer.
+    HelixAdmin admin = _gSetupTool.getClusterManagementTool();
+    for (String clusterName : admin.getResourcesInCluster(CONTROLLER_CLUSTER)) {
+      IdealState is = _gSetupTool.getClusterManagementTool()
+          .getResourceIdealState(CONTROLLER_CLUSTER, clusterName);
+      Assert.assertEquals(is.getRebalancerClassName(), WagedRebalancer.class.getName());
+    }
   }
 
   @AfterClass
diff --git a/helix-core/src/test/java/org/apache/helix/tools/TestHelixAdminCli.java b/helix-core/src/test/java/org/apache/helix/tools/TestHelixAdminCli.java
index 2a9ec33..ae92437 100644
--- a/helix-core/src/test/java/org/apache/helix/tools/TestHelixAdminCli.java
+++ b/helix-core/src/test/java/org/apache/helix/tools/TestHelixAdminCli.java
@@ -27,10 +27,9 @@ import java.util.HashSet;
 import java.util.Set;
 
 import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.TestHelper;
-import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.common.ZkTestBase;
 import org.apache.helix.integration.manager.ClusterDistributedController;
 import org.apache.helix.integration.manager.MockParticipantManager;
@@ -43,6 +42,8 @@ import org.apache.helix.model.LiveInstance;
 import org.apache.helix.store.ZNRecordJsonSerializer;
 import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
 import org.apache.helix.tools.ClusterStateVerifier.MasterNbInExtViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.testng.Assert;
 import org.testng.ITestContext;
 import org.testng.annotations.AfterMethod;
@@ -250,10 +251,7 @@ public class TestHelixAdminCli extends ZkTestBase {
       // OK
     }
 
-    command =
-        "-zkSvr localhost:2183 -activateCluster " + clusterName + " " + grandClusterName + " true";
-    ClusterSetup.processCommandLineArgs(command.split("\\s+"));
-    Thread.sleep(500);
+    activateCluster();
 
     // drop a running cluster
     command = "-zkSvr localhost:2183 -dropCluster " + clusterName;
@@ -318,10 +316,7 @@ public class TestHelixAdminCli extends ZkTestBase {
     MockParticipantManager[] participants = new MockParticipantManager[n];
     ClusterDistributedController[] controllers = new ClusterDistributedController[2];
     setupCluster(clusterName, grandClusterName, n, participants, controllers);
-    String command =
-        "-zkSvr " + ZK_ADDR + " -activateCluster " + clusterName + " " + grandClusterName + " true";
-    ClusterSetup.processCommandLineArgs(command.split("\\s+"));
-    Thread.sleep(500);
+    activateCluster();
 
     // save ideal state
     BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<>(_gZkClient);
@@ -339,7 +334,7 @@ public class TestHelixAdminCli extends ZkTestBase {
     pw.write(new String(serializer.serialize(idealState.getRecord())));
     pw.close();
 
-    command = "-zkSvr " + ZK_ADDR + " -dropResource " + clusterName + " db_11 ";
+    String command = "-zkSvr " + ZK_ADDR + " -dropResource " + clusterName + " db_11 ";
     ClusterSetup.processCommandLineArgs(command.split("\\s+"));
 
     boolean verifyResult = ClusterStateVerifier
@@ -420,13 +415,10 @@ public class TestHelixAdminCli extends ZkTestBase {
     MockParticipantManager[] participants = new MockParticipantManager[n];
     ClusterDistributedController[] controllers = new ClusterDistributedController[2];
     setupCluster(clusterName, grandClusterName, n, participants, controllers);
-    String command =
-        "-zkSvr " + ZK_ADDR + " -activateCluster " + clusterName + " " + grandClusterName + " true";
-    ClusterSetup.processCommandLineArgs(command.split("\\s+"));
-    Thread.sleep(500);
+    activateCluster();
 
     // drop node should fail if the node is not disabled
-    command = "-zkSvr " + ZK_ADDR + " -dropNode " + clusterName + " localhost:1232";
+    String command = "-zkSvr " + ZK_ADDR + " -dropNode " + clusterName + " localhost:1232";
     try {
       ClusterSetup.processCommandLineArgs(command.split("\\s+"));
       Assert.fail("dropNode should fail since the node is not disabled");
@@ -503,12 +495,9 @@ public class TestHelixAdminCli extends ZkTestBase {
     MockParticipantManager[] participants = new MockParticipantManager[n];
     ClusterDistributedController[] controllers = new ClusterDistributedController[2];
     setupCluster(clusterName, grandClusterName, n, participants, controllers);
-    String command =
-        "-zkSvr " + ZK_ADDR + " -activateCluster " + clusterName + " " + grandClusterName + " true";
-    ClusterSetup.processCommandLineArgs(command.split("\\s+"));
-    Thread.sleep(500);
+    activateCluster();
 
-    command = "-zkSvr " + ZK_ADDR + " -addNode " + clusterName
+    String command = "-zkSvr " + ZK_ADDR + " -addNode " + clusterName
         + " localhost:12331;localhost:12341;localhost:12351;localhost:12361";
     ClusterSetup.processCommandLineArgs(command.split("\\s+"));
 
@@ -566,21 +555,19 @@ public class TestHelixAdminCli extends ZkTestBase {
     MockParticipantManager[] participants = new MockParticipantManager[n];
     ClusterDistributedController[] controllers = new ClusterDistributedController[2];
     setupCluster(clusterName, grandClusterName, n, participants, controllers);
-    String command =
-        "-zkSvr " + ZK_ADDR + " -activateCluster " + clusterName + " " + grandClusterName + " true";
-    ClusterSetup.processCommandLineArgs(command.split("\\s+"));
+    activateCluster();
     // wait till grand_cluster converge
-    BestPossibleExternalViewVerifier verifier = new BestPossibleExternalViewVerifier.Builder(grandClusterName)
-        .setZkClient(_gZkClient)
-        .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
-        .build();
+    BestPossibleExternalViewVerifier verifier =
+        new BestPossibleExternalViewVerifier.Builder(grandClusterName).setZkClient(_gZkClient)
+            .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME).build();
 
     boolean result = verifier.verifyByPolling();
     Assert.assertTrue(result, "grand cluster not converging.");
 
     // deactivate cluster
-    command = "-zkSvr " + ZK_ADDR + " -activateCluster " + clusterName + " " + grandClusterName
-        + " false";
+    String command =
+        "-zkSvr " + ZK_ADDR + " -activateCluster " + clusterName + " " + grandClusterName
+            + " false";
     ClusterSetup.processCommandLineArgs(command.split("\\s+"));
 
     BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<>(_gZkClient);
@@ -728,4 +715,23 @@ public class TestHelixAdminCli extends ZkTestBase {
     command = "-zkSvr " + ZK_ADDR + " -rebalance " + clusterName + " db_11 2 -key alias";
     ClusterSetup.processCommandLineArgs(command.split("\\s+"));
   }
+
+  private void activateCluster() throws Exception {
+    String command =
+        "-zkSvr localhost:2183 -activateCluster " + clusterName + " " + grandClusterName + " true";
+    ClusterSetup.processCommandLineArgs(command.split("\\s+"));
+
+    HelixAdmin admin = _gSetupTool.getClusterManagementTool();
+    TestHelper.verify(() -> {
+      if (admin.getResourceExternalView(grandClusterName, clusterName) == null) {
+        // TODO: Remove the following logic once https://github.com/apache/helix/issues/1617 is fixed.
+        // TODO: For now, we may need to touch the IdealState to trigger a new rebalance since the test
+        // TODO: is running multiple GenericHelixController instances in one JVM.
+        IdealState is = admin.getResourceIdealState(grandClusterName, clusterName);
+        admin.setResourceIdealState(grandClusterName, clusterName, is);
+        return false;
+      }
+      return true;
+    }, TestHelper.WAIT_DURATION);
+  }
 }