You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2020/08/14 23:48:03 UTC

[helix] branch master updated: Don't skip WAGED rebalancer calculation even the resource list is empty. (#1271)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new a901c20  Don't skip WAGED rebalancer calculation even the resource list is empty. (#1271)
a901c20 is described below

commit a901c201a00d4b96294020ef82bc113fea196e8a
Author: Jiajun Wang <jj...@linkedin.com>
AuthorDate: Fri Aug 14 16:47:57 2020 -0700

    Don't skip WAGED rebalancer calculation even the resource list is empty. (#1271)
    
    This change is to allow the rebalancer logic cleaning up the internal assignment state and cache if all the resources are removed from the cluster.
---
 .../rebalancer/waged/WagedRebalancer.java          | 22 -------
 .../stages/BestPossibleStateCalcStage.java         |  2 +-
 .../rebalancer/waged/TestWagedRebalancer.java      |  3 +
 .../WagedRebalancer/TestWagedRebalance.java        | 70 ++++++++++++++++++++--
 4 files changed, 68 insertions(+), 29 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index e1f5d4a..89ecc47 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -253,16 +253,6 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
   public Map<String, IdealState> computeNewIdealStates(ResourceControllerDataProvider clusterData,
       Map<String, Resource> resourceMap, final CurrentStateOutput currentStateOutput)
       throws HelixRebalanceException {
-    if (resourceMap.isEmpty()) {
-      LOG.debug(
-          "There is no resource to be rebalanced by {}. Reset the persisted assignment state if any.",
-          this.getClass().getSimpleName());
-      // Clean up the persisted assignment records so if the resources are added back to WAGED, they
-      // will be recalculated as a new one.
-      clearAssignmentMetadata();
-      return Collections.emptyMap();
-    }
-
     LOG.info("Start computing new ideal states for resources: {}", resourceMap.keySet().toString());
     validateInput(clusterData, resourceMap);
 
@@ -468,18 +458,6 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
     }
   }
 
