You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2020/01/22 18:58:17 UTC

[helix] branch wagedRebalancer updated: Add resource partition weight gauge (#686)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch wagedRebalancer
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/wagedRebalancer by this push:
     new 61c1fe8  Add resource partition weight gauge (#686)
61c1fe8 is described below

commit 61c1fe8719eda7985cdb629ad3d8c501171ed3d1
Author: Huizhi L <ih...@gmail.com>
AuthorDate: Wed Jan 22 10:58:07 2020 -0800

    Add resource partition weight gauge (#686)
    
    We would like to monitor the usage of each capacity for the resource partitions: gauge of the average partition weight for each CAPACITY key.
    
    Change list:
    - Add partition weight gauge metric to resource monitor.
    - Add two unit tests to cover new code.
---
 .../rebalancer/util/ResourceUsageCalculator.java   |  83 +++++
 .../stages/CurrentStateComputationStage.java       |  28 +-
 .../monitoring/mbeans/ClusterStatusMonitor.java    |  19 ++
 .../helix/monitoring/mbeans/ResourceMonitor.java   |  99 ++++--
 .../util/TestResourceUsageCalculator.java          |  18 +-
 .../monitoring/mbeans/TestResourceMonitor.java     | 371 +++++++++++++--------
 6 files changed, 446 insertions(+), 172 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ResourceUsageCalculator.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ResourceUsageCalculator.java
index ad58eae..e7a1b94 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ResourceUsageCalculator.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ResourceUsageCalculator.java
@@ -142,4 +142,87 @@ public class ResourceUsageCalculator {
     return numTotalBestPossibleReplicas == 0 ? 1.0d
         : (1.0d - (double) numMatchedReplicas / (double) numTotalBestPossibleReplicas);
   }
+
+  /**
+   * Calculates average partition weight per capacity key for a resource config. Example as below:
+   * Input =
+   * {
+   *   "partition1": {
+   *     "capacity1": 20,
+   *     "capacity2": 40
+   *   },
+   *   "partition2": {
+   *     "capacity1": 30,
+   *     "capacity2": 50
+   *   },
+   *   "partition3": {
+   *     "capacity1": 16,
+   *     "capacity2": 30
+   *   }
+   * }
+   *
+   * Total weight for key "capacity1" = 20 + 30 + 16 = 66;
+   * Total weight for key "capacity2" = 40 + 50 + 30 = 120;
+   * Total partitions = 3;
+   * Average partition weight for "capacity1" = 66 / 3 = 22;
+   * Average partition weight for "capacity2" = 120 / 3 = 40;
+   *
+   * Output =
+   * {
+   *   "capacity1": 22,
+   *   "capacity2": 40
+   * }
+   *
+   * @param partitionCapacityMap A map of partition capacity:
+   *        <PartitionName or DEFAULT_PARTITION_KEY, <Capacity Key, Capacity Number>>
+   * @return A map of partition weight: capacity key -> average partition weight
+   */
+  public static Map<String, Integer> calculateAveragePartitionWeight(
+      Map<String, Map<String, Integer>> partitionCapacityMap) {
+    // capacity key -> [number of partitions, total weight per capacity key]
+    Map<String, PartitionWeightCounterEntry> countPartitionWeightMap = new HashMap<>();
+
+    // Aggregates partition weight for each capacity key.
+    partitionCapacityMap.values().forEach(partitionCapacityEntry ->
+        partitionCapacityEntry.forEach((capacityKey, weight) -> countPartitionWeightMap
+            .computeIfAbsent(capacityKey, counterEntry -> new PartitionWeightCounterEntry())
+            .increase(1, weight)));
+
+    // capacity key -> average partition weight
+    Map<String, Integer> averagePartitionWeightMap = new HashMap<>();
+
+    // Calculate average partition weight for each capacity key.
+    // Per capacity key level:
+    // average partition weight = (total partition weight) / (number of partitions)
+    for (Map.Entry<String, PartitionWeightCounterEntry> entry
+        : countPartitionWeightMap.entrySet()) {
+      String capacityKey = entry.getKey();
+      PartitionWeightCounterEntry weightEntry = entry.getValue();
+      int averageWeight = (int) (weightEntry.getWeight() / weightEntry.getPartitions());
+      averagePartitionWeightMap.put(capacityKey, averageWeight);
+    }
+
+    return averagePartitionWeightMap;
+  }
+
+  /*
+   * Represents total number of partitions and total partition weight for a capacity key.
+   */
+  private static class PartitionWeightCounterEntry {
+    private int partitions;
+    private long weight;
+
+    private int getPartitions() {
+      return partitions;
+    }
+
+    private long getWeight() {
+      return weight;
+    }
+
+    private void increase(int partitions, int weight) {
+      this.partitions += partitions;
+      this.weight += weight;
+    }
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index 727804b..62fda33 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;
 
 import org.apache.helix.controller.LogUtil;
@@ -30,6 +31,7 @@ import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.controller.rebalancer.util.ResourceUsageCalculator;
 import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
 import org.apache.helix.controller.rebalancer.waged.model.ClusterModel;
 import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider;
@@ -41,6 +43,7 @@ import org.apache.helix.model.Message.MessageType;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
 import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -89,8 +92,11 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
     final ClusterStatusMonitor clusterStatusMonitor =
         event.getAttribute(AttributeName.clusterStatusMonitor.name());
     if (clusterStatusMonitor != null && cache instanceof ResourceControllerDataProvider) {
-      reportInstanceCapacityMetrics(clusterStatusMonitor, (ResourceControllerDataProvider) cache,
-          resourceToRebalance, currentStateOutput);
+      final ResourceControllerDataProvider dataProvider = (ResourceControllerDataProvider) cache;
+      reportInstanceCapacityMetrics(clusterStatusMonitor, dataProvider, resourceToRebalance,
+          currentStateOutput);
+      reportResourcePartitionCapacityMetrics(dataProvider.getAsyncTasksThreadPool(),
+          clusterStatusMonitor, dataProvider.getResourceConfigMap().values());
     }
   }
 
@@ -271,4 +277,22 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
       return null;
     });
   }
