You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2019/10/07 21:12:40 UTC

[helix] 37/37: Add latency metric components for WAGED rebalancer (#490)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch wagedRebalancer2
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 4353f67192adc6c9d6fec1c718f3e1690a60ad0f
Author: Hunter Lee <hu...@linkedin.com>
AuthorDate: Fri Oct 4 13:50:43 2019 -0700

    Add latency metric components for WAGED rebalancer (#490)
    
    Add WAGED rebalancer metric framework and latency metric implementation
    
    Changelist:
    1. Add WAGED rebalancer metric interface
    2. Implement latency-related metrics
    3. Integrate latency metrics into WAGED rebalancer
    4. Add tests
---
 .../rebalancer/waged/WagedRebalancer.java          | 192 ++++++++++++++-------
 .../ConstraintBasedAlgorithmFactory.java           |   2 +-
 .../stages/BestPossibleStateCalcStage.java         |  44 ++++-
 .../monitoring/mbeans/MonitorDomainNames.java      |   3 +-
 .../helix/monitoring/metrics/MetricCollector.java  | 100 +++++++++++
 .../metrics/WagedRebalancerMetricCollector.java    |  80 +++++++++
 .../implementation/RebalanceLatencyGauge.java      | 104 +++++++++++
 .../model/CountMetric.java}                        |  27 ++-
 .../monitoring/metrics/model/LatencyMetric.java    |  52 ++++++
 .../model/Metric.java}                             |  40 ++++-
 .../rebalancer/waged/TestWagedRebalancer.java      |  22 +--
 .../waged/TestWagedRebalancerMetrics.java          | 132 ++++++++++++++
 .../waged/model/AbstractTestClusterModel.java      |  16 +-
 13 files changed, 698 insertions(+), 116 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index d211884..53c9840 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -48,6 +48,9 @@ import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.ResourceConfig;
+import org.apache.helix.monitoring.metrics.MetricCollector;
+import org.apache.helix.monitoring.metrics.WagedRebalancerMetricCollector;
+import org.apache.helix.monitoring.metrics.model.LatencyMetric;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -64,10 +67,8 @@ public class WagedRebalancer {
   // When any of the following change happens, the rebalancer needs to do a global rebalance which
   // contains 1. baseline recalculate, 2. partial rebalance that is based on the new baseline.
   private static final Set<HelixConstants.ChangeType> GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES =
-      ImmutableSet.of(
-          HelixConstants.ChangeType.RESOURCE_CONFIG,
-          HelixConstants.ChangeType.CLUSTER_CONFIG,
-          HelixConstants.ChangeType.INSTANCE_CONFIG);
+      ImmutableSet.of(HelixConstants.ChangeType.RESOURCE_CONFIG,
+          HelixConstants.ChangeType.CLUSTER_CONFIG, HelixConstants.ChangeType.INSTANCE_CONFIG);
   // The cluster change detector is a stateful object.
   // Make it static to avoid unnecessary reinitialization.
   private static final ThreadLocal<ResourceChangeDetector> CHANGE_DETECTOR_THREAD_LOCAL =
@@ -76,6 +77,7 @@ public class WagedRebalancer {
   private final MappingCalculator<ResourceControllerDataProvider> _mappingCalculator;
   private final AssignmentMetadataStore _assignmentMetadataStore;
   private final RebalanceAlgorithm _rebalanceAlgorithm;
+  private MetricCollector _metricCollector;
 
   private static AssignmentMetadataStore constructAssignmentStore(HelixManager helixManager) {
     AssignmentMetadataStore assignmentMetadataStore = null;
@@ -90,7 +92,8 @@ public class WagedRebalancer {
   }
 
   public WagedRebalancer(HelixManager helixManager,
-      Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences) {
+      Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences,
+      MetricCollector metricCollector) {
     this(constructAssignmentStore(helixManager),
         ConstraintBasedAlgorithmFactory.getInstance(preferences),
         // Use DelayedAutoRebalancer as the mapping calculator for the final assignment output.
@@ -99,16 +102,37 @@ public class WagedRebalancer {
         // TODO abstract and separate the main mapping calculator logic from DelayedAutoRebalancer
         new DelayedAutoRebalancer(),
         // Helix Manager is required for the rebalancer scheduler
-        helixManager);
+        helixManager, metricCollector);
   }
 
+  /**
+   * This constructor will use null for HelixManager and MetricCollector. With null HelixManager,
+   * the rebalancer will rebalance solely based on CurrentStates. With null MetricCollector, the
+   * rebalancer will not emit JMX metrics.
+   * @param assignmentMetadataStore
+   * @param algorithm
+   * @param mappingCalculator
+   */
   protected WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
       RebalanceAlgorithm algorithm, MappingCalculator mappingCalculator) {
-    this(assignmentMetadataStore, algorithm, mappingCalculator, null);
+    this(assignmentMetadataStore, algorithm, mappingCalculator, null, null);
+  }
+
+  /**
+   * This constructor will use null for HelixManager and MetricCollector. With null HelixManager,
+   * the rebalancer will rebalance solely based on CurrentStates.
+   * @param assignmentMetadataStore
+   * @param algorithm
+   * @param metricCollector
+   */
+  protected WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
+      RebalanceAlgorithm algorithm, MetricCollector metricCollector) {
+    this(assignmentMetadataStore, algorithm, new DelayedAutoRebalancer(), null, metricCollector);
   }
 
   private WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
-      RebalanceAlgorithm algorithm, MappingCalculator mappingCalculator, HelixManager manager) {
+      RebalanceAlgorithm algorithm, MappingCalculator mappingCalculator, HelixManager manager,
+      MetricCollector metricCollector) {
     if (assignmentMetadataStore == null) {
       LOG.warn("Assignment Metadata Store is not configured properly."
           + " The rebalancer will not access the assignment store during the rebalance.");
@@ -117,6 +141,10 @@ public class WagedRebalancer {
     _rebalanceAlgorithm = algorithm;
     _mappingCalculator = mappingCalculator;
     _manager = manager;
+    // If metricCollector is null, instantiate a version that does not register metrics in order to
+    // allow rebalancer to proceed
+    _metricCollector =
+        metricCollector == null ? new WagedRebalancerMetricCollector() : metricCollector;
   }
 
   // Release all the resources.
@@ -138,8 +166,7 @@ public class WagedRebalancer {
       Map<String, Resource> resourceMap, final CurrentStateOutput currentStateOutput)
       throws HelixRebalanceException {
     if (resourceMap.isEmpty()) {
-      LOG.warn("There is no resource to be rebalanced by {}",
-          this.getClass().getSimpleName());
+      LOG.warn("There is no resource to be rebalanced by {}", this.getClass().getSimpleName());
       return Collections.emptyMap();
     }
 
@@ -169,7 +196,6 @@ public class WagedRebalancer {
             newStateMap == null ? Collections.emptyMap() : newStateMap);
       }
     }
-
     LOG.info("Finish computing new ideal states for resources: {}",
         resourceMap.keySet().toString());
     return newIdealStates;
@@ -191,24 +217,25 @@ public class WagedRebalancer {
               return itemKeys;
             }));
 
