You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2019/10/28 22:33:21 UTC

[helix] 43/50: Implement monitoring mbeans for the WAGED rebalancer. (#525)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch wagedRebalancer
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 9196bdd503d965d7a702573c112a440ca5606102
Author: Huizhi L <ih...@gmail.com>
AuthorDate: Fri Oct 25 10:50:51 2019 -0700

    Implement monitoring mbeans for the WAGED rebalancer. (#525)
    
    Change list:
    1. GlobalBaselineCalcCounter: Counter of the global rebalance.
    2. PartialRebalanceCounter: Counter of the partial rebalance done.
    3. BaselineDivergenceGauge: Gauge of the difference at replica level between the Baseline and the Best Possible assignments.
---
 .../rebalancer/util/ResourceUsageCalculator.java   | 112 ++++++++++++++++++++-
 .../rebalancer/waged/WagedRebalancer.java          |  21 ++++
 .../metrics/WagedRebalancerMetricCollector.java    |  25 ++++-
 .../implementation/BaselineDivergenceGauge.java    |  68 +++++++++++++
 .../RebalanceCounter.java}                         |  38 +++----
 .../implementation/RebalanceLatencyGauge.java      |   2 +-
 .../monitoring/metrics/model/CountMetric.java      |   4 +-
 .../monitoring/metrics/model/LatencyMetric.java    |   2 +-
 .../helix/monitoring/metrics/model/Metric.java     |   5 +-
 .../model/{CountMetric.java => RatioMetric.java}   |  41 +++-----
 .../util/TestResourceUsageCalculator.java          |  87 ++++++++++++++++
 .../rebalancer/waged/TestWagedRebalancer.java      |   1 +
 .../waged/TestWagedRebalancerMetrics.java          |  59 ++++++++++-
 ...eUsageCalculator.MeasureBaselineDivergence.json |  37 +++++++
 14 files changed, 442 insertions(+), 60 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ResourceUsageCalculator.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ResourceUsageCalculator.java
index b47a2ed..becf72a 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ResourceUsageCalculator.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ResourceUsageCalculator.java
@@ -1,11 +1,32 @@
 package org.apache.helix.controller.rebalancer.util;
 
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.helix.api.rebalancer.constraint.dataprovider.PartitionWeightProvider;
 import org.apache.helix.controller.common.ResourcesStateMap;
 import org.apache.helix.model.Partition;
+import org.apache.helix.model.ResourceAssignment;
 
-import java.util.HashMap;
-import java.util.Map;
 
 public class ResourceUsageCalculator {
   /**
@@ -33,4 +54,91 @@ public class ResourceUsageCalculator {
     }
     return newParticipantUsage;
   }
+
+  /**
+   * Measure baseline divergence between baseline assignment and best possible assignment at
+   * replica level. Example as below:
+   * baseline =
+   * {
+   *    resource1={
+   *       partition1={
+   *          instance1=master,
+   *          instance2=slave
+   *       },
+   *       partition2={
+   *          instance2=slave
+   *       }
+   *    }
+   * }
+   * bestPossible =
+   * {
+   *    resource1={
+   *       partition1={
+   *          instance1=master,  <--- matched
+   *          instance3=slave    <--- doesn't match
+   *       },
+   *       partition2={
+   *          instance3=master   <--- doesn't match
+   *       }
+   *    }
+   * }
+   * baseline divergence = (matched: 1) / (doesn't match: 3) = 1/3 ~= 0.333
+   * @param baseline baseline assignment
+   * @param bestPossibleAssignment best possible assignment
+   * @return double value range at [0.0, 1.0]
+   */
+  public static double measureBaselineDivergence(Map<String, ResourceAssignment> baseline,
+      Map<String, ResourceAssignment> bestPossibleAssignment) {
+    int numMatchedReplicas = 0;
+    int numTotalBestPossibleReplicas = 0;
+
+    // 1. Check resource assignment names.
+    for (Map.Entry<String, ResourceAssignment> resourceEntry : bestPossibleAssignment.entrySet()) {
+      String resourceKey = resourceEntry.getKey();
+      if (!baseline.containsKey(resourceKey)) {
+        continue;
+      }
+
+      // Resource assignment names are matched.
+      // 2. check partitions.
+      Map<String, Map<String, String>> bestPossiblePartitions =
+          resourceEntry.getValue().getRecord().getMapFields();
+      Map<String, Map<String, String>> baselinePartitions =
+          baseline.get(resourceKey).getRecord().getMapFields();
+
+      for (Map.Entry<String, Map<String, String>> partitionEntry
+          : bestPossiblePartitions.entrySet()) {
+        String partitionName = partitionEntry.getKey();
+        if (!baselinePartitions.containsKey(partitionName)) {
+          continue;
+        }
+
+        // Partition names are matched.
+        // 3. Check replicas.
+        Map<String, String> bestPossibleReplicas = partitionEntry.getValue();
+        Map<String, String> baselineReplicas = baselinePartitions.get(partitionName);
+
+        for (Map.Entry<String, String> replicaEntry : bestPossibleReplicas.entrySet()) {
+          String replicaName = replicaEntry.getKey();
+          if (!baselineReplicas.containsKey(replicaName)) {
+            continue;
+          }
+
+          // Replica names are matched.
+          // 4. Check replica values.
+          String bestPossibleReplica = replicaEntry.getValue();
+          String baselineReplica = baselineReplicas.get(replicaName);
+          if (bestPossibleReplica.equals(baselineReplica)) {
+            numMatchedReplicas++;
+          }
+        }
+
+        // Count total best possible replicas.
+        numTotalBestPossibleReplicas += bestPossibleReplicas.size();
+      }
+    }
+
+    return numTotalBestPossibleReplicas == 0 ? 0.0d
+        : (double) numMatchedReplicas / (double) numTotalBestPossibleReplicas;
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index 9050b59..5b2573f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -50,11 +50,13 @@ import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.monitoring.metrics.MetricCollector;
 import org.apache.helix.monitoring.metrics.WagedRebalancerMetricCollector;
+import org.apache.helix.monitoring.metrics.implementation.BaselineDivergenceGauge;
 import org.apache.helix.monitoring.metrics.model.CountMetric;
 import org.apache.helix.monitoring.metrics.model.LatencyMetric;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /**
  * Weight-Aware Globally-Even Distribute Rebalancer.
  * @see <a
@@ -330,6 +332,11 @@ public class WagedRebalancer {
       Map<HelixConstants.ChangeType, Set<String>> clusterChanges, Map<String, Resource> resourceMap,
       final CurrentStateOutput currentStateOutput) throws HelixRebalanceException {
     LOG.info("Start calculating the new baseline.");
+    CountMetric globalBaselineCalcCounter = _metricCollector.getMetric(
+        WagedRebalancerMetricCollector.WagedRebalancerMetricNames.GlobalBaselineCalcCounter.name(),
+        CountMetric.class);
+    globalBaselineCalcCounter.increaseCount(1L);
+
     LatencyMetric globalBaselineCalcLatency = _metricCollector.getMetric(
         WagedRebalancerMetricCollector.WagedRebalancerMetricNames.GlobalBaselineCalcLatencyGauge
             .name(),
@@ -372,6 +379,11 @@ public class WagedRebalancer {
       Set<String> activeNodes, final CurrentStateOutput currentStateOutput)
       throws HelixRebalanceException {
     LOG.info("Start calculating the new best possible assignment.");
+    CountMetric partialRebalanceCounter = _metricCollector.getMetric(
+        WagedRebalancerMetricCollector.WagedRebalancerMetricNames.PartialRebalanceCounter.name(),
+        CountMetric.class);
+    partialRebalanceCounter.increaseCount(1L);
+
     LatencyMetric partialRebalanceLatency = _metricCollector.getMetric(
         WagedRebalancerMetricCollector.WagedRebalancerMetricNames.PartialRebalanceLatencyGauge
             .name(),
@@ -390,6 +402,14 @@ public class WagedRebalancer {
     Map<String, ResourceAssignment> newAssignment = calculateAssignment(clusterData, clusterChanges,
         resourceMap, activeNodes, currentBaseline, currentBestPossibleAssignment);
 
+    // Asynchronously report baseline divergence metric before persisting to metadata store,
+    // just in case if persisting fails, we still have the metric.
+    BaselineDivergenceGauge baselineDivergenceGauge = _metricCollector.getMetric(
+        WagedRebalancerMetricCollector.WagedRebalancerMetricNames.BaselineDivergenceGauge.name(),
+        BaselineDivergenceGauge.class);
+    baselineDivergenceGauge.asyncMeasureAndUpdateValue(clusterData.getAsyncTasksThreadPool(),
+        currentBaseline, newAssignment);
+
     if (_assignmentMetadataStore != null) {
       try {
         LatencyMetric writeLatency = _metricCollector.getMetric(
@@ -444,6 +464,7 @@ public class WagedRebalancer {
         optimalAssignment.getOptimalResourceAssignment();
 
     LOG.info("Finish calculating an assignment. Took: {} ms.", System.currentTimeMillis() - startTime);
+
     return newAssignment;
   }
 
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java
index e9494ff..3dd16ad 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java
@@ -22,10 +22,14 @@ package org.apache.helix.monitoring.metrics;
 import javax.management.JMException;
 
 import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
+import org.apache.helix.monitoring.metrics.implementation.BaselineDivergenceGauge;
+import org.apache.helix.monitoring.metrics.implementation.RebalanceCounter;
 import org.apache.helix.monitoring.metrics.implementation.RebalanceFailureCount;
 import org.apache.helix.monitoring.metrics.implementation.RebalanceLatencyGauge;
 import org.apache.helix.monitoring.metrics.model.CountMetric;
 import org.apache.helix.monitoring.metrics.model.LatencyMetric;
+import org.apache.helix.monitoring.metrics.model.RatioMetric;
+
 
 public class WagedRebalancerMetricCollector extends MetricCollector {
   private static final String WAGED_REBALANCER_ENTITY_NAME = "WagedRebalancer";
@@ -43,10 +47,20 @@ public class WagedRebalancerMetricCollector extends MetricCollector {
     StateReadLatencyGauge,
     StateWriteLatencyGauge,
 
+    /*
+     * Gauge of the difference (state and partition allocation) between the baseline and the best
+     * possible assignment.
+     */
+    BaselineDivergenceGauge,
+
     // Count of any rebalance compute failure.
     // Note the rebalancer may still be able to return the last known-good assignment on a rebalance
     // compute failure. And this fallback logic won't impact this counting.
-    RebalanceFailureCounter
+    RebalanceFailureCounter,
+
+    // Waged rebalance counters.
+    GlobalBaselineCalcCounter,
+    PartialRebalanceCounter
   }
 
   public WagedRebalancerMetricCollector(String clusterName) throws JMException {
@@ -82,14 +96,23 @@ public class WagedRebalancerMetricCollector extends MetricCollector {
     LatencyMetric stateWriteLatencyGauge =
         new RebalanceLatencyGauge(WagedRebalancerMetricNames.StateWriteLatencyGauge.name(),
             getResetIntervalInMs());
+    RatioMetric baselineDivergenceGauge =
+        new BaselineDivergenceGauge(WagedRebalancerMetricNames.BaselineDivergenceGauge.name());
     CountMetric calcFailureCount =
         new RebalanceFailureCount(WagedRebalancerMetricNames.RebalanceFailureCounter.name());
+    CountMetric globalBaselineCalcCounter =
+        new RebalanceCounter(WagedRebalancerMetricNames.GlobalBaselineCalcCounter.name());
+    CountMetric partialRebalanceCounter =
+        new RebalanceCounter(WagedRebalancerMetricNames.PartialRebalanceCounter.name());
 
     // Add metrics to WagedRebalancerMetricCollector
     addMetric(globalBaselineCalcLatencyGauge);
     addMetric(partialRebalanceLatencyGauge);
     addMetric(stateReadLatencyGauge);
     addMetric(stateWriteLatencyGauge);
+    addMetric(baselineDivergenceGauge);
     addMetric(calcFailureCount);
+    addMetric(globalBaselineCalcCounter);
+    addMetric(partialRebalanceCounter);
   }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/BaselineDivergenceGauge.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/BaselineDivergenceGauge.java
new file mode 100644
index 0000000..8e6d49b
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/BaselineDivergenceGauge.java
@@ -0,0 +1,68 @@
+package org.apache.helix.monitoring.metrics.implementation;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.rebalancer.util.ResourceUsageCalculator;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.monitoring.metrics.model.RatioMetric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Gauge of the difference (state and partition allocation) between the baseline and the best
+ * possible assignment. Its value range is [0.0, 1.0].
+ */
+public class BaselineDivergenceGauge extends RatioMetric {
+  private static final Logger LOG = LoggerFactory.getLogger(BaselineDivergenceGauge.class);
+
+  /**
+   * Instantiates a new Simple dynamic metric.
+   * @param metricName   the metric name
+   */
+  public BaselineDivergenceGauge(String metricName) {
+    super(metricName, 0.0d);
+  }
+
+  /**
+   * Asynchronously measure and update metric value.
+   * @param threadPool an executor service to asynchronously run the task
+   * @param baseline baseline assignment
+   * @param bestPossibleAssignment best possible assignment
+   */
+  public void asyncMeasureAndUpdateValue(ExecutorService threadPool,
+      Map<String, ResourceAssignment> baseline,
+      Map<String, ResourceAssignment> bestPossibleAssignment) {
+    AbstractBaseStage.asyncExecute(threadPool, () -> {
+      try {
+        double baselineDivergence =
+            ResourceUsageCalculator.measureBaselineDivergence(baseline, bestPossibleAssignment);
+        updateValue(baselineDivergence);
+      } catch (Exception e) {
+        LOG.error("Failed to report BaselineDivergenceGauge metric.", e);
+      }
+      return null;
+    });
+  }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/Metric.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceCounter.java
similarity index 55%
copy from helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/Metric.java
copy to helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceCounter.java
index 22378dc..fc370a8 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/Metric.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceCounter.java
@@ -1,4 +1,4 @@
-package org.apache.helix.monitoring.metrics.model;
+package org.apache.helix.monitoring.metrics.implementation;
 
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -9,7 +9,7 @@ package org.apache.helix.monitoring.metrics.model;
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *   http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -19,31 +19,23 @@ package org.apache.helix.monitoring.metrics.model;
  * under the License.
  */
 
-import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
+import org.apache.helix.monitoring.metrics.model.CountMetric;
+
 
 /**
- * Defines a generic metric interface.
+ * To report counter type metrics related to rebalance. This monitor monotonically increases values.
  */
-public interface Metric {
-
-  /**
-   * Gets the name of the metric.
-   */
-  String getMetricName();
-
+public class RebalanceCounter extends CountMetric {
   /**
-   * Prints the metric along with its name.
+   * Instantiates a new count metric.
+   * @param metricName the metric name
    */
-  String toString();
+  public RebalanceCounter(String metricName) {
+    super(metricName, 0L);
+  }
 
-  /**
-   * Returns the most recently emitted value for the metric at the time of the call.
-   * @return metric value
-   */
-  long getLastEmittedMetricValue();
-
-  /**
-   * Returns the underlying DynamicMetric.
-   */
-  DynamicMetric getDynamicMetric();
+  @Override
+  public void increaseCount(long count) {
+    updateValue(getValue() + count);
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceLatencyGauge.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceLatencyGauge.java
index b6e58b4..365f0dc 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceLatencyGauge.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceLatencyGauge.java
@@ -76,7 +76,7 @@ public class RebalanceLatencyGauge extends LatencyMetric {
    * @return
    */
   @Override
-  public long getLastEmittedMetricValue() {
+  public Long getLastEmittedMetricValue() {
     return _lastEmittedMetricValue;
   }
 
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/CountMetric.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/CountMetric.java
index 424ac9e..81aa001 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/CountMetric.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/CountMetric.java
@@ -26,7 +26,7 @@ import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric;
  * Represents a count metric and defines methods to help with calculation. A count metric gives a
  * gauge value of a certain property.
  */
-public abstract class CountMetric extends SimpleDynamicMetric<Long> implements Metric {
+public abstract class CountMetric extends SimpleDynamicMetric<Long> implements Metric<Long> {
 
   /**
    * Instantiates a new count metric.
@@ -56,7 +56,7 @@ public abstract class CountMetric extends SimpleDynamicMetric<Long> implements M
   }
 
   @Override
-  public long getLastEmittedMetricValue() {
+  public Long getLastEmittedMetricValue() {
     return getValue();
   }
 
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/LatencyMetric.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/LatencyMetric.java
index d60f245..de63d0f 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/LatencyMetric.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/LatencyMetric.java
@@ -27,7 +27,7 @@ import org.apache.helix.monitoring.mbeans.dynamicMBeans.HistogramDynamicMetric;
  * Represents a latency metric and defines methods to help with calculation. A latency metric gives
  * how long a particular stage in the logic took in milliseconds.
  */
-public abstract class LatencyMetric extends HistogramDynamicMetric implements Metric {
+public abstract class LatencyMetric extends HistogramDynamicMetric implements Metric<Long> {
   protected long _startTime;
   protected long _endTime;
   protected String _metricName;
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/Metric.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/Metric.java
index 22378dc..be7ea80 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/Metric.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/Metric.java
@@ -23,8 +23,9 @@ import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
 
 /**
  * Defines a generic metric interface.
+ * @param <T> type of input value for the metric
  */
-public interface Metric {
+public interface Metric<T> {
 
   /**
    * Gets the name of the metric.
@@ -40,7 +41,7 @@ public interface Metric {
    * Returns the most recently emitted value for the metric at the time of the call.
    * @return metric value
    */
-  long getLastEmittedMetricValue();
+  T getLastEmittedMetricValue();
 
   /**
    * Returns the underlying DynamicMetric.
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/CountMetric.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/RatioMetric.java
similarity index 62%
copy from helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/CountMetric.java
copy to helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/RatioMetric.java
index 424ac9e..d321e51 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/CountMetric.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/RatioMetric.java
@@ -9,7 +9,7 @@ package org.apache.helix.monitoring.metrics.model;
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *   http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -22,46 +22,37 @@ package org.apache.helix.monitoring.metrics.model;
 import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
 import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric;
 
+
 /**
- * Represents a count metric and defines methods to help with calculation. A count metric gives a
- * gauge value of a certain property.
+ * A gauge which defines the ratio of one value to another.
  */
-public abstract class CountMetric extends SimpleDynamicMetric<Long> implements Metric {
-
+public abstract class RatioMetric extends SimpleDynamicMetric<Double> implements Metric<Double> {
   /**
-   * Instantiates a new count metric.
-   *
-   * @param metricName the metric name
-   * @param initCount the initial count
+   * Instantiates a new Simple dynamic metric.
+   *  @param metricName the metric name
+   * @param metricObject the metric object
    */
-  public CountMetric(String metricName, long initCount) {
-    super(metricName, initCount);
+  public RatioMetric(String metricName, double metricObject) {
+    super(metricName, metricObject);
   }
 
-  /**
-   * Increment the metric by the input count.
-   *
-   * @param count
-   */
-  public abstract void increaseCount(long count);
-
   @Override
-  public String getMetricName() {
-    return _metricName;
+  public DynamicMetric getDynamicMetric() {
+    return this;
   }
 
   @Override
-  public String toString() {
-    return String.format("Metric %s's count is %d", getMetricName(), getValue());
+  public String getMetricName() {
+    return _metricName;
   }
 
   @Override
-  public long getLastEmittedMetricValue() {
+  public Double getLastEmittedMetricValue() {
     return getValue();
   }
 
   @Override
-  public DynamicMetric getDynamicMetric() {
-    return this;
+  public String toString() {
+    return String.format("Metric name: %s, metric value: %f", getMetricName(), getValue());
   }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/util/TestResourceUsageCalculator.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/util/TestResourceUsageCalculator.java
new file mode 100644
index 0000000..76b2318
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/util/TestResourceUsageCalculator.java
@@ -0,0 +1,87 @@
+package org.apache.helix.controller.rebalancer.util;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.util.TestInputLoader;
+import org.testng.Assert;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+
+public class TestResourceUsageCalculator {
+  @Test(dataProvider = "TestMeasureBaselineDivergenceInput")
+  public void testMeasureBaselineDivergence(Map<String, Map<String, Map<String, String>>> baseline,
+      Map<String, Map<String, Map<String, String>>> someMatchBestPossible,
+      Map<String, Map<String, Map<String, String>>> noMatchBestPossible) {
+    Map<String, ResourceAssignment> baselineAssignment = buildResourceAssignment(baseline);
+    Map<String, ResourceAssignment> someMatchBestPossibleAssignment =
+        buildResourceAssignment(someMatchBestPossible);
+    Map<String, ResourceAssignment> noMatchBestPossibleAssignment =
+        buildResourceAssignment(noMatchBestPossible);
+
+    // Empty best possible assignment.
+    Assert.assertEquals(ResourceUsageCalculator
+        .measureBaselineDivergence(baselineAssignment, Collections.emptyMap()), 0.0d);
+    // Empty baseline assignment.
+    Assert.assertEquals(ResourceUsageCalculator
+        .measureBaselineDivergence(Collections.emptyMap(), noMatchBestPossibleAssignment), 0.0d);
+
+    Assert.assertEquals(ResourceUsageCalculator
+        .measureBaselineDivergence(baselineAssignment, noMatchBestPossibleAssignment), 0.0d);
+    Assert.assertEquals(ResourceUsageCalculator
+            .measureBaselineDivergence(baselineAssignment, someMatchBestPossibleAssignment),
+        (double) 1 / (double) 3);
+    Assert.assertEquals(
+        ResourceUsageCalculator.measureBaselineDivergence(baselineAssignment, baselineAssignment),
+        1.0d);
+  }
+
+  private Map<String, ResourceAssignment> buildResourceAssignment(
+      Map<String, Map<String, Map<String, String>>> resourceMap) {
+    Map<String, ResourceAssignment> assignment = new HashMap<>();
+    for (Map.Entry<String, Map<String, Map<String, String>>> resourceEntry
+        : resourceMap.entrySet()) {
+      ResourceAssignment resource = new ResourceAssignment(resourceEntry.getKey());
+      Map<String, Map<String, String>> partitionMap = resourceEntry.getValue();
+      for (Map.Entry<String, Map<String, String>> partitionEntry : partitionMap.entrySet()) {
+        resource.addReplicaMap(new Partition(partitionEntry.getKey()), partitionEntry.getValue());
+      }
+
+      assignment.put(resourceEntry.getKey(), resource);
+    }
+
+    return assignment;
+  }
+
+  @DataProvider(name = "TestMeasureBaselineDivergenceInput")
+  public Object[][] loadTestMeasureBaselineDivergenceInput() {
+    final String[] params =
+        new String[]{"baseline", "someMatchBestPossible", "noMatchBestPossible"};
+    return TestInputLoader
+        .loadTestInputs("TestResourceUsageCalculator.MeasureBaselineDivergence.json", params);
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
index dd0cc8c..9b6ccbe 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
@@ -107,6 +107,7 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
       when(testCache.getLiveInstances()).thenReturn(liveInstanceMap);
       when(testCache.getEnabledInstances()).thenReturn(liveInstanceMap.keySet());
       when(testCache.getEnabledLiveInstances()).thenReturn(liveInstanceMap.keySet());
+      when(testCache.getAllInstances()).thenReturn(_instances);
     }
 
     return testCache;
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancerMetrics.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancerMetrics.java
index dc0c89e..30700ed 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancerMetrics.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancerMetrics.java
@@ -25,9 +25,13 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Executors;
 import java.util.stream.Collectors;
 import javax.management.JMException;
+
+import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixRebalanceException;
+import org.apache.helix.TestHelper;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.controller.rebalancer.waged.constraints.MockRebalanceAlgorithm;
 import org.apache.helix.controller.rebalancer.waged.model.AbstractTestClusterModel;
@@ -38,13 +42,15 @@ import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Resource;
 import org.apache.helix.monitoring.metrics.MetricCollector;
 import org.apache.helix.monitoring.metrics.WagedRebalancerMetricCollector;
+import org.apache.helix.monitoring.metrics.model.CountMetric;
+import org.apache.helix.monitoring.metrics.model.RatioMetric;
 import org.mockito.stubbing.Answer;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import static org.mockito.Matchers.*;
-import static org.mockito.Mockito.*;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.when;
 
 public class TestWagedRebalancerMetrics extends AbstractTestClusterModel {
   private static final String TEST_STRING = "TEST";
@@ -85,7 +91,52 @@ public class TestWagedRebalancerMetrics extends AbstractTestClusterModel {
 
     // Check that there exists a non-zero value in the metrics
     Assert.assertTrue(_metricCollector.getMetricMap().values().stream()
-        .anyMatch(metric -> metric.getLastEmittedMetricValue() > 0L));
+        .anyMatch(metric -> (long) metric.getLastEmittedMetricValue() > 0L));
+  }
+
+  @Test
+  public void testWagedRebalanceMetrics()
+      throws Exception {
+    _metadataStore.clearMetadataStore();
+    MetricCollector metricCollector = new WagedRebalancerMetricCollector(TEST_STRING);
+    WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm, metricCollector);
+    // Generate the input for the rebalancer.
+    ResourceControllerDataProvider clusterData = setupClusterDataCache();
+    Map<String, Resource> resourceMap = clusterData.getIdealStates().entrySet().stream()
+        .collect(Collectors.toMap(entry -> entry.getKey(), entry -> {
+          Resource resource = new Resource(entry.getKey());
+          entry.getValue().getPartitionSet().stream()
+              .forEach(partition -> resource.addPartition(partition));
+          return resource;
+        }));
+
+    Assert.assertEquals((long) metricCollector.getMetric(
+        WagedRebalancerMetricCollector.WagedRebalancerMetricNames.GlobalBaselineCalcCounter.name(),
+        CountMetric.class).getLastEmittedMetricValue(), 0L);
+    Assert.assertEquals((long) metricCollector.getMetric(
+        WagedRebalancerMetricCollector.WagedRebalancerMetricNames.PartialRebalanceCounter.name(),
+        CountMetric.class).getLastEmittedMetricValue(), 0L);
+    Assert.assertEquals((double) metricCollector.getMetric(
+        WagedRebalancerMetricCollector.WagedRebalancerMetricNames.BaselineDivergenceGauge.name(),
+        RatioMetric.class).getLastEmittedMetricValue(), 0.0d);
+
+    // Cluster config change will trigger baseline recalculation and partial rebalance.
+    when(clusterData.getRefreshedChangeTypes())
+        .thenReturn(Collections.singleton(HelixConstants.ChangeType.CLUSTER_CONFIG));
+
+    rebalancer.computeBestPossibleStates(clusterData, resourceMap, new CurrentStateOutput());
+
+    Assert.assertEquals((long) metricCollector.getMetric(
+        WagedRebalancerMetricCollector.WagedRebalancerMetricNames.GlobalBaselineCalcCounter.name(),
+        CountMetric.class).getLastEmittedMetricValue(), 1L);
+    Assert.assertEquals((long) metricCollector.getMetric(
+        WagedRebalancerMetricCollector.WagedRebalancerMetricNames.PartialRebalanceCounter.name(),
+        CountMetric.class).getLastEmittedMetricValue(), 1L);
+
+    // Wait for asyncReportBaselineDivergenceGauge to complete and verify.
+    Assert.assertTrue(TestHelper.verify(() -> (double) metricCollector.getMetric(
+        WagedRebalancerMetricCollector.WagedRebalancerMetricNames.BaselineDivergenceGauge.name(),
+        RatioMetric.class).getLastEmittedMetricValue() == 1.0d, TestHelper.WAIT_DURATION));
   }
 
   @Override
@@ -108,6 +159,7 @@ public class TestWagedRebalancerMetrics extends AbstractTestClusterModel {
     when(testCache.getIdealState(anyString())).thenAnswer(
         (Answer<IdealState>) invocationOnMock -> isMap.get(invocationOnMock.getArguments()[0]));
     when(testCache.getIdealStates()).thenReturn(isMap);
+    when(testCache.getAsyncTasksThreadPool()).thenReturn(Executors.newSingleThreadExecutor());
 
     // Set up 2 more instances
     for (int i = 1; i < 3; i++) {
@@ -125,6 +177,7 @@ public class TestWagedRebalancerMetrics extends AbstractTestClusterModel {
       when(testCache.getLiveInstances()).thenReturn(liveInstanceMap);
       when(testCache.getEnabledInstances()).thenReturn(liveInstanceMap.keySet());
       when(testCache.getEnabledLiveInstances()).thenReturn(liveInstanceMap.keySet());
+      when(testCache.getAllInstances()).thenReturn(_instances);
     }
 
     return testCache;
diff --git a/helix-core/src/test/resources/TestResourceUsageCalculator.MeasureBaselineDivergence.json b/helix-core/src/test/resources/TestResourceUsageCalculator.MeasureBaselineDivergence.json
new file mode 100644
index 0000000..dab432e
--- /dev/null
+++ b/helix-core/src/test/resources/TestResourceUsageCalculator.MeasureBaselineDivergence.json
@@ -0,0 +1,37 @@
+[
+  {
+    "baseline": {
+      "resource1": {
+        "partition1": {
+          "instance1": "MASTER",
+          "instance2": "SLAVE"
+        },
+        "partition2": {
+          "instance2": "SLAVE"
+        }
+      }
+    },
+    "someMatchBestPossible": {
+      "resource1": {
+        "partition1": {
+          "instance1": "MASTER",
+          "instance3": "SLAVE"
+        },
+        "partition2": {
+          "instance3": "MASTER"
+        }
+      }
+    },
+    "noMatchBestPossible": {
+      "resource1": {
+        "partition1": {
+          "instance2": "MASTER",
+          "instance3": "SLAVE"
+        },
+        "partition2": {
+          "instance3": "MASTER"
+        }
+      }
+    }
+  }
+]