You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2020/06/30 22:41:23 UTC

[helix] branch master updated: Cleanup the persisted assignment state if no resource is on WAGED rebalancer. (#1123)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b98a6e  Cleanup the persisted assignment state if no resource is on WAGED rebalancer. (#1123)
0b98a6e is described below

commit 0b98a6e42b2e814d388cd22e53911a43212de05a
Author: Jiajun Wang <18...@users.noreply.github.com>
AuthorDate: Tue Jun 30 15:41:15 2020 -0700

    Cleanup the persisted assignment state if no resource is on WAGED rebalancer. (#1123)
    
    This is to prevent the WAGED rebalancer reads stale assignment records from the previous rebalance pipeline.
    For example,
    
    1. Resource A was the only resource. And it is rebalanced by WAGED, then we have a persisted assignment for A.
    2. Resource A was reconfigured to using DelayedRebalancer, then we stop the WAGED rebalancer since there is no more resource using WAGED. So the persisted records are still in ZK.
    3. Resource A is recreated and using WAGED again. In this case, the previous persisted assignment is no longer valid. We should treat A as a brand new resource instead of considering the stale assignment record.
    
    Moreover, this change will help to clean up the ZK persisted data if no resource is using WAGED.
---
 .../rebalancer/waged/AssignmentMetadataStore.java  |  67 +++++------
 .../rebalancer/waged/WagedRebalancer.java          |  19 ++-
 .../waged/TestAssignmentMetadataStore.java         | 127 +++++++++++++--------
 .../rebalancer/waged/TestWagedRebalancer.java      |   9 ++
 4 files changed, 140 insertions(+), 82 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
index 381b612..fffa0ba 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
@@ -27,10 +27,10 @@ import java.util.Map;
 import org.apache.helix.BucketDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixProperty;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
-import org.apache.helix.manager.zk.ZNRecordJacksonSerializer;
 import org.apache.helix.manager.zk.ZkBucketDataAccessor;
 import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordJacksonSerializer;
 import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException;
 import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
 
@@ -70,7 +70,7 @@ public class AssignmentMetadataStore {
         _globalBaseline = splitAssignments(baseline);
       } catch (ZkNoNodeException ex) {
         // Metadata does not exist, so return an empty map
-        _globalBaseline = Collections.emptyMap();
+        _globalBaseline = new HashMap<>();
       }
     }
     return _globalBaseline;
@@ -85,7 +85,7 @@ public class AssignmentMetadataStore {
         _bestPossibleAssignment = splitAssignments(baseline);
       } catch (ZkNoNodeException ex) {
         // Metadata does not exist, so return an empty map
-        _bestPossibleAssignment = Collections.emptyMap();
+        _bestPossibleAssignment = new HashMap<>();
       }
     }
     return _bestPossibleAssignment;