+    // Perform Global Baseline Calculation
     if (clusterChanges.keySet().stream()
-        .anyMatch(changeType -> GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES.contains(changeType))) {
+        .anyMatch(GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES::contains)) {
       refreshBaseline(clusterData, clusterChanges, resourceMap, currentStateOutput);
       // Inject a cluster config change for large scale partial rebalance once the baseline changed.
       clusterChanges.putIfAbsent(HelixConstants.ChangeType.CLUSTER_CONFIG, Collections.emptySet());
     }
 
-    Set<String> activeNodes = DelayedRebalanceUtil
-        .getActiveNodes(clusterData.getAllInstances(), clusterData.getEnabledLiveInstances(),
-            clusterData.getInstanceOfflineTimeMap(), clusterData.getLiveInstances().keySet(),
-            clusterData.getInstanceConfigMap(), clusterData.getClusterConfig());
+    Set<String> activeNodes = DelayedRebalanceUtil.getActiveNodes(clusterData.getAllInstances(),
+        clusterData.getEnabledLiveInstances(), clusterData.getInstanceOfflineTimeMap(),
+        clusterData.getLiveInstances().keySet(), clusterData.getInstanceConfigMap(),
+        clusterData.getClusterConfig());
 
     // Schedule (or unschedule) delayed rebalance according to the delayed rebalance config.
     delayedRebalanceSchedule(clusterData, activeNodes, resourceMap.keySet());
 
+    // Perform partial rebalance
     Map<String, ResourceAssignment> newAssignment =
-        partialRebalance(clusterData, clusterChanges, resourceMap, activeNodes,
-            currentStateOutput);
+        partialRebalance(clusterData, clusterChanges, resourceMap, activeNodes, currentStateOutput);
 
     // <ResourceName, <State, Priority>>
     Map<String, Map<String, Integer>> resourceStatePriorityMap = new HashMap<>();
@@ -238,16 +265,15 @@ public class WagedRebalancer {
     // some delayed rebalanced assignments.
     if (!activeNodes.equals(clusterData.getEnabledLiveInstances())) {
       applyRebalanceOverwrite(finalIdealStateMap, clusterData, resourceMap, clusterChanges,
-          resourceStatePriorityMap,
-          getBaselineAssignment(_assignmentMetadataStore, currentStateOutput,
-              resourceMap.keySet()));
+          resourceStatePriorityMap, getBaselineAssignment(_assignmentMetadataStore,
+              currentStateOutput, resourceMap.keySet()));
     }
     // Replace the assignment if user-defined preference list is configured.
     // Note the user-defined list is intentionally applied to the final mapping after calculation.
     // This is to avoid persisting it into the assignment store, which impacts the long term
     // assignment evenness and partition movements.