+
+  private void reportResourcePartitionCapacityMetrics(ExecutorService executorService,
+      ClusterStatusMonitor clusterStatusMonitor, Collection<ResourceConfig> resourceConfigs) {
+    asyncExecute(executorService, () -> {
+      try {
+        for (ResourceConfig config : resourceConfigs) {
+          Map<String, Integer> averageWeight = ResourceUsageCalculator
+              .calculateAveragePartitionWeight(config.getPartitionCapacityMap());
+          clusterStatusMonitor.updatePartitionWeight(config.getResourceName(), averageWeight);
+        }
+      } catch (Exception ex) {
+        LOG.error("Failed to report resource partition capacity metrics. Exception message: {}",
+            ex.getMessage());
+      }
+
+      return null;
+    });
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index 96f85bf..fc0b19d 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -497,6 +497,25 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
     }
   }
 
+  /**
+   * Updates metrics of average partition weight per capacity key for a resource. If a resource
+   * monitor is not yet existed for this resource, a new resource monitor will be created for this
+   * resource.
+   *
+   * @param resourceName The resource name for which partition weight is updated
+   * @param averageWeightMap A map of average partition weight of each capacity key:
+   *                         capacity key -> average partition weight
+   */
+  public void updatePartitionWeight(String resourceName, Map<String, Integer> averageWeightMap) {
+    ResourceMonitor monitor = getOrCreateResourceMonitor(resourceName);
+    if (monitor == null) {
+      LOG.warn("Failed to update partition weight metric for resource: {} because resource monitor"
+          + " is not created.", resourceName);
+      return;
+    }
+    monitor.updatePartitionWeightStats(averageWeightMap);
+  }
+
   public void updateMissingTopStateDurationStats(String resourceName, long totalDuration,
       long helixLatency, boolean isGraceful, boolean succeeded) {
     ResourceMonitor resourceMonitor = getOrCreateResourceMonitor(resourceName);
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
index d7a368e..af9c318 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
@@ -19,18 +19,19 @@ package org.apache.helix.monitoring.mbeans;
  * under the License.
  */
 
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import javax.management.JMException;
 import javax.management.ObjectName;
 
 import com.codahale.metrics.Histogram;
 import com.codahale.metrics.SlidingTimeWindowArrayReservoir;
+import com.google.common.collect.Lists;
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
@@ -49,6 +50,8 @@ public class ResourceMonitor extends DynamicMBeanProvider {
     INTERMEDIATE_STATE_CAL_FAILED
   }
 
+  private static final String GAUGE_METRIC_SUFFIX = "Gauge";
+
   // Gauges
   private SimpleDynamicMetric<Long> _numOfPartitions;
   private SimpleDynamicMetric<Long> _numOfPartitionsInExternalView;
@@ -83,31 +86,13 @@ public class ResourceMonitor extends DynamicMBeanProvider {
   private final String _clusterName;
   private final ObjectName _initObjectName;
 
+  // A map of dynamic capacity Gauges. The map's keys could change.
+  private final Map<String, SimpleDynamicMetric<Long>> _dynamicCapacityMetricsMap;
+
   @Override
-  public ResourceMonitor register() throws JMException {
-    List<DynamicMetric<?, ?>> attributeList = new ArrayList<>();
-    attributeList.add(_numOfPartitions);
-    attributeList.add(_numOfPartitionsInExternalView);
-    attributeList.add(_numOfErrorPartitions);
-    attributeList.add(_numNonTopStatePartitions);
-    attributeList.add(_numLessMinActiveReplicaPartitions);
-    attributeList.add(_numLessReplicaPartitions);
-    attributeList.add(_numPendingRecoveryRebalancePartitions);
-    attributeList.add(_numPendingLoadRebalancePartitions);
-    attributeList.add(_numRecoveryRebalanceThrottledPartitions);
-    attributeList.add(_numLoadRebalanceThrottledPartitions);
-    attributeList.add(_externalViewIdealStateDiff);
-    attributeList.add(_successfulTopStateHandoffDurationCounter);
-    attributeList.add(_successTopStateHandoffCounter);
-    attributeList.add(_failedTopStateHandoffCounter);
-    attributeList.add(_maxSinglePartitionTopStateHandoffDuration);
-    attributeList.add(_partitionTopStateHandoffDurationGauge);
-    attributeList.add(_partitionTopStateHandoffHelixLatencyGauge);
-    attributeList.add(_partitionTopStateNonGracefulHandoffDurationGauge);
-    attributeList.add(_totalMessageReceived);
-    attributeList.add(_numPendingStateTransitions);
-    attributeList.add(_rebalanceState);
-    doRegister(attributeList, _initObjectName);
+  public DynamicMBeanProvider register() throws JMException {
+    doRegister(buildAttributeList(), _initObjectName);
+
     return this;
   }
 
@@ -116,10 +101,12 @@ public class ResourceMonitor extends DynamicMBeanProvider {
   }
 
   @SuppressWarnings("unchecked")
-  public ResourceMonitor(String clusterName, String resourceName, ObjectName objectName) {
+  public ResourceMonitor(String clusterName, String resourceName, ObjectName objectName)
+      throws JMException {
     _clusterName = clusterName;
     _resourceName = resourceName;
     _initObjectName = objectName;
+    _dynamicCapacityMetricsMap = new ConcurrentHashMap<>();
 
     _externalViewIdealStateDiff = new SimpleDynamicMetric("DifferenceWithIdealStateGauge", 0L);
     _numLoadRebalanceThrottledPartitions =
@@ -382,6 +369,36 @@ public class ResourceMonitor extends DynamicMBeanProvider {
     _numLoadRebalanceThrottledPartitions.updateValue(numLoadRebalanceThrottledPartitions);
   }
 
+  /**
+   * Updates partition weight metric. If the partition capacity keys are changed, all MBean
+   * attributes will be updated accordingly: old capacity keys will be replaced with new capacity
+   * keys in MBean server.
+   *
+   * @param partitionWeightMap A map of partition weight: capacity key -> partition weight
+   */
+  void updatePartitionWeightStats(Map<String, Integer> partitionWeightMap) {
+    synchronized (_dynamicCapacityMetricsMap) {
+      if (_dynamicCapacityMetricsMap.keySet().equals(partitionWeightMap.keySet())) {
+        for (Map.Entry<String, Integer> entry : partitionWeightMap.entrySet()) {
+          _dynamicCapacityMetricsMap.get(entry.getKey()).updateValue((long) entry.getValue());
+        }
+        return;
+      }
+
+      // Capacity keys are changed, so capacity attribute map needs to be updated.
+      _dynamicCapacityMetricsMap.clear();
+      for (Map.Entry<String, Integer> entry : partitionWeightMap.entrySet()) {
+        String capacityKey = entry.getKey();
+        _dynamicCapacityMetricsMap.put(capacityKey,
+            new SimpleDynamicMetric<>(capacityKey + GAUGE_METRIC_SUFFIX, (long) entry.getValue()));
+      }
+    }
+
+    // Update all MBean attributes.
+    updateAttributesInfo(buildAttributeList(),
+        "Resource monitor for resource: " + getResourceName());
+  }
+
   public void setRebalanceState(RebalanceStatus state) {
     _rebalanceState.updateValue(state.name());
   }
@@ -428,4 +445,34 @@ public class ResourceMonitor extends DynamicMBeanProvider {
       _lastResetTime = System.currentTimeMillis();
     }
   }
+
+  private List<DynamicMetric<?, ?>> buildAttributeList() {
+    List<DynamicMetric<?, ?>> attributeList = Lists.newArrayList(
+        _numOfPartitions,
+        _numOfPartitionsInExternalView,
+        _numOfErrorPartitions,
+        _numNonTopStatePartitions,
+        _numLessMinActiveReplicaPartitions,
+        _numLessReplicaPartitions,
+        _numPendingRecoveryRebalancePartitions,
+        _numPendingLoadRebalancePartitions,
+        _numRecoveryRebalanceThrottledPartitions,
+        _numLoadRebalanceThrottledPartitions,
+        _externalViewIdealStateDiff,
+        _successfulTopStateHandoffDurationCounter,
+        _successTopStateHandoffCounter,
+        _failedTopStateHandoffCounter,
+        _maxSinglePartitionTopStateHandoffDuration,
+        _partitionTopStateHandoffDurationGauge,
+        _partitionTopStateHandoffHelixLatencyGauge,
+        _partitionTopStateNonGracefulHandoffDurationGauge,
+        _totalMessageReceived,
+        _numPendingStateTransitions,
+        _rebalanceState
+    );
+
+    attributeList.addAll(_dynamicCapacityMetricsMap.values());
+
+    return attributeList;
+  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/util/TestResourceUsageCalculator.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/util/TestResourceUsageCalculator.java
index b0b2142..ef1737f 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/util/TestResourceUsageCalculator.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/util/TestResourceUsageCalculator.java
@@ -60,6 +60,22 @@ public class TestResourceUsageCalculator {
         0.0d);
   }
 
+  @Test
+  public void testCalculateAveragePartitionWeight() {
+    Map<String, Map<String, Integer>> partitionCapacityMap = ImmutableMap.of(
+        "partition1", ImmutableMap.of("capacity1", 20, "capacity2", 40),
+        "partition2", ImmutableMap.of("capacity1", 30, "capacity2", 50),
+        "partition3", ImmutableMap.of("capacity1", 16, "capacity2", 30));
+
+    Map<String, Integer> averageCapacityWeightMap =
+        ResourceUsageCalculator.calculateAveragePartitionWeight(partitionCapacityMap);
+    Map<String, Integer> expectedAverageWeightMap =
+        ImmutableMap.of("capacity1", 22, "capacity2", 40);
+
+    Assert.assertNotNull(averageCapacityWeightMap);
+    Assert.assertEquals(averageCapacityWeightMap, expectedAverageWeightMap);
+  }
+
   private Map<String, ResourceAssignment> buildResourceAssignment(
       Map<String, Map<String, Map<String, String>>> resourceMap) {
     Map<String, ResourceAssignment> assignment = new HashMap<>();
@@ -78,7 +94,7 @@ public class TestResourceUsageCalculator {
   }
 
   @DataProvider(name = "TestMeasureBaselineDivergenceInput")
-  public Object[][] loadTestMeasureBaselineDivergenceInput() {
+  private Object[][] loadTestMeasureBaselineDivergenceInput() {
     final String[] params =
         new String[]{"baseline", "someMatchBestPossible", "noMatchBestPossible"};
     return TestInputLoader
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
index 18576dd..f630124 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
@@ -19,15 +19,24 @@ package org.apache.helix.monitoring.mbeans;
  * under the License.
  */
 
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.TreeMap;
+import javax.management.AttributeNotFoundException;
+import javax.management.InstanceNotFoundException;
 import javax.management.JMException;
+import javax.management.MBeanException;
+import javax.management.MBeanServerConnection;
 import javax.management.ObjectName;
+import javax.management.ReflectionException;
 
+import com.google.common.collect.ImmutableMap;
+import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ExternalView;
@@ -46,168 +55,220 @@ public class TestResourceMonitor {
   @Test()
   public void testReportData() throws JMException {
     final int n = 5;
-    ResourceMonitor monitor = new ResourceMonitor(_clusterName, _dbName, new ObjectName("testDomain:key=value"));
+    ResourceMonitor monitor =
+        new ResourceMonitor(_clusterName, _dbName, new ObjectName("testDomain:key=value"));
     monitor.register();
 
-    List<String> instances = new ArrayList<>();
-    for (int i = 0; i < n; i++) {
-      String instance = "localhost_" + (12918 + i);
-      instances.add(instance);
-    }
+    try {
+      List<String> instances = new ArrayList<>();
+      for (int i = 0; i < n; i++) {
+        String instance = "localhost_" + (12918 + i);
+        instances.add(instance);
+      }
+
+      ZNRecord idealStateRecord = DefaultIdealStateCalculator
+          .calculateIdealState(instances, _partitions, _replicas - 1, _dbName, "MASTER", "SLAVE");
+      IdealState idealState = new IdealState(deepCopyZNRecord(idealStateRecord));
+      idealState.setMinActiveReplicas(_replicas - 1);
+      ExternalView externalView = new ExternalView(deepCopyZNRecord(idealStateRecord));
+      StateModelDefinition stateModelDef =
+          BuiltInStateModelDefinitions.MasterSlave.getStateModelDefinition();
+
+      monitor.updateResourceState(externalView, idealState, stateModelDef);
+
+      Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), 0);
+      Assert.assertEquals(monitor.getErrorPartitionGauge(), 0);
+      Assert.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions);
+      Assert.assertEquals(monitor.getPartitionGauge(), _partitions);
+      Assert.assertEquals(monitor.getMissingMinActiveReplicaPartitionGauge(), 0);
+      Assert.assertEquals(monitor.getMissingReplicaPartitionGauge(), 0);
+      Assert.assertEquals(monitor.getMissingTopStatePartitionGauge(), 0);
+      Assert.assertEquals(monitor.getBeanName(), _clusterName + " " + _dbName);
 
-    ZNRecord idealStateRecord = DefaultIdealStateCalculator
-        .calculateIdealState(instances, _partitions, _replicas - 1, _dbName, "MASTER", "SLAVE");
-    IdealState idealState = new IdealState(deepCopyZNRecord(idealStateRecord));
-    idealState.setMinActiveReplicas(_replicas - 1);
-    ExternalView externalView = new ExternalView(deepCopyZNRecord(idealStateRecord));
-    StateModelDefinition stateModelDef =
-        BuiltInStateModelDefinitions.MasterSlave.getStateModelDefinition();
-
-    monitor.updateResourceState(externalView, idealState, stateModelDef);
-
-    Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), 0);
-    Assert.assertEquals(monitor.getErrorPartitionGauge(), 0);
-    Assert.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions);
-    Assert.assertEquals(monitor.getPartitionGauge(), _partitions);
-    Assert.assertEquals(monitor.getMissingMinActiveReplicaPartitionGauge(), 0);
-    Assert.assertEquals(monitor.getMissingReplicaPartitionGauge(), 0);
-    Assert.assertEquals(monitor.getMissingTopStatePartitionGauge(), 0);
-    Assert.assertEquals(monitor.getBeanName(), _clusterName + " " + _dbName);
-
-    int errorCount = 5;
-    Random r = new Random();
-    int start = r.nextInt(_partitions - errorCount - 1);
-    for (int i = start; i < start + errorCount; i++) {
-      String partition = _dbName + "_" + i;
-      Map<String, String> map = externalView.getStateMap(partition);
-      for (String key : map.keySet()) {
-        if (map.get(key).equalsIgnoreCase("SLAVE")) {
-          map.put(key, "ERROR");
-          break;
+      int errorCount = 5;
+      Random r = new Random();
+      int start = r.nextInt(_partitions - errorCount - 1);
+      for (int i = start; i < start + errorCount; i++) {
+        String partition = _dbName + "_" + i;
+        Map<String, String> map = externalView.getStateMap(partition);
+        for (String key : map.keySet()) {
+          if (map.get(key).equalsIgnoreCase("SLAVE")) {
+            map.put(key, "ERROR");
+            break;
+          }
         }
+        externalView.setStateMap(partition, map);
       }
-      externalView.setStateMap(partition, map);
-    }
 
-    monitor.updateResourceState(externalView, idealState, stateModelDef);
-
-    Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), errorCount);
-    Assert.assertEquals(monitor.getErrorPartitionGauge(), errorCount);
-    Assert.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions);
-    Assert.assertEquals(monitor.getPartitionGauge(), _partitions);
-    Assert.assertEquals(monitor.getMissingMinActiveReplicaPartitionGauge(), 0);
-    Assert.assertEquals(monitor.getMissingReplicaPartitionGauge(), errorCount);
-    Assert.assertEquals(monitor.getMissingTopStatePartitionGauge(), 0);
-
-    int lessMinActiveReplica = 6;
-    externalView = new ExternalView(deepCopyZNRecord(idealStateRecord));
-    start = r.nextInt(_partitions - lessMinActiveReplica - 1);
-    for (int i = start; i < start + lessMinActiveReplica; i++) {
-      String partition = _dbName + "_" + i;
-      Map<String, String> map = externalView.getStateMap(partition);
-      Iterator<String> it = map.keySet().iterator();
-      int flag = 0;
-      while (it.hasNext()) {
-        String key = it.next();
-        if (map.get(key).equalsIgnoreCase("SLAVE")) {
-          if (flag++ % 2 == 0) {
-            map.put(key, "OFFLINE");
-          } else {
-            it.remove();
+      monitor.updateResourceState(externalView, idealState, stateModelDef);
+
+      Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), errorCount);
+      Assert.assertEquals(monitor.getErrorPartitionGauge(), errorCount);
+      Assert.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions);
+      Assert.assertEquals(monitor.getPartitionGauge(), _partitions);
+      Assert.assertEquals(monitor.getMissingMinActiveReplicaPartitionGauge(), 0);
+      Assert.assertEquals(monitor.getMissingReplicaPartitionGauge(), errorCount);
+      Assert.assertEquals(monitor.getMissingTopStatePartitionGauge(), 0);
+
+      int lessMinActiveReplica = 6;
+      externalView = new ExternalView(deepCopyZNRecord(idealStateRecord));
+      start = r.nextInt(_partitions - lessMinActiveReplica - 1);
+      for (int i = start; i < start + lessMinActiveReplica; i++) {
+        String partition = _dbName + "_" + i;
+        Map<String, String> map = externalView.getStateMap(partition);
+        Iterator<String> it = map.keySet().iterator();
+        int flag = 0;
+        while (it.hasNext()) {
+          String key = it.next();
+          if (map.get(key).equalsIgnoreCase("SLAVE")) {
+            if (flag++ % 2 == 0) {
+              map.put(key, "OFFLINE");
+            } else {
+              it.remove();
+            }
           }
         }
+        externalView.setStateMap(partition, map);
       }
-      externalView.setStateMap(partition, map);
-    }
 
-    monitor.updateResourceState(externalView, idealState, stateModelDef);
-
-    Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), lessMinActiveReplica);
-    Assert.assertEquals(monitor.getErrorPartitionGauge(), 0);
-    Assert.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions);
-    Assert.assertEquals(monitor.getPartitionGauge(), _partitions);
-    Assert.assertEquals(monitor.getMissingMinActiveReplicaPartitionGauge(), lessMinActiveReplica);
-    Assert.assertEquals(monitor.getMissingReplicaPartitionGauge(), lessMinActiveReplica);
-    Assert.assertEquals(monitor.getMissingTopStatePartitionGauge(), 0);
-
-    int lessReplica = 4;
-    externalView = new ExternalView(deepCopyZNRecord(idealStateRecord));
-    start = r.nextInt(_partitions - lessReplica - 1);
-    for (int i = start; i < start + lessReplica; i++) {
-      String partition = _dbName + "_" + i;
-      Map<String, String> map = externalView.getStateMap(partition);
-      int flag = 0;
-      Iterator<String> it = map.keySet().iterator();
-      while (it.hasNext()) {
-        String key = it.next();
-        if (map.get(key).equalsIgnoreCase("SLAVE")) {
-          if (flag++ % 2 == 0) {
-            map.put(key, "OFFLINE");
-          } else {
-            it.remove();
+      monitor.updateResourceState(externalView, idealState, stateModelDef);
+
+      Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), lessMinActiveReplica);
+      Assert.assertEquals(monitor.getErrorPartitionGauge(), 0);
+      Assert.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions);
+      Assert.assertEquals(monitor.getPartitionGauge(), _partitions);
+      Assert.assertEquals(monitor.getMissingMinActiveReplicaPartitionGauge(), lessMinActiveReplica);
+      Assert.assertEquals(monitor.getMissingReplicaPartitionGauge(), lessMinActiveReplica);
+      Assert.assertEquals(monitor.getMissingTopStatePartitionGauge(), 0);
+
+      int lessReplica = 4;
+      externalView = new ExternalView(deepCopyZNRecord(idealStateRecord));
+      start = r.nextInt(_partitions - lessReplica - 1);
+      for (int i = start; i < start + lessReplica; i++) {
+        String partition = _dbName + "_" + i;
+        Map<String, String> map = externalView.getStateMap(partition);
+        int flag = 0;
+        Iterator<String> it = map.keySet().iterator();
+        while (it.hasNext()) {
+          String key = it.next();
+          if (map.get(key).equalsIgnoreCase("SLAVE")) {
+            if (flag++ % 2 == 0) {
+              map.put(key, "OFFLINE");
+            } else {
+              it.remove();
+            }
+            break;
           }
-          break;
         }
+        externalView.setStateMap(partition, map);
       }
-      externalView.setStateMap(partition, map);
-    }
 
