You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2019/10/26 03:34:06 UTC

[helix] branch wagedRebalancer updated: Change change detector to a regular field in the WAGED rebalancer instead of static threadlocal. (#543)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch wagedRebalancer
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/wagedRebalancer by this push:
     new 8df436b  Change change detector to a regular field in the WAGED rebalancer instead of static threadlocal. (#543)
8df436b is described below

commit 8df436b7cabc5dabb87f831d830e8838c612c3a9
Author: Jiajun Wang <18...@users.noreply.github.com>
AuthorDate: Fri Oct 25 20:33:58 2019 -0700

    Change change detector to a regular field in the WAGED rebalancer instead of static threadlocal. (#543)
    
    * Change change detector to regular field instead of static thread-local.
    
    The rebalance has been modified to be a thread-local object. So there is no need to keep the change detector as thread-local.
    This may cause potential problems.
    In addition, in order to avoid resource leakage, implement the finalize method of the WagedRebalancer to close all connections.
---
 .../rebalancer/waged/WagedRebalancer.java          | 18 +++++++-----
 .../stages/BestPossibleStateCalcStage.java         |  2 +-
 .../rebalancer/waged/TestWagedRebalancer.java      | 34 ++++++++++++----------
 .../waged/TestWagedRebalancerMetrics.java          |  2 ++
 4 files changed, 32 insertions(+), 24 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index b455992..9dacfbe 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -73,10 +73,7 @@ public class WagedRebalancer {
       ImmutableSet.of(HelixConstants.ChangeType.RESOURCE_CONFIG,
           HelixConstants.ChangeType.IDEAL_STATE,
           HelixConstants.ChangeType.CLUSTER_CONFIG, HelixConstants.ChangeType.INSTANCE_CONFIG);
-  // The cluster change detector is a stateful object.
-  // Make it static to avoid unnecessary reinitialization.
-  private static final ThreadLocal<ResourceChangeDetector> CHANGE_DETECTOR_THREAD_LOCAL =
-      new ThreadLocal<>();
+  private final ResourceChangeDetector _changeDetector;
   private final HelixManager _manager;
   private final MappingCalculator<ResourceControllerDataProvider> _mappingCalculator;
   private final AssignmentMetadataStore _assignmentMetadataStore;
@@ -149,6 +146,7 @@ public class WagedRebalancer {
     // allow rebalancer to proceed
     _metricCollector =
         metricCollector == null ? new WagedRebalancerMetricCollector() : metricCollector;
+    _changeDetector = new ResourceChangeDetector();
   }
 
   // Release all the resources.
@@ -468,10 +466,7 @@ public class WagedRebalancer {
   }
 
   private ResourceChangeDetector getChangeDetector() {
-    if (CHANGE_DETECTOR_THREAD_LOCAL.get() == null) {
-      CHANGE_DETECTOR_THREAD_LOCAL.set(new ResourceChangeDetector(true));
-    }
-    return CHANGE_DETECTOR_THREAD_LOCAL.get();
+    return _changeDetector;
   }
 
   // Generate the preference lists from the state mapping based on state priority.
@@ -684,4 +679,11 @@ public class WagedRebalancer {
   protected MetricCollector getMetricCollector() {
     return _metricCollector;
   }
+
+  @Override
+  protected void finalize()
+      throws Throwable {
+    super.finalize();
+    close();
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 1bd1fdf..fcf89b5 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -283,7 +283,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
       }
     }
 
-    // Create MetricCollector ThreadLocal if it hasn't been already initialized
+    // Create WagedRebalancer ThreadLocal if it hasn't been already initialized
     if (WAGED_REBALANCER_THREAD_LOCAL.get() == null) {
       WAGED_REBALANCER_THREAD_LOCAL
           .set(new WagedRebalancer(helixManager, preferences, METRIC_COLLECTOR_THREAD_LOCAL.get()));
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
index abf084d..9782645 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
@@ -359,6 +359,8 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
     // Cluster config change will trigger baseline to be recalculated.
     when(clusterData.getRefreshedChangeTypes())
         .thenReturn(Collections.singleton(HelixConstants.ChangeType.CLUSTER_CONFIG));
+    // Update the config so the cluster config will be marked as changed.
+    clusterData.getClusterConfig().getRecord().setSimpleField("foo", "bar");
     Map<String, Resource> resourceMap = clusterData.getIdealStates().entrySet().stream()
         .collect(Collectors.toMap(entry -> entry.getKey(), entry -> {
           Resource resource = new Resource(entry.getKey());
@@ -373,10 +375,10 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
     // as the mock algorithm result.
     validateRebalanceResult(resourceMap, newIdealStates, algorithmResult);
     Map<String, ResourceAssignment> baseline = _metadataStore.getBaseline();
-    Assert.assertEquals(baseline, algorithmResult);
+    Assert.assertEquals(algorithmResult, baseline);
     Map<String, ResourceAssignment> bestPossibleAssignment =
         _metadataStore.getBestPossibleAssignment();
-    Assert.assertEquals(bestPossibleAssignment, algorithmResult);
+    Assert.assertEquals(algorithmResult, bestPossibleAssignment);
 
     // 2. rebalance with one resource changed in the Resource Config znode only
     String changedResourceName = _resourceNames.get(0);
@@ -398,15 +400,16 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
     validateRebalanceResult(
         Collections.singletonMap(changedResourceName, new Resource(changedResourceName)),
         newIdealStates, partialAlgorithmResult);
-    // Baseline should be empty, because there is no cluster topology change.
+    // Best possible assignment contains the new assignment of only one resource.
     baseline = _metadataStore.getBaseline();
-    Assert.assertEquals(baseline, Collections.emptyMap());
+    Assert.assertEquals(baseline, partialAlgorithmResult);
     // Best possible assignment contains the new assignment of only one resource.
     bestPossibleAssignment = _metadataStore.getBestPossibleAssignment();
     Assert.assertEquals(bestPossibleAssignment, partialAlgorithmResult);
 
     // * Before the next test, recover the best possible assignment record.
     _metadataStore.persistBestPossibleAssignment(algorithmResult);
+    _metadataStore.persistBaseline(algorithmResult);
 
     // 3. rebalance with current state change only
     // Create a new cluster data cache to simulate cluster change
@@ -423,15 +426,16 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
     // assignment since there is only current state change.
     newIdealStates =
         rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput());
-    algorithmResult = _algorithm.getRebalanceResult();
+    Map<String, ResourceAssignment> newAlgorithmResult = _algorithm.getRebalanceResult();
 
     // Verify that only the changed resource has been included in the calculation.
-    validateRebalanceResult(Collections.emptyMap(), newIdealStates, algorithmResult);
-    // Both assignment state should be empty.
+    validateRebalanceResult(Collections.emptyMap(), newIdealStates, newAlgorithmResult);
+    // There should be no changes in the baseline since only the currentStates changed
     baseline = _metadataStore.getBaseline();
-    Assert.assertEquals(baseline, Collections.emptyMap());
+    Assert.assertEquals(baseline, algorithmResult);
+    // The BestPossible assignment should have been updated since computeNewIdealStates() should have been called.
     bestPossibleAssignment = _metadataStore.getBestPossibleAssignment();
-    Assert.assertEquals(bestPossibleAssignment, Collections.emptyMap());
+    Assert.assertEquals(bestPossibleAssignment, newAlgorithmResult);
 
     // 4. rebalance with no change but best possible state record missing.
     // This usually happens when the persisted assignment state is gone.
@@ -440,15 +444,15 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
     // calculate the assignment for both resources.
     newIdealStates =
         rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput());
-    algorithmResult = _algorithm.getRebalanceResult();
+    newAlgorithmResult = _algorithm.getRebalanceResult();
     // Verify that both resource has been included in the calculation.
-    validateRebalanceResult(resourceMap, newIdealStates, algorithmResult);
-    // Both assignment state should be empty since no cluster topology change.
+    validateRebalanceResult(resourceMap, newIdealStates, newAlgorithmResult);
+    // There should not be any changes in the baseline.
     baseline = _metadataStore.getBaseline();
-    Assert.assertEquals(baseline, Collections.emptyMap());
-    // The best possible assignment should be present.
+    Assert.assertEquals(baseline, algorithmResult);
+    // The BestPossible assignment should have been updated since computeNewIdealStates() should have been called.
     bestPossibleAssignment = _metadataStore.getBestPossibleAssignment();
-    Assert.assertEquals(bestPossibleAssignment, algorithmResult);
+    Assert.assertEquals(bestPossibleAssignment, newAlgorithmResult);
   }
 
   private void validateRebalanceResult(Map<String, Resource> resourceMap,
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancerMetrics.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancerMetrics.java
index 30700ed..83229a1 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancerMetrics.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancerMetrics.java
@@ -123,6 +123,8 @@ public class TestWagedRebalancerMetrics extends AbstractTestClusterModel {
     // Cluster config change will trigger baseline recalculation and partial rebalance.
     when(clusterData.getRefreshedChangeTypes())
         .thenReturn(Collections.singleton(HelixConstants.ChangeType.CLUSTER_CONFIG));
+    // Add a field to the cluster config so the cluster config will be marked as changed in the change detector.
+    clusterData.getClusterConfig().getRecord().setSimpleField("foo", "bar");
 
     rebalancer.computeBestPossibleStates(clusterData, resourceMap, new CurrentStateOutput());