-    finalIdealStateMap.entrySet().stream().forEach(
-        idealStateEntry -> applyUserDefinedPreferenceList(
+    finalIdealStateMap.entrySet().stream()
+        .forEach(idealStateEntry -> applyUserDefinedPreferenceList(
             clusterData.getResourceConfig(idealStateEntry.getKey()), idealStateEntry.getValue()));
 
     return finalIdealStateMap;
@@ -258,19 +284,31 @@ public class WagedRebalancer {
       Map<HelixConstants.ChangeType, Set<String>> clusterChanges, Map<String, Resource> resourceMap,
       final CurrentStateOutput currentStateOutput) throws HelixRebalanceException {
     LOG.info("Start calculating the new baseline.");
+    LatencyMetric globalBaselineCalcLatency = _metricCollector.getMetric(
+        WagedRebalancerMetricCollector.WagedRebalancerMetricNames.GlobalBaselineCalcLatencyGauge
+            .name(),
+        LatencyMetric.class);
+    globalBaselineCalcLatency.startMeasuringLatency();
+    // Read the baseline from metadata store
     Map<String, ResourceAssignment> currentBaseline =
         getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet());
+
     // For baseline calculation
     // 1. Ignore node status (disable/offline).
     // 2. Use the baseline as the previous best possible assignment since there is no "baseline" for
     // the baseline.
-    Map<String, ResourceAssignment> newBaseline =
-        calculateAssignment(clusterData, clusterChanges, resourceMap, clusterData.getAllInstances(),
-            Collections.emptyMap(), currentBaseline);
+    Map<String, ResourceAssignment> newBaseline = calculateAssignment(clusterData, clusterChanges,
+        resourceMap, clusterData.getAllInstances(), Collections.emptyMap(), currentBaseline);
 
+    // Write the new baseline to metadata store
     if (_assignmentMetadataStore != null) {
       try {
+        LatencyMetric writeLatency = _metricCollector.getMetric(
+            WagedRebalancerMetricCollector.WagedRebalancerMetricNames.StateWriteLatencyGauge.name(),
+            LatencyMetric.class);
+        writeLatency.startMeasuringLatency();
         _assignmentMetadataStore.persistBaseline(newBaseline);
+        writeLatency.endMeasuringLatency();
       } catch (Exception ex) {
         throw new HelixRebalanceException("Failed to persist the new baseline assignment.",
             HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
@@ -278,7 +316,7 @@ public class WagedRebalancer {
     } else {
       LOG.debug("Assignment Metadata Store is empty. Skip persist the baseline assignment.");
     }
-
+    globalBaselineCalcLatency.endMeasuringLatency();
     LOG.info("Finish calculating the new baseline.");
   }
 
@@ -288,20 +326,34 @@ public class WagedRebalancer {
       Set<String> activeNodes, final CurrentStateOutput currentStateOutput)
       throws HelixRebalanceException {
     LOG.info("Start calculating the new best possible assignment.");
+    LatencyMetric partialRebalanceLatency = _metricCollector.getMetric(
+        WagedRebalancerMetricCollector.WagedRebalancerMetricNames.PartialRebalanceLatencyGauge
+            .name(),
+        LatencyMetric.class);
+    partialRebalanceLatency.startMeasuringLatency();
+    // TODO: Consider combining the metrics for both baseline/best possible?
+    // Read the baseline from metadata store
     Map<String, ResourceAssignment> currentBaseline =
         getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet());
-    Map<String, ResourceAssignment> currentBestPossibleAssignment =
-        getBestPossibleAssignment(_assignmentMetadataStore, currentStateOutput,
-            resourceMap.keySet());
-    Map<String, ResourceAssignment> newAssignment =
-        calculateAssignment(clusterData, clusterChanges, resourceMap, activeNodes, currentBaseline,
-            currentBestPossibleAssignment);
+
+    // Read the best possible assignment from metadata store
+    Map<String, ResourceAssignment> currentBestPossibleAssignment = getBestPossibleAssignment(
+        _assignmentMetadataStore, currentStateOutput, resourceMap.keySet());
+
+    // Compute the new assignment
+    Map<String, ResourceAssignment> newAssignment = calculateAssignment(clusterData, clusterChanges,
+        resourceMap, activeNodes, currentBaseline, currentBestPossibleAssignment);
 
     if (_assignmentMetadataStore != null) {
       try {
+        LatencyMetric writeLatency = _metricCollector.getMetric(
+            WagedRebalancerMetricCollector.WagedRebalancerMetricNames.StateWriteLatencyGauge.name(),
+            LatencyMetric.class);
+        writeLatency.startMeasuringLatency();
         // TODO Test to confirm if persisting the final assignment (with final partition states)
         // would be a better option.
         _assignmentMetadataStore.persistBestPossibleAssignment(newAssignment);
+        writeLatency.endMeasuringLatency();
       } catch (Exception ex) {
         throw new HelixRebalanceException("Failed to persist the new best possible assignment.",
             HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
@@ -309,20 +361,20 @@ public class WagedRebalancer {
     } else {
       LOG.debug("Assignment Metadata Store is empty. Skip persist the baseline assignment.");
     }
-
+    partialRebalanceLatency.endMeasuringLatency();
     LOG.info("Finish calculating the new best possible assignment.");
     return newAssignment;
   }
 
   /**
    * Generate the cluster model based on the input and calculate the optimal assignment.
-   * @param clusterData                the cluster data cache.
-   * @param clusterChanges             the detected cluster changes.
-   * @param resourceMap                the rebalancing resources.
-   * @param activeNodes                the alive and enabled nodes.
-   * @param baseline                   the baseline assignment for the algorithm as a reference.
+   * @param clusterData the cluster data cache.
+   * @param clusterChanges the detected cluster changes.
+   * @param resourceMap the rebalancing resources.
+   * @param activeNodes the alive and enabled nodes.
+   * @param baseline the baseline assignment for the algorithm as a reference.
    * @param prevBestPossibleAssignment the previous best possible assignment for the algorithm as a
-   *                                   reference.
+   *          reference.
    * @return the new optimal assignment for the resources.
    */
   private Map<String, ResourceAssignment> calculateAssignment(
@@ -415,7 +467,7 @@ public class WagedRebalancer {
    * @param currentStateOutput
    * @param resources
    * @return The current baseline assignment. If record does not exist in the
-   * assignmentMetadataStore, return the current state assignment.
+   *         assignmentMetadataStore, return the current state assignment.
    * @throws HelixRebalanceException
    */
   private Map<String, ResourceAssignment> getBaselineAssignment(
@@ -424,7 +476,12 @@ public class WagedRebalancer {
     Map<String, ResourceAssignment> currentBaseline = Collections.emptyMap();
     if (assignmentMetadataStore != null) {
       try {
+        LatencyMetric stateReadLatency = _metricCollector.getMetric(
+            WagedRebalancerMetricCollector.WagedRebalancerMetricNames.StateReadLatencyGauge.name(),
+            LatencyMetric.class);
+        stateReadLatency.startMeasuringLatency();
         currentBaseline = assignmentMetadataStore.getBaseline();
+        stateReadLatency.endMeasuringLatency();
       } catch (HelixException ex) {
         // Report error. and use empty mapping instead.
         LOG.error("Failed to get the current baseline assignment.", ex);
@@ -435,8 +492,7 @@ public class WagedRebalancer {
       }
     }
     if (currentBaseline.isEmpty()) {
-      LOG.warn(
-          "The current baseline assignment record is empty. Use the current states instead.");
+      LOG.warn("The current baseline assignment record is empty. Use the current states instead.");
       currentBaseline = getCurrentStateAssingment(currentStateOutput, resources);
     }
     return currentBaseline;
@@ -447,7 +503,7 @@ public class WagedRebalancer {
    * @param currentStateOutput
    * @param resources
    * @return The current best possible assignment. If record does not exist in the
-   * assignmentMetadataStore, return the current state assignment.
+   *         assignmentMetadataStore, return the current state assignment.
    * @throws HelixRebalanceException
    */
   private Map<String, ResourceAssignment> getBestPossibleAssignment(
@@ -456,7 +512,12 @@ public class WagedRebalancer {
     Map<String, ResourceAssignment> currentBestAssignment = Collections.emptyMap();
     if (assignmentMetadataStore != null) {
       try {
+        LatencyMetric stateReadLatency = _metricCollector.getMetric(
+            WagedRebalancerMetricCollector.WagedRebalancerMetricNames.StateReadLatencyGauge.name(),
+            LatencyMetric.class);
+        stateReadLatency.startMeasuringLatency();
         currentBestAssignment = assignmentMetadataStore.getBestPossibleAssignment();
+        stateReadLatency.endMeasuringLatency();
       } catch (HelixException ex) {
         // Report error. and use empty mapping instead.
         LOG.error("Failed to get the current best possible assignment.", ex);
@@ -483,8 +544,8 @@ public class WagedRebalancer {
       if (!currentStateMap.isEmpty()) {
         ResourceAssignment newResourceAssignment = new ResourceAssignment(resourceName);
         currentStateMap.entrySet().stream().forEach(currentStateEntry -> {
-          newResourceAssignment
-              .addReplicaMap(currentStateEntry.getKey(), currentStateEntry.getValue());
+          newResourceAssignment.addReplicaMap(currentStateEntry.getKey(),
+              currentStateEntry.getValue());
         });
         currentStateAssignment.put(resourceName, newResourceAssignment);
       }
@@ -507,11 +568,10 @@ public class WagedRebalancer {
       Set<String> offlineOrDisabledInstances = new HashSet<>(delayedActiveNodes);
       offlineOrDisabledInstances.removeAll(clusterData.getEnabledLiveInstances());
       for (String resource : resourceSet) {
-        DelayedRebalanceUtil
-            .setRebalanceScheduler(resource, delayedRebalanceEnabled, offlineOrDisabledInstances,
-                clusterData.getInstanceOfflineTimeMap(), clusterData.getLiveInstances().keySet(),
-                clusterData.getInstanceConfigMap(), clusterConfig.getRebalanceDelayTime(),
-                clusterConfig, _manager);
+        DelayedRebalanceUtil.setRebalanceScheduler(resource, delayedRebalanceEnabled,
+            offlineOrDisabledInstances, clusterData.getInstanceOfflineTimeMap(),
+            clusterData.getLiveInstances().keySet(), clusterData.getInstanceConfigMap(),
+            clusterConfig.getRebalanceDelayTime(), clusterConfig, _manager);
       }
     } else {
       LOG.warn("Skip scheduling a delayed rebalancer since HelixManager is not specified.");
@@ -523,30 +583,30 @@ public class WagedRebalancer {
    * Since the rebalancing might be done with the delayed logic, the rebalanced ideal states
    * might include inactive nodes.
    * This overwrite will adjust the final mapping, so as to ensure the result is completely valid.
-   * @param idealStateMap            the calculated ideal states.
-   * @param clusterData              the cluster data cache.
-   * @param resourceMap              the rebalanaced resource map.
-   * @param clusterChanges           the detected cluster changes that triggeres the rebalance.
+   * @param idealStateMap the calculated ideal states.
+   * @param clusterData the cluster data cache.
+   * @param resourceMap the rebalanaced resource map.
+   * @param clusterChanges the detected cluster changes that triggeres the rebalance.
    * @param resourceStatePriorityMap the state priority map for each resource.
-   * @param baseline                 the baseline assignment
+   * @param baseline the baseline assignment
    */
   private void applyRebalanceOverwrite(Map<String, IdealState> idealStateMap,
       ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
       Map<HelixConstants.ChangeType, Set<String>> clusterChanges,
       Map<String, Map<String, Integer>> resourceStatePriorityMap,
-      Map<String, ResourceAssignment> baseline)
-      throws HelixRebalanceException {
+      Map<String, ResourceAssignment> baseline) throws HelixRebalanceException {
     Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
-    // Note that the calculation used the baseline as the input only. This is for minimizing unnecessary partition movement.
-    Map<String, ResourceAssignment> activeAssignment =
-        calculateAssignment(clusterData, clusterChanges, resourceMap, enabledLiveInstances,
-            Collections.emptyMap(), baseline);
+    // Note that the calculation used the baseline as the input only. This is for minimizing
+    // unnecessary partition movement.
+    Map<String, ResourceAssignment> activeAssignment = calculateAssignment(clusterData,
+        clusterChanges, resourceMap, enabledLiveInstances, Collections.emptyMap(), baseline);
     for (String resourceName : idealStateMap.keySet()) {
       IdealState is = idealStateMap.get(resourceName);
       if (!activeAssignment.containsKey(resourceName)) {
         throw new HelixRebalanceException(
             "Failed to calculate the complete partition assignment with all active nodes. Cannot find the resource assignment for "
-                + resourceName, HelixRebalanceException.Type.FAILED_TO_CALCULATE);
+                + resourceName,
+            HelixRebalanceException.Type.FAILED_TO_CALCULATE);
       }
       IdealState currentIdealState = clusterData.getIdealState(resourceName);
       IdealState newActiveIdealState =
@@ -555,9 +615,9 @@ public class WagedRebalancer {
 
       int numReplia = currentIdealState.getReplicaCount(enabledLiveInstances.size());
       int minActiveReplica = DelayedRebalanceUtil.getMinActiveReplica(currentIdealState, numReplia);
-      Map<String, List<String>> finalPreferenceLists = DelayedRebalanceUtil
-          .getFinalDelayedMapping(newActiveIdealState.getPreferenceLists(), is.getPreferenceLists(),
-              enabledLiveInstances, Math.min(minActiveReplica, numReplia));
+      Map<String, List<String>> finalPreferenceLists =
+          DelayedRebalanceUtil.getFinalDelayedMapping(newActiveIdealState.getPreferenceLists(),
+              is.getPreferenceLists(), enabledLiveInstances, Math.min(minActiveReplica, numReplia));
 
       is.setPreferenceLists(finalPreferenceLists);
     }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java
index f70de9a..fbf8b19 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java
@@ -41,7 +41,7 @@ public class ConstraintBasedAlgorithmFactory {
   // enlarge the overall weight of the evenness constraints compared with the movement constraint.
   // TODO: Tune or make the following factor configurable.
   private static final int EVENNESS_PREFERENCE_NORMALIZE_FACTOR = 50;
-  private static final Map<String, Float> MODEL = new HashMap<>() {
+  private static final Map<String, Float> MODEL = new HashMap<String, Float>() {
     {
       // The default setting
       put(PartitionMovementConstraint.class.getSimpleName(), 1f);
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 8c082f1..6f442ea 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -26,8 +26,9 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
-
 import java.util.stream.Collectors;
+
+import javax.management.JMException;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixRebalanceException;
@@ -42,6 +43,8 @@ import org.apache.helix.controller.rebalancer.Rebalancer;
 import org.apache.helix.controller.rebalancer.SemiAutoRebalancer;
 import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
 import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
+import org.apache.helix.monitoring.metrics.MetricCollector;
+import org.apache.helix.monitoring.metrics.WagedRebalancerMetricCollector;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
@@ -64,6 +67,8 @@ import org.slf4j.LoggerFactory;
 public class BestPossibleStateCalcStage extends AbstractBaseStage {
   private static final Logger logger =
       LoggerFactory.getLogger(BestPossibleStateCalcStage.class.getName());
+  // Create a ThreadLocal of MetricCollector. Metrics could only be updated by the controller thread only.
+  private static final ThreadLocal<MetricCollector> METRIC_COLLECTOR_THREAD_LOCAL = new ThreadLocal<>();
 
   @Override
   public void process(ClusterEvent event) throws Exception {
@@ -253,20 +258,41 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
     Map<String, IdealState> newIdealStates = new HashMap<>();
 
     // Init rebalancer with the rebalance preferences.
-    Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences = cache.getClusterConfig()
-        .getGlobalRebalancePreference();
+    Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences =
+        cache.getClusterConfig().getGlobalRebalancePreference();
+
+    if (METRIC_COLLECTOR_THREAD_LOCAL.get() == null) {
+      try {
+        // If HelixManager is null, we just pass in null for MetricCollector so that a
+        // non-functioning WagedRebalancerMetricCollector would be created in WagedRebalancer's
+        // constructor. This is to handle two cases: 1. HelixManager is null for non-testing cases -
+        // in this case, WagedRebalancer will not read/write to metadata store and just use
+        // CurrentState-based rebalancing. 2. Tests that require instrumenting the rebalancer for
+        // verifying whether the cluster has converged.
+        METRIC_COLLECTOR_THREAD_LOCAL.set(helixManager == null ? null
+            : new WagedRebalancerMetricCollector(helixManager.getClusterName()));
+      } catch (JMException e) {
+        LogUtil.logWarn(logger, _eventId, String.format(
+            "MetricCollector instantiation failed! WagedRebalancer will not emit metrics due to JMException %s",
+            e));
+      }
+    }
+
     // TODO avoid creating the rebalancer on every rebalance call for performance enhancement
-    WagedRebalancer wagedRebalancer = new WagedRebalancer(helixManager, preferences);
+    WagedRebalancer wagedRebalancer =
+        new WagedRebalancer(helixManager, preferences, METRIC_COLLECTOR_THREAD_LOCAL.get());
     try {
-      newIdealStates.putAll(wagedRebalancer
-          .computeNewIdealStates(cache, wagedRebalancedResourceMap, currentStateOutput));
+      newIdealStates.putAll(wagedRebalancer.computeNewIdealStates(cache, wagedRebalancedResourceMap,
+          currentStateOutput));
     } catch (HelixRebalanceException ex) {
       // Note that unlike the legacy rebalancer, the WAGED rebalance won't return partial result.
       // Since it calculates for all the eligible resources globally, a partial result is invalid.
       // TODO propagate the rebalancer failure information to updateRebalanceStatus for monitoring.
-      LogUtil.logError(logger, _eventId, String
-          .format("Failed to calculate the new Ideal States using the rebalancer %s due to %s",
-              wagedRebalancer.getClass().getSimpleName(), ex.getFailureType()), ex);
+      LogUtil.logError(logger, _eventId,
+          String.format(
+              "Failed to calculate the new Ideal States using the rebalancer %s due to %s",
+              wagedRebalancer.getClass().getSimpleName(), ex.getFailureType()),
+          ex);
     } finally {
       wagedRebalancer.close();
     }
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MonitorDomainNames.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MonitorDomainNames.java
index 73bf057..fee9099 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MonitorDomainNames.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MonitorDomainNames.java
@@ -28,5 +28,6 @@ public enum MonitorDomainNames {
   HelixThreadPoolExecutor,
   HelixCallback,
   RoutingTableProvider,
-  CLMParticipantReport
+  CLMParticipantReport,
+  Rebalancer
 }
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/MetricCollector.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/MetricCollector.java
new file mode 100644
index 0000000..764557a
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/MetricCollector.java
@@ -0,0 +1,100 @@
+package org.apache.helix.monitoring.metrics;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import javax.management.JMException;
+import javax.management.ObjectName;
+import org.apache.helix.HelixException;
+import org.apache.helix.monitoring.metrics.model.Metric;
+import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
+
+/**
+ * Collects and manages all metrics that implement the {@link Metric} interface.
+ */
+public abstract class MetricCollector extends DynamicMBeanProvider {
+  private static final String CLUSTER_NAME_KEY = "ClusterName";
+  private static final String ENTITY_NAME_KEY = "EntityName";
+  private final String _monitorDomainName;
+  private final String _clusterName;
+  private final String _entityName;
+  private Map<String, Metric> _metricMap;
+
+  public MetricCollector(String monitorDomainName, String clusterName, String entityName) {
+    _monitorDomainName = monitorDomainName;
+    _clusterName = clusterName;
+    _entityName = entityName;
+    _metricMap = new HashMap<>();
+  }
+
+  @Override
+  public DynamicMBeanProvider register() throws JMException {
+    // First cast all Metric objects to DynamicMetrics
+    Collection<DynamicMetric<?, ?>> dynamicMetrics = new HashSet<>();
+    _metricMap.values().forEach(metric -> dynamicMetrics.add(metric.getDynamicMetric()));
+
+    // Define MBeanName and ObjectName
+    // MBean name has two key-value pairs:
+    // ------ 1) ClusterName KV pair (first %s=%s)
+    // ------ 2) EntityName KV pair (second %s=%s)
+    String mbeanName =
+        String.format("%s=%s, %s=%s", CLUSTER_NAME_KEY, _clusterName, ENTITY_NAME_KEY, _entityName);
+
+    // ObjectName has one key-value pair:
+    // ------ 1) Monitor domain name KV pair where value is the MBean name
+    doRegister(dynamicMetrics,
+        new ObjectName(String.format("%s:%s", _monitorDomainName, mbeanName)));
+    return this;
+  }
+
+  @Override
+  public String getSensorName() {
+    return String.format("%s.%s.%s", MonitorDomainNames.Rebalancer.name(), _clusterName,
+        _entityName);
+  }
+
+  void addMetric(Metric metric) {
+    if (metric instanceof DynamicMetric) {
+      _metricMap.putIfAbsent(metric.getMetricName(), metric);
+    } else {
+      throw new HelixException("MetricCollector only supports Metrics that are DynamicMetric!");
+    }
+  }
+
+  /**
+   * Returns a desired type of the metric.
+   * @param metricName
+   * @param metricClass Desired type
+   * @param <T> Casted result of the metric
+   * @return
+   */
+  public <T extends DynamicMetric> T getMetric(String metricName, Class<T> metricClass) {
+    return metricClass.cast(_metricMap.get(metricName));
+  }
+
+  public Map<String, Metric> getMetricMap() {
+    return _metricMap;
+  }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java
new file mode 100644
index 0000000..04d804d
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java
@@ -0,0 +1,80 @@
+package org.apache.helix.monitoring.metrics;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import javax.management.JMException;
+import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
+import org.apache.helix.monitoring.metrics.implementation.RebalanceLatencyGauge;
+import org.apache.helix.monitoring.metrics.model.LatencyMetric;
+
+public class WagedRebalancerMetricCollector extends MetricCollector {
+  private static final String WAGED_REBALANCER_ENTITY_NAME = "WagedRebalancer";
+
+  /**
+   * This enum class contains all metric names defined for WagedRebalancer. Note that all enums are
+   * in camel case for readability.
+   */
+  public enum WagedRebalancerMetricNames {
+    // Per-stage latency metrics
+    GlobalBaselineCalcLatencyGauge,
+    PartialRebalanceLatencyGauge,
+
+    // The following latency metrics are related to AssignmentMetadataStore
+    StateReadLatencyGauge,
+    StateWriteLatencyGauge
+  }
+
+  public WagedRebalancerMetricCollector(String clusterName) throws JMException {
+    super(MonitorDomainNames.Rebalancer.name(), clusterName, WAGED_REBALANCER_ENTITY_NAME);
+    createMetrics();
+    register();
+  }
+
+  /**
+   * This constructor will create but will not register metrics. This constructor will be used in
+   * case of JMException so that the rebalancer could proceed without registering and emitting
+   * metrics.
+   */
+  public WagedRebalancerMetricCollector() {
+    super(MonitorDomainNames.Rebalancer.name(), null, null);
+    createMetrics();
+  }
+
+  /**
+   * Creates and registers all metrics in MetricCollector for WagedRebalancer.
+   */
+  private void createMetrics() {
+    // Define all metrics
+    LatencyMetric globalBaselineCalcLatencyGauge = new RebalanceLatencyGauge(
+        WagedRebalancerMetricNames.GlobalBaselineCalcLatencyGauge.name(), getResetIntervalInMs());
+    LatencyMetric partialRebalanceLatencyGauge = new RebalanceLatencyGauge(
+        WagedRebalancerMetricNames.PartialRebalanceLatencyGauge.name(), getResetIntervalInMs());
+    LatencyMetric stateReadLatencyGauge = new RebalanceLatencyGauge(
+        WagedRebalancerMetricNames.StateReadLatencyGauge.name(), getResetIntervalInMs());
+    LatencyMetric stateWriteLatencyGauge = new RebalanceLatencyGauge(
+        WagedRebalancerMetricNames.StateWriteLatencyGauge.name(), getResetIntervalInMs());
+
+    // Add metrics to WagedRebalancerMetricCollector
+    addMetric(globalBaselineCalcLatencyGauge);
+    addMetric(partialRebalanceLatencyGauge);
+    addMetric(stateReadLatencyGauge);
+    addMetric(stateWriteLatencyGauge);
+  }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceLatencyGauge.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceLatencyGauge.java
new file mode 100644
index 0000000..e96a589
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceLatencyGauge.java
@@ -0,0 +1,104 @@
+package org.apache.helix.monitoring.metrics.implementation;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.SlidingTimeWindowArrayReservoir;
+import java.util.concurrent.TimeUnit;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
+import org.apache.helix.monitoring.metrics.model.LatencyMetric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RebalanceLatencyGauge extends LatencyMetric {
+  private static final Logger LOG = LoggerFactory.getLogger(RebalanceLatencyGauge.class);
+  private static final long VALUE_NOT_SET = -1;
+  private long _lastEmittedMetricValue = VALUE_NOT_SET;
+
+  /**
+   * Instantiates a new Histogram dynamic metric.
+   * @param metricName the metric name
+   */
+  public RebalanceLatencyGauge(String metricName, long slidingTimeWindow) {
+    super(metricName, new Histogram(
+        new SlidingTimeWindowArrayReservoir(slidingTimeWindow, TimeUnit.MILLISECONDS)));
+    _metricName = metricName;
+  }
+
+  /**
+   * WARNING: this method is not thread-safe.
+   * Calling this method multiple times would simply overwrite the previous state. This is because
+   * the rebalancer could fail at any point, and we want it to recover gracefully by resetting the
+   * internal state of this metric.
+   */
+  @Override
+  public void startMeasuringLatency() {
+    reset();
+    _startTime = System.currentTimeMillis();
+  }
+
+  /**
+   * WARNING: this method is not thread-safe.
+   */
+  @Override
+  public void endMeasuringLatency() {
+    if (_startTime == VALUE_NOT_SET || _endTime != VALUE_NOT_SET) {
+      LOG.error(
+          "Needs to call startMeasuringLatency first! Ignoring and resetting the metric. Metric name: {}",
+          _metricName);
+      reset();
+      return;
+    }
+    _endTime = System.currentTimeMillis();
+    _lastEmittedMetricValue = _endTime - _startTime;
+    updateValue(_lastEmittedMetricValue);
+    reset();
+  }
+
+  @Override
+  public String getMetricName() {
+    return _metricName;
+  }
+
+  @Override
+  public void reset() {
+    _startTime = VALUE_NOT_SET;
+    _endTime = VALUE_NOT_SET;
+  }
+
+  @Override
+  public String toString() {
+    return String.format("Metric %s's latency is %d", _metricName, getLastEmittedMetricValue());
+  }
+
+  /**
+   * Returns the most recently emitted metric value at the time of the call.
+   * @return
+   */
+  @Override
+  public long getLastEmittedMetricValue() {
+    return _lastEmittedMetricValue;
+  }
+
+  @Override
+  public DynamicMetric getDynamicMetric() {
+    return this;
+  }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MonitorDomainNames.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/CountMetric.java
similarity index 55%
copy from helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MonitorDomainNames.java
copy to helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/CountMetric.java
index 73bf057..5a7f0ca 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MonitorDomainNames.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/CountMetric.java
@@ -1,4 +1,4 @@
-package org.apache.helix.monitoring.mbeans;
+package org.apache.helix.monitoring.metrics.model;
 
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -19,14 +19,23 @@ package org.apache.helix.monitoring.mbeans;
  * under the License.
  */
 
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric;
+
 /**
- * This enum defines all of domain names used with various Helix monitor mbeans.
+ * Represents a count metric and defines methods to help with calculation. A count metric gives a
+ * gauge value of a certain property.
  */
-public enum MonitorDomainNames {
-  ClusterStatus,
-  HelixZkClient,
-  HelixThreadPoolExecutor,
-  HelixCallback,
-  RoutingTableProvider,
-  CLMParticipantReport
+public abstract class CountMetric<V> extends SimpleDynamicMetric<V> implements Metric {
+  protected V _count;
+
+  /**
+   * Instantiates a new Simple dynamic metric.
+   * @param metricName the metric name
+   * @param metricObject the metric object
+   */
+  public CountMetric(String metricName, V metricObject) {
+    super(metricName, metricObject);
+  }
+
+  public abstract void setCount(Object count);
 }
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/LatencyMetric.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/LatencyMetric.java
new file mode 100644
index 0000000..c8ba5ae
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/LatencyMetric.java
@@ -0,0 +1,52 @@
+package org.apache.helix.monitoring.metrics.model;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.codahale.metrics.Histogram;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.HistogramDynamicMetric;
+
+/**
+ * Represents a latency metric and defines methods to help with calculation. A latency metric gives
+ * how long a particular stage in the logic took in milliseconds.
+ */
+public abstract class LatencyMetric extends HistogramDynamicMetric implements Metric {
+  protected long _startTime;
+  protected long _endTime;
+  protected String _metricName;
+
+  /**
+   * Instantiates a new Histogram dynamic metric.
+   * @param metricName the metric name
+   * @param metricObject the metric object
+   */
+  public LatencyMetric(String metricName, Histogram metricObject) {
+    super(metricName, metricObject);
+  }
+
+  /**
+   * Starts measuring the latency.
+   */
+  public abstract void startMeasuringLatency();
+
+  /**
+   * Ends measuring the latency.
+   */
+  public abstract void endMeasuringLatency();
+}
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MonitorDomainNames.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/Metric.java
similarity index 55%
copy from helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MonitorDomainNames.java
copy to helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/Metric.java
index 73bf057..ba59b4f 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MonitorDomainNames.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/Metric.java
@@ -1,4 +1,4 @@
-package org.apache.helix.monitoring.mbeans;
+package org.apache.helix.monitoring.metrics.model;
 
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -19,14 +19,36 @@ package org.apache.helix.monitoring.mbeans;
  * under the License.
  */
 
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
+
 /**
- * This enum defines all of domain names used with various Helix monitor mbeans.
+ * Defines a generic metric interface.
  */
-public enum MonitorDomainNames {
-  ClusterStatus,
-  HelixZkClient,
-  HelixThreadPoolExecutor,
-  HelixCallback,
-  RoutingTableProvider,
-  CLMParticipantReport
+public interface Metric {
+
+  /**
+   * Gets the name of the metric.
+   */
+  String getMetricName();
+
+  /**
+   * Resets the internal state of this metric.
+   */
+  void reset();
+
+  /**
+   * Prints the metric along with its name.
+   */
+  String toString();
+
+  /**
+   * Returns the most recently emitted value for the metric at the time of the call.
+   * @return metric value
+   */
+  long getLastEmittedMetricValue();
+
+  /**
+   * Returns the underlying DynamicMetric.
+   */
+  DynamicMetric getDynamicMetric();
 }
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
index 96b6523..df368cb 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
@@ -133,8 +133,7 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
   }
 
   @Test(dependsOnMethods = "testRebalance")
-  public void testPartialRebalance()
-      throws IOException, HelixRebalanceException {
+  public void testPartialRebalance() throws IOException, HelixRebalanceException {
     _metadataStore.clearMetadataStore();
     WagedRebalancer rebalancer =
         new WagedRebalancer(_metadataStore, _algorithm, new DelayedAutoRebalancer());
@@ -184,9 +183,9 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
         String resourceName = csEntry.getKey();
         CurrentState cs = csEntry.getValue();
         for (Map.Entry<String, String> partitionStateEntry : cs.getPartitionStateMap().entrySet()) {
-          currentStateOutput
-              .setCurrentState(resourceName, new Partition(partitionStateEntry.getKey()),
-                  instanceName, partitionStateEntry.getValue());
+          currentStateOutput.setCurrentState(resourceName,
+              new Partition(partitionStateEntry.getKey()), instanceName,
+              partitionStateEntry.getValue());
         }
       }
     }
@@ -220,8 +219,7 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
   }
 
   @Test(dependsOnMethods = "testRebalance", expectedExceptions = HelixRebalanceException.class, expectedExceptionsMessageRegExp = "Input contains invalid resource\\(s\\) that cannot be rebalanced by the WAGED rebalancer. \\[Resource1\\] Failure Type: INVALID_INPUT")
-  public void testNonCompatibleConfiguration()
-      throws IOException, HelixRebalanceException {
+  public void testNonCompatibleConfiguration() throws IOException, HelixRebalanceException {
     _metadataStore.clearMetadataStore();
     WagedRebalancer rebalancer =
         new WagedRebalancer(_metadataStore, _algorithm, new DelayedAutoRebalancer());
@@ -243,8 +241,7 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
 
   // TODO test with invalid capacity configuration which will fail the cluster model constructing.
   @Test(dependsOnMethods = "testRebalance")
-  public void testInvalidClusterStatus()
-      throws IOException {
+  public void testInvalidClusterStatus() throws IOException {
     _metadataStore.clearMetadataStore();
     WagedRebalancer rebalancer =
         new WagedRebalancer(_metadataStore, _algorithm, new DelayedAutoRebalancer());
@@ -291,7 +288,7 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
   }
 
   @Test(dependsOnMethods = "testRebalance")
-  public void testAlgorithmExepction() throws IOException, HelixRebalanceException {
+  public void testAlgorithmException() throws IOException, HelixRebalanceException {
     RebalanceAlgorithm badAlgorithm = Mockito.mock(RebalanceAlgorithm.class);
     when(badAlgorithm.calculate(any())).thenThrow(new HelixRebalanceException("Algorithm fails.",
         HelixRebalanceException.Type.FAILED_TO_CALCULATE));
@@ -430,9 +427,8 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
       Assert.assertTrue(newIdealStates.containsKey(resourceName));
       IdealState is = newIdealStates.get(resourceName);
       ResourceAssignment assignment = expectedResult.get(resourceName);
-      Assert.assertEquals(is.getPartitionSet(), new HashSet<>(
-          assignment.getMappedPartitions().stream().map(partition -> partition.getPartitionName())
-              .collect(Collectors.toSet())));
+      Assert.assertEquals(is.getPartitionSet(), new HashSet<>(assignment.getMappedPartitions()
+          .stream().map(partition -> partition.getPartitionName()).collect(Collectors.toSet())));
       for (String partitionName : is.getPartitionSet()) {
         Assert.assertEquals(is.getInstanceStateMap(partitionName),
             assignment.getReplicaMap(new Partition(partitionName)));
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancerMetrics.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancerMetrics.java
new file mode 100644
index 0000000..dc0c89e
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancerMetrics.java
@@ -0,0 +1,132 @@
+package org.apache.helix.controller.rebalancer.waged;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import javax.management.JMException;
+import org.apache.helix.HelixRebalanceException;
+import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.rebalancer.waged.constraints.MockRebalanceAlgorithm;
+import org.apache.helix.controller.rebalancer.waged.model.AbstractTestClusterModel;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Resource;
+import org.apache.helix.monitoring.metrics.MetricCollector;
+import org.apache.helix.monitoring.metrics.WagedRebalancerMetricCollector;
+import org.mockito.stubbing.Answer;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.*;
+
+public class TestWagedRebalancerMetrics extends AbstractTestClusterModel {
+  private static final String TEST_STRING = "TEST";
+  private MetricCollector _metricCollector;
+  private Set<String> _instances;
+  private MockRebalanceAlgorithm _algorithm;
+  private MockAssignmentMetadataStore _metadataStore;
+
+  @BeforeClass
+  public void initialize() {
+    super.initialize();
+    _instances = new HashSet<>();
+    _instances.add(_testInstanceId);
+    _algorithm = new MockRebalanceAlgorithm();
+
+    // Initialize a mock assignment metadata store
+    _metadataStore = new MockAssignmentMetadataStore();
+  }
+
+  @Test
+  public void testMetricValuePropagation()
+      throws JMException, HelixRebalanceException, IOException {
+    _metadataStore.clearMetadataStore();
+    _metricCollector = new WagedRebalancerMetricCollector(TEST_STRING);
+    WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm, _metricCollector);
+
+    // Generate the input for the rebalancer.
+    ResourceControllerDataProvider clusterData = setupClusterDataCache();
+    Map<String, Resource> resourceMap = clusterData.getIdealStates().entrySet().stream()
+        .collect(Collectors.toMap(entry -> entry.getKey(), entry -> {
+          Resource resource = new Resource(entry.getKey());
+          entry.getValue().getPartitionSet().stream()
+              .forEach(partition -> resource.addPartition(partition));
+          return resource;
+        }));
+    Map<String, IdealState> newIdealStates =
+        rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput());
+
+    // Check that there exists a non-zero value in the metrics
+    Assert.assertTrue(_metricCollector.getMetricMap().values().stream()
+        .anyMatch(metric -> metric.getLastEmittedMetricValue() > 0L));
+  }
+
+  @Override
+  protected ResourceControllerDataProvider setupClusterDataCache() throws IOException {
+    ResourceControllerDataProvider testCache = super.setupClusterDataCache();
+
+    // Set up mock idealstate
+    Map<String, IdealState> isMap = new HashMap<>();
+    for (String resource : _resourceNames) {
+      IdealState is = new IdealState(resource);
+      is.setNumPartitions(_partitionNames.size());
+      is.setRebalanceMode(IdealState.RebalanceMode.FULL_AUTO);
+      is.setStateModelDefRef("MasterSlave");
+      is.setReplicas("100");
+      is.setRebalancerClassName(WagedRebalancer.class.getName());
+      _partitionNames.stream()
+          .forEach(partition -> is.setPreferenceList(partition, Collections.emptyList()));
+      isMap.put(resource, is);
+    }
+    when(testCache.getIdealState(anyString())).thenAnswer(
+        (Answer<IdealState>) invocationOnMock -> isMap.get(invocationOnMock.getArguments()[0]));
+    when(testCache.getIdealStates()).thenReturn(isMap);
+
+    // Set up 2 more instances
+    for (int i = 1; i < 3; i++) {
+      String instanceName = _testInstanceId + i;
+      _instances.add(instanceName);
+      // 1. Set up the default instance information with capacity configuration.
+      InstanceConfig testInstanceConfig = createMockInstanceConfig(instanceName);
+      Map<String, InstanceConfig> instanceConfigMap = testCache.getInstanceConfigMap();
+      instanceConfigMap.put(instanceName, testInstanceConfig);
+      when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap);
+      // 2. Mock the live instance node for the default instance.
+      LiveInstance testLiveInstance = createMockLiveInstance(instanceName);
+      Map<String, LiveInstance> liveInstanceMap = testCache.getLiveInstances();
+      liveInstanceMap.put(instanceName, testLiveInstance);
+      when(testCache.getLiveInstances()).thenReturn(liveInstanceMap);
+      when(testCache.getEnabledInstances()).thenReturn(liveInstanceMap.keySet());
+      when(testCache.getEnabledLiveInstances()).thenReturn(liveInstanceMap.keySet());
+    }
+
+    return testCache;
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
index 91db076..7cb1da2 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
@@ -27,7 +27,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ClusterConfig;
@@ -116,11 +115,11 @@ public abstract class AbstractTestClusterModel {
     // 4. Mock two resources, each with 2 partitions on the default instance.
     // The instance will have the following partitions assigned
     // Resource 1:
-    //          partition 1 - MASTER
-    //          partition 2 - SLAVE
+    // -------------- partition 1 - MASTER
+    // -------------- partition 2 - SLAVE
     // Resource 2:
-    //          partition 3 - MASTER
-    //          partition 4 - SLAVE
+    // -------------- partition 3 - MASTER
+    // -------------- partition 4 - SLAVE
     CurrentState testCurrentStateResource1 = Mockito.mock(CurrentState.class);
     Map<String, String> partitionStateMap1 = new HashMap<>();
     partitionStateMap1.put(_partitionNames.get(0), "MASTER");
@@ -179,9 +178,10 @@ public abstract class AbstractTestClusterModel {
     for (CurrentState cs : currentStatemap.values()) {
       ResourceConfig resourceConfig = dataProvider.getResourceConfig(cs.getResourceName());
       // Construct one AssignableReplica for each partition in the current state.
-      cs.getPartitionStateMap().entrySet().stream().forEach(entry -> assignmentSet.add(
-          new AssignableReplica(dataProvider.getClusterConfig(), resourceConfig, entry.getKey(),
-              entry.getValue(), entry.getValue().equals("MASTER") ? 1 : 2)));
+      cs.getPartitionStateMap().entrySet().stream()
+          .forEach(entry -> assignmentSet
+              .add(new AssignableReplica(dataProvider.getClusterConfig(), resourceConfig,
+                  entry.getKey(), entry.getValue(), entry.getValue().equals("MASTER") ? 1 : 2)));
     }
     return assignmentSet;
   }