@@ -95,53 +95,56 @@ public class AssignmentMetadataStore {
    * @return true if a new baseline was persisted.
    * @throws HelixException if the method failed to persist the baseline.
    */
-  // TODO: Enhance the return value so it is more intuitive to understand when the persist fails and
-  // TODO: when it is skipped.
   public synchronized boolean persistBaseline(Map<String, ResourceAssignment> globalBaseline) {
-    // TODO: Make the write async?
-    // If baseline hasn't changed, skip writing to metadata store
-    if (compareAssignments(_globalBaseline, globalBaseline)) {
-      return false;
-    }
-    // Persist to ZK
-    HelixProperty combinedAssignments = combineAssignments(BASELINE_KEY, globalBaseline);
-    try {
-      _dataAccessor.compressedBucketWrite(_baselinePath, combinedAssignments);
-    } catch (IOException e) {
-      // TODO: Improve failure handling
-      throw new HelixException("Failed to persist baseline!", e);
-    }
-
-    // Update the in-memory reference
-    _globalBaseline = globalBaseline;
-    return true;
+    return persistAssignment(globalBaseline, getBaseline(), _baselinePath, BASELINE_KEY);
   }
 
   /**
    * @return true if a new best possible assignment was persisted.
    * @throws HelixException if the method failed to persist the baseline.
    */
-  // TODO: Enhance the return value so it is more intuitive to understand when the persist fails and
-  // TODO: when it is skipped.
   public synchronized boolean persistBestPossibleAssignment(
       Map<String, ResourceAssignment> bestPossibleAssignment) {
+    return persistAssignment(bestPossibleAssignment, getBestPossibleAssignment(), _bestPossiblePath,
+        BEST_POSSIBLE_KEY);
+  }
+
+  public synchronized void clearAssignmentMetadata() {
+    persistAssignment(Collections.emptyMap(), getBaseline(), _baselinePath, BASELINE_KEY);
+    persistAssignment(Collections.emptyMap(), getBestPossibleAssignment(), _bestPossiblePath,
+        BEST_POSSIBLE_KEY);
+  }
+
+  /**
+   * @param newAssignment
+   * @param cachedAssignment
+   * @param path the path of the assignment record
+   * @param key  the key of the assignment in the record
+   * @return true if a new assignment was persisted.
+   */
+  // TODO: Enhance the return value so it is more intuitive to understand when the persist fails and
+  // TODO: when it is skipped.
+  private boolean persistAssignment(Map<String, ResourceAssignment> newAssignment,
+      Map<String, ResourceAssignment> cachedAssignment, String path,
+      String key) {
     // TODO: Make the write async?
-    // If bestPossibleAssignment hasn't changed, skip writing to metadata store
-    if (compareAssignments(_bestPossibleAssignment, bestPossibleAssignment)) {
+    // If the assignment hasn't changed, skip writing to metadata store
+    if (compareAssignments(cachedAssignment, newAssignment)) {
       return false;
     }
     // Persist to ZK
-    HelixProperty combinedAssignments =
-        combineAssignments(BEST_POSSIBLE_KEY, bestPossibleAssignment);
+    HelixProperty combinedAssignments = combineAssignments(key, newAssignment);
     try {
-      _dataAccessor.compressedBucketWrite(_bestPossiblePath, combinedAssignments);
+      _dataAccessor.compressedBucketWrite(path, combinedAssignments);
     } catch (IOException e) {
       // TODO: Improve failure handling
-      throw new HelixException("Failed to persist BestPossibleAssignment!", e);
+      throw new HelixException(
+          String.format("Failed to persist %s assignment to path %s", key, path), e);
     }
 
     // Update the in-memory reference
-    _bestPossibleAssignment = bestPossibleAssignment;
+    cachedAssignment.clear();
+    cachedAssignment.putAll(newAssignment);
     return true;
   }
 
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index 86c4d09..22955fe 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -254,7 +254,12 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
       Map<String, Resource> resourceMap, final CurrentStateOutput currentStateOutput)
       throws HelixRebalanceException {
     if (resourceMap.isEmpty()) {
-      LOG.warn("There is no resource to be rebalanced by {}", this.getClass().getSimpleName());
+      LOG.debug(
+          "There is no resource to be rebalanced by {}. Reset the persisted assignment state if any.",
+          this.getClass().getSimpleName());
+      // Clean up the persisted assignment records so if the resources are added back to WAGED, they
+      // will be recalculated as a new one.
+      clearAssignmentMetadata();
       return Collections.emptyMap();
     }
 
@@ -463,6 +468,18 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
     }
   }
 
+  private void clearAssignmentMetadata() {
+    if (_assignmentMetadataStore != null) {
+      try {
+        _writeLatency.startMeasuringLatency();
+        _assignmentMetadataStore.clearAssignmentMetadata();
+        _writeLatency.endMeasuringLatency();
+      } catch (Exception ex) {
+        LOG.error("Failed to clear the assignment metadata.", ex);
+      }
+    }
+  }
+
   /**
    * Calculate and update the Baseline assignment
    * @param clusterModel
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java
index 3237420..79a9f06 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java
@@ -19,6 +19,7 @@ package org.apache.helix.controller.rebalancer.waged;
  * under the License.
  */
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -28,8 +29,7 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
 import org.apache.helix.common.ZkTestBase;
-import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZkBucketDataAccessor;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.ResourceAssignment;
 import org.testng.Assert;
