You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2023/04/25 20:38:05 UTC
[helix] branch master updated: WAGED rebalance overwrite redesign -- part 2 (#2447)
This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 06401d4b4 WAGED rebalance overwrite redesign -- part 2 (#2447)
06401d4b4 is described below
commit 06401d4b43055dc244446dd0430707de0fbe651c
Author: Qi (Quincy) Qu <qq...@linkedin.com>
AuthorDate: Tue Apr 25 16:37:59 2023 -0400
WAGED rebalance overwrite redesign -- part 2 (#2447)
Integrates the redesigned minActiveReplica handling logic into WAGED rebalancer.
---
.../rebalancer/util/DelayedRebalanceUtil.java | 46 +++++-
.../rebalancer/waged/WagedRebalancer.java | 150 ++++++++++----------
.../java/org/apache/helix/util/RebalanceUtil.java | 8 +-
.../rebalancer/waged/TestWagedRebalancer.java | 155 ++++++++++++++++-----
.../waged/model/AbstractTestClusterModel.java | 5 +-
.../waged/model/TestClusterModelProvider.java | 9 --
6 files changed, 244 insertions(+), 129 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java
index 65b1431e4..61985223c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java
@@ -311,17 +311,18 @@ public class DelayedRebalanceUtil {
Set<AssignableReplica> toBeAssignedReplicas = new HashSet<>();
for (String resourceName : resources) {
- // <partition, <state, instances set>>
- Map<String, Map<String, Set<String>>> stateInstanceMap =
- ClusterModelProvider.getStateInstanceMap(currentAssignment.get(resourceName));
ResourceAssignment resourceAssignment = currentAssignment.get(resourceName);
- String modelDef = clusterData.getIdealState(resourceName).getStateModelDefRef();
+ IdealState idealState = clusterData.getIdealState(resourceName);
+ String modelDef = idealState.getStateModelDefRef();
Map<String, Integer> statePriorityMap = clusterData.getStateModelDef(modelDef).getStatePriorityMap();
+ ResourceConfig mergedResourceConfig =
+ ResourceConfig.mergeIdealStateWithResourceConfig(clusterData.getResourceConfig(resourceName), idealState);
+
// keep all current assignment and add to allocated replicas
resourceAssignment.getMappedPartitions().forEach(partition ->
resourceAssignment.getReplicaMap(partition).forEach((instance, state) ->
allocatedReplicas.computeIfAbsent(instance, key -> new HashSet<>())
- .add(new AssignableReplica(clusterData.getClusterConfig(), clusterData.getResourceConfig(resourceName),
+ .add(new AssignableReplica(clusterData.getClusterConfig(), mergedResourceConfig,
partition.getPartitionName(), state, statePriorityMap.get(state)))));
// only proceed for resource requiring delayed rebalance overwrites
List<String> partitions =
@@ -329,12 +330,44 @@ public class DelayedRebalanceUtil {
if (partitions.isEmpty()) {
continue;
}
+ // <partition, <state, instances set>>
+ Map<String, Map<String, Set<String>>> stateInstanceMap =
+ ClusterModelProvider.getStateInstanceMap(resourceAssignment);
toBeAssignedReplicas.addAll(
findAssignableReplicaForResource(clusterData, resourceName, partitions, stateInstanceMap, liveEnabledInstances));
}
return toBeAssignedReplicas;
}
+ /**
+ * Merge entries from currentResourceAssignment to newAssignment.
+ * To handle minActiveReplica for delayed rebalance, new assignment is computed based on enabled live instances, but
+ * could miss out current partition allocation still on offline instances (within delayed window).
+ * The merge process is independent for each resource; for each resource-partition, it adds the <instance, state> pair
+ * from newAssignment to currentResourceAssignment
+ * @param newAssignment newAssignment to merge and may override currentResourceAssignment
+ * @param currentResourceAssignment the current resource assignment, this map is getting updated during this method.
+ */
+ public static void mergeAssignments(Map<String, ResourceAssignment> newAssignment,
+ Map<String, ResourceAssignment> currentResourceAssignment) {
+ newAssignment.entrySet().parallelStream().forEach(entry -> {
+ String resourceName = entry.getKey();
+ ResourceAssignment assignment = entry.getValue();
+ if (!currentResourceAssignment.containsKey(resourceName)) {
+ currentResourceAssignment.put(resourceName, assignment);
+ } else {
+ for (Partition partition : assignment.getMappedPartitions()) {
+ Map<String, String> toMerge =
+ new HashMap<>(currentResourceAssignment.get(resourceName).getReplicaMap(partition));
+ assignment.getReplicaMap(partition).forEach((key, value) -> {
+ toMerge.put(key, value);
+ currentResourceAssignment.get(resourceName).addReplicaMap(partition, toMerge);
+ });
+ }
+ }
+ });
+ }
+
/**
* From the current assignment, find the partitions that are missing minActiveReplica for ALL resources, return as a
* map keyed by resource name.
@@ -411,7 +444,8 @@ public class DelayedRebalanceUtil {
clusterData.getStateModelDef(clusterData.getIdealState(resourceName).getStateModelDefRef())
.getStatesPriorityList();
final IdealState currentIdealState = clusterData.getIdealState(resourceName);
- final ResourceConfig resourceConfig = clusterData.getResourceConfig(resourceName);
+ final ResourceConfig resourceConfig = ResourceConfig.mergeIdealStateWithResourceConfig(
+ clusterData.getResourceConfig(resourceName), currentIdealState);
final Map<String, Integer> statePriorityMap =
clusterData.getStateModelDef(currentIdealState.getStateModelDefRef()).getStatePriorityMap();
final Set<AssignableReplica> toBeAssignedReplicas = new HashSet<>();
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index 494baef01..6c1c4d74d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -312,13 +312,6 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
computeBestPossibleAssignment(clusterData, resourceMap, activeNodes, currentStateOutput, algorithm);
Map<String, IdealState> newIdealStates = convertResourceAssignment(clusterData, newBestPossibleAssignment);
- // The additional rebalance overwrite is required since the calculated mapping may contain
- // some delayed rebalanced assignments.
- if (!activeNodes.equals(clusterData.getEnabledLiveInstances()) && requireRebalanceOverwrite(clusterData,
- newBestPossibleAssignment)) {
- applyRebalanceOverwrite(newIdealStates, clusterData, resourceMap,
- _assignmentManager.getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet()), algorithm);
- }
// Replace the assignment if user-defined preference list is configured.
// Note the user-defined list is intentionally applied to the final mapping after calculation.
// This is to avoid persisting it into the assignment store, which impacts the long term
@@ -339,10 +332,7 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
// Perform global rebalance for a new baseline assignment
_globalRebalanceRunner.globalRebalance(clusterData, resourceMap, currentStateOutput, algorithm);
// Perform emergency rebalance for a new best possible assignment
- Map<String, ResourceAssignment> newAssignment =
- emergencyRebalance(clusterData, resourceMap, activeNodes, currentStateOutput, algorithm);
-
- return newAssignment;
+ return emergencyRebalance(clusterData, resourceMap, activeNodes, currentStateOutput, algorithm);
}
/**
@@ -383,6 +373,69 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
return FAILURE_TYPES_TO_PROPAGATE;
}
+ /**
+ * Some partition may fail to meet minActiveReplica due to delayed rebalance, because some instances are offline yet
+ * active. In this case, additional replicas have to be brought up -- until either the instance gets back, or timeout,
+ * at which we have a more permanent resolution.
+ * The term "overwrite" is inherited from historical approach, however, it's no longer technically an overwrite.
+ * It's a formal rebalance process that goes through the algorithm and all constraints.
+ * @param clusterData Cluster data cache
+ * @param resourceMap The map of resource to calculate
+ * @param activeNodes All active nodes (live nodes plus offline-yet-active nodes) while considering cluster's
+ * delayed rebalance config
+ * @param currentResourceAssignment The current resource assignment or the best possible assignment computed from last
+ * emergency rebalance.
+ * @param algorithm The rebalance algorithm
+ * @return The resource assignment with delayed rebalance minActiveReplica
+ */
+ private Map<String, ResourceAssignment> handleDelayedRebalanceMinActiveReplica(
+ ResourceControllerDataProvider clusterData,
+ Map<String, Resource> resourceMap,
+ Set<String> activeNodes,
+ Map<String, ResourceAssignment> currentResourceAssignment,
+ RebalanceAlgorithm algorithm) throws HelixRebalanceException {
+ // the "real" live nodes at the time
+ final Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
+ if (activeNodes.equals(enabledLiveInstances) || !requireRebalanceOverwrite(clusterData, currentResourceAssignment)) {
+ // no need for additional process, return the current resource assignment
+ return currentResourceAssignment;
+ }
+ _rebalanceOverwriteCounter.increment(1L);
+ _rebalanceOverwriteLatency.startMeasuringLatency();
+ LOG.info("Start delayed rebalance overwrites in emergency rebalance.");
+ try {
+ // use the "real" live and enabled instances for calculation
+ ClusterModel clusterModel = ClusterModelProvider.generateClusterModelForDelayedRebalanceOverwrites(
+ clusterData, resourceMap, enabledLiveInstances, currentResourceAssignment);
+ Map<String, ResourceAssignment> assignment = WagedRebalanceUtil.calculateAssignment(clusterModel, algorithm);
+ // keep only the resource entries requiring changes for minActiveReplica
+ assignment.keySet().retainAll(clusterModel.getAssignableReplicaMap().keySet());
+ DelayedRebalanceUtil.mergeAssignments(assignment, currentResourceAssignment);
+ return currentResourceAssignment;
+ } catch (HelixRebalanceException e) {
+ LOG.error("Failed to compute for delayed rebalance overwrites in cluster {}", clusterData.getClusterName());
+ throw e;
+ } catch (Exception e) {
+ LOG.error("Failed to compute for delayed rebalance overwrites in cluster {}", clusterData.getClusterName());
+ throw new HelixRebalanceException("Failed to compute for delayed rebalance overwrites in cluster "
+ + clusterData.getClusterConfig(), HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, e);
+ } finally {
+ _rebalanceOverwriteLatency.endMeasuringLatency();
+ }
+ }
+
+ /**
+ * Emergency rebalance is scheduled to quickly handle urgent cases like reassigning partitions from inactive nodes
+ * and addressing for partitions failing to meet minActiveReplicas.
+ * The scope of the computation here should be limited to handling urgency only and shouldn't be blocking.
+ * @param clusterData Cluster data cache
+ * @param resourceMap resource map
+ * @param activeNodes All active nodes (live nodes plus offline-yet-active nodes) while considering cluster's
+ * delayed rebalance config
+ * @param currentStateOutput Current state output from pipeline
+ * @param algorithm The rebalance algorithm
+ * @return The new resource assignment
+ */
protected Map<String, ResourceAssignment> emergencyRebalance(
ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
Set<String> activeNodes, final CurrentStateOutput currentStateOutput,
@@ -430,6 +483,17 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
// Step 3: persist result to metadata store
persistBestPossibleAssignment(newAssignment);
+
+ // Step 4: handle delayed rebalance minActiveReplica
+ // Note this result is one step branching from the main calculation and SHOULD NOT be persisted -- it is temporary,
+ // and only apply during the delayed window of those offline yet active nodes, a definitive resolution will happen
+ // once the node comes back of remain offline after the delayed window.
+ Map<String, ResourceAssignment> assignmentWithDelayedRebalanceAdjust = newAssignment;
+ if (_partialRebalanceRunner.isAsyncPartialRebalanceEnabled()) {
+ assignmentWithDelayedRebalanceAdjust =
+ handleDelayedRebalanceMinActiveReplica(clusterData, resourceMap, activeNodes, newAssignment, algorithm);
+ }
+
_emergencyRebalanceLatency.endMeasuringLatency();
LOG.info("Finish emergency rebalance");
@@ -438,9 +502,12 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
newAssignment = _assignmentManager.getBestPossibleAssignment(_assignmentMetadataStore, currentStateOutput,
resourceMap.keySet());
persistBestPossibleAssignment(newAssignment);
+ // delayed rebalance handling result is temporary, shouldn't be persisted
+ assignmentWithDelayedRebalanceAdjust =
+ handleDelayedRebalanceMinActiveReplica(clusterData, resourceMap, activeNodes, newAssignment, algorithm);
}
- return newAssignment;
+ return assignmentWithDelayedRebalanceAdjust;
}
// Generate the preference lists from the state mapping based on state priority.
@@ -563,65 +630,6 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
return !allMinActiveReplicaMet.get();
}
- /**
- * Update the rebalanced ideal states according to the real active nodes.
- * Since the rebalancing might be done with the delayed logic, the rebalanced ideal states
- * might include inactive nodes.
- * This overwrite will adjust the final mapping, so as to ensure the result is completely valid.
- * @param idealStateMap the calculated ideal states.
- * @param clusterData the cluster data cache.
- * @param resourceMap the rebalanaced resource map.
- * @param baseline the baseline assignment.
- * @param algorithm the rebalance algorithm.
- */
- protected void applyRebalanceOverwrite(Map<String, IdealState> idealStateMap,
- ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
- Map<String, ResourceAssignment> baseline, RebalanceAlgorithm algorithm)
- throws HelixRebalanceException {
- _rebalanceOverwriteCounter.increment(1L);
- _rebalanceOverwriteLatency.startMeasuringLatency();
-
- ClusterModel clusterModel;
- try {
- // Note this calculation uses the baseline as the best possible assignment input here.
- // This is for minimizing unnecessary partition movement.
- clusterModel = ClusterModelProvider
- .generateClusterModelFromExistingAssignment(clusterData, resourceMap, baseline);
- } catch (Exception ex) {
- throw new HelixRebalanceException(
- "Failed to generate cluster model for delayed rebalance overwrite.",
- HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
- }
- Map<String, IdealState> activeIdealStates =
- convertResourceAssignment(clusterData, WagedRebalanceUtil.calculateAssignment(clusterModel, algorithm));
- for (String resourceName : idealStateMap.keySet()) {
- // The new calculated ideal state before overwrite
- IdealState newIdealState = idealStateMap.get(resourceName);
- if (!activeIdealStates.containsKey(resourceName)) {
- throw new HelixRebalanceException(
- "Failed to calculate the complete partition assignment with all active nodes. Cannot find the resource assignment for "
- + resourceName, HelixRebalanceException.Type.FAILED_TO_CALCULATE);
- }
- // The ideal state that is calculated based on the real alive/enabled instances list
- IdealState newActiveIdealState = activeIdealStates.get(resourceName);
- // The current ideal state that exists in the IdealState znode
- IdealState currentIdealState = clusterData.getIdealState(resourceName);
- Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
- int numReplica = currentIdealState.getReplicaCount(enabledLiveInstances.size());
- int minActiveReplica = DelayedRebalanceUtil.getMinActiveReplica(ResourceConfig
- .mergeIdealStateWithResourceConfig(clusterData.getResourceConfig(resourceName),
- currentIdealState), currentIdealState, numReplica);
- Map<String, List<String>> finalPreferenceLists =
- DelayedRebalanceUtil.getFinalDelayedMapping(newActiveIdealState.getPreferenceLists(),
- newIdealState.getPreferenceLists(), enabledLiveInstances,
- Math.min(minActiveReplica, numReplica));
-
- newIdealState.setPreferenceLists(finalPreferenceLists);
-
- _rebalanceOverwriteLatency.endMeasuringLatency();
- }
- }
-
private void applyUserDefinedPreferenceList(ResourceConfig resourceConfig,
IdealState idealState) {
if (resourceConfig != null) {
diff --git a/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java b/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
index bc4054a52..868e0cf57 100644
--- a/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
@@ -177,12 +177,8 @@ public class RebalanceUtil {
public static void scheduleOnDemandPipeline(String clusterName, long delay,
boolean shouldRefreshCache) {
- if (clusterName == null) {
- LOG.error("Failed to issue a pipeline run. ClusterName is null.");
- return;
- }
- if (delay < 0L) {
- LOG.error("Failed to issue a pipeline run. Delay is invalid.");
+ if (clusterName == null || delay < 0L) {
+ LOG.warn("Invalid input: [clusterName: {}, delay: {}], skip the pipeline issuing.", clusterName, delay);
return;
}
GenericHelixController leaderController =
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
index 3344fe14c..a34c92298 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
@@ -19,6 +19,7 @@ package org.apache.helix.controller.rebalancer.waged;
* under the License.
*/
+import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
@@ -49,6 +50,7 @@ import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.monitoring.metrics.WagedRebalancerMetricCollector;
import org.apache.helix.monitoring.metrics.model.CountMetric;
+import org.apache.helix.monitoring.metrics.model.LatencyMetric;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
@@ -58,22 +60,19 @@ import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class TestWagedRebalancer extends AbstractTestClusterModel {
- private Set<String> _instances;
private MockRebalanceAlgorithm _algorithm;
private MockAssignmentMetadataStore _metadataStore;
@BeforeClass
public void initialize() {
super.initialize();
- _instances = new HashSet<>();
- _instances.add(_testInstanceId);
_algorithm = new MockRebalanceAlgorithm();
// Initialize a mock assignment metadata store
@@ -93,7 +92,7 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
is.setStateModelDefRef("MasterSlave");
is.setReplicas("3");
is.setRebalancerClassName(WagedRebalancer.class.getName());
- _partitionNames.stream()
+ _partitionNames
.forEach(partition -> is.setPreferenceList(partition, Collections.emptyList()));
isMap.put(resource, is);
}
@@ -131,10 +130,9 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
// Generate the input for the rebalancer.
ResourceControllerDataProvider clusterData = setupClusterDataCache();
Map<String, Resource> resourceMap = clusterData.getIdealStates().entrySet().stream()
- .collect(Collectors.toMap(entry -> entry.getKey(), entry -> {
+ .collect(Collectors.toMap(Map.Entry::getKey, entry -> {
Resource resource = new Resource(entry.getKey());
- entry.getValue().getPartitionSet().stream()
- .forEach(partition -> resource.addPartition(partition));
+ entry.getValue().getPartitionSet().forEach(resource::addPartition);
return resource;
}));
// Mocking the change types for triggering a baseline rebalance.
@@ -169,10 +167,9 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
// Generate the input for the rebalancer.
ResourceControllerDataProvider clusterData = setupClusterDataCache();
Map<String, Resource> resourceMap = clusterData.getIdealStates().entrySet().stream()
- .collect(Collectors.toMap(entry -> entry.getKey(), entry -> {
+ .collect(Collectors.toMap(Map.Entry::getKey, entry -> {
Resource resource = new Resource(entry.getKey());
- entry.getValue().getPartitionSet().stream()
- .forEach(partition -> resource.addPartition(partition));
+ entry.getValue().getPartitionSet().forEach(resource::addPartition);
return resource;
}));
// Mocking the change types for triggering a baseline rebalance.
@@ -197,10 +194,9 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
// Generate the input for the rebalancer.
ResourceControllerDataProvider clusterData = setupClusterDataCache();
Map<String, Resource> resourceMap = clusterData.getIdealStates().entrySet().stream()
- .collect(Collectors.toMap(entry -> entry.getKey(), entry -> {
+ .collect(Collectors.toMap(Map.Entry::getKey, entry -> {
Resource resource = new Resource(entry.getKey());
- entry.getValue().getPartitionSet().stream()
- .forEach(partition -> resource.addPartition(partition));
+ entry.getValue().getPartitionSet().forEach(resource::addPartition);
return resource;
}));
// Mocking the change types for triggering a baseline rebalance.
@@ -285,10 +281,9 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
// Generate the input for the rebalancer.
ResourceControllerDataProvider clusterData = setupClusterDataCache();
Map<String, Resource> resourceMap = clusterData.getIdealStates().entrySet().stream()
- .collect(Collectors.toMap(entry -> entry.getKey(), entry -> {
+ .collect(Collectors.toMap(Map.Entry::getKey, entry -> {
Resource resource = new Resource(entry.getKey());
- entry.getValue().getPartitionSet().stream()
- .forEach(partition -> resource.addPartition(partition));
+ entry.getValue().getPartitionSet().forEach(resource::addPartition);
return resource;
}));
// Mocking the change types for triggering a baseline rebalance.
@@ -347,10 +342,9 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
.setRebalancerClassName(CrushRebalanceStrategy.class.getName());
// The input resource Map shall contain all the valid resources.
Map<String, Resource> resourceMap = clusterData.getIdealStates().entrySet().stream()
- .collect(Collectors.toMap(entry -> entry.getKey(), entry -> {
+ .collect(Collectors.toMap(Map.Entry::getKey, entry -> {
Resource resource = new Resource(entry.getKey());
- entry.getValue().getPartitionSet().stream()
- .forEach(partition -> resource.addPartition(partition));
+ entry.getValue().getPartitionSet().forEach(resource::addPartition);
return resource;
}));
rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput());
@@ -368,7 +362,7 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
clusterData.getIdealState(invalidResource).setStateModelDefRef("foobar");
// The input resource Map shall contain all the valid resources.
Map<String, Resource> resourceMap = clusterData.getIdealStates().keySet().stream().collect(
- Collectors.toMap(resourceName -> resourceName, resourceName -> new Resource(resourceName)));
+ Collectors.toMap(resourceName -> resourceName, Resource::new));
try {
rebalancer.computeBestPossibleAssignment(clusterData, resourceMap,
clusterData.getEnabledLiveInstances(), new CurrentStateOutput(), _algorithm);
@@ -397,7 +391,7 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
ResourceControllerDataProvider clusterData = setupClusterDataCache();
// The input resource Map shall contain all the valid resources.
Map<String, Resource> resourceMap = clusterData.getIdealStates().keySet().stream().collect(
- Collectors.toMap(resourceName -> resourceName, resourceName -> new Resource(resourceName)));
+ Collectors.toMap(resourceName -> resourceName, Resource::new));
try {
rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput());
Assert.fail("Rebalance shall fail.");
@@ -417,10 +411,9 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
ResourceControllerDataProvider clusterData = setupClusterDataCache();
Map<String, Resource> resourceMap = clusterData.getIdealStates().entrySet().stream()
- .collect(Collectors.toMap(entry -> entry.getKey(), entry -> {
+ .collect(Collectors.toMap(Map.Entry::getKey, entry -> {
Resource resource = new Resource(entry.getKey());
- entry.getValue().getPartitionSet().stream()
- .forEach(partition -> resource.addPartition(partition));
+ entry.getValue().getPartitionSet().forEach(resource::addPartition);
return resource;
}));
// Rebalance with normal configuration. So the assignment will be persisted in the metadata store.
@@ -450,7 +443,7 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
// Ensure failure has been recorded
Assert.assertEquals(rebalancer.getMetricCollector().getMetric(
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.RebalanceFailureCounter.name(),
- CountMetric.class).getValue().longValue(), 1l);
+ CountMetric.class).getValue().longValue(), 1L);
}
@Test(dependsOnMethods = "testRebalance")
@@ -479,10 +472,9 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
clusterData.setClusterConfig(clusterConfig);
Map<String, Resource> resourceMap = clusterData.getIdealStates().entrySet().stream()
- .collect(Collectors.toMap(entry -> entry.getKey(), entry -> {
+ .collect(Collectors.toMap(Map.Entry::getKey, entry -> {
Resource resource = new Resource(entry.getKey());
- entry.getValue().getPartitionSet().stream()
- .forEach(partition -> resource.addPartition(partition));
+ entry.getValue().getPartitionSet().forEach(resource::addPartition);
return resource;
}));
@@ -686,7 +678,12 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
// Populate best possible assignment
rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput());
verify(rebalancer, times(1)).requireRebalanceOverwrite(any(), any());
- verify(rebalancer, times(0)).applyRebalanceOverwrite(any(), any(), any(), any(), any());
+ Assert.assertEquals(rebalancer.getMetricCollector().getMetric(
+ WagedRebalancerMetricCollector.WagedRebalancerMetricNames.RebalanceOverwriteCounter.name(),
+ CountMetric.class).getValue().longValue(), 0L);
+ Assert.assertEquals(rebalancer.getMetricCollector().getMetric(
+ WagedRebalancerMetricCollector.WagedRebalancerMetricNames.RebalanceOverwriteLatencyGauge.name(),
+ LatencyMetric.class).getLastEmittedMetricValue().longValue(), -1L);
// Set minActiveReplica to 1 so that requireRebalanceOverwrite returns true
for (String resource : _resourceNames) {
@@ -708,7 +705,94 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
clusterData.setClusterConfig(clusterConfig);
rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput());
verify(rebalancer, times(2)).requireRebalanceOverwrite(any(), any());
- verify(rebalancer, times(1)).applyRebalanceOverwrite(any(), any(), any(), any(), any());
+ Assert.assertEquals(rebalancer.getMetricCollector().getMetric(
+ WagedRebalancerMetricCollector.WagedRebalancerMetricNames.RebalanceOverwriteCounter.name(),
+ CountMetric.class).getValue().longValue(), 1L);
+ Assert.assertTrue(rebalancer.getMetricCollector()
+ .getMetric(WagedRebalancerMetricCollector.WagedRebalancerMetricNames.RebalanceOverwriteLatencyGauge.name(),
+ LatencyMetric.class).getLastEmittedMetricValue() > 0L);
+ }
+
+ @Test(dependsOnMethods = "testRebalanceOverwriteTrigger")
+ public void testRebalanceOverwrite() throws HelixRebalanceException, IOException {
+ _metadataStore.reset();
+
+ ResourceControllerDataProvider clusterData = setupClusterDataCache();
+ // Enable delay rebalance
+ ClusterConfig clusterConfig = clusterData.getClusterConfig();
+ clusterConfig.setDelayRebalaceEnabled(true);
+ clusterConfig.setRebalanceDelayTime(1);
+ clusterData.setClusterConfig(clusterConfig);
+
+ String instance0 = _testInstanceId;
+ String instance1 = instance0 + "1";
+ String instance2 = instance0 + "2";
+ String offlineInstance = "offlineInstance";
+
+ // force create a fake offlineInstance that's in delay window
+ Set<String> instances = new HashSet<>(_instances);
+ instances.add(offlineInstance);
+ when(clusterData.getAllInstances()).thenReturn(instances);
+ when(clusterData.getEnabledInstances()).thenReturn(instances);
+ when(clusterData.getEnabledLiveInstances()).thenReturn(Set.of(instance0, instance1, instance2));
+ Map<String, Long> instanceOfflineTimeMap = new HashMap<>();
+ instanceOfflineTimeMap.put(offlineInstance, System.currentTimeMillis() + Integer.MAX_VALUE);
+ when(clusterData.getInstanceOfflineTimeMap()).thenReturn(instanceOfflineTimeMap);
+ Map<String, InstanceConfig> instanceConfigMap = clusterData.getInstanceConfigMap();
+ instanceConfigMap.put(offlineInstance, createMockInstanceConfig(offlineInstance));
+ when(clusterData.getInstanceConfigMap()).thenReturn(instanceConfigMap);
+
+ Map<String, IdealState> isMap = new HashMap<>();
+ for (String resource : _resourceNames) {
+ IdealState idealState = clusterData.getIdealState(resource);
+ idealState.setMinActiveReplicas(2);
+ isMap.put(resource, idealState);
+ }
+ when(clusterData.getIdealState(anyString())).thenAnswer(
+ (Answer<IdealState>) invocationOnMock -> isMap.get(invocationOnMock.getArguments()[0]));
+ when(clusterData.getIdealStates()).thenReturn(isMap);
+
+ MockRebalanceAlgorithm algorithm = new MockRebalanceAlgorithm();
+ WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, algorithm, Optional.empty());
+
+ // Cluster config change will trigger baseline to be recalculated.
+ when(clusterData.getRefreshedChangeTypes())
+ .thenReturn(Collections.singleton(HelixConstants.ChangeType.CLUSTER_CONFIG));
+ Map<String, Resource> resourceMap =
+ clusterData.getIdealStates().entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> {
+ Resource resource = new Resource(entry.getKey());
+ entry.getValue().getPartitionSet().forEach(resource::addPartition);
+ return resource;
+ }));
+
+ Map<String, Map<String, Map<String, String>>> input = ImmutableMap.of(
+ _resourceNames.get(0),
+ ImmutableMap.of(
+ _partitionNames.get(0), ImmutableMap.of(instance1, "MASTER", instance2, "SLAVE"),
+ _partitionNames.get(1), ImmutableMap.of(instance2, "MASTER", offlineInstance, "OFFLINE"), // Partition2-SLAVE
+ _partitionNames.get(2), ImmutableMap.of(instance1, "SLAVE", instance2, "MASTER"),
+ _partitionNames.get(3), ImmutableMap.of(instance1, "SLAVE", instance2, "SLAVE")),
+ _resourceNames.get(1),
+ ImmutableMap.of(
+ _partitionNames.get(0), ImmutableMap.of(instance1, "MASTER", instance2, "SLAVE"),
+ _partitionNames.get(1), ImmutableMap.of(instance1, "MASTER", instance2, "SLAVE"),
+ _partitionNames.get(2), ImmutableMap.of(instance1, "MASTER", instance2, "SLAVE"),
+ _partitionNames.get(3), ImmutableMap.of(offlineInstance, "OFFLINE", instance2, "SLAVE")) // Partition4-MASTER
+ );
+ CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+ input.forEach((resource, inputMap) ->
+ inputMap.forEach((partition, stateInstance) ->
+ stateInstance.forEach((tmpInstance, state) ->
+ currentStateOutput.setCurrentState(resource, new Partition(partition), tmpInstance, state))));
+ rebalancer.setPartialRebalanceAsyncMode(true);
+ Map<String, IdealState> newIdealStates =
+ rebalancer.computeNewIdealStates(clusterData, resourceMap, currentStateOutput);
+ Assert.assertEquals(newIdealStates.get(_resourceNames.get(0)).getPreferenceLists().size(), 4);
+ Assert.assertEquals(newIdealStates.get(_resourceNames.get(1)).getPreferenceLists().size(), 4);
+ Assert.assertEquals(newIdealStates.get(_resourceNames.get(0)).getPreferenceList(_partitionNames.get(1)).size(), 3);
+ Assert.assertEquals(newIdealStates.get(_resourceNames.get(0)).getPreferenceList(_partitionNames.get(3)).size(), 2);
+ Assert.assertEquals(newIdealStates.get(_resourceNames.get(1)).getPreferenceList(_partitionNames.get(3)).size(), 3);
+ Assert.assertEquals(newIdealStates.get(_resourceNames.get(1)).getPreferenceList(_partitionNames.get(0)).size(), 2);
}
@Test(dependsOnMethods = "testRebalance")
@@ -718,10 +802,9 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
// Generate the input for the rebalancer.
ResourceControllerDataProvider clusterData = setupClusterDataCache();
Map<String, Resource> resourceMap = clusterData.getIdealStates().entrySet().stream()
- .collect(Collectors.toMap(entry -> entry.getKey(), entry -> {
+ .collect(Collectors.toMap(Map.Entry::getKey, entry -> {
Resource resource = new Resource(entry.getKey());
- entry.getValue().getPartitionSet().stream()
- .forEach(partition -> resource.addPartition(partition));
+ entry.getValue().getPartitionSet().forEach(resource::addPartition);
return resource;
}));
// Mocking the change types for triggering a baseline rebalance.
@@ -764,7 +847,7 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
IdealState is = newIdealStates.get(resourceName);
ResourceAssignment assignment = expectedResult.get(resourceName);
Assert.assertEquals(is.getPartitionSet(), new HashSet<>(assignment.getMappedPartitions()
- .stream().map(partition -> partition.getPartitionName()).collect(Collectors.toSet())));
+ .stream().map(Partition::getPartitionName).collect(Collectors.toSet())));
for (String partitionName : is.getPartitionSet()) {
Assert.assertEquals(is.getInstanceStateMap(partitionName),
assignment.getReplicaMap(new Partition(partitionName)));
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
index 997232a5a..b9c4ce39d 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
@@ -40,7 +40,7 @@ import org.apache.helix.model.ResourceConfig;
import org.mockito.Mockito;
import org.testng.annotations.BeforeClass;
-import static org.mockito.Matchers.any;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
public abstract class AbstractTestClusterModel {
@@ -52,6 +52,7 @@ public abstract class AbstractTestClusterModel {
protected Map<String, List<String>> _disabledPartitionsMap;
protected List<String> _testInstanceTags;
protected String _testFaultZoneId;
+ protected Set<String> _instances;
@BeforeClass
public void initialize() {
@@ -75,6 +76,8 @@ public abstract class AbstractTestClusterModel {
_testInstanceTags = new ArrayList<>();
_testInstanceTags.add("TestTag");
_testFaultZoneId = "testZone";
+ _instances = new HashSet<>();
+ _instances.add(_testInstanceId);
}
protected InstanceConfig createMockInstanceConfig(String instanceId) {
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
index fc316c2a5..500cfb0b6 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
@@ -45,23 +45,14 @@ import org.apache.helix.model.ResourceConfig;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.when;
public class TestClusterModelProvider extends AbstractTestClusterModel {
- Set<String> _instances;
Map<String, ResourceConfig> _resourceConfigMap = new HashMap<>();
- @BeforeClass
- public void initialize() {
- super.initialize();
- _instances = new HashSet<>();
- _instances.add(_testInstanceId);
- }
-
@Override
protected ResourceControllerDataProvider setupClusterDataCache() throws IOException {
ResourceControllerDataProvider testCache = super.setupClusterDataCache();