You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2019/10/07 21:12:22 UTC
[helix] 19/37: Implement the WAGED rebalancer with the limited
functionality. (#443)
This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch wagedRebalancer2
in repository https://gitbox.apache.org/repos/asf/helix.git
commit 702c547d987137102c3e9086516cce20fc1e3076
Author: Jiajun Wang <18...@users.noreply.github.com>
AuthorDate: Mon Sep 9 10:35:59 2019 -0700
Implement the WAGED rebalancer with the limited functionality. (#443)
The implemented rebalancer supports basic rebalance logic. It does not contain the logic to support delayed rebalance and user-defined preference list.
Added unit test to cover the main workflow of the WAGED rebalancer.
---
.../org/apache/helix/HelixRebalanceException.java | 4 +-
.../rebalancer/waged/WagedRebalancer.java | 313 ++++++++++++++--
.../rebalancer/waged/model/AssignableNode.java | 7 +-
.../waged/MockAssignmentMetadataStore.java | 55 +++
.../rebalancer/waged/TestWagedRebalancer.java | 415 +++++++++++++++++++++
.../waged/constraints/MockRebalanceAlgorithm.java | 84 +++++
.../waged/model/AbstractTestClusterModel.java | 4 +-
7 files changed, 850 insertions(+), 32 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/HelixRebalanceException.java b/helix-core/src/main/java/org/apache/helix/HelixRebalanceException.java
index d01fc60..a8b5055 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixRebalanceException.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixRebalanceException.java
@@ -33,12 +33,12 @@ public class HelixRebalanceException extends Exception {
private final Type _type;
public HelixRebalanceException(String message, Type type, Throwable cause) {
- super(String.format("%s. Failure Type: %s", message, type.name()), cause);
+ super(String.format("%s Failure Type: %s", message, type.name()), cause);
_type = type;
}
public HelixRebalanceException(String message, Type type) {
- super(String.format("%s. Failure Type: %s", message, type.name()));
+ super(String.format("%s Failure Type: %s", message, type.name()));
_type = type;
}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index 5b9634e..866c7c9 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -19,9 +19,18 @@ package org.apache.helix.controller.rebalancer.waged;
* under the License.
*/
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.helix.HelixConstants;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixRebalanceException;
import org.apache.helix.controller.changedetector.ResourceChangeDetector;
@@ -29,14 +38,18 @@ import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
import org.apache.helix.controller.rebalancer.waged.constraints.ConstraintBasedAlgorithmFactory;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterModel;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider;
+import org.apache.helix.controller.rebalancer.waged.model.OptimalAssignment;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceAssignment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * A placeholder before we have the implementation.
* Weight-Aware Globally-Even Distribute Rebalancer.
*
* @see <a href="https://github.com/apache/helix/wiki/Design-Proposal---Weight-Aware-Globally-Even-Distribute-Rebalancer">
@@ -46,50 +59,296 @@ import org.slf4j.LoggerFactory;
public class WagedRebalancer {
private static final Logger LOG = LoggerFactory.getLogger(WagedRebalancer.class);
+ // When any of the following change happens, the rebalancer needs to do a global rebalance which
+ // contains 1. baseline recalculate, 2. partial rebalance that is based on the new baseline.
+ private static final Set<HelixConstants.ChangeType> GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES =
+ Collections.unmodifiableSet(new HashSet<>(Arrays
+ .asList(HelixConstants.ChangeType.RESOURCE_CONFIG,
+ HelixConstants.ChangeType.CLUSTER_CONFIG,
+ HelixConstants.ChangeType.INSTANCE_CONFIG)));
+ // The cluster change detector is a stateful object.
+ // Make it static to avoid unnecessary reinitialization.
+ private static final ThreadLocal<ResourceChangeDetector> CHANGE_DETECTOR_THREAD_LOCAL =
+ new ThreadLocal<>();
+ private final MappingCalculator<ResourceControllerDataProvider> _mappingCalculator;
+
// --------- The following fields are placeholders and need replacement. -----------//
// TODO Shall we make the metadata store a static threadlocal object as well to avoid reinitialization?
private final AssignmentMetadataStore _assignmentMetadataStore;
private final RebalanceAlgorithm _rebalanceAlgorithm;
// ------------------------------------------------------------------------------------//
- // The cluster change detector is a stateful object. Make it static to avoid unnecessary
- // reinitialization.
- private static final ThreadLocal<ResourceChangeDetector> CHANGE_DETECTOR_THREAD_LOCAL =
- new ThreadLocal<>();
- private final MappingCalculator<ResourceControllerDataProvider> _mappingCalculator;
+ public WagedRebalancer(HelixManager helixManager) {
+ this(
+ // TODO init the metadata store according to their requirement when integrate,
+ // or change to final static method if possible.
+ new AssignmentMetadataStore(),
+ // TODO parse the cluster setting
+ ConstraintBasedAlgorithmFactory.getInstance(),
+ // Use DelayedAutoRebalancer as the mapping calculator for the final assignment output.
+ // Mapping calculator will translate the best possible assignment into the applicable state
+ // mapping based on the current states.
+ // TODO abstract and separate the main mapping calculator logic from DelayedAutoRebalancer
+ new DelayedAutoRebalancer());
+ }
- private ResourceChangeDetector getChangeDetector() {
- if (CHANGE_DETECTOR_THREAD_LOCAL.get() == null) {
- CHANGE_DETECTOR_THREAD_LOCAL.set(new ResourceChangeDetector());
- }
- return CHANGE_DETECTOR_THREAD_LOCAL.get();
+ private WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
+ RebalanceAlgorithm algorithm, MappingCalculator mappingCalculator) {
+ _assignmentMetadataStore = assignmentMetadataStore;
+ _rebalanceAlgorithm = algorithm;
+ _mappingCalculator = mappingCalculator;
}
- public WagedRebalancer(HelixManager helixManager) {
- // TODO init the metadata store according to their requirement when integrate, or change to final static method if possible.
- _assignmentMetadataStore = new AssignmentMetadataStore();
- // TODO parse the cluster setting
- _rebalanceAlgorithm = ConstraintBasedAlgorithmFactory.getInstance();
-
- // Use the mapping calculator in DelayedAutoRebalancer for calculating the final assignment
- // output.
- // This calculator will translate the best possible assignment into an applicable state mapping
- // based on the current states.
- // TODO abstract and separate the mapping calculator logic from the DelayedAutoRebalancer
- _mappingCalculator = new DelayedAutoRebalancer();
+ @VisibleForTesting
+ protected WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
+ RebalanceAlgorithm algorithm) {
+ this(assignmentMetadataStore, algorithm, new DelayedAutoRebalancer());
}
/**
- * Compute the new IdealStates for all the resources input. The IdealStates include both the new
+ * Compute the new IdealStates for all the input resources. The IdealStates include both new
* partition assignment (in the listFiles) and the new replica state mapping (in the mapFields).
+ *
* @param clusterData The Cluster status data provider.
* @param resourceMap A map containing all the rebalancing resources.
- * @param currentStateOutput The present Current State of the cluster.
- * @return A map containing the computed new IdealStates.
+ * @param currentStateOutput The present Current States of the resources.
+ * @return A map of the new IdealStates with the resource name as key.
*/
public Map<String, IdealState> computeNewIdealStates(ResourceControllerDataProvider clusterData,
Map<String, Resource> resourceMap, final CurrentStateOutput currentStateOutput)
throws HelixRebalanceException {
- return new HashMap<>();
+ LOG.info("Start computing new ideal states for resources: {}", resourceMap.keySet().toString());
+
+ // Find the compatible resources: 1. FULL_AUTO 2. Configured to use the WAGED rebalancer
+ resourceMap = resourceMap.entrySet().stream().filter(resourceEntry -> {
+ IdealState is = clusterData.getIdealState(resourceEntry.getKey());
+ return is != null && is.getRebalanceMode().equals(IdealState.RebalanceMode.FULL_AUTO)
+ && getClass().getName().equals(is.getRebalancerClassName());
+ }).collect(Collectors
+ .toMap(resourceEntry -> resourceEntry.getKey(), resourceEntry -> resourceEntry.getValue()));
+
+ if (resourceMap.isEmpty()) {
+ LOG.warn("There is no valid resource to be rebalanced by {}",
+ this.getClass().getSimpleName());
+ return Collections.emptyMap();
+ } else {
+ LOG.info("Valid resources that will be rebalanced by {}: {}", this.getClass().getSimpleName(),
+ resourceMap.keySet().toString());
+ }
+
+ // Calculate the target assignment based on the current cluster status.
+ Map<String, IdealState> newIdealStates = computeBestPossibleStates(clusterData, resourceMap);
+
+ // Construct the new best possible states according to the current state and target assignment.
+ // Note that the new ideal state might be an intermediate state between the current state and the target assignment.
+ for (IdealState is : newIdealStates.values()) {
+ String resourceName = is.getResourceName();
+ // Adjust the states according to the current state.
+ ResourceAssignment finalAssignment = _mappingCalculator
+ .computeBestPossiblePartitionState(clusterData, is, resourceMap.get(resourceName),
+ currentStateOutput);
+
+ // Clean up the state mapping fields. Use the final assignment that is calculated by the
+ // mapping calculator to replace them.
+ is.getRecord().getMapFields().clear();
+ for (Partition partition : finalAssignment.getMappedPartitions()) {
+ Map<String, String> newStateMap = finalAssignment.getReplicaMap(partition);
+ // if the final states cannot be generated, override the best possible state with empty map.
+ is.setInstanceStateMap(partition.getPartitionName(),
+ newStateMap == null ? Collections.emptyMap() : newStateMap);
+ }
+ }
+
+ LOG.info("Finish computing new ideal states for resources: {}",
+ resourceMap.keySet().toString());
+ return newIdealStates;
+ }
+
+ // Coordinate baseline recalculation and partial rebalance according to the cluster changes.
+ private Map<String, IdealState> computeBestPossibleStates(
+ ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap)
+ throws HelixRebalanceException {
+ getChangeDetector().updateSnapshots(clusterData);
+ // Get all the modified and new items' information
+ Map<HelixConstants.ChangeType, Set<String>> clusterChanges =
+ getChangeDetector().getChangeTypes().stream()
+ .collect(Collectors.toMap(changeType -> changeType, changeType -> {
+ Set<String> itemKeys = new HashSet<>();
+ itemKeys.addAll(getChangeDetector().getAdditionsByType(changeType));
+ itemKeys.addAll(getChangeDetector().getChangesByType(changeType));
+ return itemKeys;
+ }));
+
+ if (clusterChanges.keySet().stream()
+ .anyMatch(changeType -> GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES.contains(changeType))) {
+ refreshBaseline(clusterData, clusterChanges, resourceMap);
+ // Inject a cluster config change for large scale partial rebalance once the baseline changed.
+ clusterChanges.putIfAbsent(HelixConstants.ChangeType.CLUSTER_CONFIG, Collections.emptySet());
+ }
+
+ Map<String, ResourceAssignment> newAssignment =
+ partialRebalance(clusterData, clusterChanges, resourceMap);
+
+ // Convert the assignments into IdealState for the following state mapping calculation.
+ Map<String, IdealState> finalIdealState = new HashMap<>();
+ for (String resourceName : newAssignment.keySet()) {
+ IdealState newIdeaState;
+ try {
+ IdealState currentIdealState = clusterData.getIdealState(resourceName);
+ Map<String, Integer> statePriorityMap =
+ clusterData.getStateModelDef(currentIdealState.getStateModelDefRef())
+ .getStatePriorityMap();
+ // Create a new IdealState instance contains the new calculated assignment in the preference list.
+ newIdeaState = generateIdealStateWithAssignment(resourceName, currentIdealState,
+ newAssignment.get(resourceName), statePriorityMap);
+ } catch (Exception ex) {
+ throw new HelixRebalanceException(
+ "Fail to calculate the new IdealState for resource: " + resourceName,
+ HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
+ }
+ finalIdealState.put(resourceName, newIdeaState);
+ }
+ return finalIdealState;
+ }
+
+ // TODO make the Baseline calculation async if complicated algorithm is used for the Baseline
+ private void refreshBaseline(ResourceControllerDataProvider clusterData,
+ Map<HelixConstants.ChangeType, Set<String>> clusterChanges, Map<String, Resource> resourceMap)
+ throws HelixRebalanceException {
+ // For baseline calculation
+ // 1. Ignore node status (disable/offline).
+ // 2. Use the baseline as the previous best possible assignment since there is no "baseline" for
+ // the baseline.
+ LOG.info("Start calculating the new baseline.");
+ Map<String, ResourceAssignment> currentBaseline;
+ try {
+ currentBaseline = _assignmentMetadataStore.getBaseline();
+ } catch (Exception ex) {
+ throw new HelixRebalanceException("Failed to get the current baseline assignment.",
+ HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
+ }
+ Map<String, ResourceAssignment> baseline =
+ calculateAssignment(clusterData, clusterChanges, resourceMap, clusterData.getAllInstances(),
+ Collections.emptyMap(), currentBaseline);
+ try {
+ _assignmentMetadataStore.persistBaseline(baseline);
+ } catch (Exception ex) {
+ throw new HelixRebalanceException("Failed to persist the new baseline assignment.",
+ HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
+ }
+ LOG.info("Finish calculating the new baseline.");
+ }
+
+ private Map<String, ResourceAssignment> partialRebalance(
+ ResourceControllerDataProvider clusterData,
+ Map<HelixConstants.ChangeType, Set<String>> clusterChanges, Map<String, Resource> resourceMap)
+ throws HelixRebalanceException {
+ LOG.info("Start calculating the new best possible assignment.");
+ Set<String> activeInstances = clusterData.getEnabledLiveInstances();
+ Map<String, ResourceAssignment> baseline;
+ Map<String, ResourceAssignment> prevBestPossibleAssignment;
+ try {
+ baseline = _assignmentMetadataStore.getBaseline();
+ prevBestPossibleAssignment = _assignmentMetadataStore.getBestPossibleAssignment();
+ } catch (Exception ex) {
+ throw new HelixRebalanceException("Failed to get the persisted assignment records.",
+ HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
+ }
+ Map<String, ResourceAssignment> newAssignment =
+ calculateAssignment(clusterData, clusterChanges, resourceMap, activeInstances, baseline,
+ prevBestPossibleAssignment);
+ try {
+ // TODO Test to confirm if persisting the final assignment (with final partition states)
+ // would be a better option.
+ _assignmentMetadataStore.persistBestPossibleAssignment(newAssignment);
+ } catch (Exception ex) {
+ throw new HelixRebalanceException("Failed to persist the new best possible assignment.",
+ HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
+ }
+ LOG.info("Finish calculating the new best possible assignment.");
+ return newAssignment;
+ }
+
+ /**
+ * Generate the cluster model based on the input and calculate the optimal assignment.
+ *
+ * @param clusterData the cluster data cache.
+ * @param clusterChanges the detected cluster changes.
+ * @param resourceMap the rebalancing resources.
+ * @param activeNodes the alive and enabled nodes.
+ * @param baseline the baseline assignment for the algorithm as a reference.
+ * @param prevBestPossibleAssignment the previous best possible assignment for the algorithm as a reference.
+ * @return the new optimal assignment for the resources.
+ */
+ private Map<String, ResourceAssignment> calculateAssignment(
+ ResourceControllerDataProvider clusterData,
+ Map<HelixConstants.ChangeType, Set<String>> clusterChanges, Map<String, Resource> resourceMap,
+ Set<String> activeNodes, Map<String, ResourceAssignment> baseline,
+ Map<String, ResourceAssignment> prevBestPossibleAssignment) throws HelixRebalanceException {
+ long startTime = System.currentTimeMillis();
+ LOG.info("Start calculating for an assignment");
+ ClusterModel clusterModel;
+ try {
+ clusterModel = ClusterModelProvider
+ .generateClusterModel(clusterData, resourceMap, activeNodes, clusterChanges, baseline,
+ prevBestPossibleAssignment);
+ } catch (Exception ex) {
+ throw new HelixRebalanceException("Failed to generate cluster model.",
+ HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
+ }
+
+ OptimalAssignment optimalAssignment = _rebalanceAlgorithm.calculate(clusterModel);
+ Map<String, ResourceAssignment> newAssignment =
+ optimalAssignment.getOptimalResourceAssignment();
+
+ LOG.info("Finish calculating. Time spent: {}ms.", System.currentTimeMillis() - startTime);
+ return newAssignment;
+ }
+
+ private ResourceChangeDetector getChangeDetector() {
+ if (CHANGE_DETECTOR_THREAD_LOCAL.get() == null) {
+ CHANGE_DETECTOR_THREAD_LOCAL.set(new ResourceChangeDetector());
+ }
+ return CHANGE_DETECTOR_THREAD_LOCAL.get();
+ }
+
+ // Generate a new IdealState based on the input newAssignment.
+ // The assignment will be propagate to the preference lists.
+ // Note that we will recalculate the states based on the current state, so there is no need to
+ // update the mapping fields in the IdealState output.
+ private IdealState generateIdealStateWithAssignment(String resourceName,
+ IdealState currentIdealState, ResourceAssignment newAssignment,
+ Map<String, Integer> statePriorityMap) {
+ IdealState newIdealState = new IdealState(resourceName);
+ // Copy the simple fields
+ newIdealState.getRecord().setSimpleFields(currentIdealState.getRecord().getSimpleFields());
+ // Sort the preference list according to state priority.
+ newIdealState.setPreferenceLists(getPreferenceLists(newAssignment, statePriorityMap));
+ // Note the state mapping in the new assignment won't be directly propagate to the map fields.
+ // The rebalancer will calculate for the final state mapping considering the current states.
+ return newIdealState;
+ }
+
+ // Generate the preference lists from the state mapping based on state priority.
+ private Map<String, List<String>> getPreferenceLists(ResourceAssignment newAssignment,
+ Map<String, Integer> statePriorityMap) {
+ Map<String, List<String>> preferenceList = new HashMap<>();
+ for (Partition partition : newAssignment.getMappedPartitions()) {
+ List<String> nodes = new ArrayList<>(newAssignment.getReplicaMap(partition).keySet());
+ // To ensure backward compatibility, sort the preference list according to state priority.
+ nodes.sort((node1, node2) -> {
+ int statePriority1 =
+ statePriorityMap.get(newAssignment.getReplicaMap(partition).get(node1));
+ int statePriority2 =
+ statePriorityMap.get(newAssignment.getReplicaMap(partition).get(node2));
+ if (statePriority1 == statePriority2) {
+ return node1.compareTo(node2);
+ } else {
+ return statePriority1 - statePriority2;
+ }
+ });
+ preferenceList.put(partition.getPartitionName(), nodes);
+ }
+ return preferenceList;
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
index 33677e5..4141d20 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
@@ -41,7 +41,7 @@ import static java.lang.Math.max;
* This class represents a possible allocation of the replication.
* Note that any usage updates to the AssignableNode are not thread safe.
*/
-public class AssignableNode {
+public class AssignableNode implements Comparable<AssignableNode> {
private static final Logger LOG = LoggerFactory.getLogger(AssignableNode.class.getName());
// basic node information
@@ -384,4 +384,9 @@ public class AssignableNode {
public int hashCode() {
return _instanceName.hashCode();
}
+
+ @Override
+ public int compareTo(AssignableNode o) {
+ return _instanceName.compareTo(o.getInstanceName());
+ }
}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java
new file mode 100644
index 0000000..ea8c164
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java
@@ -0,0 +1,55 @@
+package org.apache.helix.controller.rebalancer.waged;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.model.ResourceAssignment;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A mock up metadata store for unit test.
+ * This mock datastore persist assignments in memory only.
+ */
+public class MockAssignmentMetadataStore extends AssignmentMetadataStore {
+ private Map<String, ResourceAssignment> _persistGlobalBaseline = new HashMap<>();
+ private Map<String, ResourceAssignment> _persistBestPossibleAssignment = new HashMap<>();
+
+ public Map<String, ResourceAssignment> getBaseline() {
+ return _persistGlobalBaseline;
+ }
+
+ public void persistBaseline(Map<String, ResourceAssignment> globalBaseline) {
+ _persistGlobalBaseline = globalBaseline;
+ }
+
+ public Map<String, ResourceAssignment> getBestPossibleAssignment() {
+ return _persistBestPossibleAssignment;
+ }
+
+ public void persistBestPossibleAssignment(Map<String, ResourceAssignment> bestPossibleAssignment) {
+ _persistBestPossibleAssignment = bestPossibleAssignment;
+ }
+
+ public void clearMetadataStore() {
+ _persistBestPossibleAssignment.clear();
+ _persistGlobalBaseline.clear();
+ }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
new file mode 100644
index 0000000..6759a10
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
@@ -0,0 +1,415 @@
+package org.apache.helix.controller.rebalancer.waged;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.helix.HelixConstants;
+import org.apache.helix.HelixRebalanceException;
+import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
+import org.apache.helix.controller.rebalancer.waged.constraints.MockRebalanceAlgorithm;
+import org.apache.helix.controller.rebalancer.waged.model.AbstractTestClusterModel;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceAssignment;
+import org.mockito.Mockito;
+import org.mockito.stubbing.Answer;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.when;
+
+public class TestWagedRebalancer extends AbstractTestClusterModel {
+ private Set<String> _instances;
+ private MockRebalanceAlgorithm _algorithm;
+
+ @BeforeClass
+ public void initialize() {
+ super.initialize();
+ _instances = new HashSet<>();
+ _instances.add(_testInstanceId);
+ _algorithm = new MockRebalanceAlgorithm();
+ }
+
+ @Override
+ protected ResourceControllerDataProvider setupClusterDataCache() throws IOException {
+ ResourceControllerDataProvider testCache = super.setupClusterDataCache();
+
+ // Set up mock idealstate
+ Map<String, IdealState> isMap = new HashMap<>();
+ for (String resource : _resourceNames) {
+ IdealState is = new IdealState(resource);
+ is.setNumPartitions(_partitionNames.size());
+ is.setRebalanceMode(IdealState.RebalanceMode.FULL_AUTO);
+ is.setStateModelDefRef("MasterSlave");
+ is.setReplicas("3");
+ is.setRebalancerClassName(WagedRebalancer.class.getName());
+ _partitionNames.stream()
+ .forEach(partition -> is.setPreferenceList(partition, Collections.emptyList()));
+ isMap.put(resource, is);
+ }
+ when(testCache.getIdealState(anyString())).thenAnswer(
+ (Answer<IdealState>) invocationOnMock -> isMap.get(invocationOnMock.getArguments()[0]));
+ when(testCache.getIdealStates()).thenReturn(isMap);
+
+ // Set up 2 more instances
+ for (int i = 1; i < 3; i++) {
+ String instanceName = _testInstanceId + i;
+ _instances.add(instanceName);
+ // 1. Set up the default instance information with capacity configuration.
+ InstanceConfig testInstanceConfig = createMockInstanceConfig(instanceName);
+ Map<String, InstanceConfig> instanceConfigMap = testCache.getInstanceConfigMap();
+ instanceConfigMap.put(instanceName, testInstanceConfig);
+ when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap);
+ // 2. Mock the live instance node for the default instance.
+ LiveInstance testLiveInstance = createMockLiveInstance(instanceName);
+ Map<String, LiveInstance> liveInstanceMap = testCache.getLiveInstances();
+ liveInstanceMap.put(instanceName, testLiveInstance);
+ when(testCache.getLiveInstances()).thenReturn(liveInstanceMap);
+ when(testCache.getEnabledInstances()).thenReturn(liveInstanceMap.keySet());
+ when(testCache.getEnabledLiveInstances()).thenReturn(liveInstanceMap.keySet());
+ }
+
+ return testCache;
+ }
+
+ @Test
+ public void testRebalance() throws IOException, HelixRebalanceException {
+ // Init mock metadatastore for the unit test
+ MockAssignmentMetadataStore metadataStore = new MockAssignmentMetadataStore();
+ WagedRebalancer rebalancer = new WagedRebalancer(metadataStore, _algorithm);
+
+ // Generate the input for the rebalancer.
+ ResourceControllerDataProvider clusterData = setupClusterDataCache();
+ Map<String, Resource> resourceMap = clusterData.getIdealStates().entrySet().stream()
+ .collect(Collectors.toMap(entry -> entry.getKey(), entry -> {
+ Resource resource = new Resource(entry.getKey());
+ entry.getValue().getPartitionSet().stream()
+ .forEach(partition -> resource.addPartition(partition));
+ return resource;
+ }));
+ Map<String, IdealState> newIdealStates =
+ rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput());
+ Map<String, ResourceAssignment> algorithmResult = _algorithm.getRebalanceResult();
+ // Since there is no special condition, the calculated IdealStates should be exactly the same
+ // as the mock algorithm result.
+ validateRebalanceResult(resourceMap, newIdealStates, algorithmResult);
+ }
+
+ @Test(dependsOnMethods = "testRebalance")
+ public void testPartialRebalance() throws IOException, HelixRebalanceException {
+ // Init mock metadatastore for the unit test
+ MockAssignmentMetadataStore metadataStore = new MockAssignmentMetadataStore();
+ WagedRebalancer rebalancer = new WagedRebalancer(metadataStore, _algorithm);
+
+ // Generate the input for the rebalancer.
+ ResourceControllerDataProvider clusterData = setupClusterDataCache();
+ Map<String, Resource> resourceMap = clusterData.getIdealStates().entrySet().stream()
+ .collect(Collectors.toMap(entry -> entry.getKey(), entry -> {
+ Resource resource = new Resource(entry.getKey());
+ entry.getValue().getPartitionSet().stream()
+ .forEach(partition -> resource.addPartition(partition));
+ return resource;
+ }));
+
+ // Test with partial resources listed in the resourceMap input.
+ // Remove the first resource from the input. Note it still exists in the cluster data cache.
+ metadataStore.clearMetadataStore();
+ resourceMap.remove(_resourceNames.get(0));
+ Map<String, IdealState> newIdealStates =
+ rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput());
+ Map<String, ResourceAssignment> algorithmResult = _algorithm.getRebalanceResult();
+ validateRebalanceResult(resourceMap, newIdealStates, algorithmResult);
+ }
+
+ @Test(dependsOnMethods = "testRebalance")
+ public void testRebalanceWithCurrentState() throws IOException, HelixRebalanceException {
+ // Init mock metadatastore for the unit test
+ MockAssignmentMetadataStore metadataStore = new MockAssignmentMetadataStore();
+ WagedRebalancer rebalancer = new WagedRebalancer(metadataStore, _algorithm);
+
+ // Generate the input for the rebalancer.
+ ResourceControllerDataProvider clusterData = setupClusterDataCache();
+ Map<String, Resource> resourceMap = clusterData.getIdealStates().entrySet().stream()
+ .collect(Collectors.toMap(entry -> entry.getKey(), entry -> {
+ Resource resource = new Resource(entry.getKey());
+ entry.getValue().getPartitionSet().stream()
+ .forEach(partition -> resource.addPartition(partition));
+ return resource;
+ }));
+
+ // Test with current state exists, so the rebalancer should calculate for the intermediate state
+ // Create current state based on the cluster data cache.
+ CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+ for (String instanceName : _instances) {
+ for (Map.Entry<String, CurrentState> csEntry : clusterData
+ .getCurrentState(instanceName, _sessionId).entrySet()) {
+ String resourceName = csEntry.getKey();
+ CurrentState cs = csEntry.getValue();
+ for (Map.Entry<String, String> partitionStateEntry : cs.getPartitionStateMap().entrySet()) {
+ currentStateOutput
+ .setCurrentState(resourceName, new Partition(partitionStateEntry.getKey()),
+ instanceName, partitionStateEntry.getValue());
+ }
+ }
+ }
+
+ // The state calculation will be adjusted based on the current state.
+ // So test the following cases:
+ // 1.1. Disable a resource, and the partitions in CS will be offline.
+ String disabledResourceName = _resourceNames.get(0);
+ clusterData.getIdealState(disabledResourceName).enable(false);
+ // 1.2. Adding more unknown partitions to the CS, so they will be dropped.
+ String droppingResourceName = _resourceNames.get(1);
+ String droppingPartitionName = "UnknownPartition";
+ String droppingFromInstance = _testInstanceId;
+ currentStateOutput.setCurrentState(droppingResourceName, new Partition(droppingPartitionName),
+ droppingFromInstance, "SLAVE");
+ resourceMap.get(droppingResourceName).addPartition(droppingPartitionName);
+
+ Map<String, IdealState> newIdealStates =
+ rebalancer.computeNewIdealStates(clusterData, resourceMap, currentStateOutput);
+ // All the replica state should be OFFLINE
+ IdealState disabledIdealState = newIdealStates.get(disabledResourceName);
+ for (String partition : disabledIdealState.getPartitionSet()) {
+ Assert.assertTrue(disabledIdealState.getInstanceStateMap(partition).values().stream()
+ .allMatch(state -> state.equals("OFFLINE")));
+ }
+ // the dropped partition should be dropped.
+ IdealState droppedIdealState = newIdealStates.get(droppingResourceName);
+ Assert.assertEquals(
+ droppedIdealState.getInstanceStateMap(droppingPartitionName).get(droppingFromInstance),
+ "DROPPED");
+ }
+
+ @Test(dependsOnMethods = "testRebalance")
+ public void testNonCompatibleConfiguration() throws IOException, HelixRebalanceException {
+ WagedRebalancer rebalancer = new WagedRebalancer(new MockAssignmentMetadataStore(), _algorithm);
+
+ ResourceControllerDataProvider clusterData = setupClusterDataCache();
+ String nonCompatibleResourceName = _resourceNames.get(0);
+ clusterData.getIdealState(nonCompatibleResourceName)
+ .setRebalancerClassName(CrushRebalanceStrategy.class.getName());
+ // The input resource Map shall contain all the valid resources.
+ Map<String, Resource> resourceMap = clusterData.getIdealStates().entrySet().stream()
+ .collect(Collectors.toMap(entry -> entry.getKey(), entry -> {
+ Resource resource = new Resource(entry.getKey());
+ entry.getValue().getPartitionSet().stream()
+ .forEach(partition -> resource.addPartition(partition));
+ return resource;
+ }));
+ Map<String, IdealState> newIdealStates =
+ rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput());
+ Map<String, ResourceAssignment> algorithmResult = _algorithm.getRebalanceResult();
+ // The output shall not contains the nonCompatibleResource.
+ resourceMap.remove(nonCompatibleResourceName);
+ validateRebalanceResult(resourceMap, newIdealStates, algorithmResult);
+ }
+
+ // TODO test with invalid capacity configuration which will fail the cluster model constructing.
+ @Test(dependsOnMethods = "testRebalance")
+ public void testInvalidClusterStatus() throws IOException {
+ WagedRebalancer rebalancer = new WagedRebalancer(new MockAssignmentMetadataStore(), _algorithm);
+
+ ResourceControllerDataProvider clusterData = setupClusterDataCache();
+ String invalidResource = _resourceNames.get(0);
+ // The state model does not exist
+ clusterData.getIdealState(invalidResource).setStateModelDefRef("foobar");
+ // The input resource Map shall contain all the valid resources.
+ Map<String, Resource> resourceMap = clusterData.getIdealStates().keySet().stream().collect(
+ Collectors.toMap(resourceName -> resourceName, resourceName -> new Resource(resourceName)));
+ try {
+ rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput());
+ Assert.fail("Rebalance shall fail.");
+ } catch (HelixRebalanceException ex) {
+ Assert.assertEquals(ex.getFailureType(), HelixRebalanceException.Type.INVALID_CLUSTER_STATUS);
+ Assert.assertEquals(ex.getMessage(),
+ "Failed to generate cluster model. Failure Type: INVALID_CLUSTER_STATUS");
+ }
+ }
+
+ @Test(dependsOnMethods = "testRebalance")
+ public void testInvalidRebalancerStatus() throws IOException {
+ // Mock a metadata store that will fail on all the calls.
+ AssignmentMetadataStore metadataStore = Mockito.mock(AssignmentMetadataStore.class);
+ when(metadataStore.getBaseline())
+ .thenThrow(new RuntimeException("Mock Error. Metadata store fails."));
+ WagedRebalancer rebalancer = new WagedRebalancer(metadataStore, _algorithm);
+
+ ResourceControllerDataProvider clusterData = setupClusterDataCache();
+ // The input resource Map shall contain all the valid resources.
+ Map<String, Resource> resourceMap = clusterData.getIdealStates().keySet().stream().collect(
+ Collectors.toMap(resourceName -> resourceName, resourceName -> new Resource(resourceName)));
+ try {
+ rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput());
+ Assert.fail("Rebalance shall fail.");
+ } catch (HelixRebalanceException ex) {
+ Assert.assertEquals(ex.getFailureType(),
+ HelixRebalanceException.Type.INVALID_REBALANCER_STATUS);
+ Assert.assertEquals(ex.getMessage(),
+ "Failed to get the persisted assignment records. Failure Type: INVALID_REBALANCER_STATUS");
+ }
+ }
+
+ @Test(dependsOnMethods = "testRebalance")
+ public void testAlgorithmExepction() throws IOException, HelixRebalanceException {
+ RebalanceAlgorithm badAlgorithm = Mockito.mock(RebalanceAlgorithm.class);
+ when(badAlgorithm.calculate(any())).thenThrow(new HelixRebalanceException("Algorithm fails.",
+ HelixRebalanceException.Type.FAILED_TO_CALCULATE));
+
+ WagedRebalancer rebalancer =
+ new WagedRebalancer(new MockAssignmentMetadataStore(), badAlgorithm);
+
+ ResourceControllerDataProvider clusterData = setupClusterDataCache();
+ Map<String, Resource> resourceMap = clusterData.getIdealStates().keySet().stream().collect(
+ Collectors.toMap(resourceName -> resourceName, resourceName -> new Resource(resourceName)));
+ try {
+ rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput());
+ Assert.fail("Rebalance shall fail.");
+ } catch (HelixRebalanceException ex) {
+ Assert.assertEquals(ex.getFailureType(), HelixRebalanceException.Type.FAILED_TO_CALCULATE);
+ Assert.assertEquals(ex.getMessage(), "Algorithm fails. Failure Type: FAILED_TO_CALCULATE");
+ }
+ }
+
+ @Test(dependsOnMethods = "testRebalance")
+ public void testRebalanceOnChanges() throws IOException, HelixRebalanceException {
+ // Test continuously rebalance with the same rebalancer with different internal state. Ensure
+ // that the rebalancer handles different input (different cluster changes) based on the internal
+ // state in a correct way.
+
+ // Note that this test relies on the MockRebalanceAlgorithm implementation. The mock algorithm
+ // won't propagate any existing assignment from the cluster model.
+
+ // Init mock metadatastore for the unit test
+ MockAssignmentMetadataStore metadataStore = new MockAssignmentMetadataStore();
+ WagedRebalancer rebalancer = new WagedRebalancer(metadataStore, _algorithm);
+
+ // 1. rebalance with baseline calculation done
+ // Generate the input for the rebalancer.
+ ResourceControllerDataProvider clusterData = setupClusterDataCache();
+ // Cluster config change will trigger baseline to be recalculated.
+ when(clusterData.getRefreshedChangeTypes())
+ .thenReturn(Collections.singleton(HelixConstants.ChangeType.CLUSTER_CONFIG));
+ Map<String, Resource> resourceMap = clusterData.getIdealStates().entrySet().stream()
+ .collect(Collectors.toMap(entry -> entry.getKey(), entry -> {
+ Resource resource = new Resource(entry.getKey());
+ entry.getValue().getPartitionSet().stream()
+ .forEach(partition -> resource.addPartition(partition));
+ return resource;
+ }));
+ Map<String, IdealState> newIdealStates =
+ rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput());
+ Map<String, ResourceAssignment> algorithmResult = _algorithm.getRebalanceResult();
+ // Since there is no special condition, the calculated IdealStates should be exactly the same
+ // as the mock algorithm result.
+ validateRebalanceResult(resourceMap, newIdealStates, algorithmResult);
+ Map<String, ResourceAssignment> baseline = metadataStore.getBaseline();
+ Assert.assertEquals(baseline, algorithmResult);
+ Map<String, ResourceAssignment> bestPossibleAssignment =
+ metadataStore.getBestPossibleAssignment();
+ Assert.assertEquals(bestPossibleAssignment, algorithmResult);
+
+ // 2. rebalance with one ideal state changed only
+ String changedResourceName = _resourceNames.get(0);
+ // Create a new cluster data cache to simulate cluster change
+ clusterData = setupClusterDataCache();
+ when(clusterData.getRefreshedChangeTypes())
+ .thenReturn(Collections.singleton(HelixConstants.ChangeType.IDEAL_STATE));
+ IdealState is = clusterData.getIdealState(changedResourceName);
+ // Update the tag so the ideal state will be marked as changed.
+ is.setInstanceGroupTag("newTag");
+
+ // Although the input contains 2 resources, the rebalancer shall only call the algorithm to
+ // rebalance the changed one.
+ newIdealStates =
+ rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput());
+ Map<String, ResourceAssignment> partialAlgorithmResult = _algorithm.getRebalanceResult();
+
+ // Verify that only the changed resource has been included in the calculation.
+ validateRebalanceResult(
+ Collections.singletonMap(changedResourceName, new Resource(changedResourceName)),
+ newIdealStates, partialAlgorithmResult);
+ // Baseline should be empty, because there is no cluster topology change.
+ baseline = metadataStore.getBaseline();
+ Assert.assertEquals(baseline, Collections.emptyMap());
+ // Best possible assignment contains the new assignment of only one resource.
+ bestPossibleAssignment = metadataStore.getBestPossibleAssignment();
+ Assert.assertEquals(bestPossibleAssignment, partialAlgorithmResult);
+
+ // * Before the next test, recover the best possible assignment record.
+ metadataStore.persistBestPossibleAssignment(algorithmResult);
+
+ // 3. rebalance with current state change only
+ // Create a new cluster data cache to simulate cluster change
+ clusterData = setupClusterDataCache();
+ when(clusterData.getRefreshedChangeTypes())
+ .thenReturn(Collections.singleton(HelixConstants.ChangeType.CURRENT_STATE));
+ // Modify any current state
+ CurrentState cs =
+ clusterData.getCurrentState(_testInstanceId, _sessionId).get(_resourceNames.get(0));
+ // Update the tag so the ideal state will be marked as changed.
+ cs.setInfo(_partitionNames.get(0), "mock update");
+
+ // Although the input contains 2 resources, the rebalancer shall not try to recalculate
+ // assignment since there is only current state change.
+ newIdealStates =
+ rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput());
+ algorithmResult = _algorithm.getRebalanceResult();
+
+ // Verify that only the changed resource has been included in the calculation.
+ validateRebalanceResult(Collections.emptyMap(), newIdealStates, algorithmResult);
+ // Both assignment state should be empty.
+ baseline = metadataStore.getBaseline();
+ Assert.assertEquals(baseline, Collections.emptyMap());
+ bestPossibleAssignment = metadataStore.getBestPossibleAssignment();
+ Assert.assertEquals(bestPossibleAssignment, Collections.emptyMap());
+
+ // 4. rebalance with no change but best possible state record missing.
+ // This usually happens when the persisted assignment state is gone.
+ clusterData = setupClusterDataCache(); // Note this mock data cache won't report any change.
+ // Even with no change, since the previous assignment is empty, the rebalancer will still
+ // calculate the assignment for both resources.
+ newIdealStates =
+ rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput());
+ algorithmResult = _algorithm.getRebalanceResult();
+ // Verify that both resource has been included in the calculation.
+ validateRebalanceResult(resourceMap, newIdealStates, algorithmResult);
+ // Both assignment state should be empty since no cluster topology change.
+ baseline = metadataStore.getBaseline();
+ Assert.assertEquals(baseline, Collections.emptyMap());
+ // The best possible assignment should be present.
+ bestPossibleAssignment = metadataStore.getBestPossibleAssignment();
+ Assert.assertEquals(bestPossibleAssignment, algorithmResult);
+ }
+
+ private void validateRebalanceResult(Map<String, Resource> resourceMap,
+ Map<String, IdealState> newIdealStates, Map<String, ResourceAssignment> expectedResult) {
+ Assert.assertEquals(newIdealStates.keySet(), resourceMap.keySet());
+ for (String resourceName : expectedResult.keySet()) {
+ Assert.assertTrue(newIdealStates.containsKey(resourceName));
+ IdealState is = newIdealStates.get(resourceName);
+ ResourceAssignment assignment = expectedResult.get(resourceName);
+ Assert.assertEquals(is.getPartitionSet(), new HashSet<>(
+ assignment.getMappedPartitions().stream().map(partition -> partition.getPartitionName())
+ .collect(Collectors.toSet())));
+ for (String partitionName : is.getPartitionSet()) {
+ Assert.assertEquals(is.getInstanceStateMap(partitionName),
+ assignment.getReplicaMap(new Partition(partitionName)));
+ }
+ }
+ }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/MockRebalanceAlgorithm.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/MockRebalanceAlgorithm.java
new file mode 100644
index 0000000..2a39482
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/MockRebalanceAlgorithm.java
@@ -0,0 +1,84 @@
+package org.apache.helix.controller.rebalancer.waged.constraints;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.controller.rebalancer.waged.RebalanceAlgorithm;
+import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
+import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterModel;
+import org.apache.helix.controller.rebalancer.waged.model.OptimalAssignment;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.ResourceAssignment;
+import org.mockito.Mockito;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import static org.mockito.Mockito.when;
+
+/**
+ * A mock up rebalance algorithm for unit test.
+ * Note that the mock algorithm won't propagate the existing assignment to the output as a real
+ * algorithm will do. This is for the convenience of testing.
+ */
+public class MockRebalanceAlgorithm implements RebalanceAlgorithm {
+ Map<String, ResourceAssignment> _resultHistory = Collections.emptyMap();
+
+ @Override
+ public OptimalAssignment calculate(ClusterModel clusterModel) {
+ // If no predefined rebalance result setup, do card dealing.
+ Map<String, ResourceAssignment> result = new HashMap<>();
+ Iterator<AssignableNode> nodeIterator =
+ clusterModel.getAssignableNodes().values().stream().sorted().iterator();
+ for (String resource : clusterModel.getAssignableReplicaMap().keySet()) {
+ Iterator<AssignableReplica> replicaIterator =
+ clusterModel.getAssignableReplicaMap().get(resource).stream().sorted().iterator();
+ while (replicaIterator.hasNext()) {
+ AssignableReplica replica = replicaIterator.next();
+ if (!nodeIterator.hasNext()) {
+ nodeIterator = clusterModel.getAssignableNodes().values().stream().sorted().iterator();
+ }
+ AssignableNode node = nodeIterator.next();
+
+ // Put the assignment
+ ResourceAssignment assignment = result.computeIfAbsent(replica.getResourceName(),
+ resourceName -> new ResourceAssignment(resourceName));
+ Partition partition = new Partition(replica.getPartitionName());
+ if (assignment.getReplicaMap(partition).isEmpty()) {
+ assignment.addReplicaMap(partition, new HashMap<>());
+ }
+ assignment.getReplicaMap(partition).put(node.getInstanceName(), replica.getReplicaState());
+ }
+ }
+
+ _resultHistory = result;
+
+ // TODO remove this mockito when OptimalAssignment.getOptimalResourceAssignment is ready.
+ OptimalAssignment optimalAssignment = Mockito.mock(OptimalAssignment.class);
+ when(optimalAssignment.getOptimalResourceAssignment()).thenReturn(result);
+ return optimalAssignment;
+ }
+
+ public Map<String, ResourceAssignment> getRebalanceResult() {
+ return _resultHistory;
+ }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
index a8a5de5..0f799b3 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
@@ -74,7 +74,7 @@ public abstract class AbstractTestClusterModel {
_testFaultZoneId = "testZone";
}
- InstanceConfig createMockInstanceConfig(String instanceId) {
+ protected InstanceConfig createMockInstanceConfig(String instanceId) {
InstanceConfig testInstanceConfig = new InstanceConfig(instanceId);
testInstanceConfig.setInstanceCapacityMap(_capacityDataMap);
testInstanceConfig.addTag(_testInstanceTags.get(0));
@@ -83,7 +83,7 @@ public abstract class AbstractTestClusterModel {
return testInstanceConfig;
}
- LiveInstance createMockLiveInstance(String instanceId) {
+ protected LiveInstance createMockLiveInstance(String instanceId) {
LiveInstance testLiveInstance = new LiveInstance(instanceId);
testLiveInstance.setSessionId(_sessionId);
return testLiveInstance;