You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2020/01/22 18:58:17 UTC
[helix] branch wagedRebalancer updated: Add resource partition
weight gauge (#686)
This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch wagedRebalancer
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/wagedRebalancer by this push:
new 61c1fe8 Add resource partition weight gauge (#686)
61c1fe8 is described below
commit 61c1fe8719eda7985cdb629ad3d8c501171ed3d1
Author: Huizhi L <ih...@gmail.com>
AuthorDate: Wed Jan 22 10:58:07 2020 -0800
Add resource partition weight gauge (#686)
We would like to monitor the usage of each capacity for the resource partitions: gauge of the average partition weight for each CAPACITY key.
Change list:
- Add partition weight gauge metric to resource monitor.
- Add two unit tests to cover new code.
---
.../rebalancer/util/ResourceUsageCalculator.java | 83 +++++
.../stages/CurrentStateComputationStage.java | 28 +-
.../monitoring/mbeans/ClusterStatusMonitor.java | 19 ++
.../helix/monitoring/mbeans/ResourceMonitor.java | 99 ++++--
.../util/TestResourceUsageCalculator.java | 18 +-
.../monitoring/mbeans/TestResourceMonitor.java | 371 +++++++++++++--------
6 files changed, 446 insertions(+), 172 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ResourceUsageCalculator.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ResourceUsageCalculator.java
index ad58eae..e7a1b94 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ResourceUsageCalculator.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ResourceUsageCalculator.java
@@ -142,4 +142,87 @@ public class ResourceUsageCalculator {
return numTotalBestPossibleReplicas == 0 ? 1.0d
: (1.0d - (double) numMatchedReplicas / (double) numTotalBestPossibleReplicas);
}
+
+ /**
+ * Calculates average partition weight per capacity key for a resource config. Example as below:
+ * Input =
+ * {
+ * "partition1": {
+ * "capacity1": 20,
+ * "capacity2": 40
+ * },
+ * "partition2": {
+ * "capacity1": 30,
+ * "capacity2": 50
+ * },
+ * "partition3": {
+ * "capacity1": 16,
+ * "capacity2": 30
+ * }
+ * }
+ *
+ * Total weight for key "capacity1" = 20 + 30 + 16 = 66;
+ * Total weight for key "capacity2" = 40 + 50 + 30 = 120;
+ * Total partitions = 3;
+ * Average partition weight for "capacity1" = 66 / 3 = 22;
+ * Average partition weight for "capacity2" = 120 / 3 = 40;
+ *
+ * Output =
+ * {
+ * "capacity1": 22,
+ * "capacity2": 40
+ * }
+ *
+ * @param partitionCapacityMap A map of partition capacity:
+ * <PartitionName or DEFAULT_PARTITION_KEY, <Capacity Key, Capacity Number>>
+ * @return A map of partition weight: capacity key -> average partition weight
+ */
+ public static Map<String, Integer> calculateAveragePartitionWeight(
+ Map<String, Map<String, Integer>> partitionCapacityMap) {
+ // capacity key -> [number of partitions, total weight per capacity key]
+ Map<String, PartitionWeightCounterEntry> countPartitionWeightMap = new HashMap<>();
+
+ // Aggregates partition weight for each capacity key.
+ partitionCapacityMap.values().forEach(partitionCapacityEntry ->
+ partitionCapacityEntry.forEach((capacityKey, weight) -> countPartitionWeightMap
+ .computeIfAbsent(capacityKey, counterEntry -> new PartitionWeightCounterEntry())
+ .increase(1, weight)));
+
+ // capacity key -> average partition weight
+ Map<String, Integer> averagePartitionWeightMap = new HashMap<>();
+
+ // Calculate average partition weight for each capacity key.
+ // Per capacity key level:
+ // average partition weight = (total partition weight) / (number of partitions)
+ for (Map.Entry<String, PartitionWeightCounterEntry> entry
+ : countPartitionWeightMap.entrySet()) {
+ String capacityKey = entry.getKey();
+ PartitionWeightCounterEntry weightEntry = entry.getValue();
+ int averageWeight = (int) (weightEntry.getWeight() / weightEntry.getPartitions());
+ averagePartitionWeightMap.put(capacityKey, averageWeight);
+ }
+
+ return averagePartitionWeightMap;
+ }
+
+ /*
+ * Represents total number of partitions and total partition weight for a capacity key.
+ */
+ private static class PartitionWeightCounterEntry {
+ private int partitions;
+ private long weight;
+
+ private int getPartitions() {
+ return partitions;
+ }
+
+ private long getWeight() {
+ return weight;
+ }
+
+ private void increase(int partitions, int weight) {
+ this.partitions += partitions;
+ this.weight += weight;
+ }
+ }
}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index 727804b..62fda33 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -23,6 +23,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import org.apache.helix.controller.LogUtil;
@@ -30,6 +31,7 @@ import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.controller.rebalancer.util.ResourceUsageCalculator;
import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
import org.apache.helix.controller.rebalancer.waged.model.ClusterModel;
import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider;
@@ -41,6 +43,7 @@ import org.apache.helix.model.Message.MessageType;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.ResourceConfig;
import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -89,8 +92,11 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
final ClusterStatusMonitor clusterStatusMonitor =
event.getAttribute(AttributeName.clusterStatusMonitor.name());
if (clusterStatusMonitor != null && cache instanceof ResourceControllerDataProvider) {
- reportInstanceCapacityMetrics(clusterStatusMonitor, (ResourceControllerDataProvider) cache,
- resourceToRebalance, currentStateOutput);
+ final ResourceControllerDataProvider dataProvider = (ResourceControllerDataProvider) cache;
+ reportInstanceCapacityMetrics(clusterStatusMonitor, dataProvider, resourceToRebalance,
+ currentStateOutput);
+ reportResourcePartitionCapacityMetrics(dataProvider.getAsyncTasksThreadPool(),
+ clusterStatusMonitor, dataProvider.getResourceConfigMap().values());
}
}
@@ -271,4 +277,22 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
return null;
});
}
+
+ private void reportResourcePartitionCapacityMetrics(ExecutorService executorService,
+ ClusterStatusMonitor clusterStatusMonitor, Collection<ResourceConfig> resourceConfigs) {
+ asyncExecute(executorService, () -> {
+ try {
+ for (ResourceConfig config : resourceConfigs) {
+ Map<String, Integer> averageWeight = ResourceUsageCalculator
+ .calculateAveragePartitionWeight(config.getPartitionCapacityMap());
+ clusterStatusMonitor.updatePartitionWeight(config.getResourceName(), averageWeight);
+ }
+ } catch (Exception ex) {
+ LOG.error("Failed to report resource partition capacity metrics. Exception message: {}",
+ ex.getMessage());
+ }
+
+ return null;
+ });
+ }
}
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index 96f85bf..fc0b19d 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -497,6 +497,25 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
}
}
+ /**
+ * Updates metrics of average partition weight per capacity key for a resource. If a resource
+ * monitor is not yet existed for this resource, a new resource monitor will be created for this
+ * resource.
+ *
+ * @param resourceName The resource name for which partition weight is updated
+ * @param averageWeightMap A map of average partition weight of each capacity key:
+ * capacity key -> average partition weight
+ */
+ public void updatePartitionWeight(String resourceName, Map<String, Integer> averageWeightMap) {
+ ResourceMonitor monitor = getOrCreateResourceMonitor(resourceName);
+ if (monitor == null) {
+ LOG.warn("Failed to update partition weight metric for resource: {} because resource monitor"
+ + " is not created.", resourceName);
+ return;
+ }
+ monitor.updatePartitionWeightStats(averageWeightMap);
+ }
+
public void updateMissingTopStateDurationStats(String resourceName, long totalDuration,
long helixLatency, boolean isGraceful, boolean succeeded) {
ResourceMonitor resourceMonitor = getOrCreateResourceMonitor(resourceName);
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
index d7a368e..af9c318 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
@@ -19,18 +19,19 @@ package org.apache.helix.monitoring.mbeans;
* under the License.
*/
-import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import javax.management.JMException;
import javax.management.ObjectName;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.SlidingTimeWindowArrayReservoir;
+import com.google.common.collect.Lists;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
@@ -49,6 +50,8 @@ public class ResourceMonitor extends DynamicMBeanProvider {
INTERMEDIATE_STATE_CAL_FAILED
}
+ private static final String GAUGE_METRIC_SUFFIX = "Gauge";
+
// Gauges
private SimpleDynamicMetric<Long> _numOfPartitions;
private SimpleDynamicMetric<Long> _numOfPartitionsInExternalView;
@@ -83,31 +86,13 @@ public class ResourceMonitor extends DynamicMBeanProvider {
private final String _clusterName;
private final ObjectName _initObjectName;
+ // A map of dynamic capacity Gauges. The map's keys could change.
+ private final Map<String, SimpleDynamicMetric<Long>> _dynamicCapacityMetricsMap;
+
@Override
- public ResourceMonitor register() throws JMException {
- List<DynamicMetric<?, ?>> attributeList = new ArrayList<>();
- attributeList.add(_numOfPartitions);
- attributeList.add(_numOfPartitionsInExternalView);
- attributeList.add(_numOfErrorPartitions);
- attributeList.add(_numNonTopStatePartitions);
- attributeList.add(_numLessMinActiveReplicaPartitions);
- attributeList.add(_numLessReplicaPartitions);
- attributeList.add(_numPendingRecoveryRebalancePartitions);
- attributeList.add(_numPendingLoadRebalancePartitions);
- attributeList.add(_numRecoveryRebalanceThrottledPartitions);
- attributeList.add(_numLoadRebalanceThrottledPartitions);
- attributeList.add(_externalViewIdealStateDiff);
- attributeList.add(_successfulTopStateHandoffDurationCounter);
- attributeList.add(_successTopStateHandoffCounter);
- attributeList.add(_failedTopStateHandoffCounter);
- attributeList.add(_maxSinglePartitionTopStateHandoffDuration);
- attributeList.add(_partitionTopStateHandoffDurationGauge);
- attributeList.add(_partitionTopStateHandoffHelixLatencyGauge);
- attributeList.add(_partitionTopStateNonGracefulHandoffDurationGauge);
- attributeList.add(_totalMessageReceived);
- attributeList.add(_numPendingStateTransitions);
- attributeList.add(_rebalanceState);
- doRegister(attributeList, _initObjectName);
+ public DynamicMBeanProvider register() throws JMException {
+ doRegister(buildAttributeList(), _initObjectName);
+
return this;
}
@@ -116,10 +101,12 @@ public class ResourceMonitor extends DynamicMBeanProvider {
}
@SuppressWarnings("unchecked")
- public ResourceMonitor(String clusterName, String resourceName, ObjectName objectName) {
+ public ResourceMonitor(String clusterName, String resourceName, ObjectName objectName)
+ throws JMException {
_clusterName = clusterName;
_resourceName = resourceName;
_initObjectName = objectName;
+ _dynamicCapacityMetricsMap = new ConcurrentHashMap<>();
_externalViewIdealStateDiff = new SimpleDynamicMetric("DifferenceWithIdealStateGauge", 0L);
_numLoadRebalanceThrottledPartitions =
@@ -382,6 +369,36 @@ public class ResourceMonitor extends DynamicMBeanProvider {
_numLoadRebalanceThrottledPartitions.updateValue(numLoadRebalanceThrottledPartitions);
}
+ /**
+ * Updates partition weight metric. If the partition capacity keys are changed, all MBean
+ * attributes will be updated accordingly: old capacity keys will be replaced with new capacity
+ * keys in MBean server.
+ *
+ * @param partitionWeightMap A map of partition weight: capacity key -> partition weight
+ */
+ void updatePartitionWeightStats(Map<String, Integer> partitionWeightMap) {
+ synchronized (_dynamicCapacityMetricsMap) {
+ if (_dynamicCapacityMetricsMap.keySet().equals(partitionWeightMap.keySet())) {
+ for (Map.Entry<String, Integer> entry : partitionWeightMap.entrySet()) {
+ _dynamicCapacityMetricsMap.get(entry.getKey()).updateValue((long) entry.getValue());
+ }
+ return;
+ }
+
+ // Capacity keys are changed, so capacity attribute map needs to be updated.
+ _dynamicCapacityMetricsMap.clear();
+ for (Map.Entry<String, Integer> entry : partitionWeightMap.entrySet()) {
+ String capacityKey = entry.getKey();
+ _dynamicCapacityMetricsMap.put(capacityKey,
+ new SimpleDynamicMetric<>(capacityKey + GAUGE_METRIC_SUFFIX, (long) entry.getValue()));
+ }
+ }
+
+ // Update all MBean attributes.
+ updateAttributesInfo(buildAttributeList(),
+ "Resource monitor for resource: " + getResourceName());
+ }
+
public void setRebalanceState(RebalanceStatus state) {
_rebalanceState.updateValue(state.name());
}
@@ -428,4 +445,34 @@ public class ResourceMonitor extends DynamicMBeanProvider {
_lastResetTime = System.currentTimeMillis();
}
}
+
+ private List<DynamicMetric<?, ?>> buildAttributeList() {
+ List<DynamicMetric<?, ?>> attributeList = Lists.newArrayList(
+ _numOfPartitions,
+ _numOfPartitionsInExternalView,
+ _numOfErrorPartitions,
+ _numNonTopStatePartitions,
+ _numLessMinActiveReplicaPartitions,
+ _numLessReplicaPartitions,
+ _numPendingRecoveryRebalancePartitions,
+ _numPendingLoadRebalancePartitions,
+ _numRecoveryRebalanceThrottledPartitions,
+ _numLoadRebalanceThrottledPartitions,
+ _externalViewIdealStateDiff,
+ _successfulTopStateHandoffDurationCounter,
+ _successTopStateHandoffCounter,
+ _failedTopStateHandoffCounter,
+ _maxSinglePartitionTopStateHandoffDuration,
+ _partitionTopStateHandoffDurationGauge,
+ _partitionTopStateHandoffHelixLatencyGauge,
+ _partitionTopStateNonGracefulHandoffDurationGauge,
+ _totalMessageReceived,
+ _numPendingStateTransitions,
+ _rebalanceState
+ );
+
+ attributeList.addAll(_dynamicCapacityMetricsMap.values());
+
+ return attributeList;
+ }
}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/util/TestResourceUsageCalculator.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/util/TestResourceUsageCalculator.java
index b0b2142..ef1737f 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/util/TestResourceUsageCalculator.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/util/TestResourceUsageCalculator.java
@@ -60,6 +60,22 @@ public class TestResourceUsageCalculator {
0.0d);
}
+ @Test
+ public void testCalculateAveragePartitionWeight() {
+ Map<String, Map<String, Integer>> partitionCapacityMap = ImmutableMap.of(
+ "partition1", ImmutableMap.of("capacity1", 20, "capacity2", 40),
+ "partition2", ImmutableMap.of("capacity1", 30, "capacity2", 50),
+ "partition3", ImmutableMap.of("capacity1", 16, "capacity2", 30));
+
+ Map<String, Integer> averageCapacityWeightMap =
+ ResourceUsageCalculator.calculateAveragePartitionWeight(partitionCapacityMap);
+ Map<String, Integer> expectedAverageWeightMap =
+ ImmutableMap.of("capacity1", 22, "capacity2", 40);
+
+ Assert.assertNotNull(averageCapacityWeightMap);
+ Assert.assertEquals(averageCapacityWeightMap, expectedAverageWeightMap);
+ }
+
private Map<String, ResourceAssignment> buildResourceAssignment(
Map<String, Map<String, Map<String, String>>> resourceMap) {
Map<String, ResourceAssignment> assignment = new HashMap<>();
@@ -78,7 +94,7 @@ public class TestResourceUsageCalculator {
}
@DataProvider(name = "TestMeasureBaselineDivergenceInput")
- public Object[][] loadTestMeasureBaselineDivergenceInput() {
+ private Object[][] loadTestMeasureBaselineDivergenceInput() {
final String[] params =
new String[]{"baseline", "someMatchBestPossible", "noMatchBestPossible"};
return TestInputLoader
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
index 18576dd..f630124 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
@@ -19,15 +19,24 @@ package org.apache.helix.monitoring.mbeans;
* under the License.
*/
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.TreeMap;
+import javax.management.AttributeNotFoundException;
+import javax.management.InstanceNotFoundException;
import javax.management.JMException;
+import javax.management.MBeanException;
+import javax.management.MBeanServerConnection;
import javax.management.ObjectName;
+import javax.management.ReflectionException;
+import com.google.common.collect.ImmutableMap;
+import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.model.ExternalView;
@@ -46,168 +55,220 @@ public class TestResourceMonitor {
@Test()
public void testReportData() throws JMException {
final int n = 5;
- ResourceMonitor monitor = new ResourceMonitor(_clusterName, _dbName, new ObjectName("testDomain:key=value"));
+ ResourceMonitor monitor =
+ new ResourceMonitor(_clusterName, _dbName, new ObjectName("testDomain:key=value"));
monitor.register();
- List<String> instances = new ArrayList<>();
- for (int i = 0; i < n; i++) {
- String instance = "localhost_" + (12918 + i);
- instances.add(instance);
- }
+ try {
+ List<String> instances = new ArrayList<>();
+ for (int i = 0; i < n; i++) {
+ String instance = "localhost_" + (12918 + i);
+ instances.add(instance);
+ }
+
+ ZNRecord idealStateRecord = DefaultIdealStateCalculator
+ .calculateIdealState(instances, _partitions, _replicas - 1, _dbName, "MASTER", "SLAVE");
+ IdealState idealState = new IdealState(deepCopyZNRecord(idealStateRecord));
+ idealState.setMinActiveReplicas(_replicas - 1);
+ ExternalView externalView = new ExternalView(deepCopyZNRecord(idealStateRecord));
+ StateModelDefinition stateModelDef =
+ BuiltInStateModelDefinitions.MasterSlave.getStateModelDefinition();
+
+ monitor.updateResourceState(externalView, idealState, stateModelDef);
+
+ Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), 0);
+ Assert.assertEquals(monitor.getErrorPartitionGauge(), 0);
+ Assert.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions);
+ Assert.assertEquals(monitor.getPartitionGauge(), _partitions);
+ Assert.assertEquals(monitor.getMissingMinActiveReplicaPartitionGauge(), 0);
+ Assert.assertEquals(monitor.getMissingReplicaPartitionGauge(), 0);
+ Assert.assertEquals(monitor.getMissingTopStatePartitionGauge(), 0);
+ Assert.assertEquals(monitor.getBeanName(), _clusterName + " " + _dbName);
- ZNRecord idealStateRecord = DefaultIdealStateCalculator
- .calculateIdealState(instances, _partitions, _replicas - 1, _dbName, "MASTER", "SLAVE");
- IdealState idealState = new IdealState(deepCopyZNRecord(idealStateRecord));
- idealState.setMinActiveReplicas(_replicas - 1);
- ExternalView externalView = new ExternalView(deepCopyZNRecord(idealStateRecord));
- StateModelDefinition stateModelDef =
- BuiltInStateModelDefinitions.MasterSlave.getStateModelDefinition();
-
- monitor.updateResourceState(externalView, idealState, stateModelDef);
-
- Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), 0);
- Assert.assertEquals(monitor.getErrorPartitionGauge(), 0);
- Assert.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions);
- Assert.assertEquals(monitor.getPartitionGauge(), _partitions);
- Assert.assertEquals(monitor.getMissingMinActiveReplicaPartitionGauge(), 0);
- Assert.assertEquals(monitor.getMissingReplicaPartitionGauge(), 0);
- Assert.assertEquals(monitor.getMissingTopStatePartitionGauge(), 0);
- Assert.assertEquals(monitor.getBeanName(), _clusterName + " " + _dbName);
-
- int errorCount = 5;
- Random r = new Random();
- int start = r.nextInt(_partitions - errorCount - 1);
- for (int i = start; i < start + errorCount; i++) {
- String partition = _dbName + "_" + i;
- Map<String, String> map = externalView.getStateMap(partition);
- for (String key : map.keySet()) {
- if (map.get(key).equalsIgnoreCase("SLAVE")) {
- map.put(key, "ERROR");
- break;
+ int errorCount = 5;
+ Random r = new Random();
+ int start = r.nextInt(_partitions - errorCount - 1);
+ for (int i = start; i < start + errorCount; i++) {
+ String partition = _dbName + "_" + i;
+ Map<String, String> map = externalView.getStateMap(partition);
+ for (String key : map.keySet()) {
+ if (map.get(key).equalsIgnoreCase("SLAVE")) {
+ map.put(key, "ERROR");
+ break;
+ }
}
+ externalView.setStateMap(partition, map);
}
- externalView.setStateMap(partition, map);
- }
- monitor.updateResourceState(externalView, idealState, stateModelDef);
-
- Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), errorCount);
- Assert.assertEquals(monitor.getErrorPartitionGauge(), errorCount);
- Assert.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions);
- Assert.assertEquals(monitor.getPartitionGauge(), _partitions);
- Assert.assertEquals(monitor.getMissingMinActiveReplicaPartitionGauge(), 0);
- Assert.assertEquals(monitor.getMissingReplicaPartitionGauge(), errorCount);
- Assert.assertEquals(monitor.getMissingTopStatePartitionGauge(), 0);
-
- int lessMinActiveReplica = 6;
- externalView = new ExternalView(deepCopyZNRecord(idealStateRecord));
- start = r.nextInt(_partitions - lessMinActiveReplica - 1);
- for (int i = start; i < start + lessMinActiveReplica; i++) {
- String partition = _dbName + "_" + i;
- Map<String, String> map = externalView.getStateMap(partition);
- Iterator<String> it = map.keySet().iterator();
- int flag = 0;
- while (it.hasNext()) {
- String key = it.next();
- if (map.get(key).equalsIgnoreCase("SLAVE")) {
- if (flag++ % 2 == 0) {
- map.put(key, "OFFLINE");
- } else {
- it.remove();
+ monitor.updateResourceState(externalView, idealState, stateModelDef);
+
+ Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), errorCount);
+ Assert.assertEquals(monitor.getErrorPartitionGauge(), errorCount);
+ Assert.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions);
+ Assert.assertEquals(monitor.getPartitionGauge(), _partitions);
+ Assert.assertEquals(monitor.getMissingMinActiveReplicaPartitionGauge(), 0);
+ Assert.assertEquals(monitor.getMissingReplicaPartitionGauge(), errorCount);
+ Assert.assertEquals(monitor.getMissingTopStatePartitionGauge(), 0);
+
+ int lessMinActiveReplica = 6;
+ externalView = new ExternalView(deepCopyZNRecord(idealStateRecord));
+ start = r.nextInt(_partitions - lessMinActiveReplica - 1);
+ for (int i = start; i < start + lessMinActiveReplica; i++) {
+ String partition = _dbName + "_" + i;
+ Map<String, String> map = externalView.getStateMap(partition);
+ Iterator<String> it = map.keySet().iterator();
+ int flag = 0;
+ while (it.hasNext()) {
+ String key = it.next();
+ if (map.get(key).equalsIgnoreCase("SLAVE")) {
+ if (flag++ % 2 == 0) {
+ map.put(key, "OFFLINE");
+ } else {
+ it.remove();
+ }
}
}
+ externalView.setStateMap(partition, map);
}
- externalView.setStateMap(partition, map);
- }
- monitor.updateResourceState(externalView, idealState, stateModelDef);
-
- Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), lessMinActiveReplica);
- Assert.assertEquals(monitor.getErrorPartitionGauge(), 0);
- Assert.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions);
- Assert.assertEquals(monitor.getPartitionGauge(), _partitions);
- Assert.assertEquals(monitor.getMissingMinActiveReplicaPartitionGauge(), lessMinActiveReplica);
- Assert.assertEquals(monitor.getMissingReplicaPartitionGauge(), lessMinActiveReplica);
- Assert.assertEquals(monitor.getMissingTopStatePartitionGauge(), 0);
-
- int lessReplica = 4;
- externalView = new ExternalView(deepCopyZNRecord(idealStateRecord));
- start = r.nextInt(_partitions - lessReplica - 1);
- for (int i = start; i < start + lessReplica; i++) {
- String partition = _dbName + "_" + i;
- Map<String, String> map = externalView.getStateMap(partition);
- int flag = 0;
- Iterator<String> it = map.keySet().iterator();
- while (it.hasNext()) {
- String key = it.next();
- if (map.get(key).equalsIgnoreCase("SLAVE")) {
- if (flag++ % 2 == 0) {
- map.put(key, "OFFLINE");
- } else {
- it.remove();
+ monitor.updateResourceState(externalView, idealState, stateModelDef);
+
+ Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), lessMinActiveReplica);
+ Assert.assertEquals(monitor.getErrorPartitionGauge(), 0);
+ Assert.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions);
+ Assert.assertEquals(monitor.getPartitionGauge(), _partitions);
+ Assert.assertEquals(monitor.getMissingMinActiveReplicaPartitionGauge(), lessMinActiveReplica);
+ Assert.assertEquals(monitor.getMissingReplicaPartitionGauge(), lessMinActiveReplica);
+ Assert.assertEquals(monitor.getMissingTopStatePartitionGauge(), 0);
+
+ int lessReplica = 4;
+ externalView = new ExternalView(deepCopyZNRecord(idealStateRecord));
+ start = r.nextInt(_partitions - lessReplica - 1);
+ for (int i = start; i < start + lessReplica; i++) {
+ String partition = _dbName + "_" + i;
+ Map<String, String> map = externalView.getStateMap(partition);
+ int flag = 0;
+ Iterator<String> it = map.keySet().iterator();
+ while (it.hasNext()) {
+ String key = it.next();
+ if (map.get(key).equalsIgnoreCase("SLAVE")) {
+ if (flag++ % 2 == 0) {
+ map.put(key, "OFFLINE");
+ } else {
+ it.remove();
+ }
+ break;
}
- break;
}
+ externalView.setStateMap(partition, map);
}
- externalView.setStateMap(partition, map);
- }
- monitor.updateResourceState(externalView, idealState, stateModelDef);
-
- Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), lessReplica);
- Assert.assertEquals(monitor.getErrorPartitionGauge(), 0);
- Assert.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions);
- Assert.assertEquals(monitor.getPartitionGauge(), _partitions);
- Assert.assertEquals(monitor.getMissingMinActiveReplicaPartitionGauge(), 0);
- Assert.assertEquals(monitor.getMissingReplicaPartitionGauge(), lessReplica);
- Assert.assertEquals(monitor.getMissingTopStatePartitionGauge(), 0);
-
- int missTopState = 7;
- externalView = new ExternalView(deepCopyZNRecord(idealStateRecord));
- start = r.nextInt(_partitions - missTopState - 1);
- for (int i = start; i < start + missTopState; i++) {
- String partition = _dbName + "_" + i;
- Map<String, String> map = externalView.getStateMap(partition);
- int flag = 0;
- for (String key : map.keySet()) {
- if (map.get(key).equalsIgnoreCase("MASTER")) {
- if (flag++ % 2 == 0) {
- map.put(key, "OFFLINE");
- } else {
- map.remove(key);
+ monitor.updateResourceState(externalView, idealState, stateModelDef);
+
+ Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), lessReplica);
+ Assert.assertEquals(monitor.getErrorPartitionGauge(), 0);
+ Assert.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions);
+ Assert.assertEquals(monitor.getPartitionGauge(), _partitions);
+ Assert.assertEquals(monitor.getMissingMinActiveReplicaPartitionGauge(), 0);
+ Assert.assertEquals(monitor.getMissingReplicaPartitionGauge(), lessReplica);
+ Assert.assertEquals(monitor.getMissingTopStatePartitionGauge(), 0);
+
+ int missTopState = 7;
+ externalView = new ExternalView(deepCopyZNRecord(idealStateRecord));
+ start = r.nextInt(_partitions - missTopState - 1);
+ for (int i = start; i < start + missTopState; i++) {
+ String partition = _dbName + "_" + i;
+ Map<String, String> map = externalView.getStateMap(partition);
+ int flag = 0;
+ for (String key : map.keySet()) {
+ if (map.get(key).equalsIgnoreCase("MASTER")) {
+ if (flag++ % 2 == 0) {
+ map.put(key, "OFFLINE");
+ } else {
+ map.remove(key);
+ }
+ break;
}
- break;
}
+ externalView.setStateMap(partition, map);
}
- externalView.setStateMap(partition, map);
+
+ monitor.updateResourceState(externalView, idealState, stateModelDef);
+
+ Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), missTopState);
+ Assert.assertEquals(monitor.getErrorPartitionGauge(), 0);
+ Assert.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions);
+ Assert.assertEquals(monitor.getPartitionGauge(), _partitions);
+ Assert.assertEquals(monitor.getMissingMinActiveReplicaPartitionGauge(), 0);
+ Assert.assertEquals(monitor.getMissingReplicaPartitionGauge(), missTopState);
+ Assert.assertEquals(monitor.getMissingTopStatePartitionGauge(), missTopState);
+
+ Assert.assertEquals(monitor.getNumPendingStateTransitionGauge(), 0);
+ // test pending state transition message report and read
+ int messageCount = new Random().nextInt(_partitions) + 1;
+ monitor.updatePendingStateTransitionMessages(messageCount);
+ Assert.assertEquals(monitor.getNumPendingStateTransitionGauge(), messageCount);
+
+ Assert.assertEquals(monitor.getRebalanceState(),
+ ResourceMonitor.RebalanceStatus.UNKNOWN.name());
+ monitor.setRebalanceState(ResourceMonitor.RebalanceStatus.NORMAL);
+ Assert
+ .assertEquals(monitor.getRebalanceState(), ResourceMonitor.RebalanceStatus.NORMAL.name());
+ monitor.setRebalanceState(ResourceMonitor.RebalanceStatus.BEST_POSSIBLE_STATE_CAL_FAILED);
+ Assert.assertEquals(monitor.getRebalanceState(),
+ ResourceMonitor.RebalanceStatus.BEST_POSSIBLE_STATE_CAL_FAILED.name());
+ monitor.setRebalanceState(ResourceMonitor.RebalanceStatus.INTERMEDIATE_STATE_CAL_FAILED);
+ Assert.assertEquals(monitor.getRebalanceState(),
+ ResourceMonitor.RebalanceStatus.INTERMEDIATE_STATE_CAL_FAILED.name());
+ } finally {
+ // Has to unregister this monitor to clean up. Otherwise, later tests may be affected and fail.
+ monitor.unregister();
}
+ }
+
+ @Test
+ public void testUpdatePartitionWeightStats() throws JMException, IOException {
+ final MBeanServerConnection mBeanServer = ManagementFactory.getPlatformMBeanServer();
+ final String clusterName = TestHelper.getTestMethodName();
+ final String resource = "testDB";
+ final ObjectName resourceObjectName = new ObjectName("testDomain:key=value");
+ final ResourceMonitor monitor =
+ new ResourceMonitor(clusterName, resource, resourceObjectName);
+ monitor.register();
+
+ try {
+ Map<String, Map<String, Integer>> partitionWeightMap =
+ ImmutableMap.of(resource, ImmutableMap.of("capacity1", 20, "capacity2", 40));
+
+ // Update Metrics
+ partitionWeightMap.values().forEach(monitor::updatePartitionWeightStats);
+
+ verifyPartitionWeightMetrics(mBeanServer, resourceObjectName, partitionWeightMap);
+
+ // Change capacity keys: "capacity2" -> "capacity3"
+ partitionWeightMap =
+ ImmutableMap.of(resource, ImmutableMap.of("capacity1", 20, "capacity3", 60));
+
+ // Update metrics.
+ partitionWeightMap.values().forEach(monitor::updatePartitionWeightStats);
- monitor.updateResourceState(externalView, idealState, stateModelDef);
-
- Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), missTopState);
- Assert.assertEquals(monitor.getErrorPartitionGauge(), 0);
- Assert.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions);
- Assert.assertEquals(monitor.getPartitionGauge(), _partitions);
- Assert.assertEquals(monitor.getMissingMinActiveReplicaPartitionGauge(), 0);
- Assert.assertEquals(monitor.getMissingReplicaPartitionGauge(), missTopState);
- Assert.assertEquals(monitor.getMissingTopStatePartitionGauge(), missTopState);
-
- Assert.assertEquals(monitor.getNumPendingStateTransitionGauge(), 0);
- // test pending state transition message report and read
- int messageCount = new Random().nextInt(_partitions) + 1;
- monitor.updatePendingStateTransitionMessages(messageCount);
- Assert.assertEquals(monitor.getNumPendingStateTransitionGauge(), messageCount);
-
- Assert
- .assertEquals(monitor.getRebalanceState(), ResourceMonitor.RebalanceStatus.UNKNOWN.name());
- monitor.setRebalanceState(ResourceMonitor.RebalanceStatus.NORMAL);
- Assert.assertEquals(monitor.getRebalanceState(), ResourceMonitor.RebalanceStatus.NORMAL.name());
- monitor.setRebalanceState(ResourceMonitor.RebalanceStatus.BEST_POSSIBLE_STATE_CAL_FAILED);
- Assert.assertEquals(monitor.getRebalanceState(),
- ResourceMonitor.RebalanceStatus.BEST_POSSIBLE_STATE_CAL_FAILED.name());
- monitor.setRebalanceState(ResourceMonitor.RebalanceStatus.INTERMEDIATE_STATE_CAL_FAILED);
- Assert.assertEquals(monitor.getRebalanceState(),
- ResourceMonitor.RebalanceStatus.INTERMEDIATE_STATE_CAL_FAILED.name());
+ // Verify results.
+ verifyPartitionWeightMetrics(mBeanServer, resourceObjectName, partitionWeightMap);
+
+ // "capacity2" metric should not exist in MBean server.
+ String removedAttribute = "capacity2Gauge";
+ try {
+ mBeanServer.getAttribute(resourceObjectName, removedAttribute);
+ Assert.fail("AttributeNotFoundException should be thrown because attribute [capacity2Gauge]"
+ + " is removed.");
+ } catch (AttributeNotFoundException expected) {
+ }
+ } finally {
+ // Reset monitor.
+ monitor.unregister();
+ Assert.assertFalse(mBeanServer.isRegistered(resourceObjectName),
+ "Failed to unregister resource monitor.");
+ }
}
/**
@@ -240,4 +301,28 @@ public class TestResourceMonitor {
return copy;
}
+
+ private void verifyPartitionWeightMetrics(MBeanServerConnection mBeanServer,
+ ObjectName objectName, Map<String, Map<String, Integer>> expectedPartitionWeightMap)
+ throws IOException, AttributeNotFoundException, MBeanException, ReflectionException,
+ InstanceNotFoundException {
+ final String gaugeMetricSuffix = "Gauge";
+ for (Map.Entry<String, Map<String, Integer>> entry : expectedPartitionWeightMap.entrySet()) {
+ // Resource monitor for this resource is already registered.
+ Assert.assertTrue(mBeanServer.isRegistered(objectName));
+
+ for (Map.Entry<String, Integer> capacityEntry : entry.getValue().entrySet()) {
+ String attributeName = capacityEntry.getKey() + gaugeMetricSuffix;
+ try {
+ // Wait until the attribute is already registered to mbean server.
+ Assert.assertTrue(TestHelper.verify(
+ () -> !mBeanServer.getAttributes(objectName, new String[]{attributeName}).isEmpty(),
+ 2000));
+ } catch (Exception ignored) {
+ }
+ Assert.assertEquals((long) mBeanServer.getAttribute(objectName, attributeName),
+ (long) capacityEntry.getValue());
+ }
+ }
+ }
}