You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2019/10/07 21:12:40 UTC
[helix] 37/37: Add latency metric components for WAGED rebalancer
(#490)
This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch wagedRebalancer2
in repository https://gitbox.apache.org/repos/asf/helix.git
commit 4353f67192adc6c9d6fec1c718f3e1690a60ad0f
Author: Hunter Lee <hu...@linkedin.com>
AuthorDate: Fri Oct 4 13:50:43 2019 -0700
Add latency metric components for WAGED rebalancer (#490)
Add WAGED rebalancer metric framework and latency metric implementation
Changelist:
1. Add WAGED rebalancer metric interface
2. Implement latency-related metrics
3. Integrate latency metrics into WAGED rebalancer
4. Add tests
---
.../rebalancer/waged/WagedRebalancer.java | 192 ++++++++++++++-------
.../ConstraintBasedAlgorithmFactory.java | 2 +-
.../stages/BestPossibleStateCalcStage.java | 44 ++++-
.../monitoring/mbeans/MonitorDomainNames.java | 3 +-
.../helix/monitoring/metrics/MetricCollector.java | 100 +++++++++++
.../metrics/WagedRebalancerMetricCollector.java | 80 +++++++++
.../implementation/RebalanceLatencyGauge.java | 104 +++++++++++
.../model/CountMetric.java} | 27 ++-
.../monitoring/metrics/model/LatencyMetric.java | 52 ++++++
.../model/Metric.java} | 40 ++++-
.../rebalancer/waged/TestWagedRebalancer.java | 22 +--
.../waged/TestWagedRebalancerMetrics.java | 132 ++++++++++++++
.../waged/model/AbstractTestClusterModel.java | 16 +-
13 files changed, 698 insertions(+), 116 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index d211884..53c9840 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -48,6 +48,9 @@ import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.ResourceConfig;
+import org.apache.helix.monitoring.metrics.MetricCollector;
+import org.apache.helix.monitoring.metrics.WagedRebalancerMetricCollector;
+import org.apache.helix.monitoring.metrics.model.LatencyMetric;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,10 +67,8 @@ public class WagedRebalancer {
// When any of the following change happens, the rebalancer needs to do a global rebalance which
// contains 1. baseline recalculate, 2. partial rebalance that is based on the new baseline.
private static final Set<HelixConstants.ChangeType> GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES =
- ImmutableSet.of(
- HelixConstants.ChangeType.RESOURCE_CONFIG,
- HelixConstants.ChangeType.CLUSTER_CONFIG,
- HelixConstants.ChangeType.INSTANCE_CONFIG);
+ ImmutableSet.of(HelixConstants.ChangeType.RESOURCE_CONFIG,
+ HelixConstants.ChangeType.CLUSTER_CONFIG, HelixConstants.ChangeType.INSTANCE_CONFIG);
// The cluster change detector is a stateful object.
// Make it static to avoid unnecessary reinitialization.
private static final ThreadLocal<ResourceChangeDetector> CHANGE_DETECTOR_THREAD_LOCAL =
@@ -76,6 +77,7 @@ public class WagedRebalancer {
private final MappingCalculator<ResourceControllerDataProvider> _mappingCalculator;
private final AssignmentMetadataStore _assignmentMetadataStore;
private final RebalanceAlgorithm _rebalanceAlgorithm;
+ private MetricCollector _metricCollector;
private static AssignmentMetadataStore constructAssignmentStore(HelixManager helixManager) {
AssignmentMetadataStore assignmentMetadataStore = null;
@@ -90,7 +92,8 @@ public class WagedRebalancer {
}
public WagedRebalancer(HelixManager helixManager,
- Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences) {
+ Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences,
+ MetricCollector metricCollector) {
this(constructAssignmentStore(helixManager),
ConstraintBasedAlgorithmFactory.getInstance(preferences),
// Use DelayedAutoRebalancer as the mapping calculator for the final assignment output.
@@ -99,16 +102,37 @@ public class WagedRebalancer {
// TODO abstract and separate the main mapping calculator logic from DelayedAutoRebalancer
new DelayedAutoRebalancer(),
// Helix Manager is required for the rebalancer scheduler
- helixManager);
+ helixManager, metricCollector);
}
+ /**
+ * This constructor will use null for HelixManager and MetricCollector. With null HelixManager,
+ * the rebalancer will rebalance solely based on CurrentStates. With null MetricCollector, the
+ * rebalancer will not emit JMX metrics.
+ * @param assignmentMetadataStore
+ * @param algorithm
+ * @param mappingCalculator
+ */
protected WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
RebalanceAlgorithm algorithm, MappingCalculator mappingCalculator) {
- this(assignmentMetadataStore, algorithm, mappingCalculator, null);
+ this(assignmentMetadataStore, algorithm, mappingCalculator, null, null);
+ }
+
+ /**
+ * This constructor will use null for HelixManager and MetricCollector. With null HelixManager,
+ * the rebalancer will rebalance solely based on CurrentStates.
+ * @param assignmentMetadataStore
+ * @param algorithm
+ * @param metricCollector
+ */
+ protected WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
+ RebalanceAlgorithm algorithm, MetricCollector metricCollector) {
+ this(assignmentMetadataStore, algorithm, new DelayedAutoRebalancer(), null, metricCollector);
}
private WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
- RebalanceAlgorithm algorithm, MappingCalculator mappingCalculator, HelixManager manager) {
+ RebalanceAlgorithm algorithm, MappingCalculator mappingCalculator, HelixManager manager,
+ MetricCollector metricCollector) {
if (assignmentMetadataStore == null) {
LOG.warn("Assignment Metadata Store is not configured properly."
+ " The rebalancer will not access the assignment store during the rebalance.");
@@ -117,6 +141,10 @@ public class WagedRebalancer {
_rebalanceAlgorithm = algorithm;
_mappingCalculator = mappingCalculator;
_manager = manager;
+ // If metricCollector is null, instantiate a version that does not register metrics in order to
+ // allow rebalancer to proceed
+ _metricCollector =
+ metricCollector == null ? new WagedRebalancerMetricCollector() : metricCollector;
}
// Release all the resources.
@@ -138,8 +166,7 @@ public class WagedRebalancer {
Map<String, Resource> resourceMap, final CurrentStateOutput currentStateOutput)
throws HelixRebalanceException {
if (resourceMap.isEmpty()) {
- LOG.warn("There is no resource to be rebalanced by {}",
- this.getClass().getSimpleName());
+ LOG.warn("There is no resource to be rebalanced by {}", this.getClass().getSimpleName());
return Collections.emptyMap();
}
@@ -169,7 +196,6 @@ public class WagedRebalancer {
newStateMap == null ? Collections.emptyMap() : newStateMap);
}
}
-
LOG.info("Finish computing new ideal states for resources: {}",
resourceMap.keySet().toString());
return newIdealStates;
@@ -191,24 +217,25 @@ public class WagedRebalancer {
return itemKeys;
}));
+ // Perform Global Baseline Calculation
if (clusterChanges.keySet().stream()
- .anyMatch(changeType -> GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES.contains(changeType))) {
+ .anyMatch(GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES::contains)) {
refreshBaseline(clusterData, clusterChanges, resourceMap, currentStateOutput);
// Inject a cluster config change for large scale partial rebalance once the baseline changed.
clusterChanges.putIfAbsent(HelixConstants.ChangeType.CLUSTER_CONFIG, Collections.emptySet());
}
- Set<String> activeNodes = DelayedRebalanceUtil
- .getActiveNodes(clusterData.getAllInstances(), clusterData.getEnabledLiveInstances(),
- clusterData.getInstanceOfflineTimeMap(), clusterData.getLiveInstances().keySet(),
- clusterData.getInstanceConfigMap(), clusterData.getClusterConfig());
+ Set<String> activeNodes = DelayedRebalanceUtil.getActiveNodes(clusterData.getAllInstances(),
+ clusterData.getEnabledLiveInstances(), clusterData.getInstanceOfflineTimeMap(),
+ clusterData.getLiveInstances().keySet(), clusterData.getInstanceConfigMap(),
+ clusterData.getClusterConfig());
// Schedule (or unschedule) delayed rebalance according to the delayed rebalance config.
delayedRebalanceSchedule(clusterData, activeNodes, resourceMap.keySet());
+ // Perform partial rebalance
Map<String, ResourceAssignment> newAssignment =
- partialRebalance(clusterData, clusterChanges, resourceMap, activeNodes,
- currentStateOutput);
+ partialRebalance(clusterData, clusterChanges, resourceMap, activeNodes, currentStateOutput);
// <ResourceName, <State, Priority>>
Map<String, Map<String, Integer>> resourceStatePriorityMap = new HashMap<>();
@@ -238,16 +265,15 @@ public class WagedRebalancer {
// some delayed rebalanced assignments.
if (!activeNodes.equals(clusterData.getEnabledLiveInstances())) {
applyRebalanceOverwrite(finalIdealStateMap, clusterData, resourceMap, clusterChanges,
- resourceStatePriorityMap,
- getBaselineAssignment(_assignmentMetadataStore, currentStateOutput,
- resourceMap.keySet()));
+ resourceStatePriorityMap, getBaselineAssignment(_assignmentMetadataStore,
+ currentStateOutput, resourceMap.keySet()));
}
// Replace the assignment if user-defined preference list is configured.
// Note the user-defined list is intentionally applied to the final mapping after calculation.
// This is to avoid persisting it into the assignment store, which impacts the long term
// assignment evenness and partition movements.
- finalIdealStateMap.entrySet().stream().forEach(
- idealStateEntry -> applyUserDefinedPreferenceList(
+ finalIdealStateMap.entrySet().stream()
+ .forEach(idealStateEntry -> applyUserDefinedPreferenceList(
clusterData.getResourceConfig(idealStateEntry.getKey()), idealStateEntry.getValue()));
return finalIdealStateMap;
@@ -258,19 +284,31 @@ public class WagedRebalancer {
Map<HelixConstants.ChangeType, Set<String>> clusterChanges, Map<String, Resource> resourceMap,
final CurrentStateOutput currentStateOutput) throws HelixRebalanceException {
LOG.info("Start calculating the new baseline.");
+ LatencyMetric globalBaselineCalcLatency = _metricCollector.getMetric(
+ WagedRebalancerMetricCollector.WagedRebalancerMetricNames.GlobalBaselineCalcLatencyGauge
+ .name(),
+ LatencyMetric.class);
+ globalBaselineCalcLatency.startMeasuringLatency();
+ // Read the baseline from metadata store
Map<String, ResourceAssignment> currentBaseline =
getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet());
+
// For baseline calculation
// 1. Ignore node status (disable/offline).
// 2. Use the baseline as the previous best possible assignment since there is no "baseline" for
// the baseline.
- Map<String, ResourceAssignment> newBaseline =
- calculateAssignment(clusterData, clusterChanges, resourceMap, clusterData.getAllInstances(),
- Collections.emptyMap(), currentBaseline);
+ Map<String, ResourceAssignment> newBaseline = calculateAssignment(clusterData, clusterChanges,
+ resourceMap, clusterData.getAllInstances(), Collections.emptyMap(), currentBaseline);
+ // Write the new baseline to metadata store
if (_assignmentMetadataStore != null) {
try {
+ LatencyMetric writeLatency = _metricCollector.getMetric(
+ WagedRebalancerMetricCollector.WagedRebalancerMetricNames.StateWriteLatencyGauge.name(),
+ LatencyMetric.class);
+ writeLatency.startMeasuringLatency();
_assignmentMetadataStore.persistBaseline(newBaseline);
+ writeLatency.endMeasuringLatency();
} catch (Exception ex) {
throw new HelixRebalanceException("Failed to persist the new baseline assignment.",
HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
@@ -278,7 +316,7 @@ public class WagedRebalancer {
} else {
LOG.debug("Assignment Metadata Store is empty. Skip persist the baseline assignment.");
}
-
+ globalBaselineCalcLatency.endMeasuringLatency();
LOG.info("Finish calculating the new baseline.");
}
@@ -288,20 +326,34 @@ public class WagedRebalancer {
Set<String> activeNodes, final CurrentStateOutput currentStateOutput)
throws HelixRebalanceException {
LOG.info("Start calculating the new best possible assignment.");
+ LatencyMetric partialRebalanceLatency = _metricCollector.getMetric(
+ WagedRebalancerMetricCollector.WagedRebalancerMetricNames.PartialRebalanceLatencyGauge
+ .name(),
+ LatencyMetric.class);
+ partialRebalanceLatency.startMeasuringLatency();
+ // TODO: Consider combining the metrics for both baseline/best possible?
+ // Read the baseline from metadata store
Map<String, ResourceAssignment> currentBaseline =
getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet());
- Map<String, ResourceAssignment> currentBestPossibleAssignment =
- getBestPossibleAssignment(_assignmentMetadataStore, currentStateOutput,
- resourceMap.keySet());
- Map<String, ResourceAssignment> newAssignment =
- calculateAssignment(clusterData, clusterChanges, resourceMap, activeNodes, currentBaseline,
- currentBestPossibleAssignment);
+
+ // Read the best possible assignment from metadata store
+ Map<String, ResourceAssignment> currentBestPossibleAssignment = getBestPossibleAssignment(
+ _assignmentMetadataStore, currentStateOutput, resourceMap.keySet());
+
+ // Compute the new assignment
+ Map<String, ResourceAssignment> newAssignment = calculateAssignment(clusterData, clusterChanges,
+ resourceMap, activeNodes, currentBaseline, currentBestPossibleAssignment);
if (_assignmentMetadataStore != null) {
try {
+ LatencyMetric writeLatency = _metricCollector.getMetric(
+ WagedRebalancerMetricCollector.WagedRebalancerMetricNames.StateWriteLatencyGauge.name(),
+ LatencyMetric.class);
+ writeLatency.startMeasuringLatency();
// TODO Test to confirm if persisting the final assignment (with final partition states)
// would be a better option.
_assignmentMetadataStore.persistBestPossibleAssignment(newAssignment);
+ writeLatency.endMeasuringLatency();
} catch (Exception ex) {
throw new HelixRebalanceException("Failed to persist the new best possible assignment.",
HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
@@ -309,20 +361,20 @@ public class WagedRebalancer {
} else {
LOG.debug("Assignment Metadata Store is empty. Skip persist the baseline assignment.");
}
-
+ partialRebalanceLatency.endMeasuringLatency();
LOG.info("Finish calculating the new best possible assignment.");
return newAssignment;
}
/**
* Generate the cluster model based on the input and calculate the optimal assignment.
- * @param clusterData the cluster data cache.
- * @param clusterChanges the detected cluster changes.
- * @param resourceMap the rebalancing resources.
- * @param activeNodes the alive and enabled nodes.
- * @param baseline the baseline assignment for the algorithm as a reference.
+ * @param clusterData the cluster data cache.
+ * @param clusterChanges the detected cluster changes.
+ * @param resourceMap the rebalancing resources.
+ * @param activeNodes the alive and enabled nodes.
+ * @param baseline the baseline assignment for the algorithm as a reference.
* @param prevBestPossibleAssignment the previous best possible assignment for the algorithm as a
- * reference.
+ * reference.
* @return the new optimal assignment for the resources.
*/
private Map<String, ResourceAssignment> calculateAssignment(
@@ -415,7 +467,7 @@ public class WagedRebalancer {
* @param currentStateOutput
* @param resources
* @return The current baseline assignment. If record does not exist in the
- * assignmentMetadataStore, return the current state assignment.
+ * assignmentMetadataStore, return the current state assignment.
* @throws HelixRebalanceException
*/
private Map<String, ResourceAssignment> getBaselineAssignment(
@@ -424,7 +476,12 @@ public class WagedRebalancer {
Map<String, ResourceAssignment> currentBaseline = Collections.emptyMap();
if (assignmentMetadataStore != null) {
try {
+ LatencyMetric stateReadLatency = _metricCollector.getMetric(
+ WagedRebalancerMetricCollector.WagedRebalancerMetricNames.StateReadLatencyGauge.name(),
+ LatencyMetric.class);
+ stateReadLatency.startMeasuringLatency();
currentBaseline = assignmentMetadataStore.getBaseline();
+ stateReadLatency.endMeasuringLatency();
} catch (HelixException ex) {
// Report error. and use empty mapping instead.
LOG.error("Failed to get the current baseline assignment.", ex);
@@ -435,8 +492,7 @@ public class WagedRebalancer {
}
}
if (currentBaseline.isEmpty()) {
- LOG.warn(
- "The current baseline assignment record is empty. Use the current states instead.");
+ LOG.warn("The current baseline assignment record is empty. Use the current states instead.");
currentBaseline = getCurrentStateAssingment(currentStateOutput, resources);
}
return currentBaseline;
@@ -447,7 +503,7 @@ public class WagedRebalancer {
* @param currentStateOutput
* @param resources
* @return The current best possible assignment. If record does not exist in the
- * assignmentMetadataStore, return the current state assignment.
+ * assignmentMetadataStore, return the current state assignment.
* @throws HelixRebalanceException
*/
private Map<String, ResourceAssignment> getBestPossibleAssignment(
@@ -456,7 +512,12 @@ public class WagedRebalancer {
Map<String, ResourceAssignment> currentBestAssignment = Collections.emptyMap();
if (assignmentMetadataStore != null) {
try {
+ LatencyMetric stateReadLatency = _metricCollector.getMetric(
+ WagedRebalancerMetricCollector.WagedRebalancerMetricNames.StateReadLatencyGauge.name(),
+ LatencyMetric.class);
+ stateReadLatency.startMeasuringLatency();
currentBestAssignment = assignmentMetadataStore.getBestPossibleAssignment();
+ stateReadLatency.endMeasuringLatency();
} catch (HelixException ex) {
// Report error. and use empty mapping instead.
LOG.error("Failed to get the current best possible assignment.", ex);
@@ -483,8 +544,8 @@ public class WagedRebalancer {
if (!currentStateMap.isEmpty()) {
ResourceAssignment newResourceAssignment = new ResourceAssignment(resourceName);
currentStateMap.entrySet().stream().forEach(currentStateEntry -> {
- newResourceAssignment
- .addReplicaMap(currentStateEntry.getKey(), currentStateEntry.getValue());
+ newResourceAssignment.addReplicaMap(currentStateEntry.getKey(),
+ currentStateEntry.getValue());
});
currentStateAssignment.put(resourceName, newResourceAssignment);
}
@@ -507,11 +568,10 @@ public class WagedRebalancer {
Set<String> offlineOrDisabledInstances = new HashSet<>(delayedActiveNodes);
offlineOrDisabledInstances.removeAll(clusterData.getEnabledLiveInstances());
for (String resource : resourceSet) {
- DelayedRebalanceUtil
- .setRebalanceScheduler(resource, delayedRebalanceEnabled, offlineOrDisabledInstances,
- clusterData.getInstanceOfflineTimeMap(), clusterData.getLiveInstances().keySet(),
- clusterData.getInstanceConfigMap(), clusterConfig.getRebalanceDelayTime(),
- clusterConfig, _manager);
+ DelayedRebalanceUtil.setRebalanceScheduler(resource, delayedRebalanceEnabled,
+ offlineOrDisabledInstances, clusterData.getInstanceOfflineTimeMap(),
+ clusterData.getLiveInstances().keySet(), clusterData.getInstanceConfigMap(),
+ clusterConfig.getRebalanceDelayTime(), clusterConfig, _manager);
}
} else {
LOG.warn("Skip scheduling a delayed rebalancer since HelixManager is not specified.");
@@ -523,30 +583,30 @@ public class WagedRebalancer {
* Since the rebalancing might be done with the delayed logic, the rebalanced ideal states
* might include inactive nodes.
* This overwrite will adjust the final mapping, so as to ensure the result is completely valid.
- * @param idealStateMap the calculated ideal states.
- * @param clusterData the cluster data cache.
- * @param resourceMap the rebalanaced resource map.
- * @param clusterChanges the detected cluster changes that triggeres the rebalance.
+ * @param idealStateMap the calculated ideal states.
+ * @param clusterData the cluster data cache.
+ * @param resourceMap the rebalanaced resource map.
+ * @param clusterChanges the detected cluster changes that triggeres the rebalance.
* @param resourceStatePriorityMap the state priority map for each resource.
- * @param baseline the baseline assignment
+ * @param baseline the baseline assignment
*/
private void applyRebalanceOverwrite(Map<String, IdealState> idealStateMap,
ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
Map<HelixConstants.ChangeType, Set<String>> clusterChanges,
Map<String, Map<String, Integer>> resourceStatePriorityMap,
- Map<String, ResourceAssignment> baseline)
- throws HelixRebalanceException {
+ Map<String, ResourceAssignment> baseline) throws HelixRebalanceException {
Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
- // Note that the calculation used the baseline as the input only. This is for minimizing unnecessary partition movement.
- Map<String, ResourceAssignment> activeAssignment =
- calculateAssignment(clusterData, clusterChanges, resourceMap, enabledLiveInstances,
- Collections.emptyMap(), baseline);
+ // Note that the calculation used the baseline as the input only. This is for minimizing
+ // unnecessary partition movement.
+ Map<String, ResourceAssignment> activeAssignment = calculateAssignment(clusterData,
+ clusterChanges, resourceMap, enabledLiveInstances, Collections.emptyMap(), baseline);
for (String resourceName : idealStateMap.keySet()) {
IdealState is = idealStateMap.get(resourceName);
if (!activeAssignment.containsKey(resourceName)) {
throw new HelixRebalanceException(
"Failed to calculate the complete partition assignment with all active nodes. Cannot find the resource assignment for "
- + resourceName, HelixRebalanceException.Type.FAILED_TO_CALCULATE);
+ + resourceName,
+ HelixRebalanceException.Type.FAILED_TO_CALCULATE);
}
IdealState currentIdealState = clusterData.getIdealState(resourceName);
IdealState newActiveIdealState =
@@ -555,9 +615,9 @@ public class WagedRebalancer {
int numReplia = currentIdealState.getReplicaCount(enabledLiveInstances.size());
int minActiveReplica = DelayedRebalanceUtil.getMinActiveReplica(currentIdealState, numReplia);
- Map<String, List<String>> finalPreferenceLists = DelayedRebalanceUtil
- .getFinalDelayedMapping(newActiveIdealState.getPreferenceLists(), is.getPreferenceLists(),
- enabledLiveInstances, Math.min(minActiveReplica, numReplia));
+ Map<String, List<String>> finalPreferenceLists =
+ DelayedRebalanceUtil.getFinalDelayedMapping(newActiveIdealState.getPreferenceLists(),
+ is.getPreferenceLists(), enabledLiveInstances, Math.min(minActiveReplica, numReplia));
is.setPreferenceLists(finalPreferenceLists);
}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java
index f70de9a..fbf8b19 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java
@@ -41,7 +41,7 @@ public class ConstraintBasedAlgorithmFactory {
// enlarge the overall weight of the evenness constraints compared with the movement constraint.
// TODO: Tune or make the following factor configurable.
private static final int EVENNESS_PREFERENCE_NORMALIZE_FACTOR = 50;
- private static final Map<String, Float> MODEL = new HashMap<>() {
+ private static final Map<String, Float> MODEL = new HashMap<String, Float>() {
{
// The default setting
put(PartitionMovementConstraint.class.getSimpleName(), 1f);
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 8c082f1..6f442ea 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -26,8 +26,9 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
-
import java.util.stream.Collectors;
+
+import javax.management.JMException;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixRebalanceException;
@@ -42,6 +43,8 @@ import org.apache.helix.controller.rebalancer.Rebalancer;
import org.apache.helix.controller.rebalancer.SemiAutoRebalancer;
import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
+import org.apache.helix.monitoring.metrics.MetricCollector;
+import org.apache.helix.monitoring.metrics.WagedRebalancerMetricCollector;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
@@ -64,6 +67,8 @@ import org.slf4j.LoggerFactory;
public class BestPossibleStateCalcStage extends AbstractBaseStage {
private static final Logger logger =
LoggerFactory.getLogger(BestPossibleStateCalcStage.class.getName());
+ // Create a ThreadLocal of MetricCollector. Metrics could only be updated by the controller thread only.
+ private static final ThreadLocal<MetricCollector> METRIC_COLLECTOR_THREAD_LOCAL = new ThreadLocal<>();
@Override
public void process(ClusterEvent event) throws Exception {
@@ -253,20 +258,41 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
Map<String, IdealState> newIdealStates = new HashMap<>();
// Init rebalancer with the rebalance preferences.
- Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences = cache.getClusterConfig()
- .getGlobalRebalancePreference();
+ Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences =
+ cache.getClusterConfig().getGlobalRebalancePreference();
+
+ if (METRIC_COLLECTOR_THREAD_LOCAL.get() == null) {
+ try {
+ // If HelixManager is null, we just pass in null for MetricCollector so that a
+ // non-functioning WagedRebalancerMetricCollector would be created in WagedRebalancer's
+ // constructor. This is to handle two cases: 1. HelixManager is null for non-testing cases -
+ // in this case, WagedRebalancer will not read/write to metadata store and just use
+ // CurrentState-based rebalancing. 2. Tests that require instrumenting the rebalancer for
+ // verifying whether the cluster has converged.
+ METRIC_COLLECTOR_THREAD_LOCAL.set(helixManager == null ? null
+ : new WagedRebalancerMetricCollector(helixManager.getClusterName()));
+ } catch (JMException e) {
+ LogUtil.logWarn(logger, _eventId, String.format(
+ "MetricCollector instantiation failed! WagedRebalancer will not emit metrics due to JMException %s",
+ e));
+ }
+ }
+
// TODO avoid creating the rebalancer on every rebalance call for performance enhancement
- WagedRebalancer wagedRebalancer = new WagedRebalancer(helixManager, preferences);
+ WagedRebalancer wagedRebalancer =
+ new WagedRebalancer(helixManager, preferences, METRIC_COLLECTOR_THREAD_LOCAL.get());
try {
- newIdealStates.putAll(wagedRebalancer
- .computeNewIdealStates(cache, wagedRebalancedResourceMap, currentStateOutput));
+ newIdealStates.putAll(wagedRebalancer.computeNewIdealStates(cache, wagedRebalancedResourceMap,
+ currentStateOutput));
} catch (HelixRebalanceException ex) {
// Note that unlike the legacy rebalancer, the WAGED rebalance won't return partial result.
// Since it calculates for all the eligible resources globally, a partial result is invalid.
// TODO propagate the rebalancer failure information to updateRebalanceStatus for monitoring.
- LogUtil.logError(logger, _eventId, String
- .format("Failed to calculate the new Ideal States using the rebalancer %s due to %s",
- wagedRebalancer.getClass().getSimpleName(), ex.getFailureType()), ex);
+ LogUtil.logError(logger, _eventId,
+ String.format(
+ "Failed to calculate the new Ideal States using the rebalancer %s due to %s",
+ wagedRebalancer.getClass().getSimpleName(), ex.getFailureType()),
+ ex);
} finally {
wagedRebalancer.close();
}
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MonitorDomainNames.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MonitorDomainNames.java
index 73bf057..fee9099 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MonitorDomainNames.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MonitorDomainNames.java
@@ -28,5 +28,6 @@ public enum MonitorDomainNames {
HelixThreadPoolExecutor,
HelixCallback,
RoutingTableProvider,
- CLMParticipantReport
+ CLMParticipantReport,
+ Rebalancer
}
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/MetricCollector.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/MetricCollector.java
new file mode 100644
index 0000000..764557a
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/MetricCollector.java
@@ -0,0 +1,100 @@
+package org.apache.helix.monitoring.metrics;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import javax.management.JMException;
+import javax.management.ObjectName;
+import org.apache.helix.HelixException;
+import org.apache.helix.monitoring.metrics.model.Metric;
+import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
+
+/**
+ * Collects and manages all metrics that implement the {@link Metric} interface.
+ */
+public abstract class MetricCollector extends DynamicMBeanProvider {
+ private static final String CLUSTER_NAME_KEY = "ClusterName";
+ private static final String ENTITY_NAME_KEY = "EntityName";
+ private final String _monitorDomainName;
+ private final String _clusterName;
+ private final String _entityName;
+ private Map<String, Metric> _metricMap;
+
+ public MetricCollector(String monitorDomainName, String clusterName, String entityName) {
+ _monitorDomainName = monitorDomainName;
+ _clusterName = clusterName;
+ _entityName = entityName;
+ _metricMap = new HashMap<>();
+ }
+
+ @Override
+ public DynamicMBeanProvider register() throws JMException {
+ // First cast all Metric objects to DynamicMetrics
+ Collection<DynamicMetric<?, ?>> dynamicMetrics = new HashSet<>();
+ _metricMap.values().forEach(metric -> dynamicMetrics.add(metric.getDynamicMetric()));
+
+ // Define MBeanName and ObjectName
+ // MBean name has two key-value pairs:
+ // ------ 1) ClusterName KV pair (first %s=%s)
+ // ------ 2) EntityName KV pair (second %s=%s)
+ String mbeanName =
+ String.format("%s=%s, %s=%s", CLUSTER_NAME_KEY, _clusterName, ENTITY_NAME_KEY, _entityName);
+
+ // ObjectName has one key-value pair:
+ // ------ 1) Monitor domain name KV pair where value is the MBean name
+ doRegister(dynamicMetrics,
+ new ObjectName(String.format("%s:%s", _monitorDomainName, mbeanName)));
+ return this;
+ }
+
+ @Override
+ public String getSensorName() {
+ return String.format("%s.%s.%s", MonitorDomainNames.Rebalancer.name(), _clusterName,
+ _entityName);
+ }
+
+ void addMetric(Metric metric) {
+ if (metric instanceof DynamicMetric) {
+ _metricMap.putIfAbsent(metric.getMetricName(), metric);
+ } else {
+ throw new HelixException("MetricCollector only supports Metrics that are DynamicMetric!");
+ }
+ }
+
+ /**
+ * Returns a desired type of the metric.
+ * @param metricName
+ * @param metricClass Desired type
+ * @param <T> Casted result of the metric
+ * @return
+ */
+ public <T extends DynamicMetric> T getMetric(String metricName, Class<T> metricClass) {
+ return metricClass.cast(_metricMap.get(metricName));
+ }
+
+ public Map<String, Metric> getMetricMap() {
+ return _metricMap;
+ }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java
new file mode 100644
index 0000000..04d804d
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java
@@ -0,0 +1,80 @@
+package org.apache.helix.monitoring.metrics;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import javax.management.JMException;
+import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
+import org.apache.helix.monitoring.metrics.implementation.RebalanceLatencyGauge;
+import org.apache.helix.monitoring.metrics.model.LatencyMetric;
+
+public class WagedRebalancerMetricCollector extends MetricCollector {
+ private static final String WAGED_REBALANCER_ENTITY_NAME = "WagedRebalancer";
+
+ /**
+ * This enum class contains all metric names defined for WagedRebalancer. Note that all enums are
+ * in camel case for readability.
+ */
+ public enum WagedRebalancerMetricNames {
+ // Per-stage latency metrics
+ GlobalBaselineCalcLatencyGauge,
+ PartialRebalanceLatencyGauge,
+
+ // The following latency metrics are related to AssignmentMetadataStore
+ StateReadLatencyGauge,
+ StateWriteLatencyGauge
+ }
+
+ public WagedRebalancerMetricCollector(String clusterName) throws JMException {
+ super(MonitorDomainNames.Rebalancer.name(), clusterName, WAGED_REBALANCER_ENTITY_NAME);
+ createMetrics();
+ register();
+ }
+
+ /**
+ * This constructor will create but will not register metrics. This constructor will be used in
+ * case of JMException so that the rebalancer could proceed without registering and emitting
+ * metrics.
+ */
+ public WagedRebalancerMetricCollector() {
+ super(MonitorDomainNames.Rebalancer.name(), null, null);
+ createMetrics();
+ }
+
+ /**
+ * Creates and registers all metrics in MetricCollector for WagedRebalancer.
+ */
+ private void createMetrics() {
+ // Define all metrics
+ LatencyMetric globalBaselineCalcLatencyGauge = new RebalanceLatencyGauge(
+ WagedRebalancerMetricNames.GlobalBaselineCalcLatencyGauge.name(), getResetIntervalInMs());
+ LatencyMetric partialRebalanceLatencyGauge = new RebalanceLatencyGauge(
+ WagedRebalancerMetricNames.PartialRebalanceLatencyGauge.name(), getResetIntervalInMs());
+ LatencyMetric stateReadLatencyGauge = new RebalanceLatencyGauge(
+ WagedRebalancerMetricNames.StateReadLatencyGauge.name(), getResetIntervalInMs());
+ LatencyMetric stateWriteLatencyGauge = new RebalanceLatencyGauge(
+ WagedRebalancerMetricNames.StateWriteLatencyGauge.name(), getResetIntervalInMs());
+
+ // Add metrics to WagedRebalancerMetricCollector
+ addMetric(globalBaselineCalcLatencyGauge);
+ addMetric(partialRebalanceLatencyGauge);
+ addMetric(stateReadLatencyGauge);
+ addMetric(stateWriteLatencyGauge);
+ }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceLatencyGauge.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceLatencyGauge.java
new file mode 100644
index 0000000..e96a589
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceLatencyGauge.java
@@ -0,0 +1,104 @@
+package org.apache.helix.monitoring.metrics.implementation;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.SlidingTimeWindowArrayReservoir;
+import java.util.concurrent.TimeUnit;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
+import org.apache.helix.monitoring.metrics.model.LatencyMetric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RebalanceLatencyGauge extends LatencyMetric {
+ private static final Logger LOG = LoggerFactory.getLogger(RebalanceLatencyGauge.class);
+ private static final long VALUE_NOT_SET = -1;
+ private long _lastEmittedMetricValue = VALUE_NOT_SET;
+
+ /**
+ * Instantiates a new Histogram dynamic metric.
+ * @param metricName the metric name
+ */
+ public RebalanceLatencyGauge(String metricName, long slidingTimeWindow) {
+ super(metricName, new Histogram(
+ new SlidingTimeWindowArrayReservoir(slidingTimeWindow, TimeUnit.MILLISECONDS)));
+ _metricName = metricName;
+ }
+
+ /**
+ * WARNING: this method is not thread-safe.
+ * Calling this method multiple times would simply overwrite the previous state. This is because
+ * the rebalancer could fail at any point, and we want it to recover gracefully by resetting the
+ * internal state of this metric.
+ */
+ @Override
+ public void startMeasuringLatency() {
+ reset();
+ _startTime = System.currentTimeMillis();
+ }
+
+ /**
+ * WARNING: this method is not thread-safe.
+ */
+ @Override
+ public void endMeasuringLatency() {
+ if (_startTime == VALUE_NOT_SET || _endTime != VALUE_NOT_SET) {
+ LOG.error(
+ "Needs to call startMeasuringLatency first! Ignoring and resetting the metric. Metric name: {}",
+ _metricName);
+ reset();
+ return;
+ }
+ _endTime = System.currentTimeMillis();
+ _lastEmittedMetricValue = _endTime - _startTime;
+ updateValue(_lastEmittedMetricValue);
+ reset();
+ }
+
+ @Override
+ public String getMetricName() {
+ return _metricName;
+ }
+
+ @Override
+ public void reset() {
+ _startTime = VALUE_NOT_SET;
+ _endTime = VALUE_NOT_SET;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Metric %s's latency is %d", _metricName, getLastEmittedMetricValue());
+ }
+
+ /**
+ * Returns the most recently emitted metric value at the time of the call.
+ * @return
+ */
+ @Override
+ public long getLastEmittedMetricValue() {
+ return _lastEmittedMetricValue;
+ }
+
+ @Override
+ public DynamicMetric getDynamicMetric() {
+ return this;
+ }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MonitorDomainNames.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/CountMetric.java
similarity index 55%
copy from helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MonitorDomainNames.java
copy to helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/CountMetric.java
index 73bf057..5a7f0ca 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MonitorDomainNames.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/CountMetric.java
@@ -1,4 +1,4 @@
-package org.apache.helix.monitoring.mbeans;
+package org.apache.helix.monitoring.metrics.model;
/*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -19,14 +19,23 @@ package org.apache.helix.monitoring.mbeans;
* under the License.
*/
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric;
+
/**
- * This enum defines all of domain names used with various Helix monitor mbeans.
+ * Represents a count metric and defines methods to help with calculation. A count metric gives a
+ * gauge value of a certain property.
*/
-public enum MonitorDomainNames {
- ClusterStatus,
- HelixZkClient,
- HelixThreadPoolExecutor,
- HelixCallback,
- RoutingTableProvider,
- CLMParticipantReport
+public abstract class CountMetric<V> extends SimpleDynamicMetric<V> implements Metric {
+ protected V _count;
+
+ /**
+ * Instantiates a new Simple dynamic metric.
+ * @param metricName the metric name
+ * @param metricObject the metric object
+ */
+ public CountMetric(String metricName, V metricObject) {
+ super(metricName, metricObject);
+ }
+
+ public abstract void setCount(Object count);
}
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/LatencyMetric.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/LatencyMetric.java
new file mode 100644
index 0000000..c8ba5ae
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/LatencyMetric.java
@@ -0,0 +1,52 @@
+package org.apache.helix.monitoring.metrics.model;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.codahale.metrics.Histogram;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.HistogramDynamicMetric;
+
+/**
+ * Represents a latency metric and defines methods to help with calculation. A latency metric gives
+ * how long a particular stage in the logic took in milliseconds.
+ */
+public abstract class LatencyMetric extends HistogramDynamicMetric implements Metric {
+ protected long _startTime;
+ protected long _endTime;
+ protected String _metricName;
+
+ /**
+ * Instantiates a new Histogram dynamic metric.
+ * @param metricName the metric name
+ * @param metricObject the metric object
+ */
+ public LatencyMetric(String metricName, Histogram metricObject) {
+ super(metricName, metricObject);
+ }
+
+ /**
+ * Starts measuring the latency.
+ */
+ public abstract void startMeasuringLatency();
+
+ /**
+ * Ends measuring the latency.
+ */
+ public abstract void endMeasuringLatency();
+}
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MonitorDomainNames.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/Metric.java
similarity index 55%
copy from helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MonitorDomainNames.java
copy to helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/Metric.java
index 73bf057..ba59b4f 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MonitorDomainNames.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/Metric.java
@@ -1,4 +1,4 @@
-package org.apache.helix.monitoring.mbeans;
+package org.apache.helix.monitoring.metrics.model;
/*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -19,14 +19,36 @@ package org.apache.helix.monitoring.mbeans;
* under the License.
*/
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
+
/**
- * This enum defines all of domain names used with various Helix monitor mbeans.
+ * Defines a generic metric interface.
*/
-public enum MonitorDomainNames {
- ClusterStatus,
- HelixZkClient,
- HelixThreadPoolExecutor,
- HelixCallback,
- RoutingTableProvider,
- CLMParticipantReport
+public interface Metric {
+
+ /**
+ * Gets the name of the metric.
+ */
+ String getMetricName();
+
+ /**
+ * Resets the internal state of this metric.
+ */
+ void reset();
+
+ /**
+ * Prints the metric along with its name.
+ */
+ String toString();
+
+ /**
+ * Returns the most recently emitted value for the metric at the time of the call.
+ * @return metric value
+ */
+ long getLastEmittedMetricValue();
+
+ /**
+ * Returns the underlying DynamicMetric.
+ */
+ DynamicMetric getDynamicMetric();
}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
index 96b6523..df368cb 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
@@ -133,8 +133,7 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
}
@Test(dependsOnMethods = "testRebalance")
- public void testPartialRebalance()
- throws IOException, HelixRebalanceException {
+ public void testPartialRebalance() throws IOException, HelixRebalanceException {
_metadataStore.clearMetadataStore();
WagedRebalancer rebalancer =
new WagedRebalancer(_metadataStore, _algorithm, new DelayedAutoRebalancer());
@@ -184,9 +183,9 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
String resourceName = csEntry.getKey();
CurrentState cs = csEntry.getValue();
for (Map.Entry<String, String> partitionStateEntry : cs.getPartitionStateMap().entrySet()) {
- currentStateOutput
- .setCurrentState(resourceName, new Partition(partitionStateEntry.getKey()),
- instanceName, partitionStateEntry.getValue());
+ currentStateOutput.setCurrentState(resourceName,
+ new Partition(partitionStateEntry.getKey()), instanceName,
+ partitionStateEntry.getValue());
}
}
}
@@ -220,8 +219,7 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
}
@Test(dependsOnMethods = "testRebalance", expectedExceptions = HelixRebalanceException.class, expectedExceptionsMessageRegExp = "Input contains invalid resource\\(s\\) that cannot be rebalanced by the WAGED rebalancer. \\[Resource1\\] Failure Type: INVALID_INPUT")
- public void testNonCompatibleConfiguration()
- throws IOException, HelixRebalanceException {
+ public void testNonCompatibleConfiguration() throws IOException, HelixRebalanceException {
_metadataStore.clearMetadataStore();
WagedRebalancer rebalancer =
new WagedRebalancer(_metadataStore, _algorithm, new DelayedAutoRebalancer());
@@ -243,8 +241,7 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
// TODO test with invalid capacity configuration which will fail the cluster model constructing.
@Test(dependsOnMethods = "testRebalance")
- public void testInvalidClusterStatus()
- throws IOException {
+ public void testInvalidClusterStatus() throws IOException {
_metadataStore.clearMetadataStore();
WagedRebalancer rebalancer =
new WagedRebalancer(_metadataStore, _algorithm, new DelayedAutoRebalancer());
@@ -291,7 +288,7 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
}
@Test(dependsOnMethods = "testRebalance")
- public void testAlgorithmExepction() throws IOException, HelixRebalanceException {
+ public void testAlgorithmException() throws IOException, HelixRebalanceException {
RebalanceAlgorithm badAlgorithm = Mockito.mock(RebalanceAlgorithm.class);
when(badAlgorithm.calculate(any())).thenThrow(new HelixRebalanceException("Algorithm fails.",
HelixRebalanceException.Type.FAILED_TO_CALCULATE));
@@ -430,9 +427,8 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
Assert.assertTrue(newIdealStates.containsKey(resourceName));
IdealState is = newIdealStates.get(resourceName);
ResourceAssignment assignment = expectedResult.get(resourceName);
- Assert.assertEquals(is.getPartitionSet(), new HashSet<>(
- assignment.getMappedPartitions().stream().map(partition -> partition.getPartitionName())
- .collect(Collectors.toSet())));
+ Assert.assertEquals(is.getPartitionSet(), new HashSet<>(assignment.getMappedPartitions()
+ .stream().map(partition -> partition.getPartitionName()).collect(Collectors.toSet())));
for (String partitionName : is.getPartitionSet()) {
Assert.assertEquals(is.getInstanceStateMap(partitionName),
assignment.getReplicaMap(new Partition(partitionName)));
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancerMetrics.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancerMetrics.java
new file mode 100644
index 0000000..dc0c89e
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancerMetrics.java
@@ -0,0 +1,132 @@
+package org.apache.helix.controller.rebalancer.waged;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import javax.management.JMException;
+import org.apache.helix.HelixRebalanceException;
+import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.rebalancer.waged.constraints.MockRebalanceAlgorithm;
+import org.apache.helix.controller.rebalancer.waged.model.AbstractTestClusterModel;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Resource;
+import org.apache.helix.monitoring.metrics.MetricCollector;
+import org.apache.helix.monitoring.metrics.WagedRebalancerMetricCollector;
+import org.mockito.stubbing.Answer;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.*;
+
+public class TestWagedRebalancerMetrics extends AbstractTestClusterModel {
+ private static final String TEST_STRING = "TEST";
+ private MetricCollector _metricCollector;
+ private Set<String> _instances;
+ private MockRebalanceAlgorithm _algorithm;
+ private MockAssignmentMetadataStore _metadataStore;
+
+ @BeforeClass
+ public void initialize() {
+ super.initialize();
+ _instances = new HashSet<>();
+ _instances.add(_testInstanceId);
+ _algorithm = new MockRebalanceAlgorithm();
+
+ // Initialize a mock assignment metadata store
+ _metadataStore = new MockAssignmentMetadataStore();
+ }
+
+ @Test
+ public void testMetricValuePropagation()
+ throws JMException, HelixRebalanceException, IOException {
+ _metadataStore.clearMetadataStore();
+ _metricCollector = new WagedRebalancerMetricCollector(TEST_STRING);
+ WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm, _metricCollector);
+
+ // Generate the input for the rebalancer.
+ ResourceControllerDataProvider clusterData = setupClusterDataCache();
+ Map<String, Resource> resourceMap = clusterData.getIdealStates().entrySet().stream()
+ .collect(Collectors.toMap(entry -> entry.getKey(), entry -> {
+ Resource resource = new Resource(entry.getKey());
+ entry.getValue().getPartitionSet().stream()
+ .forEach(partition -> resource.addPartition(partition));
+ return resource;
+ }));
+ Map<String, IdealState> newIdealStates =
+ rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput());
+
+ // Check that there exists a non-zero value in the metrics
+ Assert.assertTrue(_metricCollector.getMetricMap().values().stream()
+ .anyMatch(metric -> metric.getLastEmittedMetricValue() > 0L));
+ }
+
+ @Override
+ protected ResourceControllerDataProvider setupClusterDataCache() throws IOException {
+ ResourceControllerDataProvider testCache = super.setupClusterDataCache();
+
+ // Set up mock idealstate
+ Map<String, IdealState> isMap = new HashMap<>();
+ for (String resource : _resourceNames) {
+ IdealState is = new IdealState(resource);
+ is.setNumPartitions(_partitionNames.size());
+ is.setRebalanceMode(IdealState.RebalanceMode.FULL_AUTO);
+ is.setStateModelDefRef("MasterSlave");
+ is.setReplicas("100");
+ is.setRebalancerClassName(WagedRebalancer.class.getName());
+ _partitionNames.stream()
+ .forEach(partition -> is.setPreferenceList(partition, Collections.emptyList()));
+ isMap.put(resource, is);
+ }
+ when(testCache.getIdealState(anyString())).thenAnswer(
+ (Answer<IdealState>) invocationOnMock -> isMap.get(invocationOnMock.getArguments()[0]));
+ when(testCache.getIdealStates()).thenReturn(isMap);
+
+ // Set up 2 more instances
+ for (int i = 1; i < 3; i++) {
+ String instanceName = _testInstanceId + i;
+ _instances.add(instanceName);
+ // 1. Set up the default instance information with capacity configuration.
+ InstanceConfig testInstanceConfig = createMockInstanceConfig(instanceName);
+ Map<String, InstanceConfig> instanceConfigMap = testCache.getInstanceConfigMap();
+ instanceConfigMap.put(instanceName, testInstanceConfig);
+ when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap);
+ // 2. Mock the live instance node for the default instance.
+ LiveInstance testLiveInstance = createMockLiveInstance(instanceName);
+ Map<String, LiveInstance> liveInstanceMap = testCache.getLiveInstances();
+ liveInstanceMap.put(instanceName, testLiveInstance);
+ when(testCache.getLiveInstances()).thenReturn(liveInstanceMap);
+ when(testCache.getEnabledInstances()).thenReturn(liveInstanceMap.keySet());
+ when(testCache.getEnabledLiveInstances()).thenReturn(liveInstanceMap.keySet());
+ }
+
+ return testCache;
+ }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
index 91db076..7cb1da2 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
@@ -27,7 +27,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.model.ClusterConfig;
@@ -116,11 +115,11 @@ public abstract class AbstractTestClusterModel {
// 4. Mock two resources, each with 2 partitions on the default instance.
// The instance will have the following partitions assigned
// Resource 1:
- // partition 1 - MASTER
- // partition 2 - SLAVE
+ // -------------- partition 1 - MASTER
+ // -------------- partition 2 - SLAVE
// Resource 2:
- // partition 3 - MASTER
- // partition 4 - SLAVE
+ // -------------- partition 3 - MASTER
+ // -------------- partition 4 - SLAVE
CurrentState testCurrentStateResource1 = Mockito.mock(CurrentState.class);
Map<String, String> partitionStateMap1 = new HashMap<>();
partitionStateMap1.put(_partitionNames.get(0), "MASTER");
@@ -179,9 +178,10 @@ public abstract class AbstractTestClusterModel {
for (CurrentState cs : currentStatemap.values()) {
ResourceConfig resourceConfig = dataProvider.getResourceConfig(cs.getResourceName());
// Construct one AssignableReplica for each partition in the current state.
- cs.getPartitionStateMap().entrySet().stream().forEach(entry -> assignmentSet.add(
- new AssignableReplica(dataProvider.getClusterConfig(), resourceConfig, entry.getKey(),
- entry.getValue(), entry.getValue().equals("MASTER") ? 1 : 2)));
+ cs.getPartitionStateMap().entrySet().stream()
+ .forEach(entry -> assignmentSet
+ .add(new AssignableReplica(dataProvider.getClusterConfig(), resourceConfig,
+ entry.getKey(), entry.getValue(), entry.getValue().equals("MASTER") ? 1 : 2)));
}
return assignmentSet;
}