You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2020/06/30 22:41:23 UTC
[helix] branch master updated: Cleanup the persisted assignment
state if no resource is on WAGED rebalancer. (#1123)
This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 0b98a6e Cleanup the persisted assignment state if no resource is on WAGED rebalancer. (#1123)
0b98a6e is described below
commit 0b98a6e42b2e814d388cd22e53911a43212de05a
Author: Jiajun Wang <18...@users.noreply.github.com>
AuthorDate: Tue Jun 30 15:41:15 2020 -0700
Cleanup the persisted assignment state if no resource is on WAGED rebalancer. (#1123)
This is to prevent the WAGED rebalancer reads stale assignment records from the previous rebalance pipeline.
For example,
1. Resource A was the only resource. And it is rebalanced by WAGED, then we have a persisted assignment for A.
2. Resource A was reconfigured to using DelayedRebalancer, then we stop the WAGED rebalancer since there is no more resource using WAGED. So the persisted records are still in ZK.
3. Resource A is recreated and using WAGED again. In this case, the previous persisted assignment is no longer valid. We should treat A as a brand new resource instead of considering the stale assignment record.
Moreover, this change will help to clean up the ZK persisted data if no resource is using WAGED.
---
.../rebalancer/waged/AssignmentMetadataStore.java | 67 +++++------
.../rebalancer/waged/WagedRebalancer.java | 19 ++-
.../waged/TestAssignmentMetadataStore.java | 127 +++++++++++++--------
.../rebalancer/waged/TestWagedRebalancer.java | 9 ++
4 files changed, 140 insertions(+), 82 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
index 381b612..fffa0ba 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
@@ -27,10 +27,10 @@ import java.util.Map;
import org.apache.helix.BucketDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixProperty;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
-import org.apache.helix.manager.zk.ZNRecordJacksonSerializer;
import org.apache.helix.manager.zk.ZkBucketDataAccessor;
import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordJacksonSerializer;
import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException;
import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
@@ -70,7 +70,7 @@ public class AssignmentMetadataStore {
_globalBaseline = splitAssignments(baseline);
} catch (ZkNoNodeException ex) {
// Metadata does not exist, so return an empty map
- _globalBaseline = Collections.emptyMap();
+ _globalBaseline = new HashMap<>();
}
}
return _globalBaseline;
@@ -85,7 +85,7 @@ public class AssignmentMetadataStore {
_bestPossibleAssignment = splitAssignments(baseline);
} catch (ZkNoNodeException ex) {
// Metadata does not exist, so return an empty map
- _bestPossibleAssignment = Collections.emptyMap();
+ _bestPossibleAssignment = new HashMap<>();
}
}
return _bestPossibleAssignment;
@@ -95,53 +95,56 @@ public class AssignmentMetadataStore {
* @return true if a new baseline was persisted.
* @throws HelixException if the method failed to persist the baseline.
*/
- // TODO: Enhance the return value so it is more intuitive to understand when the persist fails and
- // TODO: when it is skipped.
public synchronized boolean persistBaseline(Map<String, ResourceAssignment> globalBaseline) {
- // TODO: Make the write async?
- // If baseline hasn't changed, skip writing to metadata store
- if (compareAssignments(_globalBaseline, globalBaseline)) {
- return false;
- }
- // Persist to ZK
- HelixProperty combinedAssignments = combineAssignments(BASELINE_KEY, globalBaseline);
- try {
- _dataAccessor.compressedBucketWrite(_baselinePath, combinedAssignments);
- } catch (IOException e) {
- // TODO: Improve failure handling
- throw new HelixException("Failed to persist baseline!", e);
- }
-
- // Update the in-memory reference
- _globalBaseline = globalBaseline;
- return true;
+ return persistAssignment(globalBaseline, getBaseline(), _baselinePath, BASELINE_KEY);
}
/**
* @return true if a new best possible assignment was persisted.
* @throws HelixException if the method failed to persist the baseline.
*/
- // TODO: Enhance the return value so it is more intuitive to understand when the persist fails and
- // TODO: when it is skipped.
public synchronized boolean persistBestPossibleAssignment(
Map<String, ResourceAssignment> bestPossibleAssignment) {
+ return persistAssignment(bestPossibleAssignment, getBestPossibleAssignment(), _bestPossiblePath,
+ BEST_POSSIBLE_KEY);
+ }
+
+ public synchronized void clearAssignmentMetadata() {
+ persistAssignment(Collections.emptyMap(), getBaseline(), _baselinePath, BASELINE_KEY);
+ persistAssignment(Collections.emptyMap(), getBestPossibleAssignment(), _bestPossiblePath,
+ BEST_POSSIBLE_KEY);
+ }
+
+ /**
+ * @param newAssignment
+ * @param cachedAssignment
+ * @param path the path of the assignment record
+ * @param key the key of the assignment in the record
+ * @return true if a new assignment was persisted.
+ */
+ // TODO: Enhance the return value so it is more intuitive to understand when the persist fails and
+ // TODO: when it is skipped.
+ private boolean persistAssignment(Map<String, ResourceAssignment> newAssignment,
+ Map<String, ResourceAssignment> cachedAssignment, String path,
+ String key) {
// TODO: Make the write async?
- // If bestPossibleAssignment hasn't changed, skip writing to metadata store
- if (compareAssignments(_bestPossibleAssignment, bestPossibleAssignment)) {
+ // If the assignment hasn't changed, skip writing to metadata store
+ if (compareAssignments(cachedAssignment, newAssignment)) {
return false;
}
// Persist to ZK
- HelixProperty combinedAssignments =
- combineAssignments(BEST_POSSIBLE_KEY, bestPossibleAssignment);
+ HelixProperty combinedAssignments = combineAssignments(key, newAssignment);
try {
- _dataAccessor.compressedBucketWrite(_bestPossiblePath, combinedAssignments);
+ _dataAccessor.compressedBucketWrite(path, combinedAssignments);
} catch (IOException e) {
// TODO: Improve failure handling
- throw new HelixException("Failed to persist BestPossibleAssignment!", e);
+ throw new HelixException(
+ String.format("Failed to persist %s assignment to path %s", key, path), e);
}
// Update the in-memory reference
- _bestPossibleAssignment = bestPossibleAssignment;
+ cachedAssignment.clear();
+ cachedAssignment.putAll(newAssignment);
return true;
}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index 86c4d09..22955fe 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -254,7 +254,12 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
Map<String, Resource> resourceMap, final CurrentStateOutput currentStateOutput)
throws HelixRebalanceException {
if (resourceMap.isEmpty()) {
- LOG.warn("There is no resource to be rebalanced by {}", this.getClass().getSimpleName());
+ LOG.debug(
+ "There is no resource to be rebalanced by {}. Reset the persisted assignment state if any.",
+ this.getClass().getSimpleName());
+ // Clean up the persisted assignment records so if the resources are added back to WAGED, they
+ // will be recalculated as a new one.
+ clearAssignmentMetadata();
return Collections.emptyMap();
}
@@ -463,6 +468,18 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
}
}
+ private void clearAssignmentMetadata() {
+ if (_assignmentMetadataStore != null) {
+ try {
+ _writeLatency.startMeasuringLatency();
+ _assignmentMetadataStore.clearAssignmentMetadata();
+ _writeLatency.endMeasuringLatency();
+ } catch (Exception ex) {
+ LOG.error("Failed to clear the assignment metadata.", ex);
+ }
+ }
+ }
+
/**
* Calculate and update the Baseline assignment
* @param clusterModel
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java
index 3237420..79a9f06 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java
@@ -19,6 +19,7 @@ package org.apache.helix.controller.rebalancer.waged;
* under the License.
*/
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -28,8 +29,7 @@ import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.common.ZkTestBase;
-import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZkBucketDataAccessor;
import org.apache.helix.model.Partition;
import org.apache.helix.model.ResourceAssignment;
import org.testng.Assert;
@@ -37,58 +37,34 @@ import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-
public class TestAssignmentMetadataStore extends ZkTestBase {
- protected static final int NODE_NR = 5;
- protected static final int START_PORT = 12918;
- protected static final String STATE_MODEL = "MasterSlave";
- protected static final String TEST_DB = "TestDB";
- protected static final int _PARTITIONS = 20;
+ private static final int DEFAULT_BUCKET_SIZE = 50 * 1024; // 50KB
+ private static final String BASELINE_KEY = "BASELINE";
+ private static final String BEST_POSSIBLE_KEY = "BEST_POSSIBLE";
+ protected static final String TEST_DB = "TestDB";
protected HelixManager _manager;
protected final String CLASS_NAME = getShortClassName();
protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
- protected MockParticipantManager[] _participants = new MockParticipantManager[NODE_NR];
- protected ClusterControllerManager _controller;
- protected int _replica = 3;
-
private AssignmentMetadataStore _store;
@BeforeClass
- public void beforeClass()
- throws Exception {
+ public void beforeClass() throws Exception {
super.beforeClass();
// setup storage cluster
_gSetupTool.addCluster(CLUSTER_NAME, true);
- _gSetupTool.addResourceToCluster(CLUSTER_NAME, TEST_DB, _PARTITIONS, STATE_MODEL);
- for (int i = 0; i < NODE_NR; i++) {
- String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
- _gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
- }
- _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, TEST_DB, _replica);
-
- // start dummy participants
- for (int i = 0; i < NODE_NR; i++) {
- String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
- _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
- _participants[i].syncStart();
- }
-
- // start controller
- String controllerName = CONTROLLER_PREFIX + "_0";
- _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
- _controller.syncStart();
// create cluster manager
_manager = HelixManagerFactory
.getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
_manager.connect();
- // create AssignmentMetadataStore
- _store = new AssignmentMetadataStore(_manager.getMetadataStoreConnectionString(),
- _manager.getClusterName());
+ // Create AssignmentMetadataStore. No version clean up to ensure the test result is stable.
+ _store = new AssignmentMetadataStore(
+ new ZkBucketDataAccessor(_manager.getMetadataStoreConnectionString(), DEFAULT_BUCKET_SIZE,
+ Integer.MAX_VALUE), _manager.getClusterName());
}
@AfterClass
@@ -96,6 +72,7 @@ public class TestAssignmentMetadataStore extends ZkTestBase {
if (_store != null) {
_store.close();
}
+ _gSetupTool.deleteCluster(CLUSTER_NAME);
}
/**
@@ -107,8 +84,13 @@ public class TestAssignmentMetadataStore extends ZkTestBase {
*/
@Test
public void testReadEmptyBaseline() {
- Map<String, ResourceAssignment> baseline = _store.getBaseline();
- Assert.assertTrue(baseline.isEmpty());
+ // This should be the first test. Assert there is no record in ZK.
+ // Check that only one version exists
+ Assert.assertEquals(getExistingVersionNumbers(BASELINE_KEY).size(), 0);
+ Assert.assertEquals(getExistingVersionNumbers(BEST_POSSIBLE_KEY).size(), 0);
+ // Read from cache and the result is empty.
+ Assert.assertTrue(_store.getBaseline().isEmpty());
+ Assert.assertTrue(_store.getBestPossibleAssignment().isEmpty());
}
/**
@@ -116,9 +98,6 @@ public class TestAssignmentMetadataStore extends ZkTestBase {
*/
@Test(dependsOnMethods = "testReadEmptyBaseline")
public void testAvoidingRedundantWrite() {
- String baselineKey = "BASELINE";
- String bestPossibleKey = "BEST_POSSIBLE";
-
Map<String, ResourceAssignment> dummyAssignment = getDummyAssignment();
// Call persist functions
@@ -126,36 +105,82 @@ public class TestAssignmentMetadataStore extends ZkTestBase {
_store.persistBestPossibleAssignment(dummyAssignment);
// Check that only one version exists
- List<String> baselineVersions = getExistingVersionNumbers(baselineKey);
- List<String> bestPossibleVersions = getExistingVersionNumbers(bestPossibleKey);
- Assert.assertEquals(baselineVersions.size(), 1);
- Assert.assertEquals(bestPossibleVersions.size(), 1);
+ Assert.assertEquals(getExistingVersionNumbers(BASELINE_KEY).size(), 1);
+ Assert.assertEquals(getExistingVersionNumbers(BEST_POSSIBLE_KEY).size(), 1);
// Call persist functions again
_store.persistBaseline(dummyAssignment);
_store.persistBestPossibleAssignment(dummyAssignment);
// Check that only one version exists still
- baselineVersions = getExistingVersionNumbers(baselineKey);
- bestPossibleVersions = getExistingVersionNumbers(bestPossibleKey);
- Assert.assertEquals(baselineVersions.size(), 1);
- Assert.assertEquals(bestPossibleVersions.size(), 1);
+ Assert.assertEquals(getExistingVersionNumbers(BASELINE_KEY).size(), 1);
+ Assert.assertEquals(getExistingVersionNumbers(BEST_POSSIBLE_KEY).size(), 1);
}
- @Test
+ @Test(dependsOnMethods = "testAvoidingRedundantWrite")
public void testAssignmentCache() {
Map<String, ResourceAssignment> dummyAssignment = getDummyAssignment();
// Call persist functions
_store.persistBaseline(dummyAssignment);
_store.persistBestPossibleAssignment(dummyAssignment);
+ // Check that only one version exists
+ Assert.assertEquals(getExistingVersionNumbers(BASELINE_KEY).size(), 1);
+ Assert.assertEquals(getExistingVersionNumbers(BEST_POSSIBLE_KEY).size(), 1);
+
+ // Same data in cache
Assert.assertEquals(_store._bestPossibleAssignment, dummyAssignment);
Assert.assertEquals(_store._globalBaseline, dummyAssignment);
+ dummyAssignment.values().stream().forEach(assignment -> {
+ assignment.addReplicaMap(new Partition("foo"), Collections.emptyMap());
+ });
+
+ // Call persist functions
+ _store.persistBaseline(dummyAssignment);
+ _store.persistBestPossibleAssignment(dummyAssignment);
+
+ // Check that two versions exist
+ Assert.assertEquals(getExistingVersionNumbers(BASELINE_KEY).size(), 2);
+ Assert.assertEquals(getExistingVersionNumbers(BEST_POSSIBLE_KEY).size(), 2);
+
+ // Same data in cache
+ Assert.assertEquals(_store._bestPossibleAssignment, dummyAssignment);
+ Assert.assertEquals(_store._globalBaseline, dummyAssignment);
+
+ // Clear cache
_store.reset();
Assert.assertEquals(_store._bestPossibleAssignment, null);
Assert.assertEquals(_store._globalBaseline, null);
+
+ // Check the persisted data is not changed.
+ Assert.assertEquals(getExistingVersionNumbers(BASELINE_KEY).size(), 2);
+ Assert.assertEquals(getExistingVersionNumbers(BEST_POSSIBLE_KEY).size(), 2);
+ }
+
+ @Test(dependsOnMethods = "testAssignmentCache")
+ void testClearAssignment() {
+ // Check the persisted data is not empty
+ List<String> baselineVersions = getExistingVersionNumbers(BASELINE_KEY);
+ List<String> bestPossibleVersions = getExistingVersionNumbers(BEST_POSSIBLE_KEY);
+ int baselineVersionCount = baselineVersions.size();
+ int bestPossibleVersionCount = bestPossibleVersions.size();
+ Assert.assertTrue(baselineVersionCount > 0);
+ Assert.assertTrue(bestPossibleVersionCount > 0);
+
+ _store.clearAssignmentMetadata();
+
+ // 1. cache is cleaned up
+ Assert.assertEquals(_store._bestPossibleAssignment, Collections.emptyMap());
+ Assert.assertEquals(_store._globalBaseline, Collections.emptyMap());
+ // 2. refresh the cache and then read from ZK again to ensure the persisted assignments is empty
+ _store.reset();
+ Assert.assertEquals(_store.getBaseline(), Collections.emptyMap());
+ Assert.assertEquals(_store.getBestPossibleAssignment(), Collections.emptyMap());
+ // 3. check that there is
+ Assert.assertEquals(getExistingVersionNumbers(BASELINE_KEY).size(), baselineVersionCount + 1);
+ Assert.assertEquals(getExistingVersionNumbers(BEST_POSSIBLE_KEY).size(), bestPossibleVersionCount + 1);
}
private Map<String, ResourceAssignment> getDummyAssignment() {
@@ -172,6 +197,7 @@ public class TestAssignmentMetadataStore extends ZkTestBase {
/**
* Returns a list of existing version numbers only.
+ *
* @param metadataType
* @return
*/
@@ -179,6 +205,9 @@ public class TestAssignmentMetadataStore extends ZkTestBase {
List<String> children = _baseAccessor
.getChildNames("/" + CLUSTER_NAME + "/ASSIGNMENT_METADATA/" + metadataType,
AccessOption.PERSISTENT);
+ if (children == null) {
+ children = Collections.EMPTY_LIST;
+ }
children.remove("LAST_SUCCESSFUL_WRITE");
children.remove("LAST_WRITE");
return children;
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
index 8e54cd7..47aa081 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
@@ -139,6 +139,15 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
// Since there is no special condition, the calculated IdealStates should be exactly the same
// as the mock algorithm result.
validateRebalanceResult(resourceMap, newIdealStates, algorithmResult);
+
+ Assert.assertFalse(_metadataStore.getBaseline().isEmpty());
+ Assert.assertFalse(_metadataStore.getBestPossibleAssignment().isEmpty());
+ // Calculate with empty resource list. The rebalancer shall clean up all the assignment status.
+ newIdealStates = rebalancer
+ .computeNewIdealStates(clusterData, Collections.emptyMap(), new CurrentStateOutput());
+ Assert.assertTrue(newIdealStates.isEmpty());
+ Assert.assertTrue(_metadataStore.getBaseline().isEmpty());
+ Assert.assertTrue(_metadataStore.getBestPossibleAssignment().isEmpty());
}
@Test(dependsOnMethods = "testRebalance")