-    monitor.updateResourceState(externalView, idealState, stateModelDef);
-
-    Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), lessReplica);
-    Assert.assertEquals(monitor.getErrorPartitionGauge(), 0);
-    Assert.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions);
-    Assert.assertEquals(monitor.getPartitionGauge(), _partitions);
-    Assert.assertEquals(monitor.getMissingMinActiveReplicaPartitionGauge(), 0);
-    Assert.assertEquals(monitor.getMissingReplicaPartitionGauge(), lessReplica);
-    Assert.assertEquals(monitor.getMissingTopStatePartitionGauge(), 0);
-
-    int missTopState = 7;
-    externalView = new ExternalView(deepCopyZNRecord(idealStateRecord));
-    start = r.nextInt(_partitions - missTopState - 1);
-    for (int i = start; i < start + missTopState; i++) {
-      String partition = _dbName + "_" + i;
-      Map<String, String> map = externalView.getStateMap(partition);
-      int flag = 0;
-      for (String key : map.keySet()) {
-        if (map.get(key).equalsIgnoreCase("MASTER")) {
-          if (flag++ % 2 == 0) {
-            map.put(key, "OFFLINE");
-          } else {
-            map.remove(key);
+      monitor.updateResourceState(externalView, idealState, stateModelDef);
+
+      Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), lessReplica);
+      Assert.assertEquals(monitor.getErrorPartitionGauge(), 0);
+      Assert.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions);
+      Assert.assertEquals(monitor.getPartitionGauge(), _partitions);
+      Assert.assertEquals(monitor.getMissingMinActiveReplicaPartitionGauge(), 0);
+      Assert.assertEquals(monitor.getMissingReplicaPartitionGauge(), lessReplica);
+      Assert.assertEquals(monitor.getMissingTopStatePartitionGauge(), 0);
+
+      int missTopState = 7;
+      externalView = new ExternalView(deepCopyZNRecord(idealStateRecord));
+      start = r.nextInt(_partitions - missTopState - 1);
+      for (int i = start; i < start + missTopState; i++) {
+        String partition = _dbName + "_" + i;
+        Map<String, String> map = externalView.getStateMap(partition);
+        int flag = 0;
+        for (String key : map.keySet()) {
+          if (map.get(key).equalsIgnoreCase("MASTER")) {
+            if (flag++ % 2 == 0) {
+              map.put(key, "OFFLINE");
+            } else {
+              map.remove(key);
+            }
+            break;
           }
-          break;
         }
+        externalView.setStateMap(partition, map);
       }
-      externalView.setStateMap(partition, map);
+
+      monitor.updateResourceState(externalView, idealState, stateModelDef);
+
+      Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), missTopState);
+      Assert.assertEquals(monitor.getErrorPartitionGauge(), 0);
+      Assert.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions);
+      Assert.assertEquals(monitor.getPartitionGauge(), _partitions);
+      Assert.assertEquals(monitor.getMissingMinActiveReplicaPartitionGauge(), 0);
+      Assert.assertEquals(monitor.getMissingReplicaPartitionGauge(), missTopState);
+      Assert.assertEquals(monitor.getMissingTopStatePartitionGauge(), missTopState);
+
+      Assert.assertEquals(monitor.getNumPendingStateTransitionGauge(), 0);
+      // test pending state transition message report and read
+      int messageCount = new Random().nextInt(_partitions) + 1;
+      monitor.updatePendingStateTransitionMessages(messageCount);
+      Assert.assertEquals(monitor.getNumPendingStateTransitionGauge(), messageCount);
+
+      Assert.assertEquals(monitor.getRebalanceState(),
+          ResourceMonitor.RebalanceStatus.UNKNOWN.name());
+      monitor.setRebalanceState(ResourceMonitor.RebalanceStatus.NORMAL);
+      Assert
+          .assertEquals(monitor.getRebalanceState(), ResourceMonitor.RebalanceStatus.NORMAL.name());
+      monitor.setRebalanceState(ResourceMonitor.RebalanceStatus.BEST_POSSIBLE_STATE_CAL_FAILED);
+      Assert.assertEquals(monitor.getRebalanceState(),
+          ResourceMonitor.RebalanceStatus.BEST_POSSIBLE_STATE_CAL_FAILED.name());
+      monitor.setRebalanceState(ResourceMonitor.RebalanceStatus.INTERMEDIATE_STATE_CAL_FAILED);
+      Assert.assertEquals(monitor.getRebalanceState(),
+          ResourceMonitor.RebalanceStatus.INTERMEDIATE_STATE_CAL_FAILED.name());
+    } finally {
+      // Has to unregister this monitor to clean up. Otherwise, later tests may be affected and fail.
+      monitor.unregister();
     }
