You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2019/11/05 02:04:39 UTC

[helix] branch wagedRebalancer updated: Avoid redundant writes in AssignmentMetadataStore (#564)

This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch wagedRebalancer
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/wagedRebalancer by this push:
     new 29a62c7  Avoid redundant writes in AssignmentMetadataStore (#564)
29a62c7 is described below

commit 29a62c7f65c6f93544e3135af14f1f759922ce34
Author: Hunter Lee <hu...@linkedin.com>
AuthorDate: Mon Nov 4 18:04:34 2019 -0800

    Avoid redundant writes in AssignmentMetadataStore (#564)
    
    For the WAGED rebalancer, we persist the cluster's mapping via AssignmentMetadataStore every pipeline. However, if there are no changes made to the new assignment from the old assignment, this write is not necessary. This diff checks whether they are equal and skips the write if old and new assignments are the same.
---
 .../rebalancer/waged/AssignmentMetadataStore.java  | 25 ++++++++-
 .../waged/TestAssignmentMetadataStore.java         | 64 +++++++++++++++++++++-
 2 files changed, 84 insertions(+), 5 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
index a540ffb..234c88c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
@@ -94,6 +94,10 @@ public class AssignmentMetadataStore {
 
   public void persistBaseline(Map<String, ResourceAssignment> globalBaseline) {
     // TODO: Make the write async?
+    // If baseline hasn't changed, skip writing to metadata store
+    if (compareAssignments(_globalBaseline, globalBaseline)) {
+      return;
+    }
     // Persist to ZK
     HelixProperty combinedAssignments = combineAssignments(BASELINE_KEY, globalBaseline);
     try {
@@ -110,14 +114,18 @@ public class AssignmentMetadataStore {
   public void persistBestPossibleAssignment(
       Map<String, ResourceAssignment> bestPossibleAssignment) {
     // TODO: Make the write async?
-    // Persist to ZK asynchronously
+    // If bestPossibleAssignment hasn't changed, skip writing to metadata store
+    if (compareAssignments(_bestPossibleAssignment, bestPossibleAssignment)) {
+      return;
+    }
+    // Persist to ZK
     HelixProperty combinedAssignments =
         combineAssignments(BEST_POSSIBLE_KEY, bestPossibleAssignment);
     try {
       _dataAccessor.compressedBucketWrite(_bestPossiblePath, combinedAssignments);
     } catch (IOException e) {
       // TODO: Improve failure handling
-      throw new HelixException("Failed to persist baseline!", e);
+      throw new HelixException("Failed to persist BestPossibleAssignment!", e);
     }
 
     // Update the in-memory reference
@@ -164,4 +172,17 @@ public class AssignmentMetadataStore {
             new ResourceAssignment((ZNRecord) SERIALIZER.deserialize(assignmentStr.getBytes()))));
     return assignmentMap;
   }
+
+  /**
+   * Returns whether two assignments are same.
+   * @param oldAssignment
+   * @param newAssignment
+   * @return true if they are the same. False otherwise or oldAssignment is null
+   */
+  private boolean compareAssignments(Map<String, ResourceAssignment> oldAssignment,
+      Map<String, ResourceAssignment> newAssignment) {
+    // If oldAssignment is null, that means that we haven't read from/written to
+    // the metadata store yet. In that case, we return false so that we write to metadata store.
+    return oldAssignment != null && oldAssignment.equals(newAssignment);
+  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java
index ecd2af3..59326e7 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java
@@ -19,20 +19,25 @@ package org.apache.helix.controller.rebalancer.waged;
  * under the License.
  */
 
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
+import org.apache.helix.AccessOption;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
 import org.apache.helix.common.ZkTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.Partition;
 import org.apache.helix.model.ResourceAssignment;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+
 public class TestAssignmentMetadataStore extends ZkTestBase {
   protected static final int NODE_NR = 5;
   protected static final int START_PORT = 12918;
@@ -51,7 +56,8 @@ public class TestAssignmentMetadataStore extends ZkTestBase {
   private AssignmentMetadataStore _store;
 
   @BeforeClass
-  public void beforeClass() throws Exception {
+  public void beforeClass()
+      throws Exception {
     super.beforeClass();
 
     // setup storage cluster
@@ -76,8 +82,8 @@ public class TestAssignmentMetadataStore extends ZkTestBase {
     _controller.syncStart();
 
     // create cluster manager
-    _manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin",
-        InstanceType.ADMINISTRATOR, ZK_ADDR);
+    _manager = HelixManagerFactory
+        .getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
     _manager.connect();
 
     // create AssignmentMetadataStore
@@ -104,4 +110,56 @@ public class TestAssignmentMetadataStore extends ZkTestBase {
     Map<String, ResourceAssignment> baseline = _store.getBaseline();
     Assert.assertTrue(baseline.isEmpty());
   }
+
+  /**
+   * Test that if the old assignment and new assignment are the same,
+   */
+  @Test(dependsOnMethods = "testReadEmptyBaseline")
+  public void testAvoidingRedundantWrite() {
+    String baselineKey = "BASELINE";
+    String bestPossibleKey = "BEST_POSSIBLE";
+
+    // Generate a dummy assignment
+    Map<String, ResourceAssignment> dummyAssignment = new HashMap<>();
+    ResourceAssignment assignment = new ResourceAssignment(TEST_DB);
+    Partition partition = new Partition(TEST_DB);
+    Map<String, String> replicaMap = new HashMap<>();
+    replicaMap.put(TEST_DB, TEST_DB);
+    assignment.addReplicaMap(partition, replicaMap);
+    dummyAssignment.put(TEST_DB, new ResourceAssignment(TEST_DB));
+
+    // Call persist functions
+    _store.persistBaseline(dummyAssignment);
+    _store.persistBestPossibleAssignment(dummyAssignment);
+
+    // Check that only one version exists
+    List<String> baselineVersions = getExistingVersionNumbers(baselineKey);
+    List<String> bestPossibleVersions = getExistingVersionNumbers(bestPossibleKey);
+    Assert.assertEquals(baselineVersions.size(), 1);
+    Assert.assertEquals(bestPossibleVersions.size(), 1);
+
+    // Call persist functions again
+    _store.persistBaseline(dummyAssignment);
+    _store.persistBestPossibleAssignment(dummyAssignment);
+
+    // Check that only one version exists still
+    baselineVersions = getExistingVersionNumbers(baselineKey);
+    bestPossibleVersions = getExistingVersionNumbers(bestPossibleKey);
+    Assert.assertEquals(baselineVersions.size(), 1);
+    Assert.assertEquals(bestPossibleVersions.size(), 1);
+  }
+
+  /**
+   * Returns a list of existing version numbers only.
+   * @param metadataType
+   * @return
+   */
+  private List<String> getExistingVersionNumbers(String metadataType) {
+    List<String> children = _baseAccessor
+        .getChildNames("/" + CLUSTER_NAME + "/ASSIGNMENT_METADATA/" + metadataType,
+            AccessOption.PERSISTENT);
+    children.remove("LAST_SUCCESSFUL_WRITE");
+    children.remove("LAST_WRITE");
+    return children;
+  }
 }