You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2019/10/26 03:34:06 UTC
[helix] branch wagedRebalancer updated: Change change detector to a
regular field in the WAGED rebalancer instead of static threadlocal. (#543)
This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch wagedRebalancer
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/wagedRebalancer by this push:
new 8df436b Change change detector to a regular field in the WAGED rebalancer instead of static threadlocal. (#543)
8df436b is described below
commit 8df436b7cabc5dabb87f831d830e8838c612c3a9
Author: Jiajun Wang <18...@users.noreply.github.com>
AuthorDate: Fri Oct 25 20:33:58 2019 -0700
Change change detector to a regular field in the WAGED rebalancer instead of static threadlocal. (#543)
* Change change detector to regular field instead of static thread-local.
The rebalance has been modified to be a thread-local object. So there is no need to keep the change detector as thread-local.
This may cause potential problems.
In addition, in order to avoid resource leakage, implement the finalize method of the WagedRebalancer to close all connections.
---
.../rebalancer/waged/WagedRebalancer.java | 18 +++++++-----
.../stages/BestPossibleStateCalcStage.java | 2 +-
.../rebalancer/waged/TestWagedRebalancer.java | 34 ++++++++++++----------
.../waged/TestWagedRebalancerMetrics.java | 2 ++
4 files changed, 32 insertions(+), 24 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index b455992..9dacfbe 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -73,10 +73,7 @@ public class WagedRebalancer {
ImmutableSet.of(HelixConstants.ChangeType.RESOURCE_CONFIG,
HelixConstants.ChangeType.IDEAL_STATE,
HelixConstants.ChangeType.CLUSTER_CONFIG, HelixConstants.ChangeType.INSTANCE_CONFIG);
- // The cluster change detector is a stateful object.
- // Make it static to avoid unnecessary reinitialization.
- private static final ThreadLocal<ResourceChangeDetector> CHANGE_DETECTOR_THREAD_LOCAL =
- new ThreadLocal<>();
+ private final ResourceChangeDetector _changeDetector;
private final HelixManager _manager;
private final MappingCalculator<ResourceControllerDataProvider> _mappingCalculator;
private final AssignmentMetadataStore _assignmentMetadataStore;
@@ -149,6 +146,7 @@ public class WagedRebalancer {
// allow rebalancer to proceed
_metricCollector =
metricCollector == null ? new WagedRebalancerMetricCollector() : metricCollector;
+ _changeDetector = new ResourceChangeDetector();
}
// Release all the resources.
@@ -468,10 +466,7 @@ public class WagedRebalancer {
}
private ResourceChangeDetector getChangeDetector() {
- if (CHANGE_DETECTOR_THREAD_LOCAL.get() == null) {
- CHANGE_DETECTOR_THREAD_LOCAL.set(new ResourceChangeDetector(true));
- }
- return CHANGE_DETECTOR_THREAD_LOCAL.get();
+ return _changeDetector;
}
// Generate the preference lists from the state mapping based on state priority.
@@ -684,4 +679,11 @@ public class WagedRebalancer {
protected MetricCollector getMetricCollector() {
return _metricCollector;
}
+
+ @Override
+ protected void finalize()
+ throws Throwable {
+ super.finalize();
+ close();
+ }
}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 1bd1fdf..fcf89b5 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -283,7 +283,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
}
}
- // Create MetricCollector ThreadLocal if it hasn't been already initialized
+ // Create WagedRebalancer ThreadLocal if it hasn't been already initialized
if (WAGED_REBALANCER_THREAD_LOCAL.get() == null) {
WAGED_REBALANCER_THREAD_LOCAL
.set(new WagedRebalancer(helixManager, preferences, METRIC_COLLECTOR_THREAD_LOCAL.get()));
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
index abf084d..9782645 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
@@ -359,6 +359,8 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
// Cluster config change will trigger baseline to be recalculated.
when(clusterData.getRefreshedChangeTypes())
.thenReturn(Collections.singleton(HelixConstants.ChangeType.CLUSTER_CONFIG));
+ // Update the config so the cluster config will be marked as changed.
+ clusterData.getClusterConfig().getRecord().setSimpleField("foo", "bar");
Map<String, Resource> resourceMap = clusterData.getIdealStates().entrySet().stream()
.collect(Collectors.toMap(entry -> entry.getKey(), entry -> {
Resource resource = new Resource(entry.getKey());
@@ -373,10 +375,10 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
// as the mock algorithm result.
validateRebalanceResult(resourceMap, newIdealStates, algorithmResult);
Map<String, ResourceAssignment> baseline = _metadataStore.getBaseline();
- Assert.assertEquals(baseline, algorithmResult);
+ Assert.assertEquals(algorithmResult, baseline);
Map<String, ResourceAssignment> bestPossibleAssignment =
_metadataStore.getBestPossibleAssignment();
- Assert.assertEquals(bestPossibleAssignment, algorithmResult);
+ Assert.assertEquals(algorithmResult, bestPossibleAssignment);
// 2. rebalance with one resource changed in the Resource Config znode only
String changedResourceName = _resourceNames.get(0);
@@ -398,15 +400,16 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
validateRebalanceResult(
Collections.singletonMap(changedResourceName, new Resource(changedResourceName)),
newIdealStates, partialAlgorithmResult);
- // Baseline should be empty, because there is no cluster topology change.
+ // Best possible assignment contains the new assignment of only one resource.
baseline = _metadataStore.getBaseline();
- Assert.assertEquals(baseline, Collections.emptyMap());
+ Assert.assertEquals(baseline, partialAlgorithmResult);
// Best possible assignment contains the new assignment of only one resource.
bestPossibleAssignment = _metadataStore.getBestPossibleAssignment();
Assert.assertEquals(bestPossibleAssignment, partialAlgorithmResult);
// * Before the next test, recover the best possible assignment record.
_metadataStore.persistBestPossibleAssignment(algorithmResult);
+ _metadataStore.persistBaseline(algorithmResult);
// 3. rebalance with current state change only
// Create a new cluster data cache to simulate cluster change
@@ -423,15 +426,16 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
// assignment since there is only current state change.
newIdealStates =
rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput());
- algorithmResult = _algorithm.getRebalanceResult();
+ Map<String, ResourceAssignment> newAlgorithmResult = _algorithm.getRebalanceResult();
// Verify that only the changed resource has been included in the calculation.
- validateRebalanceResult(Collections.emptyMap(), newIdealStates, algorithmResult);
- // Both assignment state should be empty.
+ validateRebalanceResult(Collections.emptyMap(), newIdealStates, newAlgorithmResult);
+ // There should be no changes in the baseline since only the currentStates changed
baseline = _metadataStore.getBaseline();
- Assert.assertEquals(baseline, Collections.emptyMap());
+ Assert.assertEquals(baseline, algorithmResult);
+ // The BestPossible assignment should have been updated since computeNewIdealStates() should have been called.
bestPossibleAssignment = _metadataStore.getBestPossibleAssignment();
- Assert.assertEquals(bestPossibleAssignment, Collections.emptyMap());
+ Assert.assertEquals(bestPossibleAssignment, newAlgorithmResult);
// 4. rebalance with no change but best possible state record missing.
// This usually happens when the persisted assignment state is gone.
@@ -440,15 +444,15 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
// calculate the assignment for both resources.
newIdealStates =
rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput());
- algorithmResult = _algorithm.getRebalanceResult();
+ newAlgorithmResult = _algorithm.getRebalanceResult();
// Verify that both resource has been included in the calculation.
- validateRebalanceResult(resourceMap, newIdealStates, algorithmResult);
- // Both assignment state should be empty since no cluster topology change.
+ validateRebalanceResult(resourceMap, newIdealStates, newAlgorithmResult);
+ // There should not be any changes in the baseline.
baseline = _metadataStore.getBaseline();
- Assert.assertEquals(baseline, Collections.emptyMap());
- // The best possible assignment should be present.
+ Assert.assertEquals(baseline, algorithmResult);
+ // The BestPossible assignment should have been updated since computeNewIdealStates() should have been called.
bestPossibleAssignment = _metadataStore.getBestPossibleAssignment();
- Assert.assertEquals(bestPossibleAssignment, algorithmResult);
+ Assert.assertEquals(bestPossibleAssignment, newAlgorithmResult);
}
private void validateRebalanceResult(Map<String, Resource> resourceMap,
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancerMetrics.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancerMetrics.java
index 30700ed..83229a1 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancerMetrics.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancerMetrics.java
@@ -123,6 +123,8 @@ public class TestWagedRebalancerMetrics extends AbstractTestClusterModel {
// Cluster config change will trigger baseline recalculation and partial rebalance.
when(clusterData.getRefreshedChangeTypes())
.thenReturn(Collections.singleton(HelixConstants.ChangeType.CLUSTER_CONFIG));
+ // Add a field to the cluster config so the cluster config will be marked as changed in the change detector.
+ clusterData.getClusterConfig().getRecord().setSimpleField("foo", "bar");
rebalancer.computeBestPossibleStates(clusterData, resourceMap, new CurrentStateOutput());