You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2020/08/14 23:48:03 UTC
[helix] branch master updated: Don't skip WAGED rebalancer
calculation even the resource list is empty. (#1271)
This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new a901c20 Don't skip WAGED rebalancer calculation even the resource list is empty. (#1271)
a901c20 is described below
commit a901c201a00d4b96294020ef82bc113fea196e8a
Author: Jiajun Wang <jj...@linkedin.com>
AuthorDate: Fri Aug 14 16:47:57 2020 -0700
Don't skip WAGED rebalancer calculation even the resource list is empty. (#1271)
This change is to allow the rebalancer logic cleaning up the internal assignment state and cache if all the resources are removed from the cluster.
---
.../rebalancer/waged/WagedRebalancer.java | 22 -------
.../stages/BestPossibleStateCalcStage.java | 2 +-
.../rebalancer/waged/TestWagedRebalancer.java | 3 +
.../WagedRebalancer/TestWagedRebalance.java | 70 ++++++++++++++++++++--
4 files changed, 68 insertions(+), 29 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index e1f5d4a..89ecc47 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -253,16 +253,6 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
public Map<String, IdealState> computeNewIdealStates(ResourceControllerDataProvider clusterData,
Map<String, Resource> resourceMap, final CurrentStateOutput currentStateOutput)
throws HelixRebalanceException {
- if (resourceMap.isEmpty()) {
- LOG.debug(
- "There is no resource to be rebalanced by {}. Reset the persisted assignment state if any.",
- this.getClass().getSimpleName());
- // Clean up the persisted assignment records so if the resources are added back to WAGED, they
- // will be recalculated as a new one.
- clearAssignmentMetadata();
- return Collections.emptyMap();
- }
-
LOG.info("Start computing new ideal states for resources: {}", resourceMap.keySet().toString());
validateInput(clusterData, resourceMap);
@@ -468,18 +458,6 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
}
}
- private void clearAssignmentMetadata() {
- if (_assignmentMetadataStore != null) {
- try {
- _writeLatency.startMeasuringLatency();
- _assignmentMetadataStore.clearAssignmentMetadata();
- _writeLatency.endMeasuringLatency();
- } catch (Exception ex) {
- LOG.error("Failed to clear the assignment metadata.", ex);
- }
- }
- }
-
/**
* Calculate and update the Baseline assignment
* @param clusterModel
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 32d3b42..bca7a37 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -292,7 +292,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
} else {
failureResources.add(resource.getResourceName());
LogUtil.logWarn(logger, _eventId, String
- .format("Failed to calculate best possible states for %s.",
+ .format("The calculated best possible states for %s is empty or invalid.",
resource.getResourceName()));
}
}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
index fe5fb02..cfcbe29 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
@@ -150,6 +150,9 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
Assert.assertFalse(_metadataStore.getBaseline().isEmpty());
Assert.assertFalse(_metadataStore.getBestPossibleAssignment().isEmpty());
// Calculate with empty resource list. The rebalancer shall clean up all the assignment status.
+ when(clusterData.getRefreshedChangeTypes())
+ .thenReturn(Collections.singleton(HelixConstants.ChangeType.IDEAL_STATE));
+ clusterData.getIdealStates().clear();
newIdealStates = rebalancer
.computeNewIdealStates(clusterData, Collections.emptyMap(), new CurrentStateOutput());
Assert.assertTrue(newIdealStates.isEmpty());
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
index c232462..b778b3c 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
@@ -36,9 +36,11 @@ import org.apache.helix.common.ZkTestBase;
import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
+import org.apache.helix.controller.rebalancer.waged.AssignmentMetadataStore;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBucketDataAccessor;
import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ExternalView;
@@ -66,6 +68,7 @@ public class TestWagedRebalance extends ZkTestBase {
protected final String CLASS_NAME = getShortClassName();
protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
protected ClusterControllerManager _controller;
+ protected AssignmentMetadataStore _assignmentMetadataStore;
List<MockParticipantManager> _participants = new ArrayList<>();
Map<String, String> _nodeToTagMap = new HashMap<>();
@@ -103,6 +106,23 @@ public class TestWagedRebalance extends ZkTestBase {
_controller.syncStart();
enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
+
+ // It's a hacky way to workaround the package restriction. Note that we still want to hide the
+ // AssignmentMetadataStore constructor to prevent unexpected update to the assignment records.
+ _assignmentMetadataStore =
+ new AssignmentMetadataStore(new ZkBucketDataAccessor(ZK_ADDR), CLUSTER_NAME) {
+ public Map<String, ResourceAssignment> getBaseline() {
+ // Ensure this metadata store always read from the ZK without using cache.
+ super.reset();
+ return super.getBaseline();
+ }
+
+ public synchronized Map<String, ResourceAssignment> getBestPossibleAssignment() {
+ // Ensure this metadata store always read from the ZK without using cache.
+ super.reset();
+ return super.getBestPossibleAssignment();
+ }
+ };
}
protected void addInstanceConfig(String storageNodeName, int seqNo, int tagCount) {
@@ -118,7 +138,8 @@ public class TestWagedRebalance extends ZkTestBase {
int i = 0;
for (String stateModel : _testModels) {
String db = "Test-DB-" + TestHelper.getTestMethodName() + i++;
- createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica);
+ createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica,
+ _replica);
_gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
_allDBs.add(db);
}
@@ -315,8 +336,7 @@ public class TestWagedRebalance extends ZkTestBase {
Thread.sleep(300);
validate(newReplicaFactor);
- ev =
- _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, dbName);
+ ev = _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, dbName);
Assert.assertEquals(ev.getPartitionSet().size(), PARTITIONS + 2);
}
@@ -412,7 +432,6 @@ public class TestWagedRebalance extends ZkTestBase {
_gSetupTool.getClusterManagementTool()
.enableInstance(CLUSTER_NAME, p.getInstanceName(), false);
_gSetupTool.dropInstanceFromCluster(CLUSTER_NAME, p.getInstanceName());
-
}
int j = 0;
@@ -490,7 +509,8 @@ public class TestWagedRebalance extends ZkTestBase {
IdealState idealState =
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
idealState.setMaxPartitionsPerInstance(1);
- _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, db, idealState);
+ _gSetupTool.getClusterManagementTool()
+ .setResourceIdealState(CLUSTER_NAME, db, idealState);
}
_gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
_allDBs.add(db);
@@ -546,7 +566,8 @@ public class TestWagedRebalance extends ZkTestBase {
int i = 0;
for (String stateModel : _testModels) {
String db = "Test-DB-" + TestHelper.getTestMethodName() + i++;
- createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica);
+ createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica,
+ _replica);
_gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
_allDBs.add(db);
}
@@ -653,6 +674,36 @@ public class TestWagedRebalance extends ZkTestBase {
Assert.assertFalse(newEV.equals(oldEV));
}
+ @Test(dependsOnMethods = "test")
+ public void testRecreateSameResource() throws InterruptedException {
+ String dbName = "Test-DB-" + TestHelper.getTestMethodName();
+ createResourceWithWagedRebalance(CLUSTER_NAME, dbName,
+ BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, _replica);
+ _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, dbName, _replica);
+ _allDBs.add(dbName);
+ // waiting for the DBs being dropped.
+ Thread.sleep(300);
+ validate(_replica);
+
+ // Record the current Ideal State and Resource Config for recreating.
+ IdealState is =
+ _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, dbName);
+
+ // Drop preserved the DB.
+ _gSetupTool.dropResourceFromCluster(CLUSTER_NAME, dbName);
+ _allDBs.remove(dbName);
+ // waiting for the DBs being dropped.
+ Thread.sleep(100);
+ validate(_replica);
+
+ // Recreate the DB.
+ _gSetupTool.getClusterManagementTool().addResource(CLUSTER_NAME, dbName, is);
+ _allDBs.add(dbName);
+ // waiting for the DBs to be recreated.
+ Thread.sleep(100);
+ validate(_replica);
+ }
+
private void validate(int expectedReplica) {
HelixClusterVerifier _clusterVerifier =
new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
@@ -665,6 +716,7 @@ public class TestWagedRebalance extends ZkTestBase {
_gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
validateIsolation(is, ev, expectedReplica);
}
+ _clusterVerifier.close();
}
/**
@@ -702,6 +754,12 @@ public class TestWagedRebalance extends ZkTestBase {
} finally {
_clusterVerifier.close();
}
+
+ // Verify the DBs are all removed and the persisted assignment records are cleaned up.
+ Assert.assertEquals(
+ _gSetupTool.getClusterManagementTool().getResourcesInCluster(CLUSTER_NAME).size(), 0);
+ Assert.assertTrue(_assignmentMetadataStore.getBestPossibleAssignment().isEmpty());
+ Assert.assertTrue(_assignmentMetadataStore.getBaseline().isEmpty());
}
@AfterClass