You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2021/02/02 22:43:47 UTC
[helix] branch master updated: Use WAGED rebalancer as the default
rebalancer for the controller resources in the super cluster. (#1619)
This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 282c469 Use WAGED rebalancer as the default rebalancer for the controller resources in the super cluster. (#1619)
282c469 is described below
commit 282c4690088fd05b38172a95da7473374f79b83b
Author: Jiajun Wang <jj...@linkedin.com>
AuthorDate: Tue Feb 2 14:43:34 2021 -0800
Use WAGED rebalancer as the default rebalancer for the controller resources in the super cluster. (#1619)
Use WAGED rebalancer as the default rebalancer for the controller resources in the supercluster.
Regarding the test change, add a workaround for issue #1617. Note this issue won't impact the product since one JVM shall not run more than one GenericHelixController.
---
.../org/apache/helix/manager/zk/ZKHelixAdmin.java | 9 +--
.../apache/helix/integration/TestAddClusterV2.java | 11 +++-
.../org/apache/helix/tools/TestHelixAdminCli.java | 66 ++++++++++++----------
3 files changed, 51 insertions(+), 35 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index df5abe4..62840b6 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -1266,11 +1266,10 @@ public class ZKHelixAdmin implements HelixAdmin {
idealState.setNumPartitions(1);
idealState.setStateModelDefRef("LeaderStandby");
idealState.setRebalanceMode(RebalanceMode.FULL_AUTO);
- idealState.setRebalancerClassName(DelayedAutoRebalancer.class.getName());
- idealState.setRebalanceStrategy(CrushEdRebalanceStrategy.class.getName());
+ idealState.setRebalancerClassName(WagedRebalancer.class.getName());
// TODO: Give user an option, say from RestAPI to config the number of replicas.
idealState.setReplicas(Integer.toString(DEFAULT_SUPERCLUSTER_REPLICA));
- idealState.getRecord().setListField(clusterName, new ArrayList<String>());
+ idealState.getRecord().setListField(clusterName, new ArrayList<>());
List<String> controllers = getInstancesInCluster(grandCluster);
if (controllers.size() == 0) {
@@ -1278,10 +1277,12 @@ public class ZKHelixAdmin implements HelixAdmin {
}
ZKHelixDataAccessor accessor =
- new ZKHelixDataAccessor(grandCluster, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
+ new ZKHelixDataAccessor(grandCluster, new ZkBaseDataAccessor<>(_zkClient));
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
accessor.setProperty(keyBuilder.idealStates(idealState.getResourceName()), idealState);
+ LOG.info("Cluster {} has been added to grand cluster {} with rebalance configuration {}.",
+ clusterName, grandCluster, idealState.getRecord().getSimpleFields().toString());
}
@Override
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAddClusterV2.java b/helix-core/src/test/java/org/apache/helix/integration/TestAddClusterV2.java
index cf4c580..d1dfee8 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestAddClusterV2.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAddClusterV2.java
@@ -21,10 +21,13 @@ package org.apache.helix.integration;
import java.util.Date;
+import org.apache.helix.HelixAdmin;
import org.apache.helix.TestHelper;
import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
import org.apache.helix.integration.manager.ClusterDistributedController;
import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.IdealState;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
@@ -90,7 +93,13 @@ public class TestAddClusterV2 extends ZkTestBase {
@Test
public void Test() {
-
+ // Verify the super cluster resources are all rebalanced by the WAGED rebalancer.
+ HelixAdmin admin = _gSetupTool.getClusterManagementTool();
+ for (String clusterName : admin.getResourcesInCluster(CONTROLLER_CLUSTER)) {
+ IdealState is = _gSetupTool.getClusterManagementTool()
+ .getResourceIdealState(CONTROLLER_CLUSTER, clusterName);
+ Assert.assertEquals(is.getRebalancerClassName(), WagedRebalancer.class.getName());
+ }
}
@AfterClass
diff --git a/helix-core/src/test/java/org/apache/helix/tools/TestHelixAdminCli.java b/helix-core/src/test/java/org/apache/helix/tools/TestHelixAdminCli.java
index 2a9ec33..ae92437 100644
--- a/helix-core/src/test/java/org/apache/helix/tools/TestHelixAdminCli.java
+++ b/helix-core/src/test/java/org/apache/helix/tools/TestHelixAdminCli.java
@@ -27,10 +27,9 @@ import java.util.HashSet;
import java.util.Set;
import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.TestHelper;
-import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.common.ZkTestBase;
import org.apache.helix.integration.manager.ClusterDistributedController;
import org.apache.helix.integration.manager.MockParticipantManager;
@@ -43,6 +42,8 @@ import org.apache.helix.model.LiveInstance;
import org.apache.helix.store.ZNRecordJsonSerializer;
import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
import org.apache.helix.tools.ClusterStateVerifier.MasterNbInExtViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.testng.Assert;
import org.testng.ITestContext;
import org.testng.annotations.AfterMethod;
@@ -250,10 +251,7 @@ public class TestHelixAdminCli extends ZkTestBase {
// OK
}
- command =
- "-zkSvr localhost:2183 -activateCluster " + clusterName + " " + grandClusterName + " true";
- ClusterSetup.processCommandLineArgs(command.split("\\s+"));
- Thread.sleep(500);
+ activateCluster();
// drop a running cluster
command = "-zkSvr localhost:2183 -dropCluster " + clusterName;
@@ -318,10 +316,7 @@ public class TestHelixAdminCli extends ZkTestBase {
MockParticipantManager[] participants = new MockParticipantManager[n];
ClusterDistributedController[] controllers = new ClusterDistributedController[2];
setupCluster(clusterName, grandClusterName, n, participants, controllers);
- String command =
- "-zkSvr " + ZK_ADDR + " -activateCluster " + clusterName + " " + grandClusterName + " true";
- ClusterSetup.processCommandLineArgs(command.split("\\s+"));
- Thread.sleep(500);
+ activateCluster();
// save ideal state
BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<>(_gZkClient);
@@ -339,7 +334,7 @@ public class TestHelixAdminCli extends ZkTestBase {
pw.write(new String(serializer.serialize(idealState.getRecord())));
pw.close();
- command = "-zkSvr " + ZK_ADDR + " -dropResource " + clusterName + " db_11 ";
+ String command = "-zkSvr " + ZK_ADDR + " -dropResource " + clusterName + " db_11 ";
ClusterSetup.processCommandLineArgs(command.split("\\s+"));
boolean verifyResult = ClusterStateVerifier
@@ -420,13 +415,10 @@ public class TestHelixAdminCli extends ZkTestBase {
MockParticipantManager[] participants = new MockParticipantManager[n];
ClusterDistributedController[] controllers = new ClusterDistributedController[2];
setupCluster(clusterName, grandClusterName, n, participants, controllers);
- String command =
- "-zkSvr " + ZK_ADDR + " -activateCluster " + clusterName + " " + grandClusterName + " true";
- ClusterSetup.processCommandLineArgs(command.split("\\s+"));
- Thread.sleep(500);
+ activateCluster();
// drop node should fail if the node is not disabled
- command = "-zkSvr " + ZK_ADDR + " -dropNode " + clusterName + " localhost:1232";
+ String command = "-zkSvr " + ZK_ADDR + " -dropNode " + clusterName + " localhost:1232";
try {
ClusterSetup.processCommandLineArgs(command.split("\\s+"));
Assert.fail("dropNode should fail since the node is not disabled");
@@ -503,12 +495,9 @@ public class TestHelixAdminCli extends ZkTestBase {
MockParticipantManager[] participants = new MockParticipantManager[n];
ClusterDistributedController[] controllers = new ClusterDistributedController[2];
setupCluster(clusterName, grandClusterName, n, participants, controllers);
- String command =
- "-zkSvr " + ZK_ADDR + " -activateCluster " + clusterName + " " + grandClusterName + " true";
- ClusterSetup.processCommandLineArgs(command.split("\\s+"));
- Thread.sleep(500);
+ activateCluster();
- command = "-zkSvr " + ZK_ADDR + " -addNode " + clusterName
+ String command = "-zkSvr " + ZK_ADDR + " -addNode " + clusterName
+ " localhost:12331;localhost:12341;localhost:12351;localhost:12361";
ClusterSetup.processCommandLineArgs(command.split("\\s+"));
@@ -566,21 +555,19 @@ public class TestHelixAdminCli extends ZkTestBase {
MockParticipantManager[] participants = new MockParticipantManager[n];
ClusterDistributedController[] controllers = new ClusterDistributedController[2];
setupCluster(clusterName, grandClusterName, n, participants, controllers);
- String command =
- "-zkSvr " + ZK_ADDR + " -activateCluster " + clusterName + " " + grandClusterName + " true";
- ClusterSetup.processCommandLineArgs(command.split("\\s+"));
+ activateCluster();
// wait till grand_cluster converge
- BestPossibleExternalViewVerifier verifier = new BestPossibleExternalViewVerifier.Builder(grandClusterName)
- .setZkClient(_gZkClient)
- .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
- .build();
+ BestPossibleExternalViewVerifier verifier =
+ new BestPossibleExternalViewVerifier.Builder(grandClusterName).setZkClient(_gZkClient)
+ .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME).build();
boolean result = verifier.verifyByPolling();
Assert.assertTrue(result, "grand cluster not converging.");
// deactivate cluster
- command = "-zkSvr " + ZK_ADDR + " -activateCluster " + clusterName + " " + grandClusterName
- + " false";
+ String command =
+ "-zkSvr " + ZK_ADDR + " -activateCluster " + clusterName + " " + grandClusterName
+ + " false";
ClusterSetup.processCommandLineArgs(command.split("\\s+"));
BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<>(_gZkClient);
@@ -728,4 +715,23 @@ public class TestHelixAdminCli extends ZkTestBase {
command = "-zkSvr " + ZK_ADDR + " -rebalance " + clusterName + " db_11 2 -key alias";
ClusterSetup.processCommandLineArgs(command.split("\\s+"));
}
+
+ private void activateCluster() throws Exception {
+ String command =
+ "-zkSvr localhost:2183 -activateCluster " + clusterName + " " + grandClusterName + " true";
+ ClusterSetup.processCommandLineArgs(command.split("\\s+"));
+
+ HelixAdmin admin = _gSetupTool.getClusterManagementTool();
+ TestHelper.verify(() -> {
+ if (admin.getResourceExternalView(grandClusterName, clusterName) == null) {
+ // TODO: Remove the following logic once https://github.com/apache/helix/issues/1617 is fixed.
+ // TODO: For now, we may need to touch the IdealState to trigger a new rebalance since the test
+ // TODO: is running multiple GenericHelixController instances in one JVM.
+ IdealState is = admin.getResourceIdealState(grandClusterName, clusterName);
+ admin.setResourceIdealState(grandClusterName, clusterName, is);
+ return false;
+ }
+ return true;
+ }, TestHelper.WAIT_DURATION);
+ }
}