-  private void clearAssignmentMetadata() {
-    if (_assignmentMetadataStore != null) {
-      try {
-        _writeLatency.startMeasuringLatency();
-        _assignmentMetadataStore.clearAssignmentMetadata();
-        _writeLatency.endMeasuringLatency();
-      } catch (Exception ex) {
-        LOG.error("Failed to clear the assignment metadata.", ex);
-      }
-    }
-  }
-
   /**
    * Calculate and update the Baseline assignment
    * @param clusterModel
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 32d3b42..bca7a37 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -292,7 +292,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
       } else {
         failureResources.add(resource.getResourceName());
         LogUtil.logWarn(logger, _eventId, String
-            .format("Failed to calculate best possible states for %s.",
+            .format("The calculated best possible states for %s is empty or invalid.",
                 resource.getResourceName()));
       }
     }
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
index fe5fb02..cfcbe29 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
@@ -150,6 +150,9 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
     Assert.assertFalse(_metadataStore.getBaseline().isEmpty());
     Assert.assertFalse(_metadataStore.getBestPossibleAssignment().isEmpty());
     // Calculate with empty resource list. The rebalancer shall clean up all the assignment status.
+    when(clusterData.getRefreshedChangeTypes())
+        .thenReturn(Collections.singleton(HelixConstants.ChangeType.IDEAL_STATE));
+    clusterData.getIdealStates().clear();
     newIdealStates = rebalancer
         .computeNewIdealStates(clusterData, Collections.emptyMap(), new CurrentStateOutput());
     Assert.assertTrue(newIdealStates.isEmpty());
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
index c232462..b778b3c 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
@@ -36,9 +36,11 @@ import org.apache.helix.common.ZkTestBase;
 import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
 import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
 import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
+import org.apache.helix.controller.rebalancer.waged.AssignmentMetadataStore;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBucketDataAccessor;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.ExternalView;
@@ -66,6 +68,7 @@ public class TestWagedRebalance extends ZkTestBase {
   protected final String CLASS_NAME = getShortClassName();
   protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
   protected ClusterControllerManager _controller;
+  protected AssignmentMetadataStore _assignmentMetadataStore;
 
   List<MockParticipantManager> _participants = new ArrayList<>();
   Map<String, String> _nodeToTagMap = new HashMap<>();
@@ -103,6 +106,23 @@ public class TestWagedRebalance extends ZkTestBase {
     _controller.syncStart();
 
     enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
+
+    // It's a hacky way to workaround the package restriction. Note that we still want to hide the
+    // AssignmentMetadataStore constructor to prevent unexpected update to the assignment records.
+    _assignmentMetadataStore =
+        new AssignmentMetadataStore(new ZkBucketDataAccessor(ZK_ADDR), CLUSTER_NAME) {
+          public Map<String, ResourceAssignment> getBaseline() {
+            // Ensure this metadata store always read from the ZK without using cache.
+            super.reset();
+            return super.getBaseline();
+          }
+
+          public synchronized Map<String, ResourceAssignment> getBestPossibleAssignment() {
+            // Ensure this metadata store always read from the ZK without using cache.
+            super.reset();
+            return super.getBestPossibleAssignment();
+          }
+        };
   }
 
   protected void addInstanceConfig(String storageNodeName, int seqNo, int tagCount) {
@@ -118,7 +138,8 @@ public class TestWagedRebalance extends ZkTestBase {
     int i = 0;
     for (String stateModel : _testModels) {
       String db = "Test-DB-" + TestHelper.getTestMethodName() + i++;
-      createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica);
+      createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica,
+          _replica);
       _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
       _allDBs.add(db);
     }
@@ -315,8 +336,7 @@ public class TestWagedRebalance extends ZkTestBase {
     Thread.sleep(300);
 
     validate(newReplicaFactor);
-    ev =
-        _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, dbName);
+    ev = _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, dbName);
     Assert.assertEquals(ev.getPartitionSet().size(), PARTITIONS + 2);
   }
 
@@ -412,7 +432,6 @@ public class TestWagedRebalance extends ZkTestBase {
       _gSetupTool.getClusterManagementTool()
           .enableInstance(CLUSTER_NAME, p.getInstanceName(), false);
       _gSetupTool.dropInstanceFromCluster(CLUSTER_NAME, p.getInstanceName());
-
     }
 
     int j = 0;
@@ -490,7 +509,8 @@ public class TestWagedRebalance extends ZkTestBase {
           IdealState idealState =
               _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
           idealState.setMaxPartitionsPerInstance(1);
-          _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, db, idealState);
+          _gSetupTool.getClusterManagementTool()
+              .setResourceIdealState(CLUSTER_NAME, db, idealState);
         }
         _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
         _allDBs.add(db);
@@ -546,7 +566,8 @@ public class TestWagedRebalance extends ZkTestBase {
     int i = 0;
     for (String stateModel : _testModels) {
       String db = "Test-DB-" + TestHelper.getTestMethodName() + i++;
-      createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica);
+      createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica,
+          _replica);
       _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
       _allDBs.add(db);
     }
@@ -653,6 +674,36 @@ public class TestWagedRebalance extends ZkTestBase {
     Assert.assertFalse(newEV.equals(oldEV));
   }
 
+  @Test(dependsOnMethods = "test")
+  public void testRecreateSameResource() throws InterruptedException {
+    String dbName = "Test-DB-" + TestHelper.getTestMethodName();
+    createResourceWithWagedRebalance(CLUSTER_NAME, dbName,
+        BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, _replica);
+    _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, dbName, _replica);
+    _allDBs.add(dbName);
+    // waiting for the DBs being dropped.
+    Thread.sleep(300);
+    validate(_replica);
+
+    // Record the current Ideal State and Resource Config for recreating.
+    IdealState is =
+        _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, dbName);
+
+    // Drop preserved the DB.
+    _gSetupTool.dropResourceFromCluster(CLUSTER_NAME, dbName);
+    _allDBs.remove(dbName);
+    // waiting for the DBs being dropped.
+    Thread.sleep(100);
+    validate(_replica);
+
+    // Recreate the DB.
+    _gSetupTool.getClusterManagementTool().addResource(CLUSTER_NAME, dbName, is);
+    _allDBs.add(dbName);
+    // waiting for the DBs to be recreated.
+    Thread.sleep(100);
+    validate(_replica);
+  }
+
   private void validate(int expectedReplica) {
     HelixClusterVerifier _clusterVerifier =
         new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
@@ -665,6 +716,7 @@ public class TestWagedRebalance extends ZkTestBase {
           _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
       validateIsolation(is, ev, expectedReplica);
     }
+    _clusterVerifier.close();
   }
 
   /**
@@ -702,6 +754,12 @@ public class TestWagedRebalance extends ZkTestBase {
     } finally {
       _clusterVerifier.close();
     }
+
+    // Verify the DBs are all removed and the persisted assignment records are cleaned up.
+    Assert.assertEquals(
+        _gSetupTool.getClusterManagementTool().getResourcesInCluster(CLUSTER_NAME).size(), 0);
+    Assert.assertTrue(_assignmentMetadataStore.getBestPossibleAssignment().isEmpty());
+    Assert.assertTrue(_assignmentMetadataStore.getBaseline().isEmpty());
   }
 
   @AfterClass