+  }
+
+  @Test
+  public void testUpdatePartitionWeightStats() throws JMException, IOException {
+    final MBeanServerConnection mBeanServer = ManagementFactory.getPlatformMBeanServer();
+    final String clusterName = TestHelper.getTestMethodName();
+    final String resource = "testDB";
+    final ObjectName resourceObjectName = new ObjectName("testDomain:key=value");
+    final ResourceMonitor monitor =
+        new ResourceMonitor(clusterName, resource, resourceObjectName);
+    monitor.register();
+
+    try {
+      Map<String, Map<String, Integer>> partitionWeightMap =
+          ImmutableMap.of(resource, ImmutableMap.of("capacity1", 20, "capacity2", 40));
+
+      // Update Metrics
+      partitionWeightMap.values().forEach(monitor::updatePartitionWeightStats);
+
+      verifyPartitionWeightMetrics(mBeanServer, resourceObjectName, partitionWeightMap);
+
+      // Change capacity keys: "capacity2" -> "capacity3"
+      partitionWeightMap =
+          ImmutableMap.of(resource, ImmutableMap.of("capacity1", 20, "capacity3", 60));
+
+      // Update metrics.
+      partitionWeightMap.values().forEach(monitor::updatePartitionWeightStats);
 
-    monitor.updateResourceState(externalView, idealState, stateModelDef);
-
-    Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), missTopState);
-    Assert.assertEquals(monitor.getErrorPartitionGauge(), 0);
-    Assert.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions);
-    Assert.assertEquals(monitor.getPartitionGauge(), _partitions);
-    Assert.assertEquals(monitor.getMissingMinActiveReplicaPartitionGauge(), 0);
-    Assert.assertEquals(monitor.getMissingReplicaPartitionGauge(), missTopState);
-    Assert.assertEquals(monitor.getMissingTopStatePartitionGauge(), missTopState);
-
-    Assert.assertEquals(monitor.getNumPendingStateTransitionGauge(), 0);
-    // test pending state transition message report and read
-    int messageCount = new Random().nextInt(_partitions) + 1;
-    monitor.updatePendingStateTransitionMessages(messageCount);
-    Assert.assertEquals(monitor.getNumPendingStateTransitionGauge(), messageCount);
-
-    Assert
-        .assertEquals(monitor.getRebalanceState(), ResourceMonitor.RebalanceStatus.UNKNOWN.name());
-    monitor.setRebalanceState(ResourceMonitor.RebalanceStatus.NORMAL);
-    Assert.assertEquals(monitor.getRebalanceState(), ResourceMonitor.RebalanceStatus.NORMAL.name());
-    monitor.setRebalanceState(ResourceMonitor.RebalanceStatus.BEST_POSSIBLE_STATE_CAL_FAILED);
-    Assert.assertEquals(monitor.getRebalanceState(),
-        ResourceMonitor.RebalanceStatus.BEST_POSSIBLE_STATE_CAL_FAILED.name());
-    monitor.setRebalanceState(ResourceMonitor.RebalanceStatus.INTERMEDIATE_STATE_CAL_FAILED);
-    Assert.assertEquals(monitor.getRebalanceState(),
-        ResourceMonitor.RebalanceStatus.INTERMEDIATE_STATE_CAL_FAILED.name());
+      // Verify results.
+      verifyPartitionWeightMetrics(mBeanServer, resourceObjectName, partitionWeightMap);
+
+      // "capacity2" metric should not exist in MBean server.
+      String removedAttribute = "capacity2Gauge";
+      try {
+        mBeanServer.getAttribute(resourceObjectName, removedAttribute);
+        Assert.fail("AttributeNotFoundException should be thrown because attribute [capacity2Gauge]"
+            + " is removed.");
+      } catch (AttributeNotFoundException expected) {
+      }
+    } finally {
+      // Reset monitor.
+      monitor.unregister();
+      Assert.assertFalse(mBeanServer.isRegistered(resourceObjectName),
+          "Failed to unregister resource monitor.");
+    }
   }
 
   /**
@@ -240,4 +301,28 @@ public class TestResourceMonitor {
 
     return copy;
   }
+
+  private void verifyPartitionWeightMetrics(MBeanServerConnection mBeanServer,
+      ObjectName objectName, Map<String, Map<String, Integer>> expectedPartitionWeightMap)
+      throws IOException, AttributeNotFoundException, MBeanException, ReflectionException,
+             InstanceNotFoundException {
+    final String gaugeMetricSuffix = "Gauge";
+    for (Map.Entry<String, Map<String, Integer>> entry : expectedPartitionWeightMap.entrySet()) {
+      // Resource monitor for this resource is already registered.
+      Assert.assertTrue(mBeanServer.isRegistered(objectName));
+
+      for (Map.Entry<String, Integer> capacityEntry : entry.getValue().entrySet()) {
+        String attributeName = capacityEntry.getKey() + gaugeMetricSuffix;
+        try {
+          // Wait until the attribute is already registered to mbean server.
+          Assert.assertTrue(TestHelper.verify(
+              () -> !mBeanServer.getAttributes(objectName, new String[]{attributeName}).isEmpty(),
+              2000));
+        } catch (Exception ignored) {
+        }
+        Assert.assertEquals((long) mBeanServer.getAttribute(objectName, attributeName),
+            (long) capacityEntry.getValue());
+      }
+    }
+  }
 }