@@ -37,58 +37,34 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-
 public class TestAssignmentMetadataStore extends ZkTestBase {
-  protected static final int NODE_NR = 5;
-  protected static final int START_PORT = 12918;
-  protected static final String STATE_MODEL = "MasterSlave";
-  protected static final String TEST_DB = "TestDB";
-  protected static final int _PARTITIONS = 20;
+  private static final int DEFAULT_BUCKET_SIZE = 50 * 1024; // 50KB
+  private static final String BASELINE_KEY = "BASELINE";
+  private static final String BEST_POSSIBLE_KEY = "BEST_POSSIBLE";
 
+  protected static final String TEST_DB = "TestDB";
   protected HelixManager _manager;
   protected final String CLASS_NAME = getShortClassName();
   protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
 
-  protected MockParticipantManager[] _participants = new MockParticipantManager[NODE_NR];
-  protected ClusterControllerManager _controller;
-  protected int _replica = 3;
-
   private AssignmentMetadataStore _store;
 
   @BeforeClass
-  public void beforeClass()
-      throws Exception {
+  public void beforeClass() throws Exception {
     super.beforeClass();
 
     // setup storage cluster
     _gSetupTool.addCluster(CLUSTER_NAME, true);
-    _gSetupTool.addResourceToCluster(CLUSTER_NAME, TEST_DB, _PARTITIONS, STATE_MODEL);
-    for (int i = 0; i < NODE_NR; i++) {
-      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
-      _gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
-    }
-    _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, TEST_DB, _replica);
-
-    // start dummy participants
-    for (int i = 0; i < NODE_NR; i++) {
-      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
-      _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
-      _participants[i].syncStart();
-    }
-
-    // start controller
-    String controllerName = CONTROLLER_PREFIX + "_0";
-    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
-    _controller.syncStart();
 
     // create cluster manager
     _manager = HelixManagerFactory
         .getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
     _manager.connect();
 
-    // create AssignmentMetadataStore
-    _store = new AssignmentMetadataStore(_manager.getMetadataStoreConnectionString(),
-        _manager.getClusterName());
+    // Create AssignmentMetadataStore. No version clean up to ensure the test result is stable.
+    _store = new AssignmentMetadataStore(
+        new ZkBucketDataAccessor(_manager.getMetadataStoreConnectionString(), DEFAULT_BUCKET_SIZE,
+            Integer.MAX_VALUE), _manager.getClusterName());
   }
 
   @AfterClass
@@ -96,6 +72,7 @@ public class TestAssignmentMetadataStore extends ZkTestBase {
     if (_store != null) {
       _store.close();
     }
+    _gSetupTool.deleteCluster(CLUSTER_NAME);
   }
 
   /**
@@ -107,8 +84,13 @@ public class TestAssignmentMetadataStore extends ZkTestBase {
    */
   @Test
   public void testReadEmptyBaseline() {
-    Map<String, ResourceAssignment> baseline = _store.getBaseline();
-    Assert.assertTrue(baseline.isEmpty());
+    // This should be the first test. Assert there is no record in ZK.
+    // Check that only one version exists
+    Assert.assertEquals(getExistingVersionNumbers(BASELINE_KEY).size(), 0);
+    Assert.assertEquals(getExistingVersionNumbers(BEST_POSSIBLE_KEY).size(), 0);
+    // Read from cache and the result is empty.
+    Assert.assertTrue(_store.getBaseline().isEmpty());
+    Assert.assertTrue(_store.getBestPossibleAssignment().isEmpty());
   }
 
   /**
@@ -116,9 +98,6 @@ public class TestAssignmentMetadataStore extends ZkTestBase {
    */
   @Test(dependsOnMethods = "testReadEmptyBaseline")
   public void testAvoidingRedundantWrite() {
-    String baselineKey = "BASELINE";
-    String bestPossibleKey = "BEST_POSSIBLE";
-
     Map<String, ResourceAssignment> dummyAssignment = getDummyAssignment();
 
     // Call persist functions
@@ -126,36 +105,82 @@ public class TestAssignmentMetadataStore extends ZkTestBase {
     _store.persistBestPossibleAssignment(dummyAssignment);
 
     // Check that only one version exists
-    List<String> baselineVersions = getExistingVersionNumbers(baselineKey);
-    List<String> bestPossibleVersions = getExistingVersionNumbers(bestPossibleKey);
-    Assert.assertEquals(baselineVersions.size(), 1);
-    Assert.assertEquals(bestPossibleVersions.size(), 1);
+    Assert.assertEquals(getExistingVersionNumbers(BASELINE_KEY).size(), 1);
+    Assert.assertEquals(getExistingVersionNumbers(BEST_POSSIBLE_KEY).size(), 1);
 
     // Call persist functions again
     _store.persistBaseline(dummyAssignment);
     _store.persistBestPossibleAssignment(dummyAssignment);
 
     // Check that only one version exists still
-    baselineVersions = getExistingVersionNumbers(baselineKey);
-    bestPossibleVersions = getExistingVersionNumbers(bestPossibleKey);
-    Assert.assertEquals(baselineVersions.size(), 1);
-    Assert.assertEquals(bestPossibleVersions.size(), 1);
+    Assert.assertEquals(getExistingVersionNumbers(BASELINE_KEY).size(), 1);
+    Assert.assertEquals(getExistingVersionNumbers(BEST_POSSIBLE_KEY).size(), 1);
   }
 
-  @Test
+  @Test(dependsOnMethods = "testAvoidingRedundantWrite")
   public void testAssignmentCache() {
     Map<String, ResourceAssignment> dummyAssignment = getDummyAssignment();
     // Call persist functions
     _store.persistBaseline(dummyAssignment);
     _store.persistBestPossibleAssignment(dummyAssignment);
 
+    // Check that only one version exists
+    Assert.assertEquals(getExistingVersionNumbers(BASELINE_KEY).size(), 1);
+    Assert.assertEquals(getExistingVersionNumbers(BEST_POSSIBLE_KEY).size(), 1);
+
+    // Same data in cache
     Assert.assertEquals(_store._bestPossibleAssignment, dummyAssignment);
     Assert.assertEquals(_store._globalBaseline, dummyAssignment);
 
+    dummyAssignment.values().stream().forEach(assignment -> {
+      assignment.addReplicaMap(new Partition("foo"), Collections.emptyMap());
+    });
+
+    // Call persist functions
+    _store.persistBaseline(dummyAssignment);
+    _store.persistBestPossibleAssignment(dummyAssignment);
+
+    // Check that two versions exist
+    Assert.assertEquals(getExistingVersionNumbers(BASELINE_KEY).size(), 2);
+    Assert.assertEquals(getExistingVersionNumbers(BEST_POSSIBLE_KEY).size(), 2);
+
+    // Same data in cache
+    Assert.assertEquals(_store._bestPossibleAssignment, dummyAssignment);
+    Assert.assertEquals(_store._globalBaseline, dummyAssignment);
+
+    // Clear cache
     _store.reset();
 
     Assert.assertEquals(_store._bestPossibleAssignment, null);
     Assert.assertEquals(_store._globalBaseline, null);
+
+    // Check the persisted data is not changed.
+    Assert.assertEquals(getExistingVersionNumbers(BASELINE_KEY).size(), 2);
+    Assert.assertEquals(getExistingVersionNumbers(BEST_POSSIBLE_KEY).size(), 2);
+  }
+
+  @Test(dependsOnMethods = "testAssignmentCache")
+  void testClearAssignment() {
+    // Check the persisted data is not empty
+    List<String> baselineVersions = getExistingVersionNumbers(BASELINE_KEY);
+    List<String> bestPossibleVersions = getExistingVersionNumbers(BEST_POSSIBLE_KEY);
+    int baselineVersionCount = baselineVersions.size();
+    int bestPossibleVersionCount = bestPossibleVersions.size();
+    Assert.assertTrue(baselineVersionCount > 0);
+    Assert.assertTrue(bestPossibleVersionCount > 0);
+
+    _store.clearAssignmentMetadata();
+
+    // 1. cache is cleaned up
+    Assert.assertEquals(_store._bestPossibleAssignment, Collections.emptyMap());
+    Assert.assertEquals(_store._globalBaseline, Collections.emptyMap());
+    // 2. refresh the cache and then read from ZK again to ensure the persisted assignments is empty
+    _store.reset();
+    Assert.assertEquals(_store.getBaseline(), Collections.emptyMap());
+    Assert.assertEquals(_store.getBestPossibleAssignment(), Collections.emptyMap());
+    // 3. check that there is
+    Assert.assertEquals(getExistingVersionNumbers(BASELINE_KEY).size(), baselineVersionCount + 1);
+    Assert.assertEquals(getExistingVersionNumbers(BEST_POSSIBLE_KEY).size(), bestPossibleVersionCount + 1);
   }
 
   private Map<String, ResourceAssignment> getDummyAssignment() {
@@ -172,6 +197,7 @@ public class TestAssignmentMetadataStore extends ZkTestBase {
 
   /**
    * Returns a list of existing version numbers only.
+   *
    * @param metadataType
    * @return
    */
@@ -179,6 +205,9 @@ public class TestAssignmentMetadataStore extends ZkTestBase {
     List<String> children = _baseAccessor
         .getChildNames("/" + CLUSTER_NAME + "/ASSIGNMENT_METADATA/" + metadataType,
             AccessOption.PERSISTENT);
+    if (children == null) {
+      children = Collections.EMPTY_LIST;
+    }
     children.remove("LAST_SUCCESSFUL_WRITE");
     children.remove("LAST_WRITE");
     return children;
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
index 8e54cd7..47aa081 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
@@ -139,6 +139,15 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
     // Since there is no special condition, the calculated IdealStates should be exactly the same
     // as the mock algorithm result.
     validateRebalanceResult(resourceMap, newIdealStates, algorithmResult);
+
+    Assert.assertFalse(_metadataStore.getBaseline().isEmpty());
+    Assert.assertFalse(_metadataStore.getBestPossibleAssignment().isEmpty());
+    // Calculate with empty resource list. The rebalancer shall clean up all the assignment status.
+    newIdealStates = rebalancer
+        .computeNewIdealStates(clusterData, Collections.emptyMap(), new CurrentStateOutput());
+    Assert.assertTrue(newIdealStates.isEmpty());
+    Assert.assertTrue(_metadataStore.getBaseline().isEmpty());
+    Assert.assertTrue(_metadataStore.getBestPossibleAssignment().isEmpty());
   }
 
   @Test(dependsOnMethods = "testRebalance")