You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by xy...@apache.org on 2023/12/08 02:11:06 UTC
(helix) 07/09: Fix WAGED to only use logicalId when computing baseline and centralize picking assignable instances in the cache. (#2702)
This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch ApplicationClusterManager
in repository https://gitbox.apache.org/repos/asf/helix.git
commit a24ed72b249814722e6ad2ebe86b28bf13d3e358
Author: Zachary Pinto <za...@linkedin.com>
AuthorDate: Tue Nov 28 11:06:02 2023 -0800
Fix WAGED to only use logicalId when computing baseline and centralize picking assignable instances in the cache. (#2702)
Fix WAGED to only use logicalId when computing baseline and centralize picking assignable instances in the cache.
---
.../changedetector/ResourceChangeDetector.java | 4 +-
.../changedetector/ResourceChangeSnapshot.java | 41 ++-
.../dataproviders/BaseControllerDataProvider.java | 365 +++++++++++++++------
.../WorkflowControllerDataProvider.java | 3 +-
.../controller/rebalancer/AbstractRebalancer.java | 11 +-
.../controller/rebalancer/AutoRebalancer.java | 34 +-
.../controller/rebalancer/CustomRebalancer.java | 4 +-
.../rebalancer/DelayedAutoRebalancer.java | 75 ++---
.../AbstractEvenDistributionRebalanceStrategy.java | 5 +-
.../strategy/ConstraintRebalanceStrategy.java | 8 +-
.../strategy/CrushRebalanceStrategy.java | 2 +-
.../strategy/MultiRoundCrushRebalanceStrategy.java | 2 +-
.../rebalancer/util/DelayedRebalanceUtil.java | 94 +-----
.../rebalancer/waged/GlobalRebalanceRunner.java | 23 +-
.../rebalancer/waged/WagedRebalancer.java | 61 +---
.../AbstractPartitionMovementConstraint.java | 6 +-
.../constraints/BaselineInfluenceConstraint.java | 2 +-
.../constraints/ConstraintBasedAlgorithm.java | 10 +-
.../constraints/PartitionMovementConstraint.java | 6 +-
.../waged/model/ClusterModelProvider.java | 107 ++++--
.../stages/BestPossibleStateCalcStage.java | 65 +++-
.../stages/IntermediateStateCalcStage.java | 10 +-
.../stages/MaintenanceRecoveryStage.java | 2 +-
.../controller/stages/MessageGenerationPhase.java | 3 +-
.../controller/stages/ReadClusterDataStage.java | 2 -
.../stages/ResourceComputationStage.java | 2 -
.../stages/task/TaskSchedulingStage.java | 1 -
.../org/apache/helix/manager/zk/ZKHelixAdmin.java | 14 +-
.../org/apache/helix/model/ResourceAssignment.java | 1 -
.../apache/helix/task/AbstractTaskDispatcher.java | 3 +-
.../java/org/apache/helix/task/JobDispatcher.java | 6 +-
.../helix/tools/ClusterExternalViewVerifier.java | 2 +-
.../StrictMatchExternalViewVerifier.java | 12 +-
.../main/java/org/apache/helix/util/HelixUtil.java | 6 +-
.../trimmer/TestHelixPropoertyTimmer.java | 4 +-
.../rebalancer/TestAutoRebalanceStrategy.java | 2 +-
.../rebalancer/waged/TestWagedRebalancer.java | 43 ++-
.../waged/TestWagedRebalancerMetrics.java | 9 +-
.../TestPartitionMovementConstraint.java | 5 +
.../waged/model/AbstractTestClusterModel.java | 5 +-
.../waged/model/ClusterModelTestHelper.java | 2 +-
.../rebalancer/waged/model/TestAssignableNode.java | 22 +-
.../waged/model/TestClusterModelProvider.java | 31 +-
.../TestBestPossibleCalcStageCompatibility.java | 3 +-
.../stages/TestBestPossibleStateCalcStage.java | 1 +
.../stages/TestCancellationMessageGeneration.java | 1 -
.../stages/TestIntermediateStateCalcStage.java | 1 +
.../controller/stages/TestRebalancePipeline.java | 26 +-
.../stages/TestReplicaLevelThrottling.java | 2 +-
...estOfflineNodeTimeoutDuringMaintenanceMode.java | 12 +-
.../messaging/TestP2PMessageSemiAuto.java | 2 +-
.../messaging/TestP2PNoDuplicatedMessage.java | 4 +-
.../integration/rebalancer/TestAutoRebalance.java | 6 +-
.../TestAutoRebalancePartitionLimit.java | 2 +-
.../rebalancer/TestCustomRebalancer.java | 2 +-
.../TestCustomizedIdealStateRebalancer.java | 8 +-
.../rebalancer/TestInstanceOperation.java | 54 +--
.../WagedRebalancer/TestWagedNodeSwap.java | 2 +-
.../messaging/p2pMessage/TestP2PMessages.java | 8 +-
.../helix/task/TestTargetedTaskStateChange.java | 8 +-
.../helix/rest/server/AbstractTestClass.java | 2 +-
.../helix/rest/server/TestInstancesAccessor.java | 2 +-
62 files changed, 757 insertions(+), 504 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeDetector.java b/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeDetector.java
index ae8253ffb..4e0b706b8 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeDetector.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeDetector.java
@@ -113,13 +113,13 @@ public class ResourceChangeDetector implements ChangeDetector {
HelixConstants.ChangeType changeType, ResourceChangeSnapshot snapshot) {
switch (changeType) {
case INSTANCE_CONFIG:
- return snapshot.getInstanceConfigMap();
+ return snapshot.getAssignableInstanceConfigMap();
case IDEAL_STATE:
return snapshot.getIdealStateMap();
case RESOURCE_CONFIG:
return snapshot.getResourceConfigMap();
case LIVE_INSTANCE:
- return snapshot.getLiveInstances();
+ return snapshot.getAssignableLiveInstances();
case CLUSTER_CONFIG:
ClusterConfig config = snapshot.getClusterConfig();
if (config == null) {
diff --git a/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeSnapshot.java b/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeSnapshot.java
index 39fccd8d0..f37cae635 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeSnapshot.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeSnapshot.java
@@ -50,10 +50,12 @@ import org.apache.helix.model.ResourceConfig;
class ResourceChangeSnapshot {
private Set<HelixConstants.ChangeType> _changedTypes;
- private Map<String, InstanceConfig> _instanceConfigMap;
+ private Map<String, InstanceConfig> _allInstanceConfigMap;
+ private Map<String, InstanceConfig> _assignableInstanceConfigMap;
private Map<String, IdealState> _idealStateMap;
private Map<String, ResourceConfig> _resourceConfigMap;
- private Map<String, LiveInstance> _liveInstances;
+ private Map<String, LiveInstance> _allLiveInstances;
+ private Map<String, LiveInstance> _assignableLiveInstances;
private ClusterConfig _clusterConfig;
/**
@@ -61,10 +63,12 @@ class ResourceChangeSnapshot {
*/
ResourceChangeSnapshot() {
_changedTypes = new HashSet<>();
- _instanceConfigMap = new HashMap<>();
+ _allInstanceConfigMap = new HashMap<>();
+ _assignableInstanceConfigMap = new HashMap<>();
_idealStateMap = new HashMap<>();
_resourceConfigMap = new HashMap<>();
- _liveInstances = new HashMap<>();
+ _allLiveInstances = new HashMap<>();
+ _assignableLiveInstances = new HashMap<>();
_clusterConfig = null;
}
@@ -80,12 +84,16 @@ class ResourceChangeSnapshot {
ResourceChangeSnapshot(ResourceControllerDataProvider dataProvider,
boolean ignoreNonTopologyChange) {
_changedTypes = new HashSet<>(dataProvider.getRefreshedChangeTypes());
-
- _instanceConfigMap = ignoreNonTopologyChange ?
+ _allInstanceConfigMap = ignoreNonTopologyChange ?
dataProvider.getInstanceConfigMap().entrySet().parallelStream().collect(Collectors
.toMap(e -> e.getKey(),
e -> InstanceConfigTrimmer.getInstance().trimProperty(e.getValue()))) :
new HashMap<>(dataProvider.getInstanceConfigMap());
+ _assignableInstanceConfigMap = ignoreNonTopologyChange ?
+ dataProvider.getAssignableInstanceConfigMap().entrySet().parallelStream().collect(Collectors
+ .toMap(e -> e.getKey(),
+ e -> InstanceConfigTrimmer.getInstance().trimProperty(e.getValue()))) :
+ new HashMap<>(dataProvider.getAssignableInstanceConfigMap());
_idealStateMap = ignoreNonTopologyChange ?
dataProvider.getIdealStates().entrySet().parallelStream().collect(Collectors
.toMap(e -> e.getKey(),
@@ -99,7 +107,8 @@ class ResourceChangeSnapshot {
_clusterConfig = ignoreNonTopologyChange ?
ClusterConfigTrimmer.getInstance().trimProperty(dataProvider.getClusterConfig()) :
dataProvider.getClusterConfig();
- _liveInstances = new HashMap<>(dataProvider.getLiveInstances());
+ _allLiveInstances = new HashMap<>(dataProvider.getLiveInstances());
+ _assignableLiveInstances = new HashMap<>(dataProvider.getAssignableLiveInstances());
}
/**
@@ -108,10 +117,12 @@ class ResourceChangeSnapshot {
*/
ResourceChangeSnapshot(ResourceChangeSnapshot snapshot) {
_changedTypes = new HashSet<>(snapshot._changedTypes);
- _instanceConfigMap = new HashMap<>(snapshot._instanceConfigMap);
+ _allInstanceConfigMap = new HashMap<>(snapshot._allInstanceConfigMap);
+ _assignableInstanceConfigMap = new HashMap<>(snapshot._assignableInstanceConfigMap);
_idealStateMap = new HashMap<>(snapshot._idealStateMap);
_resourceConfigMap = new HashMap<>(snapshot._resourceConfigMap);
- _liveInstances = new HashMap<>(snapshot._liveInstances);
+ _allLiveInstances = new HashMap<>(snapshot._allLiveInstances);
+ _assignableLiveInstances = new HashMap<>(snapshot._assignableLiveInstances);
_clusterConfig = snapshot._clusterConfig;
}
@@ -120,7 +131,11 @@ class ResourceChangeSnapshot {
}
Map<String, InstanceConfig> getInstanceConfigMap() {
- return _instanceConfigMap;
+ return _allInstanceConfigMap;
+ }
+
+ Map<String, InstanceConfig> getAssignableInstanceConfigMap() {
+ return _assignableInstanceConfigMap;
}
Map<String, IdealState> getIdealStateMap() {
@@ -132,7 +147,11 @@ class ResourceChangeSnapshot {
}
Map<String, LiveInstance> getLiveInstances() {
- return _liveInstances;
+ return _allLiveInstances;
+ }
+
+ Map<String, LiveInstance> getAssignableLiveInstances() {
+ return _assignableLiveInstances;
}
ClusterConfig getClusterConfig() {
diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
index 9dd517384..9120bd962 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
@@ -102,8 +102,8 @@ public class BaseControllerDataProvider implements ControlContextProvider {
// Property caches
private final PropertyCache<ResourceConfig> _resourceConfigCache;
- private final PropertyCache<InstanceConfig> _instanceConfigCache;
- private final PropertyCache<LiveInstance> _liveInstanceCache;
+ private final PropertyCache<InstanceConfig> _allInstanceConfigCache;
+ private final PropertyCache<LiveInstance> _allLiveInstanceCache;
private final PropertyCache<IdealState> _idealStateCache;
private final PropertyCache<ClusterConstraints> _clusterConstraintsCache;
private final PropertyCache<StateModelDefinition> _stateModelDefinitionCache;
@@ -118,6 +118,12 @@ public class BaseControllerDataProvider implements ControlContextProvider {
private Map<String, Map<String, String>> _idealStateRuleMap;
private final Map<String, Map<String, Set<String>>> _disabledInstanceForPartitionMap = new HashMap<>();
private final Set<String> _disabledInstanceSet = new HashSet<>();
+
+ // Assignable instances are instances will contain at most one instance with a given logicalId.
+ // This is used for SWAP related operations where there can be two instances with the same logicalId.
+ private final Map<String, InstanceConfig> _assignableInstanceConfigMap = new HashMap<>();
+ private final Map<String, LiveInstance> _assignableLiveInstancesMap = new HashMap<>();
+ private final Set<String> _assignableDisabledInstanceSet = new HashSet<>();
private final Map<String, String> _swapOutInstanceNameToSwapInInstanceName = new HashMap<>();
private final Set<String> _enabledLiveSwapInInstanceNames = new HashSet<>();
private final Map<String, MonitoredAbnormalResolver> _abnormalStateResolverMap = new HashMap<>();
@@ -154,7 +160,7 @@ public class BaseControllerDataProvider implements ControlContextProvider {
return obj.getResourceName();
}
}, true);
- _liveInstanceCache = new PropertyCache<>(this, "LiveInstance", new PropertyCache.PropertyCacheKeyFuncs<LiveInstance>() {
+ _allLiveInstanceCache = new PropertyCache<>(this, "LiveInstance", new PropertyCache.PropertyCacheKeyFuncs<LiveInstance>() {
@Override
public PropertyKey getRootKey(HelixDataAccessor accessor) {
return accessor.keyBuilder().liveInstances();
@@ -170,7 +176,7 @@ public class BaseControllerDataProvider implements ControlContextProvider {
return obj.getInstanceName();
}
}, true);
- _instanceConfigCache = new PropertyCache<>(this, "InstanceConfig", new PropertyCache.PropertyCacheKeyFuncs<InstanceConfig>() {
+ _allInstanceConfigCache = new PropertyCache<>(this, "InstanceConfig", new PropertyCache.PropertyCacheKeyFuncs<InstanceConfig>() {
@Override
public PropertyKey getRootKey(HelixDataAccessor accessor) {
return accessor.keyBuilder().instanceConfigs();
@@ -310,7 +316,7 @@ public class BaseControllerDataProvider implements ControlContextProvider {
private void refreshLiveInstances(final HelixDataAccessor accessor,
Set<HelixConstants.ChangeType> refreshedType) {
if (_propertyDataChangedMap.get(HelixConstants.ChangeType.LIVE_INSTANCE).getAndSet(false)) {
- _liveInstanceCache.refresh(accessor);
+ _allLiveInstanceCache.refresh(accessor);
_updateInstanceOfflineTime = true;
refreshedType.add(HelixConstants.ChangeType.LIVE_INSTANCE);
} else {
@@ -323,10 +329,10 @@ public class BaseControllerDataProvider implements ControlContextProvider {
private void refreshInstanceConfigs(final HelixDataAccessor accessor,
Set<HelixConstants.ChangeType> refreshedType) {
if (_propertyDataChangedMap.get(HelixConstants.ChangeType.INSTANCE_CONFIG).getAndSet(false)) {
- _instanceConfigCache.refresh(accessor);
+ _allInstanceConfigCache.refresh(accessor);
LogUtil.logInfo(logger, getClusterEventId(), String
.format("Reloaded InstanceConfig for cluster %s, %s pipeline. Keys: %s", _clusterName,
- getPipelineName(), _instanceConfigCache.getPropertyMap().keySet()));
+ getPipelineName(), _allInstanceConfigCache.getPropertyMap().keySet()));
refreshedType.add(HelixConstants.ChangeType.INSTANCE_CONFIG);
} else {
LogUtil.logInfo(logger, getClusterEventId(), String
@@ -335,6 +341,108 @@ public class BaseControllerDataProvider implements ControlContextProvider {
}
}
+ /**
+ * Refreshes the assignable instances and SWAP related caches. This should be called after
+ * liveInstance and instanceConfig caches are refreshed. To determine what instances are
+ * assignable and live, it takes a combination of both the all instanceConfigs and liveInstances.
+ * TODO: Add EVACUATE InstanceOperation to be filtered out in assignable nodes.
+ *
+ * @param instanceConfigMap InstanceConfig map from instanceConfig cache
+ * @param liveInstancesMap LiveInstance map from liveInstance cache
+ * @param clusterConfig ClusterConfig from clusterConfig cache
+ */
+ private void updateInstanceSets(Map<String, InstanceConfig> instanceConfigMap,
+ Map<String, LiveInstance> liveInstancesMap, ClusterConfig clusterConfig) {
+
+ if (clusterConfig == null) {
+ logger.warn("Skip refreshing swapping instances because clusterConfig is null.");
+ return;
+ }
+
+ ClusterTopologyConfig clusterTopologyConfig =
+ ClusterTopologyConfig.createFromClusterConfig(clusterConfig);
+
+ // Clear all caches
+ _assignableInstanceConfigMap.clear();
+ _assignableLiveInstancesMap.clear();
+ _swapOutInstanceNameToSwapInInstanceName.clear();
+ _enabledLiveSwapInInstanceNames.clear();
+
+ Map<String, String> filteredInstancesByLogicalId = new HashMap<>();
+ Map<String, String> swapOutLogicalIdsByInstanceName = new HashMap<>();
+ Map<String, String> swapInInstancesByLogicalId = new HashMap<>();
+
+ for (Map.Entry<String, InstanceConfig> entry : instanceConfigMap.entrySet()) {
+ String node = entry.getKey();
+ InstanceConfig currentInstanceConfig = entry.getValue();
+
+ if (currentInstanceConfig == null) {
+ continue;
+ }
+
+ String currentInstanceLogicalId =
+ currentInstanceConfig.getLogicalId(clusterTopologyConfig.getEndNodeType());
+
+ // Filter out instances with duplicate logical IDs. If there are duplicates, the instance with
+ // InstanceOperation SWAP_OUT will be chosen over the instance with SWAP_IN. SWAP_IN is not
+ // assignable. If there are duplicates with one node having no InstanceOperation and the other
+ // having SWAP_OUT, the node with no InstanceOperation will be chosen. This signifies SWAP
+ // completion, therefore making the node assignable.
+ if (filteredInstancesByLogicalId.containsKey(currentInstanceLogicalId)) {
+ String filteredNode = filteredInstancesByLogicalId.get(currentInstanceLogicalId);
+ InstanceConfig filteredDuplicateInstanceConfig = instanceConfigMap.get(filteredNode);
+
+ if ((filteredDuplicateInstanceConfig.getInstanceOperation()
+ .equals(InstanceConstants.InstanceOperation.SWAP_IN.name())
+ && currentInstanceConfig.getInstanceOperation()
+ .equals(InstanceConstants.InstanceOperation.SWAP_OUT.name()))
+ || currentInstanceConfig.getInstanceOperation().isEmpty()) {
+ // If the already filtered instance is SWAP_IN and this instance is in SWAP_OUT, then replace the filtered
+ // instance with this instance. If this instance has no InstanceOperation, then replace the filtered instance
+ // with this instance. This is the case where the SWAP_IN node has been marked as complete or SWAP_IN exists and
+ // SWAP_OUT does not. There can never be a case where both have no InstanceOperation set.
+ _assignableInstanceConfigMap.remove(filteredNode);
+ _assignableInstanceConfigMap.put(node, currentInstanceConfig);
+ filteredInstancesByLogicalId.put(currentInstanceLogicalId, node);
+ }
+ } else {
+ _assignableInstanceConfigMap.put(node, currentInstanceConfig);
+ filteredInstancesByLogicalId.put(currentInstanceLogicalId, node);
+ }
+
+ if (currentInstanceConfig.getInstanceOperation()
+ .equals(InstanceConstants.InstanceOperation.SWAP_OUT.name())) {
+ swapOutLogicalIdsByInstanceName.put(currentInstanceConfig.getInstanceName(),
+ currentInstanceLogicalId);
+ }
+
+ if (currentInstanceConfig.getInstanceOperation()
+ .equals(InstanceConstants.InstanceOperation.SWAP_IN.name())) {
+ swapInInstancesByLogicalId.put(
+ currentInstanceConfig.getLogicalId(clusterTopologyConfig.getEndNodeType()),
+ currentInstanceConfig.getInstanceName());
+ }
+ }
+
+ liveInstancesMap.forEach((instanceName, liveInstance) -> {
+ if (_assignableInstanceConfigMap.containsKey(instanceName)) {
+ _assignableLiveInstancesMap.put(instanceName, liveInstance);
+ }
+ });
+
+ swapOutLogicalIdsByInstanceName.forEach((swapOutInstanceName, value) -> {
+ String swapInInstanceName = swapInInstancesByLogicalId.get(value);
+ if (swapInInstanceName != null) {
+ _swapOutInstanceNameToSwapInInstanceName.put(swapOutInstanceName, swapInInstanceName);
+ if (liveInstancesMap.containsKey(swapInInstanceName)
+ && InstanceValidationUtil.isInstanceEnabled(instanceConfigMap.get(swapInInstanceName),
+ clusterConfig)) {
+ _enabledLiveSwapInInstanceNames.add(swapInInstanceName);
+ }
+ }
+ });
+ }
+
private void refreshResourceConfig(final HelixDataAccessor accessor,
Set<HelixConstants.ChangeType> refreshedType) {
if (_propertyDataChangedMap.get(HelixConstants.ChangeType.RESOURCE_CONFIG).getAndSet(false)) {
@@ -373,7 +481,7 @@ public class BaseControllerDataProvider implements ControlContextProvider {
timeOutWindow = clusterConfig.getOfflineNodeTimeOutForMaintenanceMode();
}
if (timeOutWindow >= 0 && isMaintenanceModeEnabled) {
- for (String instance : _liveInstanceCache.getPropertyMap().keySet()) {
+ for (String instance : _assignableLiveInstancesMap.keySet()) {
// 1. Check timed-out cache and don't do repeated work;
// 2. Check for nodes that didn't exist in the last iteration, because it has been checked;
// 3. For all other nodes, check if it's timed-out.
@@ -386,9 +494,8 @@ public class BaseControllerDataProvider implements ControlContextProvider {
}
}
if (isMaintenanceModeEnabled) {
- _liveInstanceExcludeTimedOutForMaintenance =
- _liveInstanceCache.getPropertyMap().entrySet().stream()
- .filter(e -> !_timedOutInstanceDuringMaintenance.contains(e.getKey()))
+ _liveInstanceExcludeTimedOutForMaintenance = _assignableLiveInstancesMap.entrySet().stream()
+ .filter(e -> !_timedOutInstanceDuringMaintenance.contains(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
}
@@ -421,6 +528,8 @@ public class BaseControllerDataProvider implements ControlContextProvider {
refreshIdealState(accessor, refreshedTypes);
refreshLiveInstances(accessor, refreshedTypes);
refreshInstanceConfigs(accessor, refreshedTypes);
+ updateInstanceSets(_allInstanceConfigCache.getPropertyMap(), _allLiveInstanceCache.getPropertyMap(),
+ _clusterConfig);
refreshResourceConfig(accessor, refreshedTypes);
_stateModelDefinitionCache.refresh(accessor);
_clusterConstraintsCache.refresh(accessor);
@@ -431,17 +540,19 @@ public class BaseControllerDataProvider implements ControlContextProvider {
updateOfflineInstanceHistory(accessor);
// Refresh derived data
- _instanceMessagesCache.refresh(accessor, _liveInstanceCache.getPropertyMap());
- _currentStateCache.refresh(accessor, _liveInstanceCache.getPropertyMap());
+ // Must use _liveInstanceCache instead of _assignableLiveInstancesMap because we need to
+ // know about the messages and current state of all instances including the SWAP_IN ones.
+ _instanceMessagesCache.refresh(accessor, _allLiveInstanceCache.getPropertyMap());
+ _currentStateCache.refresh(accessor, _allLiveInstanceCache.getPropertyMap());
// current state must be refreshed before refreshing relay messages
// because we need to use current state to validate all relay messages.
- _instanceMessagesCache.updateRelayMessages(_liveInstanceCache.getPropertyMap(),
+ _instanceMessagesCache.updateRelayMessages(_allLiveInstanceCache.getPropertyMap(),
_currentStateCache.getParticipantStatesMap());
updateIdealRuleMap(getClusterConfig());
- updateDisabledInstances(getInstanceConfigMap().values(), getClusterConfig());
- updateSwappingInstances(getInstanceConfigMap().values(), getEnabledLiveInstances(),
+ updateDisabledInstances(getInstanceConfigMap().values(),
+ getAssignableInstanceConfigMap().values(),
getClusterConfig());
return refreshedTypes;
@@ -453,17 +564,18 @@ public class BaseControllerDataProvider implements ControlContextProvider {
"# of StateModelDefinition read from zk: " + getStateModelDefMap().size());
LogUtil.logDebug(logger, getClusterEventId(),
"# of ConstraintMap read from zk: " + getConstraintMap().size());
- LogUtil
- .logDebug(logger, getClusterEventId(), "LiveInstances: " + getLiveInstances().keySet());
- for (LiveInstance instance : getLiveInstances().values()) {
+ LogUtil.logDebug(logger, getClusterEventId(),
+ "AssignableLiveInstances: " + getAssignableLiveInstances().keySet());
+ for (LiveInstance instance : getAssignableLiveInstances().values()) {
LogUtil.logDebug(logger, getClusterEventId(),
- "live instance: " + instance.getInstanceName() + " " + instance.getEphemeralOwner());
+ "assignable live instance: " + instance.getInstanceName() + " "
+ + instance.getEphemeralOwner());
}
LogUtil.logDebug(logger, getClusterEventId(), "IdealStates: " + getIdealStates().keySet());
LogUtil.logDebug(logger, getClusterEventId(),
"ResourceConfigs: " + getResourceConfigMap().keySet());
LogUtil.logDebug(logger, getClusterEventId(),
- "InstanceConfigs: " + getInstanceConfigMap().keySet());
+ "AssignableInstanceConfigs: " + getAssignableInstanceConfigMap().keySet());
LogUtil.logDebug(logger, getClusterEventId(), "ClusterConfigs: " + getClusterConfig());
}
}
@@ -476,8 +588,10 @@ public class BaseControllerDataProvider implements ControlContextProvider {
_clusterConfig = clusterConfig;
refreshAbnormalStateResolverMap(_clusterConfig);
updateIdealRuleMap(_clusterConfig);
- updateDisabledInstances(getInstanceConfigMap().values(), _clusterConfig);
- updateSwappingInstances(getInstanceConfigMap().values(), getEnabledLiveInstances(),
+ updateInstanceSets(_allInstanceConfigCache.getPropertyMap(), _allLiveInstanceCache.getPropertyMap(),
+ _clusterConfig);
+ updateDisabledInstances(getInstanceConfigMap().values(),
+ getAssignableInstanceConfigMap().values(),
_clusterConfig);
}
@@ -527,23 +641,57 @@ public class BaseControllerDataProvider implements ControlContextProvider {
}
/**
- * Returns the LiveInstances for each of the instances that are currently up and running,
+ * Returns the assignable LiveInstances for each of the instances that are currently up and running,
* excluding the instances that are considered offline during maintenance mode. Instances
* are timed-out if they have been offline for a while before going live during maintenance mode.
+ * @return A map of LiveInstances to their instance names
*/
- public Map<String, LiveInstance> getLiveInstances() {
+ public Map<String, LiveInstance> getAssignableLiveInstances() {
if (isMaintenanceModeEnabled()) {
- return _liveInstanceExcludeTimedOutForMaintenance;
+ return Collections.unmodifiableMap(_liveInstanceExcludeTimedOutForMaintenance);
}
- return _liveInstanceCache.getPropertyMap();
+ return Collections.unmodifiableMap(_assignableLiveInstancesMap);
+ }
+
+ /**
+ * Returns the LiveInstances for each of the instances that are currently up and running,
+ * excluding the instances that are considered offline during maintenance mode. Instances are
+ * timed-out if they have been offline for a while before going live during maintenance mode.
+ *
+ * @return A map of LiveInstances to their instance names
+ */
+ public Map<String, LiveInstance> getLiveInstances() {
+ return _allLiveInstanceCache.getPropertyMap();
+ }
+
+ /**
+ * Return the set of all assignable instances names.
+ *
+ * @return A new set contains instance name
+ */
+ public Set<String> getAssignableInstances() {
+ return _assignableInstanceConfigMap.keySet();
}
/**
* Return the set of all instances names.
+ * @return A new set contains instance name
*/
public Set<String> getAllInstances() {
- return _instanceConfigCache.getPropertyMap().keySet();
+ return _allInstanceConfigCache.getPropertyMap().keySet();
+ }
+
+ /**
+ * Return all the live nodes that are enabled and assignable
+ *
+ * @return A new set contains live instance name and that are marked enabled
+ */
+ public Set<String> getAssignableEnabledLiveInstances() {
+ Set<String> enabledLiveInstances = new HashSet<>(getAssignableLiveInstances().keySet());
+ enabledLiveInstances.removeAll(getDisabledInstances());
+
+ return enabledLiveInstances;
}
/**
@@ -552,22 +700,50 @@ public class BaseControllerDataProvider implements ControlContextProvider {
*/
public Set<String> getEnabledLiveInstances() {
Set<String> enabledLiveInstances = new HashSet<>(getLiveInstances().keySet());
- enabledLiveInstances.removeAll(getDisabledInstances());
+ enabledLiveInstances.removeAll(getAssignableDisabledInstances());
return enabledLiveInstances;
}
+ /**
+ * Return all nodes that are enabled and assignable.
+ *
+ * @return A new set contains instance name and that are marked enabled
+ */
+ public Set<String> getAssignableEnabledInstances() {
+ Set<String> enabledNodes = new HashSet<>(getAssignableInstances());
+ enabledNodes.removeAll(getDisabledInstances());
+
+ return enabledNodes;
+ }
+
/**
* Return all nodes that are enabled.
- * @return
+ * @return A new set contains instance name and that are marked enabled
*/
public Set<String> getEnabledInstances() {
Set<String> enabledNodes = new HashSet<>(getAllInstances());
- enabledNodes.removeAll(getDisabledInstances());
+ enabledNodes.removeAll(getAssignableDisabledInstances());
return enabledNodes;
}
+ /**
+ * Return all the live nodes that are enabled and assignable and tagged with given instanceTag.
+ *
+ * @param instanceTag The instance group tag.
+ * @return A new set contains live instance name and that are marked enabled and have the
+ * specified tag.
+ */
+ public Set<String> getAssignableEnabledLiveInstancesWithTag(String instanceTag) {
+ Set<String> enabledLiveInstancesWithTag = new HashSet<>(getAssignableLiveInstances().keySet());
+ Set<String> instancesWithTag = getAssignableInstancesWithTag(instanceTag);
+ enabledLiveInstancesWithTag.retainAll(instancesWithTag);
+ enabledLiveInstancesWithTag.removeAll(getDisabledInstances());
+
+ return enabledLiveInstancesWithTag;
+ }
+
/**
* Return all the live nodes that are enabled and tagged with given instanceTag.
* @param instanceTag The instance group tag.
@@ -576,21 +752,38 @@ public class BaseControllerDataProvider implements ControlContextProvider {
*/
public Set<String> getEnabledLiveInstancesWithTag(String instanceTag) {
Set<String> enabledLiveInstancesWithTag = new HashSet<>(getLiveInstances().keySet());
- Set<String> instancesWithTag = getInstancesWithTag(instanceTag);
+ Set<String> instancesWithTag = getAssignableInstancesWithTag(instanceTag);
enabledLiveInstancesWithTag.retainAll(instancesWithTag);
enabledLiveInstancesWithTag.removeAll(getDisabledInstances());
return enabledLiveInstancesWithTag;
}
+ /**
+ * Return all the nodes that are assignable and tagged with given instance tag.
+ *
+ * @param instanceTag The instance group tag.
+ */
+ public Set<String> getAssignableInstancesWithTag(String instanceTag) {
+ Set<String> taggedInstances = new HashSet<>();
+ for (String instance : _assignableInstanceConfigMap.keySet()) {
+ InstanceConfig instanceConfig = _allInstanceConfigCache.getPropertyByName(instance);
+ if (instanceConfig != null && instanceConfig.containsTag(instanceTag)) {
+ taggedInstances.add(instance);
+ }
+ }
+
+ return taggedInstances;
+ }
+
/**
* Return all the nodes that are tagged with given instance tag.
* @param instanceTag The instance group tag.
*/
public Set<String> getInstancesWithTag(String instanceTag) {
Set<String> taggedInstances = new HashSet<>();
- for (String instance : _instanceConfigCache.getPropertyMap().keySet()) {
- InstanceConfig instanceConfig = _instanceConfigCache.getPropertyByName(instance);
+ for (String instance : _assignableInstanceConfigMap.keySet()) {
+ InstanceConfig instanceConfig = _allInstanceConfigCache.getPropertyByName(instance);
if (instanceConfig != null && instanceConfig.containsTag(instanceTag)) {
taggedInstances.add(instance);
}
@@ -625,6 +818,15 @@ public class BaseControllerDataProvider implements ControlContextProvider {
return Collections.unmodifiableSet(_disabledInstanceSet);
}
+ /**
+ * This method allows one to fetch the set of nodes that are disabled
+ *
+ * @return
+ */
+ public Set<String> getAssignableDisabledInstances() {
+ return Collections.unmodifiableSet(_assignableDisabledInstanceSet);
+ }
+
/**
* Get all swapping instance pairs.
*
@@ -644,7 +846,9 @@ public class BaseControllerDataProvider implements ControlContextProvider {
}
public synchronized void setLiveInstances(List<LiveInstance> liveInstances) {
- _liveInstanceCache.setPropertyMap(HelixProperty.convertListToMap(liveInstances));
+ _allLiveInstanceCache.setPropertyMap(HelixProperty.convertListToMap(liveInstances));
+ updateInstanceSets(_allInstanceConfigCache.getPropertyMap(), _allLiveInstanceCache.getPropertyMap(),
+ _clusterConfig);
_updateInstanceOfflineTime = true;
}
@@ -762,11 +966,20 @@ public class BaseControllerDataProvider implements ControlContextProvider {
}
/**
- * Returns the instance config map
- * @return
+ * Returns the instance config map for all assignable instances.
+ *
+ * @return a map of instance name to instance config
+ */
+ public Map<String, InstanceConfig> getAssignableInstanceConfigMap() {
+ return Collections.unmodifiableMap(_assignableInstanceConfigMap);
+ }
+
+ /**
+ * Returns the instance config map for all assignable instances.
+ * @return a map of instance name to instance config
*/
public Map<String, InstanceConfig> getInstanceConfigMap() {
- return _instanceConfigCache.getPropertyMap();
+ return _allInstanceConfigCache.getPropertyMap();
}
/**
@@ -774,9 +987,11 @@ public class BaseControllerDataProvider implements ControlContextProvider {
* @param instanceConfigMap
*/
public void setInstanceConfigMap(Map<String, InstanceConfig> instanceConfigMap) {
- _instanceConfigCache.setPropertyMap(instanceConfigMap);
- updateDisabledInstances(instanceConfigMap.values(), getClusterConfig());
- updateSwappingInstances(instanceConfigMap.values(), getEnabledLiveInstances(),
+ _allInstanceConfigCache.setPropertyMap(instanceConfigMap);
+ updateInstanceSets(_allInstanceConfigCache.getPropertyMap(), _allLiveInstanceCache.getPropertyMap(),
+ getClusterConfig());
+ updateDisabledInstances(getInstanceConfigMap().values(),
+ getAssignableInstanceConfigMap().values(),
getClusterConfig());
}
@@ -839,8 +1054,8 @@ public class BaseControllerDataProvider implements ControlContextProvider {
if (!_updateInstanceOfflineTime) {
return;
}
- List<String> offlineNodes = new ArrayList<>(_instanceConfigCache.getPropertyMap().keySet());
- offlineNodes.removeAll(_liveInstanceCache.getPropertyMap().keySet());
+ List<String> offlineNodes = new ArrayList<>(_allInstanceConfigCache.getPropertyMap().keySet());
+ offlineNodes.removeAll(_allLiveInstanceCache.getPropertyMap().keySet());
_instanceOfflineTimeMap = new HashMap<>();
for (String instance : offlineNodes) {
@@ -866,15 +1081,18 @@ public class BaseControllerDataProvider implements ControlContextProvider {
_updateInstanceOfflineTime = false;
}
- private void updateDisabledInstances(Collection<InstanceConfig> instanceConfigs,
- ClusterConfig clusterConfig) {
+ private void updateDisabledInstances(Collection<InstanceConfig> allInstanceConfigs,
+ Collection<InstanceConfig> assignableInstanceConfigs, ClusterConfig clusterConfig) {
// Move the calculating disabled instances to refresh
_disabledInstanceForPartitionMap.clear();
_disabledInstanceSet.clear();
- for (InstanceConfig config : instanceConfigs) {
+ for (InstanceConfig config : allInstanceConfigs) {
Map<String, List<String>> disabledPartitionMap = config.getDisabledPartitionsMap();
if (!InstanceValidationUtil.isInstanceEnabled(config, clusterConfig)) {
_disabledInstanceSet.add(config.getInstanceName());
+ if (assignableInstanceConfigs.contains(config)) {
+ _assignableDisabledInstanceSet.add(config.getInstanceName());
+ }
}
for (String resource : disabledPartitionMap.keySet()) {
_disabledInstanceForPartitionMap.putIfAbsent(resource, new HashMap<>());
@@ -886,49 +1104,6 @@ public class BaseControllerDataProvider implements ControlContextProvider {
}
}
- private void updateSwappingInstances(Collection<InstanceConfig> instanceConfigs,
- Set<String> liveEnabledInstances, ClusterConfig clusterConfig) {
- _swapOutInstanceNameToSwapInInstanceName.clear();
- _enabledLiveSwapInInstanceNames.clear();
-
- if (clusterConfig == null) {
- logger.warn("Skip refreshing swapping instances because clusterConfig is null.");
- return;
- }
-
- ClusterTopologyConfig clusterTopologyConfig =
- ClusterTopologyConfig.createFromClusterConfig(clusterConfig);
-
- Map<String, String> swapOutLogicalIdsByInstanceName = new HashMap<>();
- Map<String, String> swapInInstancesByLogicalId = new HashMap<>();
- instanceConfigs.forEach(instanceConfig -> {
- if (instanceConfig == null) {
- return;
- }
- if (instanceConfig.getInstanceOperation()
- .equals(InstanceConstants.InstanceOperation.SWAP_OUT.name())) {
- swapOutLogicalIdsByInstanceName.put(instanceConfig.getInstanceName(),
- instanceConfig.getLogicalId(clusterTopologyConfig.getEndNodeType()));
- }
- if (instanceConfig.getInstanceOperation()
- .equals(InstanceConstants.InstanceOperation.SWAP_IN.name())) {
- swapInInstancesByLogicalId.put(
- instanceConfig.getLogicalId(clusterTopologyConfig.getEndNodeType()),
- instanceConfig.getInstanceName());
- }
- });
-
- swapOutLogicalIdsByInstanceName.forEach((swapOutInstanceName, value) -> {
- String swapInInstanceName = swapInInstancesByLogicalId.get(value);
- if (swapInInstanceName != null) {
- _swapOutInstanceNameToSwapInInstanceName.put(swapOutInstanceName, swapInInstanceName);
- if (liveEnabledInstances.contains(swapInInstanceName)) {
- _enabledLiveSwapInInstanceNames.add(swapInInstanceName);
- }
- }
- });
- }
-
/*
* Check if the instance is timed-out during maintenance mode. An instance is timed-out if it has
* been offline for longer than the user defined timeout window.
@@ -1101,10 +1276,16 @@ public class BaseControllerDataProvider implements ControlContextProvider {
protected StringBuilder genCacheContentStringBuilder() {
StringBuilder sb = new StringBuilder();
- sb.append(String.format("liveInstaceMap: %s", _liveInstanceCache.getPropertyMap())).append("\n");
+ sb.append(String.format("liveInstaceMap: %s", _allLiveInstanceCache.getPropertyMap()))
+ .append("\n");
+ sb.append(String.format("assignableLiveInstaceMap: %s", _assignableLiveInstancesMap))
+ .append("\n");
sb.append(String.format("idealStateMap: %s", _idealStateCache.getPropertyMap())).append("\n");
sb.append(String.format("stateModelDefMap: %s", _stateModelDefinitionCache.getPropertyMap())).append("\n");
- sb.append(String.format("instanceConfigMap: %s", _instanceConfigCache.getPropertyMap())).append("\n");
+ sb.append(String.format("instanceConfigMap: %s", _allInstanceConfigCache.getPropertyMap()))
+ .append("\n");
+ sb.append(String.format("assignableInstanceConfigMap: %s", _assignableInstanceConfigMap))
+ .append("\n");
sb.append(String.format("resourceConfigMap: %s", _resourceConfigCache.getPropertyMap())).append("\n");
sb.append(String.format("messageCache: %s", _instanceMessagesCache)).append("\n");
sb.append(String.format("currentStateCache: %s", _currentStateCache)).append("\n");
@@ -1113,7 +1294,7 @@ public class BaseControllerDataProvider implements ControlContextProvider {
}
protected PropertyCache<LiveInstance> getLiveInstanceCache() {
- return _liveInstanceCache;
+ return _allLiveInstanceCache;
}
@Override
diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java
index 96894e82d..3a71e777b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java
@@ -26,7 +26,6 @@ import java.util.Set;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.common.caches.TaskCurrentStateCache;
import org.apache.helix.model.CurrentState;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.common.caches.AbstractDataCache;
@@ -164,7 +163,7 @@ public class WorkflowControllerDataProvider extends BaseControllerDataProvider {
*/
public void resetActiveTaskCount(CurrentStateOutput currentStateOutput) {
// init participant map
- for (String liveInstance : getLiveInstances().keySet()) {
+ for (String liveInstance : getAssignableLiveInstances().keySet()) {
_participantActiveTaskCount.put(liveInstance, 0);
}
// Active task == init and running tasks
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
index 1d8cb5d6c..7a23b8f28 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
@@ -102,9 +102,10 @@ public abstract class AbstractRebalancer<T extends BaseControllerDataProvider> i
Set<String> disabledInstancesForPartition =
cache.getDisabledInstancesForPartition(resource.getResourceName(), partition.toString());
List<String> preferenceList = getPreferenceList(partition, idealState,
- Collections.unmodifiableSet(cache.getLiveInstances().keySet()));
+ Collections.unmodifiableSet(cache.getAssignableLiveInstances().keySet()));
Map<String, String> bestStateForPartition =
- computeBestPossibleStateForPartition(cache.getLiveInstances().keySet(), stateModelDef,
+ computeBestPossibleStateForPartition(cache.getAssignableLiveInstances().keySet(),
+ stateModelDef,
preferenceList, currentStateOutput, disabledInstancesForPartition, idealState,
cache.getClusterConfig(), partition,
cache.getAbnormalStateResolver(stateModelDefName), cache);
@@ -392,6 +393,12 @@ public abstract class AbstractRebalancer<T extends BaseControllerDataProvider> i
* transition to the top-state, which could minimize the impact to the application's availability.
* To achieve that, we sort the preferenceList based on CurrentState, by treating top-state and
* second-states with same priority and rely on the fact that Collections.sort() is stable.
+ * @param preferenceList List of instances the replica will be placed on
+ * @param stateModelDef State model definition
+ * @param currentStateMap Current state of each replica <instance: state>
+ * @param liveInstances Set of live instances
+ * @param disabledInstancesForPartition Set of disabled instances for the partition
+ * @param bestPossibleStateMap Output map of <instance: state> for the partition
*/
private void assignStatesToInstances(final List<String> preferenceList,
final StateModelDefinition stateModelDef, final Map<String, String> currentStateMap,
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
index 78bbbba28..5c3ff3b9e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
@@ -72,15 +72,15 @@ public class AutoRebalancer extends AbstractRebalancer<ResourceControllerDataPro
LOG.error("State Model Definition null for resource: " + resourceName);
throw new HelixException("State Model Definition null for resource: " + resourceName);
}
- Map<String, LiveInstance> liveInstance = clusterData.getLiveInstances();
- int replicas = currentIdealState.getReplicaCount(liveInstance.size());
+ Map<String, LiveInstance> assignableLiveInstance = clusterData.getAssignableLiveInstances();
+ int replicas = currentIdealState.getReplicaCount(assignableLiveInstance.size());
LinkedHashMap<String, Integer> stateCountMap = stateModelDef
- .getStateCountMap(liveInstance.size(), replicas);
- List<String> liveNodes = new ArrayList<>(liveInstance.keySet());
- List<String> allNodes = new ArrayList<>(clusterData.getAllInstances());
- allNodes.removeAll(clusterData.getDisabledInstances());
- liveNodes.retainAll(allNodes);
+ .getStateCountMap(assignableLiveInstance.size(), replicas);
+ List<String> assignableLiveNodes = new ArrayList<>(assignableLiveInstance.keySet());
+ List<String> assignableNodes = new ArrayList<>(clusterData.getAssignableInstances());
+ assignableNodes.removeAll(clusterData.getDisabledInstances());
+ assignableLiveNodes.retainAll(assignableNodes);
Map<String, Map<String, String>> currentMapping =
currentMapping(currentStateOutput, resourceName, partitions, stateCountMap);
@@ -89,11 +89,11 @@ public class AutoRebalancer extends AbstractRebalancer<ResourceControllerDataPro
Set<String> taggedNodes = new HashSet<String>();
Set<String> taggedLiveNodes = new HashSet<String>();
if (currentIdealState.getInstanceGroupTag() != null) {
- for (String instanceName : allNodes) {
- if (clusterData.getInstanceConfigMap().get(instanceName)
+ for (String instanceName : assignableNodes) {
+ if (clusterData.getAssignableInstanceConfigMap().get(instanceName)
.containsTag(currentIdealState.getInstanceGroupTag())) {
taggedNodes.add(instanceName);
- if (liveInstance.containsKey(instanceName)) {
+ if (assignableLiveInstance.containsKey(instanceName)) {
taggedLiveNodes.add(instanceName);
}
}
@@ -114,25 +114,25 @@ public class AutoRebalancer extends AbstractRebalancer<ResourceControllerDataPro
LOG.warn("Resource " + resourceName + " has tag " + currentIdealState.getInstanceGroupTag()
+ " but no live participants have this tag");
}
- allNodes = new ArrayList<>(taggedNodes);
- liveNodes = new ArrayList<>(taggedLiveNodes);
+ assignableNodes = new ArrayList<>(taggedNodes);
+ assignableLiveNodes = new ArrayList<>(taggedLiveNodes);
}
// sort node lists to ensure consistent preferred assignments
- Collections.sort(allNodes);
- Collections.sort(liveNodes);
+ Collections.sort(assignableNodes);
+ Collections.sort(assignableLiveNodes);
int maxPartition = currentIdealState.getMaxPartitionsPerInstance();
_rebalanceStrategy =
getRebalanceStrategy(currentIdealState.getRebalanceStrategy(), partitions, resourceName,
stateCountMap, maxPartition);
ZNRecord newMapping = _rebalanceStrategy
- .computePartitionAssignment(allNodes, liveNodes, currentMapping, clusterData);
+ .computePartitionAssignment(assignableNodes, assignableLiveNodes, currentMapping, clusterData);
LOG.debug("currentMapping: {}", currentMapping);
LOG.debug("stateCountMap: {}", stateCountMap);
- LOG.debug("liveNodes: {}", liveNodes);
- LOG.debug("allNodes: {}", allNodes);
+ LOG.debug("assignableLiveNodes: {}", assignableLiveNodes);
+ LOG.debug("assignableNodes: {}", assignableNodes);
LOG.debug("maxPartition: {}", maxPartition);
LOG.debug("newMapping: {}", newMapping);
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
index 7ed2e70b0..939d94aed 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
@@ -127,7 +127,7 @@ public class CustomRebalancer extends AbstractRebalancer<ResourceControllerDataP
return instanceStateMap;
}
- Map<String, LiveInstance> liveInstancesMap = cache.getLiveInstances();
+ Map<String, LiveInstance> assignableLiveInstancesMap = cache.getAssignableLiveInstances();
for (String instance : idealStateMap.keySet()) {
boolean notInErrorState = currentStateMap != null
&& !HelixDefinedState.ERROR.toString().equals(currentStateMap.get(instance));
@@ -135,7 +135,7 @@ public class CustomRebalancer extends AbstractRebalancer<ResourceControllerDataP
// Note: if instance is not live, the mapping for that instance will not show up in
// BestPossibleMapping (and ExternalView)
- if (liveInstancesMap.containsKey(instance) && notInErrorState) {
+ if (assignableLiveInstancesMap.containsKey(instance) && notInErrorState) {
if (enabled) {
instanceStateMap.put(instance, idealStateMap.get(instance));
} else {
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
index 442ddfb02..78793ddd9 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
@@ -40,7 +40,6 @@ import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil;
import org.apache.helix.controller.rebalancer.util.WagedValidationUtil;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.ClusterConfig;
-import org.apache.helix.model.ClusterTopologyConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
@@ -95,51 +94,46 @@ public class DelayedAutoRebalancer extends AbstractRebalancer<ResourceController
}
}
- Set<String> liveEnabledNodes;
- Set<String> allNodes;
+ Set<String> assignableLiveEnabledNodes;
+ Set<String> assignableNodes;
String instanceTag = currentIdealState.getInstanceGroupTag();
if (instanceTag != null) {
- liveEnabledNodes = clusterData.getEnabledLiveInstancesWithTag(instanceTag);
- allNodes = clusterData.getInstancesWithTag(instanceTag);
+ assignableLiveEnabledNodes = clusterData.getAssignableEnabledLiveInstancesWithTag(instanceTag);
+ assignableNodes = clusterData.getAssignableInstancesWithTag(instanceTag);
if (LOG.isInfoEnabled()) {
LOG.info(String.format(
"Found the following participants with tag %s for %s: "
+ "instances: %s, liveEnabledInstances: %s",
- currentIdealState.getInstanceGroupTag(), resourceName, allNodes, liveEnabledNodes));
+ currentIdealState.getInstanceGroupTag(), resourceName, assignableNodes, assignableLiveEnabledNodes));
}
} else {
- liveEnabledNodes = clusterData.getEnabledLiveInstances();
- allNodes = clusterData.getAllInstances();
+ assignableLiveEnabledNodes = clusterData.getAssignableEnabledLiveInstances();
+ assignableNodes = clusterData.getAssignableInstances();
}
- Set<String> allNodesDeduped = DelayedRebalanceUtil.filterOutInstancesWithDuplicateLogicalIds(
- ClusterTopologyConfig.createFromClusterConfig(clusterConfig),
- clusterData.getInstanceConfigMap(), allNodes);
- // Remove the non-selected instances with duplicate logicalIds from liveEnabledNodes
- // This ensures the same duplicate instance is kept in both allNodesDeduped and liveEnabledNodes
- liveEnabledNodes.retainAll(allNodesDeduped);
-
long delay = DelayedRebalanceUtil.getRebalanceDelay(currentIdealState, clusterConfig);
- Set<String> activeNodes = DelayedRebalanceUtil
- .getActiveNodes(allNodesDeduped, currentIdealState, liveEnabledNodes,
- clusterData.getInstanceOfflineTimeMap(), clusterData.getLiveInstances().keySet(),
- clusterData.getInstanceConfigMap(), delay, clusterConfig);
+ Set<String> activeNodes =
+ DelayedRebalanceUtil.getActiveNodes(assignableNodes, currentIdealState, assignableLiveEnabledNodes,
+ clusterData.getInstanceOfflineTimeMap(),
+ clusterData.getAssignableLiveInstances().keySet(),
+ clusterData.getAssignableInstanceConfigMap(), delay, clusterConfig);
if (delayRebalanceEnabled) {
Set<String> offlineOrDisabledInstances = new HashSet<>(activeNodes);
- offlineOrDisabledInstances.removeAll(liveEnabledNodes);
+ offlineOrDisabledInstances.removeAll(assignableLiveEnabledNodes);
DelayedRebalanceUtil.setRebalanceScheduler(currentIdealState.getResourceName(), true,
offlineOrDisabledInstances, clusterData.getInstanceOfflineTimeMap(),
- clusterData.getLiveInstances().keySet(), clusterData.getInstanceConfigMap(), delay,
+ clusterData.getAssignableLiveInstances().keySet(),
+ clusterData.getAssignableInstanceConfigMap(), delay,
clusterConfig, _manager);
}
- if (allNodesDeduped.isEmpty() || activeNodes.isEmpty()) {
+ if (assignableNodes.isEmpty() || activeNodes.isEmpty()) {
LOG.error(String.format(
"No instances or active instances available for resource %s, "
- + "allInstances: %s, liveInstances: %s, activeInstances: %s",
- resourceName, allNodesDeduped, liveEnabledNodes, activeNodes));
+ + "allInstances: %s, liveInstances: %s, activeInstances: %s", resourceName, assignableNodes,
+ assignableLiveEnabledNodes, activeNodes));
return generateNewIdealState(resourceName, currentIdealState,
emptyMapping(currentIdealState));
}
@@ -165,14 +159,15 @@ public class DelayedAutoRebalancer extends AbstractRebalancer<ResourceController
getRebalanceStrategy(currentIdealState.getRebalanceStrategy(), allPartitions, resourceName,
stateCountMap, maxPartition);
- List<String> allNodeList = new ArrayList<>(allNodesDeduped);
+ List<String> allNodeList = new ArrayList<>(assignableNodes);
// TODO: Currently we have 2 groups of instances and compute preference list twice and merge.
// Eventually we want to have exclusive groups of instance for different instance tag.
List<String> liveEnabledAssignableNodeList = new ArrayList<>(
// We will not assign partitions to instances with EVACUATE InstanceOperation.
- DelayedRebalanceUtil.filterOutEvacuatingInstances(clusterData.getInstanceConfigMap(),
- liveEnabledNodes));
+ DelayedRebalanceUtil.filterOutEvacuatingInstances(
+ clusterData.getAssignableInstanceConfigMap(),
+ assignableLiveEnabledNodes));
// sort node lists to ensure consistent preferred assignments
Collections.sort(allNodeList);
Collections.sort(liveEnabledAssignableNodeList);
@@ -194,29 +189,16 @@ public class DelayedAutoRebalancer extends AbstractRebalancer<ResourceController
_rebalanceStrategy.computePartitionAssignment(allNodeList, activeNodeList, currentMapping,
clusterData);
finalMapping = getFinalDelayedMapping(currentIdealState, newIdealMapping, newActiveMapping,
- liveEnabledNodes, replicaCount, minActiveReplicas);
+ assignableLiveEnabledNodes, replicaCount, minActiveReplicas);
}
finalMapping.getListFields().putAll(userDefinedPreferenceList);
- // 1. Get all SWAP_OUT instances and corresponding SWAP_IN instance pairs in the cluster.
- Map<String, String> swapOutToSwapInInstancePairs =
- clusterData.getSwapOutToSwapInInstancePairs();
- // 2. Get all enabled and live SWAP_IN instances in the cluster.
- Set<String> enabledLiveSwapInInstances = clusterData.getEnabledLiveSwapInInstanceNames();
- // 3. For each SWAP_OUT instance in any of the preferenceLists, add the corresponding SWAP_IN instance to the end.
- // Skipping this when there are not SWAP_IN instances ready(enabled and live) will reduce computation time when there is not an active
- // swap occurring.
- if (!clusterData.getEnabledLiveSwapInInstanceNames().isEmpty()) {
- DelayedRebalanceUtil.addSwapInInstanceToPreferenceListsIfSwapOutInstanceExists(finalMapping,
- swapOutToSwapInInstancePairs, enabledLiveSwapInInstances);
- }
-
LOG.debug("currentMapping: {}", currentMapping);
LOG.debug("stateCountMap: {}", stateCountMap);
- LOG.debug("liveEnabledNodes: {}", liveEnabledNodes);
+ LOG.debug("assignableLiveEnabledNodes: {}", assignableLiveEnabledNodes);
LOG.debug("activeNodes: {}", activeNodes);
- LOG.debug("allNodes: {}", allNodesDeduped);
+ LOG.debug("assignableNodes: {}", assignableNodes);
LOG.debug("maxPartition: {}", maxPartition);
LOG.debug("newIdealMapping: {}", newIdealMapping);
LOG.debug("finalMapping: {}", finalMapping);
@@ -274,14 +256,15 @@ public class DelayedAutoRebalancer extends AbstractRebalancer<ResourceController
LOG.debug("Processing resource:" + resource.getResourceName());
}
- Set<String> allNodes = cache.getEnabledInstances();
- Set<String> liveNodes = cache.getLiveInstances().keySet();
+ Set<String> allNodes = cache.getAssignableEnabledInstances();
+ Set<String> liveNodes = cache.getAssignableLiveInstances().keySet();
ClusterConfig clusterConfig = cache.getClusterConfig();
long delayTime = DelayedRebalanceUtil.getRebalanceDelay(idealState, clusterConfig);
Set<String> activeNodes = DelayedRebalanceUtil
.getActiveNodes(allNodes, idealState, liveNodes, cache.getInstanceOfflineTimeMap(),
- cache.getLiveInstances().keySet(), cache.getInstanceConfigMap(), delayTime,
+ cache.getAssignableLiveInstances().keySet(), cache.getAssignableInstanceConfigMap(),
+ delayTime,
clusterConfig);
String stateModelDefName = idealState.getStateModelDefRef();
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AbstractEvenDistributionRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AbstractEvenDistributionRebalanceStrategy.java
index d4922b9d6..7750bd70b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AbstractEvenDistributionRebalanceStrategy.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AbstractEvenDistributionRebalanceStrategy.java
@@ -87,7 +87,7 @@ public abstract class AbstractEvenDistributionRebalanceStrategy
final List<String> liveNodes, final Map<String, Map<String, String>> currentMapping,
ResourceControllerDataProvider clusterData) {
// validate the instance configs
- Map<String, InstanceConfig> instanceConfigMap = clusterData.getInstanceConfigMap();
+ Map<String, InstanceConfig> instanceConfigMap = clusterData.getAssignableInstanceConfigMap();
if (instanceConfigMap == null || !instanceConfigMap.keySet().containsAll(allNodes)) {
throw new HelixException(String.format("Config for instances %s is not found!",
allNodes.removeAll(instanceConfigMap.keySet())));
@@ -116,7 +116,8 @@ public abstract class AbstractEvenDistributionRebalanceStrategy
if (!origPartitionMap.isEmpty()) {
Map<String, List<Node>> finalPartitionMap = null;
Topology allNodeTopo =
- new Topology(allNodes, allNodes, clusterData.getInstanceConfigMap(), clusterData.getClusterConfig());
+ new Topology(allNodes, allNodes, clusterData.getAssignableInstanceConfigMap(),
+ clusterData.getClusterConfig());
// Transform current assignment to instance->partitions map, and get total partitions
Map<Node, List<String>> nodeToPartitionMap =
convertPartitionMap(origPartitionMap, allNodeTopo);
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/ConstraintRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/ConstraintRebalanceStrategy.java
index b52ef9852..783af13f0 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/ConstraintRebalanceStrategy.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/ConstraintRebalanceStrategy.java
@@ -154,7 +154,7 @@ public class ConstraintRebalanceStrategy extends AbstractEvenDistributionRebalan
// Since instance weight will be replaced by constraint evaluation, record it in advance to avoid
// overwriting.
Map<String, Integer> instanceWeightRecords = new HashMap<>();
- for (InstanceConfig instanceConfig : clusterData.getInstanceConfigMap().values()) {
+ for (InstanceConfig instanceConfig : clusterData.getAssignableInstanceConfigMap().values()) {
if (instanceConfig.getWeight() != InstanceConfig.WEIGHT_NOT_SET) {
instanceWeightRecords.put(instanceConfig.getInstanceName(), instanceConfig.getWeight());
}
@@ -163,7 +163,7 @@ public class ConstraintRebalanceStrategy extends AbstractEvenDistributionRebalan
List<String> candidates = new ArrayList<>(allNodes);
// Only calculate for configured nodes.
// Remove all non-configured nodes.
- candidates.retainAll(clusterData.getAllInstances());
+ candidates.retainAll(clusterData.getAssignableInstances());
// For generating the IdealState ZNRecord
Map<String, List<String>> preferenceList = new HashMap<>();
@@ -207,7 +207,7 @@ public class ConstraintRebalanceStrategy extends AbstractEvenDistributionRebalan
// recover the original weight
for (String instanceName : instanceWeightRecords.keySet()) {
- clusterData.getInstanceConfigMap().get(instanceName)
+ clusterData.getAssignableInstanceConfigMap().get(instanceName)
.setWeight(instanceWeightRecords.get(instanceName));
}
@@ -297,7 +297,7 @@ public class ConstraintRebalanceStrategy extends AbstractEvenDistributionRebalan
}
// Limit the weight to be at least MIN_INSTANCE_WEIGHT
for (int i = 0; i < instancePriority.length; i++) {
- clusterData.getInstanceConfigMap().get(qualifiedNodes.get(i))
+ clusterData.getAssignableInstanceConfigMap().get(qualifiedNodes.get(i))
.setWeight(instancePriority[i] - baseline + MIN_INSTANCE_WEIGHT);
}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java
index 38d47abbe..08bbaaffa 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java
@@ -75,7 +75,7 @@ public class CrushRebalanceStrategy implements RebalanceStrategy<ResourceControl
public ZNRecord computePartitionAssignment(final List<String> allNodes,
final List<String> liveNodes, final Map<String, Map<String, String>> currentMapping,
ResourceControllerDataProvider clusterData) throws HelixException {
- Map<String, InstanceConfig> instanceConfigMap = clusterData.getInstanceConfigMap();
+ Map<String, InstanceConfig> instanceConfigMap = clusterData.getAssignableInstanceConfigMap();
_clusterTopo =
new Topology(allNodes, liveNodes, instanceConfigMap, clusterData.getClusterConfig());
Node topNode = _clusterTopo.getRootNode();
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/MultiRoundCrushRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/MultiRoundCrushRebalanceStrategy.java
index 0a7f4b67b..a53257f3d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/MultiRoundCrushRebalanceStrategy.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/MultiRoundCrushRebalanceStrategy.java
@@ -82,7 +82,7 @@ public class MultiRoundCrushRebalanceStrategy implements RebalanceStrategy<Resou
public ZNRecord computePartitionAssignment(final List<String> allNodes,
final List<String> liveNodes, final Map<String, Map<String, String>> currentMapping,
ResourceControllerDataProvider clusterData) throws HelixException {
- Map<String, InstanceConfig> instanceConfigMap = clusterData.getInstanceConfigMap();
+ Map<String, InstanceConfig> instanceConfigMap = clusterData.getAssignableInstanceConfigMap();
_clusterTopo =
new Topology(allNodes, liveNodes, instanceConfigMap, clusterData.getClusterConfig());
Node root = _clusterTopo.getRootNode();
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java
index c7066d053..f4fb26541 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java
@@ -141,92 +141,6 @@ public class DelayedRebalanceUtil {
.collect(Collectors.toSet());
}
- /**
- * Filter out instances with duplicate logical IDs. If there are duplicates, the instance with
- * InstanceOperation SWAP_OUT will be chosen over the instance with SWAP_IN. SWAP_IN is not
- * assignable. If there are duplicates with one node having no InstanceOperation and the other
- * having SWAP_OUT, the node with no InstanceOperation will be chosen. This signifies SWAP
- * completion, therefore making the node assignable.
- * TODO: Eventually when we refactor DataProvider to have getAssignableInstances() and
- * TODO: getAssignableEnabledLiveInstances() this will not need to be called in the rebalancer.
- * @param clusterTopologyConfig the cluster topology configuration
- * @param instanceConfigMap the map of instance name to corresponding InstanceConfig
- * @param instances the set of instances to filter out duplicate logicalIDs for
- * @return the set of instances with duplicate logicalIDs filtered out, there will only be one
- * instance per logicalID
- */
- public static Set<String> filterOutInstancesWithDuplicateLogicalIds(
- ClusterTopologyConfig clusterTopologyConfig, Map<String, InstanceConfig> instanceConfigMap,
- Set<String> instances) {
- Set<String> filteredNodes = new HashSet<>();
- Map<String, String> filteredInstancesByLogicalId = new HashMap<>();
-
- instances.forEach(node -> {
- InstanceConfig thisInstanceConfig = instanceConfigMap.get(node);
- if (thisInstanceConfig == null) {
- return;
- }
- String thisLogicalId =
- thisInstanceConfig.getLogicalId(clusterTopologyConfig.getEndNodeType());
-
- if (filteredInstancesByLogicalId.containsKey(thisLogicalId)) {
- InstanceConfig filteredDuplicateInstanceConfig =
- instanceConfigMap.get(filteredInstancesByLogicalId.get(thisLogicalId));
- if ((filteredDuplicateInstanceConfig.getInstanceOperation()
- .equals(InstanceConstants.InstanceOperation.SWAP_IN.name())
- && thisInstanceConfig.getInstanceOperation()
- .equals(InstanceConstants.InstanceOperation.SWAP_OUT.name()))
- || thisInstanceConfig.getInstanceOperation().isEmpty()) {
- // If the already filtered instance is SWAP_IN and this instance is in SWAP_OUT, then replace the filtered
- // instance with this instance. If this instance has no InstanceOperation, then replace the filtered instance
- // with this instance. This is the case where the SWAP_IN node has been marked as complete or SWAP_IN exists and
- // SWAP_OUT does not. There can never be a case where both have no InstanceOperation set.
- filteredNodes.remove(filteredInstancesByLogicalId.get(thisLogicalId));
- filteredNodes.add(node);
- filteredInstancesByLogicalId.put(thisLogicalId, node);
- }
- } else {
- filteredNodes.add(node);
- filteredInstancesByLogicalId.put(thisLogicalId, node);
- }
- });
-
- return filteredNodes;
- }
-
- /**
- * Look through the provided mapping and add corresponding SWAP_IN node if a SWAP_OUT node exists
- * in the partition's preference list.
- *
- * @param mapping the mapping to be updated (IdealState ZNRecord)
- * @param swapOutToSwapInInstancePairs the map of SWAP_OUT to SWAP_IN instances
- */
- public static void addSwapInInstanceToPreferenceListsIfSwapOutInstanceExists(ZNRecord mapping,
- Map<String, String> swapOutToSwapInInstancePairs, Set<String> enabledLiveSwapInInstances) {
- Map<String, List<String>> preferenceListsByPartition = mapping.getListFields();
- for (String partition : preferenceListsByPartition.keySet()) {
- List<String> preferenceList = preferenceListsByPartition.get(partition);
- if (preferenceList == null) {
- continue;
- }
- List<String> newInstancesToAdd = new ArrayList<>();
- for (String instanceName : preferenceList) {
- if (swapOutToSwapInInstancePairs.containsKey(instanceName)
- && enabledLiveSwapInInstances.contains(
- swapOutToSwapInInstancePairs.get(instanceName))) {
- String swapInInstanceName = swapOutToSwapInInstancePairs.get(instanceName);
- if (!preferenceList.contains(swapInInstanceName) && !newInstancesToAdd.contains(
- swapInInstanceName)) {
- newInstancesToAdd.add(swapInInstanceName);
- }
- }
- }
- if (!newInstancesToAdd.isEmpty()) {
- preferenceList.addAll(newInstancesToAdd);
- }
- }
- }
-
/**
* Return the time when an offline or disabled instance should be treated as inactive. Return -1
* if it is inactive now or forced to be rebalanced by an on-demand rebalance.
@@ -429,8 +343,8 @@ public class DelayedRebalanceUtil {
// keep all current assignment and add to allocated replicas
resourceAssignment.getMappedPartitions().forEach(partition ->
- resourceAssignment.getReplicaMap(partition).forEach((instance, state) ->
- allocatedReplicas.computeIfAbsent(instance, key -> new HashSet<>())
+ resourceAssignment.getReplicaMap(partition).forEach((logicalId, state) ->
+ allocatedReplicas.computeIfAbsent(logicalId, key -> new HashSet<>())
.add(new AssignableReplica(clusterData.getClusterConfig(), mergedResourceConfig,
partition.getPartitionName(), state, statePriorityMap.get(state)))));
// only proceed for resource requiring delayed rebalance overwrites
@@ -505,7 +419,7 @@ public class DelayedRebalanceUtil {
ResourceAssignment resourceAssignment) {
String resourceName = resourceAssignment.getResourceName();
IdealState currentIdealState = clusterData.getIdealState(resourceName);
- Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
+ Set<String> enabledLiveInstances = clusterData.getAssignableEnabledLiveInstances();
int numReplica = currentIdealState.getReplicaCount(enabledLiveInstances.size());
int minActiveReplica = DelayedRebalanceUtil.getMinActiveReplica(ResourceConfig
.mergeIdealStateWithResourceConfig(clusterData.getResourceConfig(resourceName),
@@ -526,7 +440,7 @@ public class DelayedRebalanceUtil {
private static int getMinActiveReplica(ResourceControllerDataProvider clusterData, String resourceName) {
IdealState currentIdealState = clusterData.getIdealState(resourceName);
- Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
+ Set<String> enabledLiveInstances = clusterData.getAssignableEnabledLiveInstances();
int numReplica = currentIdealState.getReplicaCount(enabledLiveInstances.size());
return DelayedRebalanceUtil.getMinActiveReplica(ResourceConfig
.mergeIdealStateWithResourceConfig(clusterData.getResourceConfig(resourceName),
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/GlobalRebalanceRunner.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/GlobalRebalanceRunner.java
index 6c199bc1b..d710425cf 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/GlobalRebalanceRunner.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/GlobalRebalanceRunner.java
@@ -20,6 +20,7 @@ package org.apache.helix.controller.rebalancer.waged;
*/
import com.google.common.collect.ImmutableSet;
+
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
@@ -30,12 +31,12 @@ import org.apache.helix.HelixConstants;
import org.apache.helix.HelixRebalanceException;
import org.apache.helix.controller.changedetector.ResourceChangeDetector;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
-import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil;
import org.apache.helix.controller.rebalancer.util.WagedRebalanceUtil;
import org.apache.helix.controller.rebalancer.waged.model.ClusterModel;
import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.ClusterTopologyConfig;
+import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.monitoring.metrics.MetricCollector;
@@ -111,6 +112,7 @@ class GlobalRebalanceRunner implements AutoCloseable {
_changeDetector.updateSnapshots(clusterData);
// Get all the changed items' information. Filter for the items that have content changed.
final Map<HelixConstants.ChangeType, Set<String>> clusterChanges = _changeDetector.getAllChanges();
+ Set<String> allAssignableInstances = clusterData.getAssignableInstances();
if (clusterChanges.keySet().stream().anyMatch(GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES::contains)) {
final boolean waitForGlobalRebalance = !_asyncGlobalRebalanceEnabled;
@@ -120,8 +122,8 @@ class GlobalRebalanceRunner implements AutoCloseable {
// If the synchronous thread does not wait for the baseline to be calculated, the synchronous thread should
// be triggered again after baseline is finished.
// Set shouldTriggerMainPipeline to be !waitForGlobalRebalance
- doGlobalRebalance(clusterData, resourceMap, algorithm, currentStateOutput, !waitForGlobalRebalance,
- clusterChanges);
+ doGlobalRebalance(clusterData, resourceMap, allAssignableInstances, algorithm,
+ currentStateOutput, !waitForGlobalRebalance, clusterChanges);
} catch (HelixRebalanceException e) {
if (_asyncGlobalRebalanceEnabled) {
_rebalanceFailureCount.increment(1L);
@@ -150,9 +152,11 @@ class GlobalRebalanceRunner implements AutoCloseable {
* @param shouldTriggerMainPipeline True if the call should trigger a following main pipeline rebalance
* so the new Baseline could be applied to cluster.
*/
- private void doGlobalRebalance(ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
+ private void doGlobalRebalance(ResourceControllerDataProvider clusterData,
+ Map<String, Resource> resourceMap, Set<String> allAssignableInstances,
RebalanceAlgorithm algorithm, CurrentStateOutput currentStateOutput, boolean shouldTriggerMainPipeline,
- Map<HelixConstants.ChangeType, Set<String>> clusterChanges) throws HelixRebalanceException {
+ Map<HelixConstants.ChangeType, Set<String>> clusterChanges)
+ throws HelixRebalanceException {
LOG.info("Start calculating the new baseline.");
_baselineCalcCounter.increment(1L);
_baselineCalcLatency.startMeasuringLatency();
@@ -166,14 +170,7 @@ class GlobalRebalanceRunner implements AutoCloseable {
ClusterModel clusterModel;
try {
clusterModel = ClusterModelProvider.generateClusterModelForBaseline(clusterData, resourceMap,
- // Dedupe and select correct node if there is more than one with the same logical id.
- // We should be calculating a new baseline only after a swap operation is complete and the SWAP_IN node is selected
- // by deduping. This ensures that adding the SWAP_IN node to the cluster does not cause new baseline to be calculated
- // with both the SWAP_OUT and SWAP_IN node.
- DelayedRebalanceUtil.filterOutInstancesWithDuplicateLogicalIds(
- ClusterTopologyConfig.createFromClusterConfig(clusterData.getClusterConfig()),
- clusterData.getInstanceConfigMap(), clusterData.getAllInstances()),
- clusterChanges, currentBaseline);
+ allAssignableInstances, clusterChanges, currentBaseline);
} catch (Exception ex) {
throw new HelixRebalanceException("Failed to generate cluster model for global rebalance.",
HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index 8f71f4e6d..c3049ebbf 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -47,7 +47,6 @@ import org.apache.helix.controller.rebalancer.waged.model.ClusterModel;
import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.ClusterConfig;
-import org.apache.helix.model.ClusterTopologyConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
@@ -303,18 +302,12 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
final CurrentStateOutput currentStateOutput, RebalanceAlgorithm algorithm)
throws HelixRebalanceException {
- Set<String> allNodesDeduped = DelayedRebalanceUtil.filterOutInstancesWithDuplicateLogicalIds(
- ClusterTopologyConfig.createFromClusterConfig(clusterData.getClusterConfig()),
- clusterData.getInstanceConfigMap(), clusterData.getAllInstances());
- // Remove the non-selected instances with duplicate logicalIds from liveEnabledNodes
- // This ensures the same duplicate instance is kept in both allNodesDeduped and liveEnabledNodes
- Set<String> liveEnabledNodesDeduped = clusterData.getEnabledLiveInstances();
- liveEnabledNodesDeduped.retainAll(allNodesDeduped);
-
Set<String> activeNodes =
- DelayedRebalanceUtil.getActiveNodes(allNodesDeduped, liveEnabledNodesDeduped,
- clusterData.getInstanceOfflineTimeMap(), clusterData.getLiveInstances().keySet(),
- clusterData.getInstanceConfigMap(), clusterData.getClusterConfig());
+ DelayedRebalanceUtil.getActiveNodes(clusterData.getAssignableInstances(),
+ clusterData.getAssignableEnabledLiveInstances(),
+ clusterData.getInstanceOfflineTimeMap(),
+ clusterData.getAssignableLiveInstances().keySet(),
+ clusterData.getAssignableInstanceConfigMap(), clusterData.getClusterConfig());
// Schedule (or unschedule) delayed rebalance according to the delayed rebalance config.
delayedRebalanceSchedule(clusterData, activeNodes, resourceMap.keySet());
@@ -369,19 +362,6 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
newIdealState.setPreferenceLists(
getPreferenceLists(assignments.get(resourceName), statePriorityMap));
- // 1. Get all SWAP_OUT instances and corresponding SWAP_IN instance pairs in the cluster.
- Map<String, String> swapOutToSwapInInstancePairs =
- clusterData.getSwapOutToSwapInInstancePairs();
- // 2. Get all enabled and live SWAP_IN instances in the cluster.
- Set<String> enabledLiveSwapInInstances = clusterData.getEnabledLiveSwapInInstanceNames();
- // 3. For each SWAP_OUT instance in any of the preferenceLists, add the corresponding SWAP_IN instance to the end.
- // Skipping this when there are not SWAP_IN instances ready(enabled and live) will reduce computation time when there is not an active
- // swap occurring.
- if (!clusterData.getEnabledLiveSwapInInstanceNames().isEmpty()) {
- DelayedRebalanceUtil.addSwapInInstanceToPreferenceListsIfSwapOutInstanceExists(
- newIdealState.getRecord(), swapOutToSwapInInstancePairs, enabledLiveSwapInInstances);
- }
-
// Note the state mapping in the new assignment won't directly propagate to the map fields.
// The rebalancer will calculate for the final state mapping considering the current states.
finalIdealStateMap.put(resourceName, newIdealState);
@@ -419,15 +399,12 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
Set<String> activeNodes,
Map<String, ResourceAssignment> currentResourceAssignment,
RebalanceAlgorithm algorithm) throws HelixRebalanceException {
+
// the "real" live nodes at the time
- // TODO: this is a hacky way to filter our on operation instance. We should consider redesign `getEnabledLiveInstances()`.
- final Set<String> allNodesDeduped = DelayedRebalanceUtil.filterOutInstancesWithDuplicateLogicalIds(
- ClusterTopologyConfig.createFromClusterConfig(clusterData.getClusterConfig()),
- clusterData.getInstanceConfigMap(), clusterData.getAllInstances());
- final Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
- // Remove the non-selected instances with duplicate logicalIds from liveEnabledNodes
- // This ensures the same duplicate instance is kept in both allNodesDeduped and liveEnabledNodes
- enabledLiveInstances.retainAll(allNodesDeduped);
+ // TODO: Move evacuation into BaseControllerDataProvider assignableNode logic.
+ final Set<String> enabledLiveInstances = DelayedRebalanceUtil.filterOutEvacuatingInstances(
+ clusterData.getAssignableInstanceConfigMap(),
+ clusterData.getAssignableEnabledLiveInstances());
if (activeNodes.equals(enabledLiveInstances) || !requireRebalanceOverwrite(clusterData, currentResourceAssignment)) {
// no need for additional process, return the current resource assignment
@@ -622,12 +599,13 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
ClusterConfig clusterConfig = clusterData.getClusterConfig();
boolean delayedRebalanceEnabled = DelayedRebalanceUtil.isDelayRebalanceEnabled(clusterConfig);
Set<String> offlineOrDisabledInstances = new HashSet<>(delayedActiveNodes);
- offlineOrDisabledInstances.removeAll(clusterData.getEnabledLiveInstances());
+ offlineOrDisabledInstances.removeAll(clusterData.getAssignableEnabledLiveInstances());
for (String resource : resourceSet) {
DelayedRebalanceUtil
.setRebalanceScheduler(resource, delayedRebalanceEnabled, offlineOrDisabledInstances,
- clusterData.getInstanceOfflineTimeMap(), clusterData.getLiveInstances().keySet(),
- clusterData.getInstanceConfigMap(), clusterConfig.getRebalanceDelayTime(),
+ clusterData.getInstanceOfflineTimeMap(),
+ clusterData.getAssignableLiveInstances().keySet(),
+ clusterData.getAssignableInstanceConfigMap(), clusterConfig.getRebalanceDelayTime(),
clusterConfig, _manager);
}
} else {
@@ -642,13 +620,10 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
String resourceName = resourceAssignment.getResourceName();
IdealState currentIdealState = clusterData.getIdealState(resourceName);
- Set<String> allNodesDeduped = DelayedRebalanceUtil.filterOutInstancesWithDuplicateLogicalIds(
- ClusterTopologyConfig.createFromClusterConfig(clusterData.getClusterConfig()),
- clusterData.getInstanceConfigMap(), clusterData.getAllInstances());
- Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
- // Remove the non-selected instances with duplicate logicalIds from liveEnabledNodes
- // This ensures the same duplicate instance is kept in both allNodesDeduped and liveEnabledNodes
- enabledLiveInstances.retainAll(allNodesDeduped);
+ // TODO: Move evacuation into BaseControllerDataProvider assignableNode logic.
+ Set<String> enabledLiveInstances = DelayedRebalanceUtil.filterOutEvacuatingInstances(
+ clusterData.getAssignableInstanceConfigMap(),
+ clusterData.getAssignableEnabledLiveInstances());
int numReplica = currentIdealState.getReplicaCount(enabledLiveInstances.size());
int minActiveReplica = DelayedRebalanceUtil.getMinActiveReplica(ResourceConfig
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/AbstractPartitionMovementConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/AbstractPartitionMovementConstraint.java
index 913e04234..0fbd0ad3d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/AbstractPartitionMovementConstraint.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/AbstractPartitionMovementConstraint.java
@@ -64,16 +64,16 @@ abstract class AbstractPartitionMovementConstraint extends SoftConstraint {
return assignment.get(resourceName).getReplicaMap(new Partition(partitionName));
}
- protected double calculateAssignmentScore(String nodeName, String state,
+ protected double calculateAssignmentScore(String logicalId, String state,
Map<String, String> instanceToStateMap) {
- if (instanceToStateMap.containsKey(nodeName)) {
+ if (instanceToStateMap.containsKey(logicalId)) {
// The score when the proposed allocation partially matches the assignment plan but will
// require a state transition.
double scoreWithStateTransitionCost =
MIN_SCORE + (MAX_SCORE - MIN_SCORE) * STATE_TRANSITION_COST_FACTOR;
// if state matches, no state transition required for the proposed assignment; if state does
// not match, then the proposed assignment requires state transition.
- return state.equals(instanceToStateMap.get(nodeName)) ? MAX_SCORE
+ return state.equals(instanceToStateMap.get(logicalId)) ? MAX_SCORE
: scoreWithStateTransitionCost;
}
return MIN_SCORE;
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/BaselineInfluenceConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/BaselineInfluenceConstraint.java
index 5e3fcd286..8063de34f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/BaselineInfluenceConstraint.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/BaselineInfluenceConstraint.java
@@ -44,7 +44,7 @@ public class BaselineInfluenceConstraint extends AbstractPartitionMovementConstr
Map<String, String> baselineAssignment =
getStateMap(replica, clusterContext.getBaselineAssignment());
- return calculateAssignmentScore(node.getInstanceName(), replica.getReplicaState(),
+ return calculateAssignmentScore(node.getLogicalId(), replica.getReplicaState(),
baselineAssignment);
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
index 77d56302c..60d43764a 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
@@ -148,10 +148,10 @@ class ConstraintBasedAlgorithm implements RebalanceAlgorithm {
if (scoreCompareResult == 0) {
// If the evaluation scores of 2 nodes are the same, the algorithm assigns the replica
// to the idle node first.
- String instanceName1 = nodeEntry1.getKey().getInstanceName();
- String instanceName2 = nodeEntry2.getKey().getInstanceName();
- int idleScore1 = busyInstances.contains(instanceName1) ? 0 : 1;
- int idleScore2 = busyInstances.contains(instanceName2) ? 0 : 1;
+ String logicalId1 = nodeEntry1.getKey().getLogicalId();
+ String logicalId2 = nodeEntry2.getKey().getLogicalId();
+ int idleScore1 = busyInstances.contains(logicalId1) ? 0 : 1;
+ int idleScore2 = busyInstances.contains(logicalId2) ? 0 : 1;
return idleScore1 != idleScore2 ? (idleScore1 - idleScore2)
: -nodeEntry1.getKey().compareTo(nodeEntry2.getKey());
} else {
@@ -271,7 +271,7 @@ class ConstraintBasedAlgorithm implements RebalanceAlgorithm {
/**
* @param assignments A collection of resource replicas assignment.
- * @return A set of instance names that have at least one replica assigned in the input assignments.
+ * @return A set of logicalIds that have at least one replica assigned in the input assignments.
*/
private Set<String> getBusyInstances(Collection<ResourceAssignment> assignments) {
return assignments.stream().flatMap(
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/PartitionMovementConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/PartitionMovementConstraint.java
index 08c135d66..05c7a94ed 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/PartitionMovementConstraint.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/PartitionMovementConstraint.java
@@ -40,14 +40,14 @@ public class PartitionMovementConstraint extends AbstractPartitionMovementConstr
getStateMap(replica, clusterContext.getBestPossibleAssignment());
Map<String, String> baselineAssignment =
getStateMap(replica, clusterContext.getBaselineAssignment());
- String nodeName = node.getInstanceName();
+ String logicalId = node.getLogicalId();
String state = replica.getReplicaState();
if (bestPossibleAssignment.isEmpty()) {
// if best possible is missing, it means the replica belongs to a newly added resource, so
// baseline assignment should be used instead.
- return calculateAssignmentScore(nodeName, state, baselineAssignment);
+ return calculateAssignmentScore(logicalId, state, baselineAssignment);
}
- return calculateAssignmentScore(nodeName, state, bestPossibleAssignment);
+ return calculateAssignmentScore(logicalId, state, bestPossibleAssignment);
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
index 3f1673210..a869a904e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
@@ -80,7 +80,8 @@ public class ClusterModelProvider {
Set<String> activeInstances,
Map<String, ResourceAssignment> resourceAssignment) {
return generateClusterModel(dataProvider, resourceMap, activeInstances, Collections.emptyMap(),
- Collections.emptyMap(), resourceAssignment, RebalanceScopeType.DELAYED_REBALANCE_OVERWRITES);
+ Collections.emptyMap(), resourceAssignment,
+ RebalanceScopeType.DELAYED_REBALANCE_OVERWRITES);
}
/**
@@ -162,8 +163,9 @@ public class ClusterModelProvider {
public static ClusterModel generateClusterModelFromExistingAssignment(
ResourceControllerDataProvider dataProvider, Map<String, Resource> resourceMap,
Map<String, ResourceAssignment> currentStateAssignment) {
- return generateClusterModel(dataProvider, resourceMap, dataProvider.getEnabledLiveInstances(),
- Collections.emptyMap(), Collections.emptyMap(), currentStateAssignment,
+ return generateClusterModel(dataProvider, resourceMap,
+ dataProvider.getAssignableEnabledLiveInstances(), Collections.emptyMap(),
+ Collections.emptyMap(), currentStateAssignment,
RebalanceScopeType.GLOBAL_BASELINE);
}
@@ -187,11 +189,35 @@ public class ClusterModelProvider {
Map<HelixConstants.ChangeType, Set<String>> clusterChanges,
Map<String, ResourceAssignment> idealAssignment,
Map<String, ResourceAssignment> currentAssignment, RebalanceScopeType scopeType) {
+ Map<String, InstanceConfig> assignableInstanceConfigMap = dataProvider.getAssignableInstanceConfigMap();
// Construct all the assignable nodes and initialize with the allocated replicas.
Set<AssignableNode> assignableNodes =
- getAllAssignableNodes(dataProvider.getClusterConfig(), dataProvider.getInstanceConfigMap(),
+ getAllAssignableNodes(dataProvider.getClusterConfig(), assignableInstanceConfigMap,
activeInstances);
+ // Generate the logical view of the ideal assignment and the current assignment.
+ ClusterTopologyConfig clusterTopologyConfig =
+ ClusterTopologyConfig.createFromClusterConfig(dataProvider.getClusterConfig());
+ Map<String, ResourceAssignment> logicalIdIdealAssignment =
+ idealAssignment.isEmpty() ? idealAssignment
+ : generateResourceAssignmentMapLogicalIdView(idealAssignment, clusterTopologyConfig,
+ dataProvider);
+ Map<String, ResourceAssignment> logicalIdCurrentAssignment =
+ currentAssignment.isEmpty() ? currentAssignment
+ : generateResourceAssignmentMapLogicalIdView(currentAssignment, clusterTopologyConfig,
+ dataProvider);
+
+ // Get the set of active logical ids.
+ Set<String> activeLogicalIds = activeInstances.stream().map(
+ instanceName -> assignableInstanceConfigMap.get(instanceName)
+ .getLogicalId(clusterTopologyConfig.getEndNodeType())).collect(Collectors.toSet());
+
+ Set<String> assignableLiveInstanceNames = dataProvider.getAssignableLiveInstances().keySet();
+ Set<String> assignableLiveInstanceLogicalIds =
+ assignableLiveInstanceNames.stream().map(
+ instanceName -> assignableInstanceConfigMap.get(instanceName)
+ .getLogicalId(clusterTopologyConfig.getEndNodeType())).collect(Collectors.toSet());
+
// Generate replica objects for all the resource partitions.
// <resource, replica set>
Map<String, Set<AssignableReplica>> replicaMap =
@@ -203,27 +229,28 @@ public class ClusterModelProvider {
Set<AssignableReplica> toBeAssignedReplicas;
switch (scopeType) {
case GLOBAL_BASELINE:
- toBeAssignedReplicas = findToBeAssignedReplicasByClusterChanges(replicaMap, activeInstances,
- dataProvider.getLiveInstances().keySet(), clusterChanges, currentAssignment,
+ toBeAssignedReplicas =
+ findToBeAssignedReplicasByClusterChanges(replicaMap, activeLogicalIds,
+ assignableLiveInstanceLogicalIds, clusterChanges, logicalIdCurrentAssignment,
allocatedReplicas);
break;
case PARTIAL:
// Filter to remove the replicas that do not exist in the ideal assignment given but exist
// in the replicaMap. This is because such replicas are new additions that do not need to be
// rebalanced right away.
- retainExistingReplicas(replicaMap, idealAssignment);
+ retainExistingReplicas(replicaMap, logicalIdIdealAssignment);
toBeAssignedReplicas =
- findToBeAssignedReplicasByComparingWithIdealAssignment(replicaMap, activeInstances,
- idealAssignment, currentAssignment, allocatedReplicas);
+ findToBeAssignedReplicasByComparingWithIdealAssignment(replicaMap, activeLogicalIds,
+ logicalIdIdealAssignment, logicalIdCurrentAssignment, allocatedReplicas);
break;
case EMERGENCY:
- toBeAssignedReplicas = findToBeAssignedReplicasOnDownInstances(replicaMap, activeInstances,
- currentAssignment, allocatedReplicas);
+ toBeAssignedReplicas = findToBeAssignedReplicasOnDownInstances(replicaMap, activeLogicalIds,
+ logicalIdCurrentAssignment, allocatedReplicas);
break;
case DELAYED_REBALANCE_OVERWRITES:
toBeAssignedReplicas =
DelayedRebalanceUtil.findToBeAssignedReplicasForMinActiveReplica(dataProvider, replicaMap.keySet(),
- activeInstances, currentAssignment, allocatedReplicas);
+ activeLogicalIds, logicalIdCurrentAssignment, allocatedReplicas);
break;
default:
throw new HelixException("Unknown rebalance scope type: " + scopeType);
@@ -231,18 +258,56 @@ public class ClusterModelProvider {
// Update the allocated replicas to the assignable nodes.
assignableNodes.parallelStream().forEach(node -> node.assignInitBatch(
- allocatedReplicas.getOrDefault(node.getInstanceName(), Collections.emptySet())));
+ allocatedReplicas.getOrDefault(node.getLogicalId(), Collections.emptySet())));
// Construct and initialize cluster context.
ClusterContext context = new ClusterContext(
replicaMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet()),
- assignableNodes, idealAssignment, currentAssignment);
+ assignableNodes, logicalIdIdealAssignment, logicalIdCurrentAssignment);
+
// Initial the cluster context with the allocated assignments.
context.setAssignmentForFaultZoneMap(mapAssignmentToFaultZone(assignableNodes));
return new ClusterModel(context, toBeAssignedReplicas, assignableNodes);
}
+ private static Map<String, ResourceAssignment> generateResourceAssignmentMapLogicalIdView(
+ Map<String, ResourceAssignment> resourceAssignmentMap,
+ ClusterTopologyConfig clusterTopologyConfig, ResourceControllerDataProvider dataProvider) {
+
+ Map<String, InstanceConfig> allInstanceConfigMap = dataProvider.getInstanceConfigMap();
+
+ return resourceAssignmentMap.entrySet().parallelStream()
+ .collect(Collectors.toMap(Map.Entry::getKey, entry -> {
+ String resourceName = entry.getKey();
+ ResourceAssignment instanceNameResourceAssignment = entry.getValue();
+ ResourceAssignment logicalIdResourceAssignment = new ResourceAssignment(resourceName);
+
+ StateModelDefinition stateModelDefinition = dataProvider.getStateModelDef(
+ dataProvider.getIdealState(resourceName).getStateModelDefRef());
+
+ instanceNameResourceAssignment.getMappedPartitions().forEach(partition -> {
+ Map<String, String> logicalIdStateMap = new HashMap<>();
+
+ instanceNameResourceAssignment.getReplicaMap(partition)
+ .forEach((instanceName, state) -> {
+ if (allInstanceConfigMap.containsKey(instanceName)) {
+ String logicalId = allInstanceConfigMap.get(instanceName)
+ .getLogicalId(clusterTopologyConfig.getEndNodeType());
+ if (!logicalIdStateMap.containsKey(logicalId) || state.equals(
+ stateModelDefinition.getTopState())) {
+ logicalIdStateMap.put(logicalId, state);
+ }
+ }
+ });
+
+ logicalIdResourceAssignment.addReplicaMap(partition, logicalIdStateMap);
+ });
+
+ return logicalIdResourceAssignment;
+ }));
+ }
+
// Filter the replicas map so only the replicas that have been allocated in the existing
// assignmentMap remain in the map.
private static void retainExistingReplicas(Map<String, Set<AssignableReplica>> replicaMap,
@@ -399,8 +464,11 @@ public class ClusterModelProvider {
Set<String> newlyConnectedNodes = clusterChanges
.getOrDefault(HelixConstants.ChangeType.LIVE_INSTANCE, Collections.emptySet());
newlyConnectedNodes.retainAll(liveInstances);
- if (clusterChanges.containsKey(HelixConstants.ChangeType.CLUSTER_CONFIG) || clusterChanges
- .containsKey(HelixConstants.ChangeType.INSTANCE_CONFIG) || !newlyConnectedNodes.isEmpty()) {
+
+ if (clusterChanges.containsKey(HelixConstants.ChangeType.CLUSTER_CONFIG)
+ || clusterChanges.containsKey(HelixConstants.ChangeType.INSTANCE_CONFIG)
+ || !newlyConnectedNodes.isEmpty()) {
+
// 1. If the cluster topology has been modified, need to reassign all replicas.
// 2. If any node was newly connected, need to rebalance all replicas for the evenness of
// distribution.
@@ -419,7 +487,7 @@ public class ClusterModelProvider {
.getOrDefault(HelixConstants.ChangeType.IDEAL_STATE, Collections.emptySet())
.contains(resourceName) || !currentAssignment.containsKey(resourceName)) {
toBeAssignedReplicas.addAll(replicas);
- continue; // go to check next resource
+ // go to check next resource
} else {
// check for every replica assignment to identify if the related replicas need to be reassigned.
// <partition, <state, instances list>>
@@ -433,16 +501,15 @@ public class ClusterModelProvider {
if (validInstances.isEmpty()) {
// 3. if no such an instance in the current assignment, need to reassign the replica
toBeAssignedReplicas.add(replica);
- continue; // go to check the next replica
} else {
Iterator<String> iter = validInstances.iterator();
// Remove the instance from the current allocation record after processing so that it
// won't be double-processed as we loop through all replicas
- String instanceName = iter.next();
+ String logicalId = iter.next();
iter.remove();
// the current assignment for this replica is valid,
// add to the allocated replica list.
- allocatedReplicas.computeIfAbsent(instanceName, key -> new HashSet<>()).add(replica);
+ allocatedReplicas.computeIfAbsent(logicalId, key -> new HashSet<>()).add(replica);
}
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 8223d9a36..8ec4b4475 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -22,9 +22,11 @@ package org.apache.helix.controller.stages;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;
@@ -35,6 +37,7 @@ import org.apache.helix.controller.LogUtil;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.controller.rebalancer.AbstractRebalancer;
import org.apache.helix.controller.rebalancer.CustomRebalancer;
import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
import org.apache.helix.controller.rebalancer.MaintenanceRebalancer;
@@ -86,9 +89,16 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
final BestPossibleStateOutput bestPossibleStateOutput =
compute(event, resourceMap, currentStateOutput);
+
+ // Add swap-in instances to bestPossibleStateOutput.
+ // We do this after computing the best possible state output because rebalance algorithms should not
+ // to be aware of swap-in instances. We simply add the swap-in instances to the
+ // stateMap where the swap-out instance is and compute the correct state.
+ addSwapInInstancesToBestPossibleState(resourceMap, bestPossibleStateOutput, cache);
+
event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput);
- final Map<String, InstanceConfig> instanceConfigMap = cache.getInstanceConfigMap();
+ final Map<String, InstanceConfig> allInstanceConfigMap = cache.getInstanceConfigMap();
final Map<String, StateModelDefinition> stateModelDefMap = cache.getStateModelDefMap();
final Map<String, IdealState> idealStateMap = cache.getIdealStates();
final Map<String, ExternalView> externalViewMap = cache.getExternalViews();
@@ -96,8 +106,8 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
asyncExecute(cache.getAsyncTasksThreadPool(), () -> {
try {
if (clusterStatusMonitor != null) {
- clusterStatusMonitor
- .setPerInstanceResourceStatus(bestPossibleStateOutput, instanceConfigMap, resourceMap,
+ clusterStatusMonitor.setPerInstanceResourceStatus(bestPossibleStateOutput,
+ allInstanceConfigMap, resourceMap,
stateModelDefMap);
for (String resourceName : idealStateMap.keySet()) {
@@ -121,6 +131,52 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
});
}
+ private void addSwapInInstancesToBestPossibleState(Map<String, Resource> resourceMap,
+ BestPossibleStateOutput bestPossibleStateOutput, ResourceControllerDataProvider cache) {
+ // 1. Get all SWAP_OUT instances and corresponding SWAP_IN instance pairs in the cluster.
+ Map<String, String> swapOutToSwapInInstancePairs = cache.getSwapOutToSwapInInstancePairs();
+ // 2. Get all enabled and live SWAP_IN instances in the cluster.
+ Set<String> enabledLiveSwapInInstances = cache.getEnabledLiveSwapInInstanceNames();
+ // 3. For each SWAP_OUT instance in any of the preferenceLists, add the corresponding SWAP_IN instance to the end.
+ // Skipping this when there are not SWAP_IN instances ready(enabled and live) will reduce computation time when there is not an active
+ // swap occurring.
+ if (!enabledLiveSwapInInstances.isEmpty() && !cache.isMaintenanceModeEnabled()) {
+ resourceMap.forEach((resourceName, resource) -> {
+ StateModelDefinition stateModelDef = cache.getStateModelDef(resource.getStateModelDefRef());
+ bestPossibleStateOutput.getResourceStatesMap().get(resourceName).getStateMap()
+ .forEach((partition, stateMap) -> {
+ Set<String> commonInstances = new HashSet<>(stateMap.keySet());
+ commonInstances.retainAll(swapOutToSwapInInstancePairs.keySet());
+
+ commonInstances.forEach(swapOutInstance -> {
+ if (stateMap.get(swapOutInstance).equals(stateModelDef.getTopState())) {
+ if (AbstractRebalancer.getStateCount(stateModelDef.getTopState(), stateModelDef,
+ stateMap.size() + 1, stateMap.size() + 1) > stateMap.size()) {
+ // If the swap-out instance's replica is a topState and the StateModel allows for
+ // another replica with the topState to be added, set the swap-in instance's replica
+ // to the topState.
+ stateMap.put(swapOutToSwapInInstancePairs.get(swapOutInstance),
+ stateModelDef.getTopState());
+ } else {
+ // If the swap-out instance's replica is a topState and the StateModel does not allow for
+ // another replica with the topState to be added, set the swap-in instance's replica
+ // to the secondTopState.
+ stateMap.put(swapOutToSwapInInstancePairs.get(swapOutInstance),
+ stateModelDef.getSecondTopStates().iterator().next());
+ }
+ } else if (stateModelDef.getSecondTopStates()
+ .contains(stateMap.get(swapOutInstance))) {
+ // If the swap-out instance's replica is a secondTopState, set the swap-in instance's replica
+ // to the same secondTopState.
+ stateMap.put(swapOutToSwapInInstancePairs.get(swapOutInstance),
+ stateMap.get(swapOutInstance));
+ }
+ });
+ });
+ });
+ }
+ }
+
private void reportResourceState(ClusterStatusMonitor clusterStatusMonitor,
BestPossibleStateOutput bestPossibleStateOutput, String resourceName, IdealState is,
ExternalView ev, StateModelDefinition stateModelDef) {
@@ -239,7 +295,8 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
final HelixManager manager) {
int maxOfflineInstancesAllowed = cache.getClusterConfig().getMaxOfflineInstancesAllowed();
if (maxOfflineInstancesAllowed >= 0) {
- int offlineCount = cache.getAllInstances().size() - cache.getEnabledLiveInstances().size();
+ int offlineCount =
+ cache.getAssignableInstances().size() - cache.getAssignableEnabledLiveInstances().size();
if (offlineCount > maxOfflineInstancesAllowed) {
String errMsg = String.format(
"Offline Instances count %d greater than allowed count %d. Put cluster %s into "
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
index 477e4f99b..b3990046c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
@@ -551,6 +551,10 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
messagesThrottled.add(messageToThrottle.getId());
return;
}
+ // TODO: Currently throttling is applied for messages that are targeting all instances including those not considered as
+ // assignable. They all share the same configured limits. After discussion, there was agreement that this is the proper
+ // behavior. In addition to this, we should consider adding priority based on whether the instance is assignable and whether
+ // the message is bringing replica count to configured replicas or above configured replicas.
throttleStateTransitionsForReplica(throttleController, resource.getResourceName(), partition,
messageToThrottle, messagesThrottled, RebalanceType.LOAD_BALANCE, cache,
resourceMessageMap);
@@ -656,10 +660,12 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
// Generate a state mapping, state -> required numbers based on the live and enabled instances for this partition
// preference list
if (preferenceList != null) {
- return stateModelDefinition.getStateCountMap((int) preferenceList.stream().filter(i -> resourceControllerDataProvider.getEnabledLiveInstances().contains(i))
+ return stateModelDefinition.getStateCountMap((int) preferenceList.stream().filter(
+ i -> resourceControllerDataProvider.getAssignableEnabledLiveInstances().contains(i))
.count(), requiredNumReplica); // StateModelDefinition's counts
}
- return stateModelDefinition.getStateCountMap(resourceControllerDataProvider.getEnabledLiveInstances().size(),
+ return stateModelDefinition.getStateCountMap(
+ resourceControllerDataProvider.getAssignableEnabledLiveInstances().size(),
requiredNumReplica); // StateModelDefinition's counts
}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MaintenanceRecoveryStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MaintenanceRecoveryStage.java
index e4a7e12aa..d262d1402 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MaintenanceRecoveryStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MaintenanceRecoveryStage.java
@@ -90,7 +90,7 @@ public class MaintenanceRecoveryStage extends AbstractAsyncBaseStage {
}
// Get the count of all instances that are either offline or disabled
int offlineDisabledCount =
- cache.getAllInstances().size() - cache.getEnabledLiveInstances().size();
+ cache.getAssignableInstances().size() - cache.getAssignableEnabledLiveInstances().size();
shouldExitMaintenance = offlineDisabledCount <= numOfflineInstancesForAutoExit;
reason = String.format(
"Auto-exiting maintenance mode for cluster %s; Num. of offline/disabled instances is %d, less than or equal to the exit threshold %d",
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
index 8a7ae52b5..5c22c11db 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
@@ -266,7 +266,8 @@ public class MessageGenerationPhase extends AbstractBaseStage {
LogUtil.logError(logger, _eventId, String.format(
"An invalid message was generated! Discarding this message. sessionIdMap: %s, CurrentStateMap: %s, InstanceStateMap: %s, AllInstances: %s, LiveInstances: %s, Message: %s",
sessionIdMap, currentStateOutput.getCurrentStateMap(resourceName, partition),
- instanceStateMap, cache.getAllInstances(), cache.getLiveInstances().keySet(),
+ instanceStateMap, cache.getAllInstances(),
+ cache.getLiveInstances().keySet(),
message));
continue; // Do not add this message
}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
index a7e9742e0..7e8bde9d1 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
@@ -19,8 +19,6 @@ package org.apache.helix.controller.stages;
* under the License.
*/
-import java.util.Collection;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
index 1f77fa66a..00b2fd71b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
@@ -19,12 +19,10 @@ package org.apache.helix.controller.stages;
* under the License.
*/
-import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
-import org.apache.helix.HelixProperty;
import org.apache.helix.controller.LogUtil;
import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java
index dbedf7bcc..f9662101c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java
@@ -40,7 +40,6 @@ import org.apache.helix.controller.stages.ClusterEvent;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
-import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
import org.apache.helix.task.AssignableInstanceManager;
import org.apache.helix.task.TaskConstants;
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index 34bd56487..7a0fe6377 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -643,16 +643,22 @@ public class ZKHelixAdmin implements HelixAdmin {
HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
- // 1. Check that both instances are alive.
+ // 1. Check that both instances are alive and enabled.
LiveInstance swapOutLiveInstance =
accessor.getProperty(keyBuilder.liveInstance(swapOutInstanceName));
LiveInstance swapInLiveInstance =
accessor.getProperty(keyBuilder.liveInstance(swapInInstanceName));
- if (swapOutLiveInstance == null || swapInLiveInstance == null) {
+ InstanceConfig swapOutInstanceConfig = getInstanceConfig(clusterName, swapOutInstanceName);
+ InstanceConfig swapInInstanceConfig = getInstanceConfig(clusterName, swapInInstanceName);
+ if (swapOutLiveInstance == null || swapInLiveInstance == null
+ || !swapOutInstanceConfig.getInstanceEnabled()
+ || !swapInInstanceConfig.getInstanceEnabled()) {
logger.warn(
- "SwapOutInstance {} is {} and SwapInInstance {} is {} for cluster {}. Swap will not complete unless both instances are ONLINE.",
+ "SwapOutInstance {} is {} + {} and SwapInInstance {} is {} + {} for cluster {}. Swap will not complete unless both instances are ONLINE.",
swapOutInstanceName, swapOutLiveInstance != null ? "ONLINE" : "OFFLINE",
- swapInInstanceName, swapInLiveInstance != null ? "ONLINE" : "OFFLINE", clusterName);
+ swapOutInstanceConfig.getInstanceEnabled() ? "ENABLED" : "DISABLED", swapInInstanceName,
+ swapInLiveInstance != null ? "ONLINE" : "OFFLINE",
+ swapInInstanceConfig.getInstanceEnabled() ? "ENABLED" : "DISABLED", clusterName);
return false;
}
diff --git a/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java b/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
index 4b56057cd..b347cea03 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
@@ -39,7 +39,6 @@ import org.apache.helix.zookeeper.datamodel.ZNRecord;
* can be in s1.
*/
public class ResourceAssignment extends HelixProperty {
-
/**
* Initialize an empty mapping
* @param resourceName the resource being mapped
diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
index 8da61958c..819ee5d72 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
@@ -647,7 +647,8 @@ public abstract class AbstractTaskDispatcher {
int jobCfgLimitation =
jobCfg.getNumConcurrentTasksPerInstance() - assignedPartitions.get(instance).size();
// 2. throttled by participant capacity
- int participantCapacity = cache.getInstanceConfigMap().get(instance).getMaxConcurrentTask();
+ int participantCapacity =
+ cache.getAssignableInstanceConfigMap().get(instance).getMaxConcurrentTask();
if (participantCapacity == InstanceConfig.MAX_CONCURRENT_TASK_NOT_SET) {
participantCapacity = cache.getClusterConfig().getMaxConcurrentTaskPerInstance();
}
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
index 4b0e00f81..fd22a8e1f 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
@@ -141,8 +141,8 @@ public class JobDispatcher extends AbstractTaskDispatcher {
// Will contain the list of partitions that must be explicitly dropped from the ideal state that
// is stored in zk.
Set<String> liveInstances =
- jobCfg.getInstanceGroupTag() == null ? _dataProvider.getEnabledLiveInstances()
- : _dataProvider.getEnabledLiveInstancesWithTag(jobCfg.getInstanceGroupTag());
+ jobCfg.getInstanceGroupTag() == null ? _dataProvider.getAssignableEnabledLiveInstances()
+ : _dataProvider.getAssignableEnabledLiveInstancesWithTag(jobCfg.getInstanceGroupTag());
if (liveInstances.isEmpty()) {
LOG.error("No available instance found for job: {}", jobName);
@@ -163,7 +163,7 @@ public class JobDispatcher extends AbstractTaskDispatcher {
if (jobTgtState == TargetState.STOP) {
// If the assigned instance is no longer live, so mark it as DROPPED in the context
markPartitionsWithoutLiveInstance(jobCtx, liveInstances);
-
+
if (jobState != TaskState.NOT_STARTED && TaskUtil.checkJobStopped(jobCtx)) {
workflowCtx.setJobState(jobName, TaskState.STOPPED);
} else {
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java
index da9324acf..c7d46b8e4 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java
@@ -146,7 +146,7 @@ public class ClusterExternalViewVerifier extends ClusterVerifier {
cache.refresh(_accessor);
List<String> liveInstances = new ArrayList<String>();
- liveInstances.addAll(cache.getLiveInstances().keySet());
+ liveInstances.addAll(cache.getAssignableLiveInstances().keySet());
boolean success = verifyLiveNodes(liveInstances);
if (!success) {
LOG.info("liveNodes not match, expect: " + _expectSortedLiveNodes + ", actual: "
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java
index ca47c16b9..d0da9ba8e 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java
@@ -330,16 +330,16 @@ public class StrictMatchExternalViewVerifier extends ZkHelixClusterVerifier {
Map<String, Map<String, String>> idealPartitionState = new HashMap<>();
for (String partition : idealState.getPartitionSet()) {
- List<String> preferenceList = AbstractRebalancer
- .getPreferenceList(new Partition(partition), idealState, cache.getEnabledLiveInstances());
+ List<String> preferenceList = AbstractRebalancer.getPreferenceList(new Partition(partition),
+ idealState, cache.getAssignableEnabledLiveInstances());
Map<String, String> idealMapping;
if (_isDeactivatedNodeAware) {
- idealMapping = HelixUtil
- .computeIdealMapping(preferenceList, stateModelDef, cache.getLiveInstances().keySet(),
+ idealMapping = HelixUtil.computeIdealMapping(preferenceList, stateModelDef,
+ cache.getAssignableLiveInstances().keySet(),
cache.getDisabledInstancesForPartition(idealState.getResourceName(), partition));
} else {
- idealMapping = HelixUtil
- .computeIdealMapping(preferenceList, stateModelDef, cache.getEnabledLiveInstances(),
+ idealMapping = HelixUtil.computeIdealMapping(preferenceList, stateModelDef,
+ cache.getAssignableEnabledLiveInstances(),
Collections.emptySet());
}
idealPartitionState.put(partition, idealMapping);
diff --git a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
index 88c33f608..4a3d49b73 100644
--- a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
@@ -294,9 +294,9 @@ public final class HelixUtil {
.collect(Collectors.toMap(InstanceConfig::getInstanceName, Function.identity())));
// For LiveInstances, we must preserve the existing session IDs
// So read LiveInstance objects from the cluster and do a "retainAll" on them
- // liveInstanceMap is an unmodifiableMap instances, so we filter using a stream
- Map<String, LiveInstance> liveInstanceMap = dataProvider.getLiveInstances();
- List<LiveInstance> filteredLiveInstances = liveInstanceMap.entrySet().stream()
+ // assignableLiveInstanceMap is an unmodifiableMap instances, so we filter using a stream
+ Map<String, LiveInstance> assignableLiveInstanceMap = dataProvider.getAssignableLiveInstances();
+ List<LiveInstance> filteredLiveInstances = assignableLiveInstanceMap.entrySet().stream()
.filter(entry -> liveInstances.contains(entry.getKey())).map(Map.Entry::getValue)
.collect(Collectors.toList());
// Synthetically create LiveInstance objects that are passed in as the parameter
diff --git a/helix-core/src/test/java/org/apache/helix/controller/changedetector/trimmer/TestHelixPropoertyTimmer.java b/helix-core/src/test/java/org/apache/helix/controller/changedetector/trimmer/TestHelixPropoertyTimmer.java
index 80679310a..6f6cdc0a7 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/changedetector/trimmer/TestHelixPropoertyTimmer.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/changedetector/trimmer/TestHelixPropoertyTimmer.java
@@ -111,11 +111,11 @@ public class TestHelixPropoertyTimmer {
ResourceControllerDataProvider dataProvider =
Mockito.mock(ResourceControllerDataProvider.class);
when(dataProvider.getRefreshedChangeTypes()).thenReturn(changeTypes);
- when(dataProvider.getInstanceConfigMap()).thenReturn(instanceConfigMap);
+ when(dataProvider.getAssignableInstanceConfigMap()).thenReturn(instanceConfigMap);
when(dataProvider.getIdealStates()).thenReturn(idealStateMap);
when(dataProvider.getResourceConfigMap()).thenReturn(resourceConfigMap);
when(dataProvider.getClusterConfig()).thenReturn(clusterConfig);
- when(dataProvider.getLiveInstances()).thenReturn(Collections.emptyMap());
+ when(dataProvider.getAssignableLiveInstances()).thenReturn(Collections.emptyMap());
return dataProvider;
}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java
index 32a131d48..905c5552b 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java
@@ -243,7 +243,7 @@ public class TestAutoRebalanceStrategy {
}
}
Map<String, String> assignment = new AutoRebalancer()
- .computeBestPossibleStateForPartition(cache.getLiveInstances().keySet(), _stateModelDef,
+ .computeBestPossibleStateForPartition(cache.getAssignableLiveInstances().keySet(), _stateModelDef,
preferenceList, currentStateOutput, disabled, is, clusterConfig, p,
MonitoredAbnormalResolver.DUMMY_STATE_RESOLVER);
mapResult.put(partition, assignment);
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
index 000978ef1..951e0e3c5 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
@@ -20,7 +20,7 @@ package org.apache.helix.controller.rebalancer.waged;
*/
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
+
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
@@ -108,16 +108,21 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
_instances.add(instanceName);
// 1. Set up the default instance information with capacity configuration.
InstanceConfig testInstanceConfig = createMockInstanceConfig(instanceName);
- Map<String, InstanceConfig> instanceConfigMap = testCache.getInstanceConfigMap();
+ Map<String, InstanceConfig> instanceConfigMap = testCache.getAssignableInstanceConfigMap();
instanceConfigMap.put(instanceName, testInstanceConfig);
+ when(testCache.getAssignableInstanceConfigMap()).thenReturn(instanceConfigMap);
when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap);
// 2. Mock the live instance node for the default instance.
LiveInstance testLiveInstance = createMockLiveInstance(instanceName);
- Map<String, LiveInstance> liveInstanceMap = testCache.getLiveInstances();
+ Map<String, LiveInstance> liveInstanceMap = testCache.getAssignableLiveInstances();
liveInstanceMap.put(instanceName, testLiveInstance);
+ when(testCache.getAssignableLiveInstances()).thenReturn(liveInstanceMap);
when(testCache.getLiveInstances()).thenReturn(liveInstanceMap);
+ when(testCache.getAssignableEnabledInstances()).thenReturn(liveInstanceMap.keySet());
when(testCache.getEnabledInstances()).thenReturn(liveInstanceMap.keySet());
+ when(testCache.getAssignableEnabledLiveInstances()).thenReturn(liveInstanceMap.keySet());
when(testCache.getEnabledLiveInstances()).thenReturn(liveInstanceMap.keySet());
+ when(testCache.getAssignableInstances()).thenReturn(_instances);
when(testCache.getAllInstances()).thenReturn(_instances);
}
@@ -370,7 +375,7 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
Collectors.toMap(resourceName -> resourceName, Resource::new));
try {
rebalancer.computeBestPossibleAssignment(clusterData, resourceMap,
- clusterData.getEnabledLiveInstances(), new CurrentStateOutput(), _algorithm);
+ clusterData.getAssignableEnabledLiveInstances(), new CurrentStateOutput(), _algorithm);
Assert.fail("Rebalance shall fail.");
} catch (HelixRebalanceException ex) {
Assert.assertEquals(ex.getFailureType(), HelixRebalanceException.Type.FAILED_TO_CALCULATE);
@@ -434,7 +439,7 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
// Calculation will fail
try {
rebalancer.computeBestPossibleAssignment(clusterData, resourceMap,
- clusterData.getEnabledLiveInstances(), new CurrentStateOutput(), badAlgorithm);
+ clusterData.getAssignableEnabledLiveInstances(), new CurrentStateOutput(), badAlgorithm);
Assert.fail("Rebalance shall fail.");
} catch (HelixRebalanceException ex) {
Assert.assertEquals(ex.getFailureType(), HelixRebalanceException.Type.FAILED_TO_CALCULATE);
@@ -601,6 +606,11 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
String offlinePartition = _partitionNames.get(0);
String offlineState = "MASTER";
String offlineInstance = "offlineInstance";
+ InstanceConfig offlineInstanceConfig = createMockInstanceConfig(offlineInstance);
+ Map<String, InstanceConfig> instanceConfigMap = clusterData.getAssignableInstanceConfigMap();
+ instanceConfigMap.put(offlineInstance, offlineInstanceConfig);
+ when(clusterData.getAssignableInstanceConfigMap()).thenReturn(instanceConfigMap);
+ when(clusterData.getInstanceConfigMap()).thenReturn(instanceConfigMap);
for (Partition partition : bestPossibleAssignment.get(offlineResource).getMappedPartitions()) {
if (partition.getPartitionName().equals(offlinePartition)) {
bestPossibleAssignment.get(offlineResource)
@@ -649,12 +659,13 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
Set<String> instances = new HashSet<>(_instances);
String offlineInstance = "offlineInstance";
instances.add(offlineInstance);
- when(clusterData.getAllInstances()).thenReturn(instances);
+ when(clusterData.getAssignableInstances()).thenReturn(instances);
Map<String, Long> instanceOfflineTimeMap = new HashMap<>();
instanceOfflineTimeMap.put(offlineInstance, System.currentTimeMillis() + Integer.MAX_VALUE);
when(clusterData.getInstanceOfflineTimeMap()).thenReturn(instanceOfflineTimeMap);
- Map<String, InstanceConfig> instanceConfigMap = clusterData.getInstanceConfigMap();
+ Map<String, InstanceConfig> instanceConfigMap = clusterData.getAssignableInstanceConfigMap();
instanceConfigMap.put(offlineInstance, createMockInstanceConfig(offlineInstance));
+ when(clusterData.getAssignableInstanceConfigMap()).thenReturn(instanceConfigMap);
when(clusterData.getInstanceConfigMap()).thenReturn(instanceConfigMap);
// Set minActiveReplica to 0 so that requireRebalanceOverwrite returns false
@@ -737,15 +748,16 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
// force create a fake offlineInstance that's in delay window
Set<String> instances = new HashSet<>(_instances);
instances.add(offlineInstance);
- when(clusterData.getAllInstances()).thenReturn(instances);
- when(clusterData.getEnabledInstances()).thenReturn(instances);
- when(clusterData.getEnabledLiveInstances()).thenReturn(
+ when(clusterData.getAssignableInstances()).thenReturn(instances);
+ when(clusterData.getAssignableEnabledInstances()).thenReturn(instances);
+ when(clusterData.getAssignableEnabledLiveInstances()).thenReturn(
new HashSet<>(Arrays.asList(instance0, instance1, instance2)));
Map<String, Long> instanceOfflineTimeMap = new HashMap<>();
instanceOfflineTimeMap.put(offlineInstance, System.currentTimeMillis() + Integer.MAX_VALUE);
when(clusterData.getInstanceOfflineTimeMap()).thenReturn(instanceOfflineTimeMap);
- Map<String, InstanceConfig> instanceConfigMap = clusterData.getInstanceConfigMap();
+ Map<String, InstanceConfig> instanceConfigMap = clusterData.getAssignableInstanceConfigMap();
instanceConfigMap.put(offlineInstance, createMockInstanceConfig(offlineInstance));
+ when(clusterData.getAssignableInstanceConfigMap()).thenReturn(instanceConfigMap);
when(clusterData.getInstanceConfigMap()).thenReturn(instanceConfigMap);
Map<String, IdealState> isMap = new HashMap<>();
@@ -881,10 +893,11 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
// force create a fake offlineInstance that's in delay window
Set<String> instances = new HashSet<>(_instances);
- when(clusterData.getAllInstances()).thenReturn(instances);
- when(clusterData.getEnabledInstances()).thenReturn(instances);
- when(clusterData.getEnabledLiveInstances()).thenReturn(instances);
- Map<String, InstanceConfig> instanceConfigMap = clusterData.getInstanceConfigMap();
+ when(clusterData.getAssignableInstances()).thenReturn(instances);
+ when(clusterData.getAssignableEnabledInstances()).thenReturn(instances);
+ when(clusterData.getAssignableEnabledLiveInstances()).thenReturn(instances);
+ Map<String, InstanceConfig> instanceConfigMap = clusterData.getAssignableInstanceConfigMap();
+ when(clusterData.getAssignableInstanceConfigMap()).thenReturn(instanceConfigMap);
when(clusterData.getInstanceConfigMap()).thenReturn(instanceConfigMap);
Map<String, IdealState> isMap = new HashMap<>();
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancerMetrics.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancerMetrics.java
index bdc677bde..c5c7b560c 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancerMetrics.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancerMetrics.java
@@ -256,16 +256,21 @@ public class TestWagedRebalancerMetrics extends AbstractTestClusterModel {
_instances.add(instanceName);
// 1. Set up the default instance information with capacity configuration.
InstanceConfig testInstanceConfig = createMockInstanceConfig(instanceName);
- Map<String, InstanceConfig> instanceConfigMap = testCache.getInstanceConfigMap();
+ Map<String, InstanceConfig> instanceConfigMap = testCache.getAssignableInstanceConfigMap();
instanceConfigMap.put(instanceName, testInstanceConfig);
+ when(testCache.getAssignableInstanceConfigMap()).thenReturn(instanceConfigMap);
when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap);
// 2. Mock the live instance node for the default instance.
LiveInstance testLiveInstance = createMockLiveInstance(instanceName);
- Map<String, LiveInstance> liveInstanceMap = testCache.getLiveInstances();
+ Map<String, LiveInstance> liveInstanceMap = testCache.getAssignableLiveInstances();
liveInstanceMap.put(instanceName, testLiveInstance);
+ when(testCache.getAssignableLiveInstances()).thenReturn(liveInstanceMap);
when(testCache.getLiveInstances()).thenReturn(liveInstanceMap);
+ when(testCache.getAssignableEnabledInstances()).thenReturn(liveInstanceMap.keySet());
when(testCache.getEnabledInstances()).thenReturn(liveInstanceMap.keySet());
+ when(testCache.getAssignableEnabledLiveInstances()).thenReturn(liveInstanceMap.keySet());
when(testCache.getEnabledLiveInstances()).thenReturn(liveInstanceMap.keySet());
+ when(testCache.getAssignableInstances()).thenReturn(_instances);
when(testCache.getAllInstances()).thenReturn(_instances);
}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestPartitionMovementConstraint.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestPartitionMovementConstraint.java
index 16c199470..9b4d1de15 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestPartitionMovementConstraint.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestPartitionMovementConstraint.java
@@ -53,6 +53,7 @@ public class TestPartitionMovementConstraint {
when(_testReplica.getResourceName()).thenReturn(RESOURCE);
when(_testReplica.getPartitionName()).thenReturn(PARTITION);
when(_testNode.getInstanceName()).thenReturn(INSTANCE);
+ when(_testNode.getLogicalId()).thenReturn(INSTANCE);
}
@Test
@@ -104,6 +105,7 @@ public class TestPartitionMovementConstraint {
// when the replica's state matches with best possible, allocation matches with baseline
when(testAssignableNode.getInstanceName()).thenReturn(instanceNameA);
+ when(testAssignableNode.getLogicalId()).thenReturn(instanceNameA);
when(_testReplica.getReplicaState()).thenReturn("Master");
verifyScore(_baselineInfluenceConstraint, testAssignableNode, _testReplica, _clusterContext,
0.5, 0.5);
@@ -112,6 +114,7 @@ public class TestPartitionMovementConstraint {
// when the replica's allocation matches with best possible only
when(testAssignableNode.getInstanceName()).thenReturn(instanceNameB);
+ when(testAssignableNode.getLogicalId()).thenReturn(instanceNameB);
when(_testReplica.getReplicaState()).thenReturn("Master");
verifyScore(_baselineInfluenceConstraint, testAssignableNode, _testReplica, _clusterContext,
0.0, 0.0);
@@ -120,6 +123,7 @@ public class TestPartitionMovementConstraint {
// when the replica's state matches with baseline only
when(testAssignableNode.getInstanceName()).thenReturn(instanceNameC);
+ when(testAssignableNode.getLogicalId()).thenReturn(instanceNameC);
when(_testReplica.getReplicaState()).thenReturn("Master");
verifyScore(_baselineInfluenceConstraint, testAssignableNode, _testReplica, _clusterContext,
1.0, 1.0);
@@ -128,6 +132,7 @@ public class TestPartitionMovementConstraint {
// when the replica's allocation matches with baseline only
when(testAssignableNode.getInstanceName()).thenReturn(instanceNameC);
+ when(testAssignableNode.getLogicalId()).thenReturn(instanceNameC);
when(_testReplica.getReplicaState()).thenReturn("Slave");
verifyScore(_baselineInfluenceConstraint, testAssignableNode, _testReplica, _clusterContext,
0.5, 0.5);
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
index b9c4ce39d..c9deb792d 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
@@ -103,6 +103,7 @@ public abstract class AbstractTestClusterModel {
testInstanceConfig.setInstanceEnabledForPartition("TestResource", "TestPartition", false);
Map<String, InstanceConfig> instanceConfigMap = new HashMap<>();
instanceConfigMap.put(_testInstanceId, testInstanceConfig);
+ when(testCache.getAssignableInstanceConfigMap()).thenReturn(instanceConfigMap);
when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap);
// 2. Set up the basic cluster configuration.
@@ -121,7 +122,7 @@ public abstract class AbstractTestClusterModel {
LiveInstance testLiveInstance = createMockLiveInstance(_testInstanceId);
Map<String, LiveInstance> liveInstanceMap = new HashMap<>();
liveInstanceMap.put(_testInstanceId, testLiveInstance);
- when(testCache.getLiveInstances()).thenReturn(liveInstanceMap);
+ when(testCache.getAssignableLiveInstances()).thenReturn(liveInstanceMap);
// 4. Mock two resources, each with 2 partitions on the default instance.
// The instance will have the following partitions assigned
@@ -288,7 +289,7 @@ public abstract class AbstractTestClusterModel {
protected Set<AssignableNode> generateNodes(ResourceControllerDataProvider testCache) {
Set<AssignableNode> nodeSet = new HashSet<>();
- testCache.getInstanceConfigMap().values().forEach(config -> nodeSet
+ testCache.getAssignableInstanceConfigMap().values().forEach(config -> nodeSet
.add(new AssignableNode(testCache.getClusterConfig(), config, config.getInstanceName())));
return nodeSet;
}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelTestHelper.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelTestHelper.java
index e13fb3979..d63f05c81 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelTestHelper.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelTestHelper.java
@@ -63,7 +63,7 @@ public class ClusterModelTestHelper extends AbstractTestClusterModel {
Map<String, InstanceConfig> instanceConfigMap = new HashMap<>();
instanceConfigMap.put(TEST_INSTANCE_ID_1, testInstanceConfig1);
instanceConfigMap.put(TEST_INSTANCE_ID_2, testInstanceConfig2);
- when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap);
+ when(testCache.getAssignableInstanceConfigMap()).thenReturn(instanceConfigMap);
Set<AssignableReplica> assignableReplicas = generateReplicas(testCache);
Set<AssignableNode> assignableNodes = generateNodes(testCache);
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
index 9cbfc2560..0f751b186 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
@@ -64,7 +64,7 @@ public class TestAssignableNode extends AbstractTestClusterModel {
expectedCapacityMap.put("item3", 30);
AssignableNode assignableNode = new AssignableNode(testCache.getClusterConfig(),
- testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId);
+ testCache.getAssignableInstanceConfigMap().get(_testInstanceId), _testInstanceId);
assignableNode.assignInitBatch(assignmentSet);
Assert.assertEquals(assignableNode.getAssignedPartitionsMap(), expectedAssignment);
Assert.assertEquals(assignableNode.getAssignedReplicaCount(), 4);
@@ -177,7 +177,7 @@ public class TestAssignableNode extends AbstractTestClusterModel {
ResourceControllerDataProvider testCache = setupClusterDataCache();
AssignableNode assignableNode = new AssignableNode(testCache.getClusterConfig(),
- testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId);
+ testCache.getAssignableInstanceConfigMap().get(_testInstanceId), _testInstanceId);
AssignableReplica removingReplica = new AssignableReplica(testCache.getClusterConfig(),
testCache.getResourceConfig(_resourceNames.get(1)), _partitionNames.get(2) + "non-exist",
"MASTER", 1);
@@ -192,7 +192,7 @@ public class TestAssignableNode extends AbstractTestClusterModel {
Set<AssignableReplica> assignmentSet = generateReplicas(testCache);
AssignableNode assignableNode = new AssignableNode(testCache.getClusterConfig(),
- testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId);
+ testCache.getAssignableInstanceConfigMap().get(_testInstanceId), _testInstanceId);
assignableNode.assignInitBatch(assignmentSet);
AssignableReplica duplicateReplica = new AssignableReplica(testCache.getClusterConfig(),
testCache.getResourceConfig(_resourceNames.get(0)), _partitionNames.get(0), "SLAVE", 2);
@@ -213,10 +213,10 @@ public class TestAssignableNode extends AbstractTestClusterModel {
testInstanceConfig.setDomain("instance=testInstance");
Map<String, InstanceConfig> instanceConfigMap = new HashMap<>();
instanceConfigMap.put(_testInstanceId, testInstanceConfig);
- when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap);
+ when(testCache.getAssignableInstanceConfigMap()).thenReturn(instanceConfigMap);
AssignableNode node = new AssignableNode(testCache.getClusterConfig(),
- testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId);
+ testCache.getAssignableInstanceConfigMap().get(_testInstanceId), _testInstanceId);
Assert.assertEquals(node.getFaultZone(), "Helix_default_zone");
}
@@ -234,10 +234,10 @@ public class TestAssignableNode extends AbstractTestClusterModel {
testInstanceConfig.setDomain("zone=2, instance=testInstance");
Map<String, InstanceConfig> instanceConfigMap = new HashMap<>();
instanceConfigMap.put(_testInstanceId, testInstanceConfig);
- when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap);
+ when(testCache.getAssignableInstanceConfigMap()).thenReturn(instanceConfigMap);
AssignableNode assignableNode = new AssignableNode(testCache.getClusterConfig(),
- testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId);
+ testCache.getAssignableInstanceConfigMap().get(_testInstanceId), _testInstanceId);
Assert.assertEquals(assignableNode.getFaultZone(), "2");
@@ -251,10 +251,10 @@ public class TestAssignableNode extends AbstractTestClusterModel {
testInstanceConfig.setDomain("zone=2, instance=testInstance");
instanceConfigMap = new HashMap<>();
instanceConfigMap.put(_testInstanceId, testInstanceConfig);
- when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap);
+ when(testCache.getAssignableInstanceConfigMap()).thenReturn(instanceConfigMap);
assignableNode = new AssignableNode(testCache.getClusterConfig(),
- testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId);
+ testCache.getAssignableInstanceConfigMap().get(_testInstanceId), _testInstanceId);
Assert.assertEquals(assignableNode.getFaultZone(), "2/testInstance");
@@ -268,10 +268,10 @@ public class TestAssignableNode extends AbstractTestClusterModel {
testInstanceConfig.setDomain("rack=3, zone=2, instance=testInstanceConfigId");
instanceConfigMap = new HashMap<>();
instanceConfigMap.put(_testInstanceId, testInstanceConfig);
- when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap);
+ when(testCache.getAssignableInstanceConfigMap()).thenReturn(instanceConfigMap);
when(testCache.getClusterConfig()).thenReturn(testClusterConfig);
assignableNode = new AssignableNode(testCache.getClusterConfig(),
- testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId);
+ testCache.getAssignableInstanceConfigMap().get(_testInstanceId), _testInstanceId);
Assert.assertEquals(assignableNode.getFaultZone(), "3/2");
}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
index 500cfb0b6..34582d600 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
@@ -79,14 +79,15 @@ public class TestClusterModelProvider extends AbstractTestClusterModel {
_instances.add(instanceName);
// 1. Set up the default instance information with capacity configuration.
InstanceConfig testInstanceConfig = createMockInstanceConfig(instanceName);
- Map<String, InstanceConfig> instanceConfigMap = testCache.getInstanceConfigMap();
+ Map<String, InstanceConfig> instanceConfigMap = testCache.getAssignableInstanceConfigMap();
instanceConfigMap.put(instanceName, testInstanceConfig);
+ when(testCache.getAssignableInstanceConfigMap()).thenReturn(instanceConfigMap);
when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap);
// 2. Mock the live instance node for the default instance.
LiveInstance testLiveInstance = createMockLiveInstance(instanceName);
- Map<String, LiveInstance> liveInstanceMap = testCache.getLiveInstances();
+ Map<String, LiveInstance> liveInstanceMap = testCache.getAssignableLiveInstances();
liveInstanceMap.put(instanceName, testLiveInstance);
- when(testCache.getLiveInstances()).thenReturn(liveInstanceMap);
+ when(testCache.getAssignableLiveInstances()).thenReturn(liveInstanceMap);
}
return testCache;
@@ -104,8 +105,8 @@ public class TestClusterModelProvider extends AbstractTestClusterModel {
Set<String> activeInstances = new HashSet<>();
activeInstances.add(instance1);
activeInstances.add(instance2);
- when(testCache.getLiveInstances()).thenReturn(liveInstanceMap);
- when(testCache.getEnabledLiveInstances()).thenReturn(activeInstances);
+ when(testCache.getAssignableLiveInstances()).thenReturn(liveInstanceMap);
+ when(testCache.getAssignableEnabledLiveInstances()).thenReturn(activeInstances);
// test 0, empty input
Assert.assertEquals(
@@ -142,8 +143,8 @@ public class TestClusterModelProvider extends AbstractTestClusterModel {
// test 2, no additional replica to be assigned
testCache = setupClusterDataCache();
- when(testCache.getLiveInstances()).thenReturn(liveInstanceMap);
- when(testCache.getEnabledLiveInstances()).thenReturn(activeInstances);
+ when(testCache.getAssignableLiveInstances()).thenReturn(liveInstanceMap);
+ when(testCache.getAssignableEnabledLiveInstances()).thenReturn(activeInstances);
input = ImmutableMap.of(
_resourceNames.get(0),
ImmutableMap.of(
@@ -167,8 +168,8 @@ public class TestClusterModelProvider extends AbstractTestClusterModel {
// test 3, minActiveReplica==2, two partitions falling short
testCache = setupClusterDataCache();
- when(testCache.getLiveInstances()).thenReturn(liveInstanceMap);
- when(testCache.getEnabledLiveInstances()).thenReturn(activeInstances);
+ when(testCache.getAssignableLiveInstances()).thenReturn(liveInstanceMap);
+ when(testCache.getAssignableEnabledLiveInstances()).thenReturn(activeInstances);
input = ImmutableMap.of(
_resourceNames.get(0),
ImmutableMap.of(
@@ -205,8 +206,8 @@ public class TestClusterModelProvider extends AbstractTestClusterModel {
Set<String> activeInstances = new HashSet<>();
activeInstances.add(instance1);
activeInstances.add(instance2);
- when(testCache.getLiveInstances()).thenReturn(liveInstanceMap);
- when(testCache.getEnabledLiveInstances()).thenReturn(activeInstances);
+ when(testCache.getAssignableLiveInstances()).thenReturn(liveInstanceMap);
+ when(testCache.getAssignableEnabledLiveInstances()).thenReturn(activeInstances);
// test 1, one partition under minActiveReplica
Map<String, Map<String, Map<String, String>>> input = ImmutableMap.of(
@@ -245,8 +246,8 @@ public class TestClusterModelProvider extends AbstractTestClusterModel {
// test 2, minActiveReplica==2, three partitions falling short
testCache = setupClusterDataCache();
- when(testCache.getLiveInstances()).thenReturn(liveInstanceMap);
- when(testCache.getEnabledLiveInstances()).thenReturn(activeInstances);
+ when(testCache.getAssignableLiveInstances()).thenReturn(liveInstanceMap);
+ when(testCache.getAssignableEnabledLiveInstances()).thenReturn(activeInstances);
input = ImmutableMap.of(
_resourceNames.get(0),
ImmutableMap.of(
@@ -413,7 +414,7 @@ public class TestClusterModelProvider extends AbstractTestClusterModel {
.allMatch(replicaSet -> replicaSet.size() == 4));
// Adjust instance fault zone, so they have different fault zones.
- testCache.getInstanceConfigMap().values().stream()
+ testCache.getAssignableInstanceConfigMap().values().stream()
.forEach(config -> config.setZoneId(config.getInstanceName()));
clusterModel = ClusterModelProvider.generateClusterModelForBaseline(testCache,
_resourceNames.stream()
@@ -576,7 +577,7 @@ public class TestClusterModelProvider extends AbstractTestClusterModel {
Assert.assertEquals(clusterModel.getAssignableReplicaMap().size(), 0);
// Adjust instance fault zone, so they have different fault zones.
- testCache.getInstanceConfigMap().values().stream()
+ testCache.getAssignableInstanceConfigMap().values().stream()
.forEach(config -> config.setZoneId(config.getInstanceName()));
// 2. test with a pair of identical best possible assignment and baseline assignment
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
index 4d8fb8669..0027f8e4e 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
@@ -49,8 +49,9 @@ public class TestBestPossibleCalcStageCompatibility extends BaseStageTest {
"testResourceName"
};
setupIdealStateDeprecated(5, resources, 10, 1, IdealStateModeProperty.AUTO);
- setupLiveInstances(5);
setupStateModel();
+ setupInstances(5);
+ setupLiveInstances(5);
Map<String, Resource> resourceMap = getResourceMap();
CurrentStateOutput currentStateOutput = new CurrentStateOutput();
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
index 7289dc3f8..e33dc9f5d 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
@@ -50,6 +50,7 @@ public class TestBestPossibleStateCalcStage extends BaseStageTest {
BuiltInStateModelDefinitions.MasterSlave.name());
setupLiveInstances(5);
setupStateModel();
+ setupInstances(5);
Map<String, Resource> resourceMap =
getResourceMap(resources, numPartition, BuiltInStateModelDefinitions.MasterSlave.name());
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCancellationMessageGeneration.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCancellationMessageGeneration.java
index 2fcfc3e59..7b891522c 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCancellationMessageGeneration.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCancellationMessageGeneration.java
@@ -28,7 +28,6 @@ import java.util.Map;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.HelixManager;
import org.apache.helix.controller.common.PartitionStateMap;
-import org.apache.helix.controller.common.ResourcesStateMap;
import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.LiveInstance;
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java
index 60281e65b..7da3d64c2 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java
@@ -572,6 +572,7 @@ public class TestIntermediateStateCalcStage extends BaseStageTest {
setupIdealState(numOfLiveInstances, resources, numOfLiveInstances, numOfReplicas,
IdealState.RebalanceMode.FULL_AUTO, "OnlineOffline");
setupStateModel();
+ setupInstances(numOfLiveInstances);
setupLiveInstances(numOfLiveInstances);
// Set up cluster configs
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
index 40fa82030..d3c41018c 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
@@ -57,8 +57,10 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
@Test
public void testDuplicateMsg() throws Exception {
String clusterName = "CLUSTER_" + _className + "_dup";
- System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ admin.addCluster(clusterName);
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(_gZkClient));
refreshClusterConfig(clusterName, accessor);
@@ -82,10 +84,11 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
setupIdealState(clusterName, new int[] {
0
}, resourceGroups, 1, 1);
+ setupStateModel(clusterName);
+ setupInstances(clusterName, new int[]{0});
List<LiveInstance> liveInstances = setupLiveInstances(clusterName, new int[] {
0
});
- setupStateModel(clusterName);
// cluster data cache refresh pipeline
Pipeline dataRefresh = new Pipeline();
@@ -321,10 +324,11 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
setupIdealState(clusterName, new int[] {
0
}, resourceGroups, 1, 1);
+ setupStateModel(clusterName);
+ setupInstances(clusterName, new int[]{0});
List<LiveInstance> liveInstances = setupLiveInstances(clusterName, new int[] {
0
});
- setupStateModel(clusterName);
// cluster data cache refresh pipeline
Pipeline dataRefresh = new Pipeline();
@@ -395,8 +399,10 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
@Test
public void testMasterXfer() throws Exception {
String clusterName = "CLUSTER_" + _className + "_xfer";
+ HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ admin.addCluster(clusterName);
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(_gZkClient));
@@ -417,10 +423,11 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
setupIdealState(clusterName, new int[] {
0, 1
}, resourceGroups, 1, 2);
+ setupStateModel(clusterName);
+ setupInstances(clusterName, new int[]{0, 1});
List<LiveInstance> liveInstances = setupLiveInstances(clusterName, new int[] {
1
});
- setupStateModel(clusterName);
// cluster data cache refresh pipeline
Pipeline dataRefresh = new Pipeline();
@@ -474,8 +481,10 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
@Test
public void testNoDuplicatedMaster() throws Exception {
String clusterName = "CLUSTER_" + _className + "_no_duplicated_master";
+ HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ admin.addCluster(clusterName);
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(_gZkClient));
@@ -496,10 +505,11 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
setupIdealState(clusterName, new int[] {
0, 1
}, resourceGroups, 1, 2);
+ setupStateModel(clusterName);
+ setupInstances(clusterName, new int[]{0, 1});
List<LiveInstance> liveInstances = setupLiveInstances(clusterName, new int[] {
0, 1
});
- setupStateModel(clusterName);
// cluster data cache refresh pipeline
Pipeline dataRefresh = new Pipeline();
@@ -553,6 +563,9 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
public void testNoMessageSentOnControllerLeadershipLoss() throws Exception {
String methodName = TestHelper.getTestMethodName();
String clusterName = _className + "_" + methodName;
+ HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+
+ admin.addCluster(clusterName);
final String resourceName = "testResource_" + methodName;
final String partitionName = resourceName + "_0";
@@ -565,10 +578,11 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
setupIdealState(clusterName, new int[] {
0
}, resourceGroups, 1, 1);
+ setupStateModel(clusterName);
+ setupInstances(clusterName, new int[]{0});
List<LiveInstance> liveInstances = setupLiveInstances(clusterName, new int[] {
0
});
- setupStateModel(clusterName);
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(_gZkClient));
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestReplicaLevelThrottling.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestReplicaLevelThrottling.java
index af47542ee..e4aeed04f 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestReplicaLevelThrottling.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestReplicaLevelThrottling.java
@@ -71,7 +71,7 @@ public class TestReplicaLevelThrottling extends BaseStageTest {
when(mock.cache.getClusterConfig()).thenReturn((ClusterConfig) cacheMap.get(CacheKeys.clusterConfig.name()));
when(mock.cache.getStateModelDef((String) cacheMap.get(CacheKeys.stateModelName.name()))).thenReturn(
(StateModelDefinition) cacheMap.get(CacheKeys.stateModelDef.name()));
- when(mock.cache.getEnabledLiveInstances()).thenReturn(new HashSet<>(
+ when(mock.cache.getAssignableEnabledLiveInstances()).thenReturn(new HashSet<>(
((Map<String, List<String>>) cacheMap.get(CacheKeys.preferenceList.name())).values().iterator().next()));
when(mock.cache.getLiveInstances()).thenReturn(new HashSet<>(
((Map<String, List<String>>) cacheMap.get(CacheKeys.preferenceList.name())).values().iterator().next()).stream()
diff --git a/helix-core/src/test/java/org/apache/helix/integration/controller/TestOfflineNodeTimeoutDuringMaintenanceMode.java b/helix-core/src/test/java/org/apache/helix/integration/controller/TestOfflineNodeTimeoutDuringMaintenanceMode.java
index 742020b20..d39dd1fd2 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/controller/TestOfflineNodeTimeoutDuringMaintenanceMode.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/controller/TestOfflineNodeTimeoutDuringMaintenanceMode.java
@@ -208,15 +208,15 @@ public class TestOfflineNodeTimeoutDuringMaintenanceMode extends ZkTestBase {
new ResourceControllerDataProvider(CLUSTER_NAME);
resourceControllerDataProvider.refresh(_helixDataAccessor);
Assert
- .assertFalse(resourceControllerDataProvider.getLiveInstances().containsKey(instance3));
+ .assertFalse(resourceControllerDataProvider.getAssignableLiveInstances().containsKey(instance3));
Assert
- .assertFalse(resourceControllerDataProvider.getLiveInstances().containsKey(instance4));
- Assert.assertTrue(resourceControllerDataProvider.getLiveInstances().containsKey(instance5));
+ .assertFalse(resourceControllerDataProvider.getAssignableLiveInstances().containsKey(instance4));
+ Assert.assertTrue(resourceControllerDataProvider.getAssignableLiveInstances().containsKey(instance5));
Assert
- .assertFalse(resourceControllerDataProvider.getLiveInstances().containsKey(instance6));
+ .assertFalse(resourceControllerDataProvider.getAssignableLiveInstances().containsKey(instance6));
Assert
- .assertFalse(resourceControllerDataProvider.getLiveInstances().containsKey(instance7));
- Assert.assertTrue(resourceControllerDataProvider.getLiveInstances().containsKey(instance8));
+ .assertFalse(resourceControllerDataProvider.getAssignableLiveInstances().containsKey(instance7));
+ Assert.assertTrue(resourceControllerDataProvider.getAssignableLiveInstances().containsKey(instance8));
}
/**
diff --git a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PMessageSemiAuto.java b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PMessageSemiAuto.java
index a324a3300..f01af8a7f 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PMessageSemiAuto.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PMessageSemiAuto.java
@@ -208,7 +208,7 @@ public class TestP2PMessageSemiAuto extends ZkTestBase {
ResourceControllerDataProvider dataCache = new ResourceControllerDataProvider(CLUSTER_NAME);
dataCache.refresh(_accessor);
- Map<String, LiveInstance> liveInstanceMap = dataCache.getLiveInstances();
+ Map<String, LiveInstance> liveInstanceMap = dataCache.getAssignableLiveInstances();
LiveInstance liveInstance = liveInstanceMap.get(instance);
Map<String, CurrentState> currentStateMap =
diff --git a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java
index 67d02a421..33cdbd378 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java
@@ -192,7 +192,7 @@ public class TestP2PNoDuplicatedMessage extends ZkTestBase {
private void verifyP2PDisabled() {
ResourceControllerDataProvider dataCache = new ResourceControllerDataProvider(CLUSTER_NAME);
dataCache.refresh(_accessor);
- Map<String, LiveInstance> liveInstanceMap = dataCache.getLiveInstances();
+ Map<String, LiveInstance> liveInstanceMap = dataCache.getAssignableLiveInstances();
for (LiveInstance instance : liveInstanceMap.values()) {
Map<String, CurrentState> currentStateMap =
@@ -218,7 +218,7 @@ public class TestP2PNoDuplicatedMessage extends ZkTestBase {
private void verifyP2PEnabled(long startTime) {
ResourceControllerDataProvider dataCache = new ResourceControllerDataProvider(CLUSTER_NAME);
dataCache.refresh(_accessor);
- Map<String, LiveInstance> liveInstanceMap = dataCache.getLiveInstances();
+ Map<String, LiveInstance> liveInstanceMap = dataCache.getAssignableLiveInstances();
for (LiveInstance instance : liveInstanceMap.values()) {
Map<String, CurrentState> currentStateMap =
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAutoRebalance.java
index f668d1603..3c389ac3a 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAutoRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAutoRebalance.java
@@ -293,13 +293,13 @@ public class TestAutoRebalance extends ZkStandAloneCMTestBase {
int replicas = Integer.parseInt(cache.getIdealState(_resourceName).getReplicas());
String instanceGroupTag = cache.getIdealState(_resourceName).getInstanceGroupTag();
int instances = 0;
- for (String liveInstanceName : cache.getLiveInstances().keySet()) {
- if (cache.getInstanceConfigMap().get(liveInstanceName).containsTag(instanceGroupTag)) {
+ for (String liveInstanceName : cache.getAssignableLiveInstances().keySet()) {
+ if (cache.getAssignableInstanceConfigMap().get(liveInstanceName).containsTag(instanceGroupTag)) {
instances++;
}
}
if (instances == 0) {
- instances = cache.getLiveInstances().size();
+ instances = cache.getAssignableLiveInstances().size();
}
ExternalView ev = accessor.getProperty(keyBuilder.externalView(_resourceName));
if (ev == null) {
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAutoRebalancePartitionLimit.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAutoRebalancePartitionLimit.java
index 0fb203e31..49800328d 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAutoRebalancePartitionLimit.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAutoRebalancePartitionLimit.java
@@ -222,7 +222,7 @@ public class TestAutoRebalancePartitionLimit extends ZkStandAloneCMTestBase {
try {
return verifyBalanceExternalView(
accessor.getProperty(keyBuilder.externalView(_resourceName)).getRecord(),
- numberOfPartitions, masterValue, replicas, cache.getLiveInstances().size(),
+ numberOfPartitions, masterValue, replicas, cache.getAssignableLiveInstances().size(),
cache.getIdealState(_resourceName).getMaxPartitionsPerInstance());
} catch (Exception e) {
LOG.debug("Verify failed", e);
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestCustomRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestCustomRebalancer.java
index 1bf67ccae..07a0a5de5 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestCustomRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestCustomRebalancer.java
@@ -68,7 +68,7 @@ public class TestCustomRebalancer {
when(cache.getStateModelDef(stateModelName)).thenReturn(stateModelDef);
when(cache.getDisabledInstancesForPartition(resource.getResourceName(), partitionName))
.thenReturn(ImmutableSet.of(instanceName));
- when(cache.getLiveInstances())
+ when(cache.getAssignableLiveInstances())
.thenReturn(ImmutableMap.of(instanceName, new LiveInstance(instanceName)));
CurrentStateOutput currOutput = new CurrentStateOutput();
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestCustomizedIdealStateRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestCustomizedIdealStateRebalancer.java
index f7fa7ddd1..4f52d2ecc 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestCustomizedIdealStateRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestCustomizedIdealStateRebalancer.java
@@ -62,7 +62,7 @@ public class TestCustomizedIdealStateRebalancer extends ZkStandAloneCMTestBase {
public IdealState computeNewIdealState(String resourceName, IdealState currentIdealState,
CurrentStateOutput currentStateOutput, ResourceControllerDataProvider clusterData) {
testRebalancerInvoked = true;
- List<String> liveNodes = Lists.newArrayList(clusterData.getLiveInstances().keySet());
+ List<String> liveNodes = Lists.newArrayList(clusterData.getAssignableLiveInstances().keySet());
int i = 0;
for (String partition : currentIdealState.getPartitionSet()) {
int index = i++ % liveNodes.size();
@@ -139,13 +139,13 @@ public class TestCustomizedIdealStateRebalancer extends ZkStandAloneCMTestBase {
int replicas = Integer.parseInt(cache.getIdealState(_resourceName).getReplicas());
String instanceGroupTag = cache.getIdealState(_resourceName).getInstanceGroupTag();
int instances = 0;
- for (String liveInstanceName : cache.getLiveInstances().keySet()) {
- if (cache.getInstanceConfigMap().get(liveInstanceName).containsTag(instanceGroupTag)) {
+ for (String liveInstanceName : cache.getAssignableLiveInstances().keySet()) {
+ if (cache.getAssignableInstanceConfigMap().get(liveInstanceName).containsTag(instanceGroupTag)) {
instances++;
}
}
if (instances == 0) {
- instances = cache.getLiveInstances().size();
+ instances = cache.getAssignableLiveInstances().size();
}
return verifyBalanceExternalView(
accessor.getProperty(keyBuilder.externalView(_resourceName)).getRecord(),
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
index 9ccc14fdf..b7c90d841 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
@@ -41,6 +41,7 @@ import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.helix.participant.statemachine.StateModelInfo;
import org.apache.helix.participant.statemachine.Transition;
+import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
import org.testng.Assert;
@@ -61,7 +62,8 @@ public class TestInstanceOperation extends ZkTestBase {
protected static final String HOST = "host";
protected static final String LOGICAL_ID = "logicalId";
protected static final String TOPOLOGY = String.format("%s/%s/%s", ZONE, HOST, LOGICAL_ID);
-
+ protected static final ImmutableSet<String> TOP_STATE_SET =
+ ImmutableSet.of("MASTER");
protected static final ImmutableSet<String> SECONDARY_STATE_SET =
ImmutableSet.of("SLAVE", "STANDBY");
protected static final ImmutableSet<String> ACCEPTABLE_STATE_SET =
@@ -73,6 +75,7 @@ public class TestInstanceOperation extends ZkTestBase {
List<String> _participantNames = new ArrayList<>();
private Set<String> _allDBs = new HashSet<>();
private ZkHelixClusterVerifier _clusterVerifier;
+ private ZkHelixClusterVerifier _bestPossibleClusterVerifier;
private ConfigAccessor _configAccessor;
private long _stateModelDelay = 3L;
@@ -102,6 +105,10 @@ public class TestInstanceOperation extends ZkTestBase {
.setResources(_allDBs)
.setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
.build();
+ _bestPossibleClusterVerifier = new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
+ .setResources(_allDBs)
+ .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
+ .build();
enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
_configAccessor = new ConfigAccessor(_gZkClient);
_dataAccessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor);
@@ -122,6 +129,7 @@ public class TestInstanceOperation extends ZkTestBase {
clusterConfig.setDelayRebalaceEnabled(true);
clusterConfig.setRebalanceDelayTime(1800000L);
_configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+ enabledTopologyAwareRebalance();
Assert.assertTrue(_clusterVerifier.verifyByPolling());
}
@@ -152,8 +160,8 @@ public class TestInstanceOperation extends ZkTestBase {
for (int i = 0; i < _participants.size(); i++) {
String participantName = _participantNames.get(i);
if (!_originalParticipantNames.contains(participantName)) {
- _participants.get(i).syncStop();
_gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, participantName, false);
+ _participants.get(i).syncStop();
_gSetupTool.getClusterManagementTool()
.dropInstance(CLUSTER_NAME, _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, participantName));
droppedParticipants.add(participantName);
@@ -390,6 +398,7 @@ public class TestInstanceOperation extends ZkTestBase {
null);
addParticipant(PARTICIPANT_PREFIX + "_" + (START_PORT + NUM_NODE));
+
Assert.assertTrue(_clusterVerifier.verifyByPolling());
Map<String, ExternalView> assignment = getEVs();
for (String resource : _allDBs) {
@@ -680,7 +689,7 @@ public class TestInstanceOperation extends ZkTestBase {
// and adding the SWAP_IN instance to the cluster.
// Check that the SWAP_IN instance has the same partitions as the SWAP_OUT instance
// but none of them are in a top state.
- Assert.assertTrue(_clusterVerifier.verifyByPolling());
+ Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
Set.of(instanceToSwapInName), Collections.emptySet());
@@ -747,7 +756,7 @@ public class TestInstanceOperation extends ZkTestBase {
// and adding the SWAP_IN instance to the cluster.
// Check that the SWAP_IN instance has the same partitions as the SWAP_OUT instance
// but none of them are in a top state.
- Assert.assertTrue(_clusterVerifier.verifyByPolling());
+ Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
Set.of(instanceToSwapInName), Collections.emptySet());
@@ -793,7 +802,7 @@ public class TestInstanceOperation extends ZkTestBase {
InstanceConstants.InstanceOperation.SWAP_OUT);
// Validate that the assignment has not changed since setting the InstanceOperation to SWAP_OUT
- Assert.assertTrue(_clusterVerifier.verifyByPolling());
+ Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
Collections.emptySet(), Collections.emptySet());
@@ -808,7 +817,7 @@ public class TestInstanceOperation extends ZkTestBase {
// and adding the SWAP_IN instance to the cluster.
// Check that the SWAP_IN instance has the same partitions as the SWAP_OUT instance
// but none of them are in a top state.
- Assert.assertTrue(_clusterVerifier.verifyByPolling());
+ Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
Set.of(instanceToSwapInName), Collections.emptySet());
@@ -880,7 +889,7 @@ public class TestInstanceOperation extends ZkTestBase {
// Validate that the assignment has not changed since adding the SWAP_IN node.
// During MM, the cluster should not compute new assignment.
- Assert.assertTrue(_clusterVerifier.verifyByPolling());
+ Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
Collections.emptySet(), Collections.emptySet());
@@ -892,7 +901,7 @@ public class TestInstanceOperation extends ZkTestBase {
// Validate that partitions on SWAP_OUT instance does not change after exiting MM
// Check that the SWAP_IN instance has the same partitions as the SWAP_OUT instance
// but none of them are in a top state.
- Assert.assertTrue(_clusterVerifier.verifyByPolling());
+ Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
Set.of(instanceToSwapInName), Collections.emptySet());
@@ -956,16 +965,12 @@ public class TestInstanceOperation extends ZkTestBase {
instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
InstanceConstants.InstanceOperation.SWAP_IN, true, -1);
- Assert.assertTrue(_clusterVerifier.verifyByPolling());
+ Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
// Validate that the SWAP_IN instance has the same partitions as the SWAP_OUT instance in second top state.
Map<String, String> swapInInstancePartitionsAndStates =
getPartitionsAndStatesOnInstance(getEVs(), instanceToSwapInName);
- Assert.assertTrue(
- swapInInstancePartitionsAndStates.keySet().containsAll(swapOutInstanceOriginalPartitions));
- Set<String> swapInInstanceStates = new HashSet<>(swapInInstancePartitionsAndStates.values());
- swapInInstanceStates.removeAll(SECONDARY_STATE_SET);
- Assert.assertEquals(swapInInstanceStates.size(), 0);
+ Assert.assertEquals(swapInInstancePartitionsAndStates.keySet().size(), 0);
// Assert canSwapBeCompleted is false because SWAP_OUT instance is disabled.
Assert.assertFalse(_gSetupTool.getClusterManagementTool()
@@ -975,23 +980,22 @@ public class TestInstanceOperation extends ZkTestBase {
_gSetupTool.getClusterManagementTool()
.enableInstance(CLUSTER_NAME, instanceToSwapOutName, true);
- Assert.assertTrue(_clusterVerifier.verifyByPolling());
-
- // Assert completeSwapIfPossible is true
- Assert.assertTrue(_gSetupTool.getClusterManagementTool()
- .completeSwapIfPossible(CLUSTER_NAME, instanceToSwapOutName));
+ Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
// Validate that the SWAP_IN instance has the same partitions the SWAP_OUT instance originally
- // had. Validate they are in second top state because initially disabling SWAP_OUT instance
- // caused all topStates to be handed off to next replica in the preference list.
+ // had. Validate they are in second top state.
swapInInstancePartitionsAndStates =
getPartitionsAndStatesOnInstance(getEVs(), instanceToSwapInName);
Assert.assertTrue(
swapInInstancePartitionsAndStates.keySet().containsAll(swapOutInstanceOriginalPartitions));
- swapInInstanceStates = new HashSet<>(swapInInstancePartitionsAndStates.values());
+ Set<String> swapInInstanceStates = new HashSet<>(swapInInstancePartitionsAndStates.values());
swapInInstanceStates.removeAll(SECONDARY_STATE_SET);
Assert.assertEquals(swapInInstanceStates.size(), 0);
+ // Assert completeSwapIfPossible is true
+ Assert.assertTrue(_gSetupTool.getClusterManagementTool()
+ .completeSwapIfPossible(CLUSTER_NAME, instanceToSwapOutName));
+
Assert.assertTrue(_clusterVerifier.verifyByPolling());
// Assert that SWAP_OUT instance is disabled and has no partitions assigned to it.
@@ -1036,7 +1040,7 @@ public class TestInstanceOperation extends ZkTestBase {
addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), null, false, -1);
- Assert.assertTrue(_clusterVerifier.verifyByPolling());
+ Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
// Enable the SWAP_IN instance before we have set the SWAP_OUT instance.
_gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, instanceToSwapInName, true);
@@ -1059,7 +1063,7 @@ public class TestInstanceOperation extends ZkTestBase {
addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), null, false, -1);
- Assert.assertTrue(_clusterVerifier.verifyByPolling());
+ Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
// Try to remove the InstanceOperation from the SWAP_IN instance before the SWAP_OUT instance is set.
// This should throw exception because we cannot ever have two instances with the same logicalId and both have InstanceOperation
@@ -1108,7 +1112,7 @@ public class TestInstanceOperation extends ZkTestBase {
// and adding the SWAP_IN instance to the cluster.
// Check that the SWAP_IN instance has the same partitions as the SWAP_OUT instance
// but none of them are in a top state.
- Assert.assertTrue(_clusterVerifier.verifyByPolling());
+ Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
Set.of(instanceToSwapInName), Collections.emptySet());
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedNodeSwap.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedNodeSwap.java
index 588de917a..f6ef8279d 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedNodeSwap.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedNodeSwap.java
@@ -174,7 +174,7 @@ public class TestWagedNodeSwap extends ZkTestBase {
_gSetupTool.addInstanceToCluster(CLUSTER_NAME, newParticipantName);
InstanceConfig newConfig = configAccessor.getInstanceConfig(CLUSTER_NAME, newParticipantName);
String zone = instanceConfig.getDomainAsMap().get("zone");
- String domain = String.format("zone=%s,instance=%s", zone, newParticipantName);
+ String domain = String.format("zone=%s,instance=%s", zone, oldParticipantName);
newConfig.setDomain(domain);
_gSetupTool.getClusterManagementTool()
.setInstanceConfig(CLUSTER_NAME, newParticipantName, newConfig);
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessages.java b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessages.java
index 649ff7ec1..1002ee7dd 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessages.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessages.java
@@ -112,8 +112,8 @@ public class TestP2PMessages extends BaseStageTest {
e.printStackTrace();
}
- _instances = _dataCache.getAllInstances();
- _liveInstanceMap = _dataCache.getLiveInstances();
+ _instances = _dataCache.getAssignableInstances();
+ _liveInstanceMap = _dataCache.getAssignableLiveInstances();
_initialStateMap = event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
_initialMaster = getTopStateInstance(_initialStateMap.getInstanceStateMap(_db, _partition),
@@ -218,7 +218,7 @@ public class TestP2PMessages extends BaseStageTest {
// Old master (initialMaster) failed the M->S transition,
// but has not forward p2p message to new master (secondMaster) yet.
// Validate: Controller should ignore the ERROR partition and send S->M message to new master.
- String session = _dataCache.getLiveInstances().get(_initialMaster).getEphemeralOwner();
+ String session = _dataCache.getAssignableLiveInstances().get(_initialMaster).getEphemeralOwner();
PropertyKey currentStateKey =
new PropertyKey.Builder(_clusterName).currentState(_initialMaster, session, _db);
CurrentState currentState = accessor.getProperty(currentStateKey);
@@ -308,7 +308,7 @@ public class TestP2PMessages extends BaseStageTest {
private void handleMessage(String instance, String resource) {
PropertyKey propertyKey = new PropertyKey.Builder(_clusterName).messages(instance);
List<Message> messages = accessor.getChildValues(propertyKey, true);
- String session = _dataCache.getLiveInstances().get(instance).getEphemeralOwner();
+ String session = _dataCache.getAssignableLiveInstances().get(instance).getEphemeralOwner();
for (Message m : messages) {
if (m.getResourceName().equals(resource)) {
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestTargetedTaskStateChange.java b/helix-core/src/test/java/org/apache/helix/task/TestTargetedTaskStateChange.java
index e849be28c..d8810b153 100644
--- a/helix-core/src/test/java/org/apache/helix/task/TestTargetedTaskStateChange.java
+++ b/helix-core/src/test/java/org/apache/helix/task/TestTargetedTaskStateChange.java
@@ -86,8 +86,8 @@ public class TestTargetedTaskStateChange {
when(mock._cache.getTaskDataCache()).thenReturn(mock._taskDataCache);
when(mock._cache.getJobContext(JOB_NAME)).thenReturn(mock._jobContext);
when(mock._cache.getIdealStates()).thenReturn(mock._idealStates);
- when(mock._cache.getEnabledLiveInstances()).thenReturn(_liveInstances.keySet());
- when(mock._cache.getInstanceConfigMap()).thenReturn(_instanceConfigs);
+ when(mock._cache.getAssignableEnabledLiveInstances()).thenReturn(_liveInstances.keySet());
+ when(mock._cache.getAssignableInstanceConfigMap()).thenReturn(_instanceConfigs);
when(mock._cache.getClusterConfig()).thenReturn(_clusterConfig);
when(mock._taskDataCache.getRuntimeJobDag(WORKFLOW_NAME)).thenReturn(mock._runtimeJobDag);
_assignableInstanceManager.buildAssignableInstances(_clusterConfig, mock._taskDataCache,
@@ -123,8 +123,8 @@ public class TestTargetedTaskStateChange {
when(mock._cache.getTaskDataCache()).thenReturn(mock._taskDataCache);
when(mock._cache.getJobContext(JOB_NAME)).thenReturn(mock._jobContext);
when(mock._cache.getIdealStates()).thenReturn(mock._idealStates);
- when(mock._cache.getEnabledLiveInstances()).thenReturn(_liveInstances.keySet());
- when(mock._cache.getInstanceConfigMap()).thenReturn(_instanceConfigs);
+ when(mock._cache.getAssignableEnabledLiveInstances()).thenReturn(_liveInstances.keySet());
+ when(mock._cache.getAssignableInstanceConfigMap()).thenReturn(_instanceConfigs);
when(mock._cache.getClusterConfig()).thenReturn(_clusterConfig);
when(mock._taskDataCache.getRuntimeJobDag(WORKFLOW_NAME)).thenReturn(mock._runtimeJobDag);
_assignableInstanceManager.buildAssignableInstances(_clusterConfig, mock._taskDataCache,
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
index 6b357a384..d0f0c5715 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
@@ -322,7 +322,7 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest {
}
protected void setupHelixResources() throws Exception {
- _clusters = createClusters(4);
+ _clusters = createClusters(5);
_gSetupTool.addCluster(_superCluster, true);
_gSetupTool.addCluster(TASK_TEST_CLUSTER, true);
_clusters.add(_superCluster);
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
index 92dfff002..5cfab76a0 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
@@ -45,7 +45,7 @@ import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
public class TestInstancesAccessor extends AbstractTestClass {
- private final static String CLUSTER_NAME = "TestCluster_0";
+ private final static String CLUSTER_NAME = "TestCluster_4";
@DataProvider
public Object[][] generatePayloadCrossZoneStoppableCheckWithZoneOrder() {