You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2019/11/02 00:54:17 UTC
[helix] branch wagedRebalancer updated: Add max capacity usage
metric for instance monitor. (#548)
This is an automated email from the ASF dual-hosted git repository.
hulee pushed a commit to branch wagedRebalancer
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/wagedRebalancer by this push:
new 1d01466 Add max capacity usage metric for instance monitor. (#548)
1d01466 is described below
commit 1d014669321154a646a6fb9c60009a357742933f
Author: Huizhi L <ih...@gmail.com>
AuthorDate: Fri Nov 1 17:54:11 2019 -0700
Add max capacity usage metric for instance monitor. (#548)
We need to monitor instance's max utilization in purpose of understanding what the max capacity usage is and knowing the status of the instance.
Change list:
1. Change instance monitor to extend dynamic metric, and change code logic in ClusterStatusMonitor to adapt the InstanceMonitor changes.
2. Add APIs for get/update MaxCapacityUsage.
3. Add an API in cluster status monitor to update max capacity usage.
4. Add unit tests for instance monitor and updateing max capacity usage.
---
.../rebalancer/waged/WagedRebalancer.java | 22 +--
.../waged/model/ClusterModelProvider.java | 17 +++
.../stages/CurrentStateComputationStage.java | 42 ++++++
.../controller/stages/CurrentStateOutput.java | 24 ++++
.../monitoring/mbeans/ClusterStatusMonitor.java | 72 ++++++----
.../helix/monitoring/mbeans/InstanceMonitor.java | 149 +++++++++++++++------
.../monitoring/mbeans/InstanceMonitorMBean.java | 51 -------
.../mbeans/TestClusterStatusMonitor.java | 136 +++++++++++++------
.../monitoring/mbeans/TestInstanceMonitor.java | 76 +++++++++++
9 files changed, 418 insertions(+), 171 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index a88f809..605dcd1 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -540,7 +540,7 @@ public class WagedRebalancer {
}
if (currentBaseline.isEmpty()) {
LOG.warn("The current baseline assignment record is empty. Use the current states instead.");
- currentBaseline = getCurrentStateAssingment(currentStateOutput, resources);
+ currentBaseline = currentStateOutput.getAssignment(resources);
}
currentBaseline.keySet().retainAll(resources);
return currentBaseline;
@@ -575,30 +575,12 @@ public class WagedRebalancer {
if (currentBestAssignment.isEmpty()) {
LOG.warn(
"The current best possible assignment record is empty. Use the current states instead.");
- currentBestAssignment = getCurrentStateAssingment(currentStateOutput, resources);
+ currentBestAssignment = currentStateOutput.getAssignment(resources);
}
currentBestAssignment.keySet().retainAll(resources);
return currentBestAssignment;
}
- private Map<String, ResourceAssignment> getCurrentStateAssingment(
- CurrentStateOutput currentStateOutput, Set<String> resourceSet) {
- Map<String, ResourceAssignment> currentStateAssignment = new HashMap<>();
- for (String resourceName : resourceSet) {
- Map<Partition, Map<String, String>> currentStateMap =
- currentStateOutput.getCurrentStateMap(resourceName);
- if (!currentStateMap.isEmpty()) {
- ResourceAssignment newResourceAssignment = new ResourceAssignment(resourceName);
- currentStateMap.entrySet().stream().forEach(currentStateEntry -> {
- newResourceAssignment.addReplicaMap(currentStateEntry.getKey(),
- currentStateEntry.getValue());
- });
- currentStateAssignment.put(resourceName, newResourceAssignment);
- }
- }
- return currentStateAssignment;
- }
-
/**
* Schedule rebalance according to the delayed rebalance logic.
* @param clusterData the current cluster data cache
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
index 4722e7d..f777534 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
@@ -94,6 +94,23 @@ public class ClusterModelProvider {
}
/**
+ * Generate a cluster model based on the current state output and data cache.
+ * @param dataProvider The controller's data cache.
+ * @param resourceMap The full list of the resources to be rebalanced. Note that any
+ * resources that are not in this list will be removed from the
+ * final assignment.
+ * @param currentStateAssignment The resource assignment built from current state output.
+ * @return A cluster model based on the current state and data cache.
+ */
+ public static ClusterModel generateClusterModelFromCurrentState(
+ ResourceControllerDataProvider dataProvider,
+ Map<String, Resource> resourceMap,
+ Map<String, ResourceAssignment> currentStateAssignment) {
+ return generateClusterModel(dataProvider, resourceMap, dataProvider.getEnabledLiveInstances(),
+ Collections.emptyMap(), Collections.emptyMap(), currentStateAssignment);
+ }
+
+ /**
* Find the minimum set of replicas that need to be reassigned.
* A replica needs to be reassigned if one of the following condition is true:
* 1. Cluster topology (the cluster config / any instance config) has been updated.
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index d5c65cd..38c2261 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -20,14 +20,21 @@ package org.apache.helix.controller.stages;
*/
import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
import org.apache.helix.controller.LogUtil;
+import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterModel;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider;
import org.apache.helix.model.*;
import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,6 +52,8 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
_eventId = event.getEventId();
BaseControllerDataProvider cache = event.getAttribute(AttributeName.ControllerDataProvider.name());
final Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
+ final Map<String, Resource> resourceToRebalance =
+ event.getAttribute(AttributeName.RESOURCES_TO_REBALANCE.name());
if (cache == null || resourceMap == null) {
throw new StageException("Missing attributes in event:" + event
@@ -69,6 +78,13 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
updateCurrentStates(instance, currentStateMap.values(), currentStateOutput, resourceMap);
}
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+
+ final ClusterStatusMonitor clusterStatusMonitor =
+ event.getAttribute(AttributeName.clusterStatusMonitor.name());
+ if (clusterStatusMonitor != null && cache instanceof ResourceControllerDataProvider) {
+ reportInstanceCapacityMetrics(clusterStatusMonitor, (ResourceControllerDataProvider) cache,
+ resourceToRebalance, currentStateOutput);
+ }
}
// update all pending messages to CurrentStateOutput.
@@ -215,4 +231,30 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
currentStateOutput.setCancellationMessage(resourceName, partition, instanceName, message);
}
}
+
+ private void reportInstanceCapacityMetrics(ClusterStatusMonitor clusterStatusMonitor,
+ ResourceControllerDataProvider dataProvider, Map<String, Resource> resourceMap,
+ CurrentStateOutput currentStateOutput) {
+ asyncExecute(dataProvider.getAsyncTasksThreadPool(), () -> {
+ try {
+ Map<String, ResourceAssignment> currentStateAssignment =
+ currentStateOutput.getAssignment(resourceMap.keySet());
+ ClusterModel clusterModel = ClusterModelProvider.generateClusterModelFromCurrentState(
+ dataProvider, resourceMap, currentStateAssignment);
+
+ Map<String, Double> maxUsageMap = new HashMap<>();
+ for (AssignableNode node : clusterModel.getAssignableNodes().values()) {
+ String instanceName = node.getInstanceName();
+ double usage = node.getProjectedHighestUtilization(Collections.emptyMap());
+ maxUsageMap.put(instanceName, usage);
+ }
+
+ clusterStatusMonitor.updateInstanceMaxUsage(maxUsageMap);
+ } catch (Exception ex) {
+ LOG.error("Failed to report instance capacity metrics.", ex);
+ }
+
+ return null;
+ });
+ }
}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
index 13e1dbf..87dcae3 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
@@ -28,6 +28,8 @@ import org.apache.helix.model.Message;
import org.apache.helix.model.Partition;
import com.google.common.collect.Sets;
+import org.apache.helix.model.ResourceAssignment;
+
/**
* The current state includes both current state and pending messages
@@ -428,4 +430,26 @@ public class CurrentStateOutput {
return sb.toString();
}
+ /**
+ * Get current state assignment for a set of resources.
+ * @param resourceSet a set of resources' names
+ * @return a map of current state resource assignment, {resourceName: resourceAssignment}
+ */
+ public Map<String, ResourceAssignment> getAssignment(Set<String> resourceSet) {
+ Map<String, ResourceAssignment> currentStateAssignment = new HashMap<>();
+ for (String resourceName : resourceSet) {
+ Map<Partition, Map<String, String>> currentStateMap =
+ getCurrentStateMap(resourceName);
+ if (!currentStateMap.isEmpty()) {
+ ResourceAssignment newResourceAssignment = new ResourceAssignment(resourceName);
+ currentStateMap.entrySet().stream().forEach(currentStateEntry -> {
+ newResourceAssignment.addReplicaMap(currentStateEntry.getKey(),
+ currentStateEntry.getValue());
+ });
+ currentStateAssignment.put(resourceName, newResourceAssignment);
+ }
+ }
+
+ return currentStateAssignment;
+ }
}
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index 5e8c17a..97d0a96 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -236,27 +236,28 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
// Unregister beans for instances that are no longer configured
Set<String> toUnregister = Sets.newHashSet(_instanceMonitorMap.keySet());
toUnregister.removeAll(instanceSet);
- try {
- unregisterInstances(toUnregister);
- } catch (MalformedObjectNameException e) {
- LOG.error("Could not unregister instances from MBean server: " + toUnregister, e);
- }
+ unregisterInstances(toUnregister);
// Register beans for instances that are newly configured
Set<String> toRegister = Sets.newHashSet(instanceSet);
toRegister.removeAll(_instanceMonitorMap.keySet());
Set<InstanceMonitor> monitorsToRegister = Sets.newHashSet();
for (String instanceName : toRegister) {
- InstanceMonitor bean = new InstanceMonitor(_clusterName, instanceName);
- bean.updateInstance(tags.get(instanceName), disabledPartitions.get(instanceName),
- oldDisabledPartitions.get(instanceName), liveInstanceSet.contains(instanceName),
- !disabledInstanceSet.contains(instanceName));
- monitorsToRegister.add(bean);
+ try {
+ ObjectName objectName = getObjectName(getInstanceBeanName(instanceName));
+ InstanceMonitor bean = new InstanceMonitor(_clusterName, instanceName, objectName);
+ bean.updateInstance(tags.get(instanceName), disabledPartitions.get(instanceName),
+ oldDisabledPartitions.get(instanceName), liveInstanceSet.contains(instanceName),
+ !disabledInstanceSet.contains(instanceName));
+ monitorsToRegister.add(bean);
+ } catch (MalformedObjectNameException ex) {
+ LOG.error("Failed to create instance monitor for instance: {}.", instanceName);
+ }
}
try {
registerInstances(monitorsToRegister);
- } catch (MalformedObjectNameException e) {
- LOG.error("Could not register instances with MBean server: " + toRegister, e);
+ } catch (JMException e) {
+ LOG.error("Could not register instances with MBean server: {}.", toRegister, e);
}
// Update all the sets
@@ -282,8 +283,8 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
try {
unregisterInstances(Arrays.asList(instanceName));
registerInstances(Arrays.asList(bean));
- } catch (MalformedObjectNameException e) {
- LOG.error("Could not refresh registration with MBean server: " + instanceName, e);
+ } catch (JMException e) {
+ LOG.error("Could not refresh registration with MBean server: {}", instanceName, e);
}
}
}
@@ -366,6 +367,27 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
}
/**
+ * Update max capacity usage for per instance. Before calling this API, we assume the instance
+ * monitors are already registered in ReadClusterDataStage. If the monitor is not registered, this
+ * max usage update will fail.
+ *
+ * @param maxUsageMap a map of max capacity usage, {instance: maxCapacityUsage}
+ */
+ public void updateInstanceMaxUsage(Map<String, Double> maxUsageMap) {
+ synchronized (_instanceMonitorMap) {
+ for (Map.Entry<String, Double> entry : maxUsageMap.entrySet()) {
+ InstanceMonitor monitor = _instanceMonitorMap.get(entry.getKey());
+ if (monitor == null) {
+ LOG.warn("Failed to update max usage because instance monitor is not found, instance: {}.",
+ entry.getKey());
+ continue;
+ }
+ monitor.updateMaxCapacityUsage(entry.getValue());
+ }
+ }
+ }
+
+ /**
* Update gauges for resource at instance level
* @param bestPossibleStates
* @param resourceMap
@@ -694,31 +716,35 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
}
private void registerInstances(Collection<InstanceMonitor> instances)
- throws MalformedObjectNameException {
+ throws JMException {
synchronized (_instanceMonitorMap) {
for (InstanceMonitor monitor : instances) {
String instanceName = monitor.getInstanceName();
- String beanName = getInstanceBeanName(instanceName);
- register(monitor, getObjectName(beanName));
+ // If this instance MBean is already registered, unregister it.
+ InstanceMonitor removedMonitor = _instanceMonitorMap.remove(instanceName);
+ if (removedMonitor != null) {
+ removedMonitor.unregister();
+ }
+ monitor.register();
_instanceMonitorMap.put(instanceName, monitor);
}
}
}
- private void unregisterAllInstances() throws MalformedObjectNameException {
+ private void unregisterAllInstances() {
synchronized (_instanceMonitorMap) {
unregisterInstances(_instanceMonitorMap.keySet());
}
}
- private void unregisterInstances(Collection<String> instances)
- throws MalformedObjectNameException {
+ private void unregisterInstances(Collection<String> instances) {
synchronized (_instanceMonitorMap) {
for (String instanceName : instances) {
- String beanName = getInstanceBeanName(instanceName);
- unregister(getObjectName(beanName));
+ InstanceMonitor monitor = _instanceMonitorMap.remove(instanceName);
+ if (monitor != null) {
+ monitor.unregister();
+ }
}
- _instanceMonitorMap.keySet().removeAll(instances);
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
index dc43d48..eea6eaf 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
@@ -24,35 +24,86 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import javax.management.JMException;
+import javax.management.ObjectName;
+
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric;
+
/**
* Implementation of the instance status bean
*/
-public class InstanceMonitor implements InstanceMonitorMBean {
+public class InstanceMonitor extends DynamicMBeanProvider {
+ /**
+ * Metric names for instance capacity.
+ */
+ public enum InstanceMonitorMetrics {
+ // TODO: change the metric names with Counter and Gauge suffix and deprecate old names.
+ TOTAL_MESSAGE_RECEIVED_COUNTER("TotalMessageReceived"),
+ ENABLED_STATUS_GAUGE("Enabled"),
+ ONLINE_STATUS_GAUGE("Online"),
+ DISABLED_PARTITIONS_GAUGE("DisabledPartitions"),
+ MAX_CAPACITY_USAGE_GAUGE("MaxCapacityUsageGauge");
+
+ private String metricName;
+
+ InstanceMonitorMetrics(String name) {
+ metricName = name;
+ }
+
+ public String metricName() {
+ return metricName;
+ }
+ }
+
private final String _clusterName;
private final String _participantName;
+ private final ObjectName _initObjectName;
+
private List<String> _tags;
- private long _disabledPartitions;
- private boolean _isUp;
- private boolean _isEnabled;
- private long _totalMessageReceived;
+
+ // Counters
+ private SimpleDynamicMetric<Long> _totalMessagedReceivedCounter;
+
+ // Gauges
+ private SimpleDynamicMetric<Long> _enabledStatusGauge;
+ private SimpleDynamicMetric<Long> _disabledPartitionsGauge;
+ private SimpleDynamicMetric<Long> _onlineStatusGauge;
+ private SimpleDynamicMetric<Double> _maxCapacityUsageGauge;
/**
* Initialize the bean
* @param clusterName the cluster to monitor
* @param participantName the instance whose statistics this holds
*/
- public InstanceMonitor(String clusterName, String participantName) {
+ public InstanceMonitor(String clusterName, String participantName, ObjectName objectName) {
_clusterName = clusterName;
_participantName = participantName;
_tags = ImmutableList.of(ClusterStatusMonitor.DEFAULT_TAG);
- _disabledPartitions = 0L;
- _isUp = false;
- _isEnabled = false;
- _totalMessageReceived = 0;
+ _initObjectName = objectName;
+
+ createMetrics();
+ }
+
+ private void createMetrics() {
+ _totalMessagedReceivedCounter = new SimpleDynamicMetric<>(
+ InstanceMonitorMetrics.TOTAL_MESSAGE_RECEIVED_COUNTER.metricName(), 0L);
+
+ _disabledPartitionsGauge =
+ new SimpleDynamicMetric<>(InstanceMonitorMetrics.DISABLED_PARTITIONS_GAUGE.metricName(),
+ 0L);
+ _enabledStatusGauge =
+ new SimpleDynamicMetric<>(InstanceMonitorMetrics.ENABLED_STATUS_GAUGE.metricName(), 0L);
+ _onlineStatusGauge =
+ new SimpleDynamicMetric<>(InstanceMonitorMetrics.ONLINE_STATUS_GAUGE.metricName(), 0L);
+ _maxCapacityUsageGauge =
+ new SimpleDynamicMetric<>(InstanceMonitorMetrics.MAX_CAPACITY_USAGE_GAUGE.metricName(),
+ 0.0d);
}
@Override
@@ -61,44 +112,32 @@ public class InstanceMonitor implements InstanceMonitorMBean {
serializedTags(), _participantName);
}
- @Override
- public long getOnline() {
- return _isUp ? 1 : 0;
- }
-
- @Override
- public long getEnabled() {
- return _isEnabled ? 1 : 0;
+ protected long getOnline() {
+ return _onlineStatusGauge.getValue();
}
- @Override
- public long getTotalMessageReceived() {
- return _totalMessageReceived;
+ protected long getEnabled() {
+ return _enabledStatusGauge.getValue();
}
- @Override
- public long getDisabledPartitions() {
- return _disabledPartitions;
+ protected long getTotalMessageReceived() {
+ return _totalMessagedReceivedCounter.getValue();
}
- /**
- * Get all the tags currently on this instance
- * @return list of tags
- */
- public List<String> getTags() {
- return _tags;
+ protected long getDisabledPartitions() {
+ return _disabledPartitionsGauge.getValue();
}
/**
* Get the name of the monitored instance
* @return instance name as a string
*/
- public String getInstanceName() {
+ protected String getInstanceName() {
return _participantName;
}
private String serializedTags() {
- return Joiner.on('|').skipNulls().join(_tags).toString();
+ return Joiner.on('|').skipNulls().join(_tags);
}
/**
@@ -117,20 +156,22 @@ public class InstanceMonitor implements InstanceMonitorMBean {
_tags = Lists.newArrayList(tags);
Collections.sort(_tags);
}
- _disabledPartitions = 0L;
+ long numDisabledPartitions = 0L;
if (disabledPartitions != null) {
for (List<String> partitions : disabledPartitions.values()) {
if (partitions != null) {
- _disabledPartitions += partitions.size();
+ numDisabledPartitions += partitions.size();
}
}
}
// TODO : Get rid of this when old API removed.
if (oldDisabledPartitions != null) {
- _disabledPartitions += oldDisabledPartitions.size();
+ numDisabledPartitions += oldDisabledPartitions.size();
}
- _isUp = isLive;
- _isEnabled = isEnabled;
+
+ _onlineStatusGauge.updateValue(isLive ? 1L : 0L);
+ _enabledStatusGauge.updateValue(isEnabled ? 1L : 0L);
+ _disabledPartitionsGauge.updateValue(numDisabledPartitions);
}
/**
@@ -138,7 +179,39 @@ public class InstanceMonitor implements InstanceMonitorMBean {
* @param messageReceived received message numbers
*/
public synchronized void increaseMessageCount(long messageReceived) {
- _totalMessageReceived += messageReceived;
+ _totalMessagedReceivedCounter
+ .updateValue(_totalMessagedReceivedCounter.getValue() + messageReceived);
}
+ /**
+ * Update max capacity usage for this instance.
+ * @param maxUsage max capacity usage of this instance
+ */
+ public synchronized void updateMaxCapacityUsage(double maxUsage) {
+ _maxCapacityUsageGauge.updateValue(maxUsage);
+ }
+
+ /**
+ * Get max capacity usage of this instance.
+ * @return Max capacity usage of this instance.
+ */
+ protected synchronized double getMaxCapacityUsageGauge() {
+ return _maxCapacityUsageGauge.getValue();
+ }
+
+ @Override
+ public DynamicMBeanProvider register()
+ throws JMException {
+ List<DynamicMetric<?, ?>> attributeList = ImmutableList.of(
+ _totalMessagedReceivedCounter,
+ _disabledPartitionsGauge,
+ _enabledStatusGauge,
+ _onlineStatusGauge,
+ _maxCapacityUsageGauge
+ );
+
+ doRegister(attributeList, _initObjectName);
+
+ return this;
+ }
}
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitorMBean.java
deleted file mode 100644
index a3221d8..0000000
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitorMBean.java
+++ /dev/null
@@ -1,51 +0,0 @@
-package org.apache.helix.monitoring.mbeans;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.monitoring.SensorNameProvider;
-
-/**
- * A basic bean describing the status of a single instance
- */
-public interface InstanceMonitorMBean extends SensorNameProvider {
- /**
- * Check if this instance is live
- * @return 1 if running, 0 otherwise
- */
- public long getOnline();
-
- /**
- * Check if this instance is enabled
- * @return 1 if enabled, 0 if disabled
- */
- public long getEnabled();
-
- /**
- * Get total message received for this instances
- * @return The total number of messages sent to this instance
- */
- public long getTotalMessageReceived();
-
- /**
- * Get the total disabled partitions number for this instance
- * @return The total number of disabled partitions
- */
- public long getDisabledPartitions();
-}
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
index b2daba6..cdd8688 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
@@ -19,18 +19,27 @@ package org.apache.helix.monitoring.mbeans;
* under the License.
*/
-import com.beust.jcommander.internal.Maps;
+import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Date;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import javax.management.AttributeNotFoundException;
import javax.management.InstanceNotFoundException;
import javax.management.JMException;
+import javax.management.MBeanException;
import javax.management.MBeanServerConnection;
+import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
+import javax.management.ReflectionException;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
import org.apache.helix.controller.stages.BestPossibleStateOutput;
@@ -47,13 +56,15 @@ import org.apache.helix.tools.StateModelConfigGenerator;
import org.testng.Assert;
import org.testng.annotations.Test;
+
public class TestClusterStatusMonitor {
private static final MBeanServerConnection _server = ManagementFactory.getPlatformMBeanServer();
- String testDB = "TestDB";
- String testDB_0 = testDB + "_0";
+ private String testDB = "TestDB";
+ private String testDB_0 = testDB + "_0";
@Test()
- public void testReportData() throws Exception {
+ public void testReportData()
+ throws Exception {
String className = TestHelper.getTestClassName();
String methodName = TestHelper.getTestMethodName();
String clusterName = className + "_" + methodName;
@@ -64,11 +75,8 @@ public class TestClusterStatusMonitor {
ClusterStatusMonitor monitor = new ClusterStatusMonitor(clusterName);
monitor.active();
ObjectName clusterMonitorObjName = monitor.getObjectName(monitor.clusterBeanName());
- try {
- _server.getMBeanInfo(clusterMonitorObjName);
- } catch (Exception e) {
- Assert.fail("Fail to register ClusterStatusMonitor");
- }
+
+ Assert.assertTrue(_server.isRegistered(clusterMonitorObjName));
// Test #setPerInstanceResourceStatus()
BestPossibleStateOutput bestPossibleStates = new BestPossibleStateOutput();
@@ -137,42 +145,30 @@ public class TestClusterStatusMonitor {
"localhost_12918");
monitor.setPerInstanceResourceStatus(bestPossibleStates, instanceConfigMap, resourceMap,
stateModelDefMap);
- try {
- objName =
- monitor.getObjectName(monitor.getPerInstanceResourceBeanName("localhost_12918", testDB));
- _server.getMBeanInfo(objName);
- Assert.fail("Fail to unregister PerInstanceResource mbean for localhost_12918");
-
- } catch (InstanceNotFoundException e) {
- // OK
- }
+
+ objName =
+ monitor.getObjectName(monitor.getPerInstanceResourceBeanName("localhost_12918", testDB));
+ Assert.assertFalse(_server.isRegistered(objName),
+ "Fail to unregister PerInstanceResource mbean for localhost_12918");
// Clean up
monitor.reset();
- try {
- objName =
- monitor.getObjectName(monitor.getPerInstanceResourceBeanName("localhost_12920", testDB));
- _server.getMBeanInfo(objName);
- Assert.fail("Fail to unregister PerInstanceResource mbean for localhost_12920");
-
- } catch (InstanceNotFoundException e) {
- // OK
- }
+ objName =
+ monitor.getObjectName(monitor.getPerInstanceResourceBeanName("localhost_12920", testDB));
+ Assert.assertFalse(_server.isRegistered(objName),
+ "Fail to unregister PerInstanceResource mbean for localhost_12920");
- try {
- _server.getMBeanInfo(clusterMonitorObjName);
- Assert.fail("Fail to unregister ClusterStatusMonitor");
- } catch (InstanceNotFoundException e) {
- // OK
- }
+ Assert.assertFalse(_server.isRegistered(clusterMonitorObjName),
+ "Failed to unregister ClusterStatusMonitor.");
System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
}
@Test
- public void testResourceAggregation() throws JMException {
+ public void testResourceAggregation()
+ throws JMException, IOException {
String className = TestHelper.getTestClassName();
String methodName = TestHelper.getTestMethodName();
String clusterName = className + "_" + methodName;
@@ -182,11 +178,8 @@ public class TestClusterStatusMonitor {
ClusterStatusMonitor monitor = new ClusterStatusMonitor(clusterName);
monitor.active();
ObjectName clusterMonitorObjName = monitor.getObjectName(monitor.clusterBeanName());
- try {
- _server.getMBeanInfo(clusterMonitorObjName);
- } catch (Exception e) {
- Assert.fail("Fail to register ClusterStatusMonitor");
- }
+
+ Assert.assertTrue(_server.isRegistered(clusterMonitorObjName));
int numInstance = 5;
int numPartition = 10;
@@ -315,5 +308,70 @@ public class TestClusterStatusMonitor {
messageCount = new Random().nextInt(numPartition) + 1;
monitor.setResourceStatus(externalView, idealState, stateModelDef, messageCount);
Assert.assertEquals(monitor.getPendingStateTransitionGuage(), messageCount);
+
+ // Reset monitor.
+ monitor.reset();
+ Assert.assertFalse(_server.isRegistered(clusterMonitorObjName),
+ "Failed to unregister ClusterStatusMonitor.");
+ }
+
+ @Test
+ public void testUpdateMaxCapacityUsage()
+ throws MalformedObjectNameException, IOException, AttributeNotFoundException, MBeanException,
+ ReflectionException, InstanceNotFoundException {
+ String clusterName = "testCluster";
+ List<Double> maxUsageList = ImmutableList.of(0.0d, 0.32d, 0.85d, 1.0d, 0.50d, 0.75d);
+ Map<String, Double> maxUsageMap = new HashMap<>();
+ for (int i = 0; i < maxUsageList.size(); i++) {
+ maxUsageMap.put("instance" + i, maxUsageList.get(i));
+ }
+
+ // Setup cluster status monitor.
+ ClusterStatusMonitor monitor = new ClusterStatusMonitor(clusterName);
+ monitor.active();
+ ObjectName clusterMonitorObjName = monitor.getObjectName(monitor.clusterBeanName());
+
+ Assert.assertTrue(_server.isRegistered(clusterMonitorObjName));
+
+ // Before calling setClusterInstanceStatus, instance monitors are not yet registered.
+ for (Map.Entry<String, Double> entry : maxUsageMap.entrySet()) {
+ String instance = entry.getKey();
+ String instanceBeanName =
+ String.format("%s,%s=%s", monitor.clusterBeanName(), monitor.INSTANCE_DN_KEY, instance);
+ ObjectName instanceObjectName = monitor.getObjectName(instanceBeanName);
+
+ Assert.assertFalse(_server.isRegistered(instanceObjectName));
+ }
+
+ // Call setClusterInstanceStatus to register instance monitors.
+ monitor.setClusterInstanceStatus(maxUsageMap.keySet(), maxUsageMap.keySet(),
+ Collections.emptySet(), Collections.emptyMap(), Collections.emptyMap(),
+ Collections.emptyMap());
+ // Update max usage stats.
+ monitor.updateInstanceMaxUsage(maxUsageMap);
+
+ // Verify results.
+ for (Map.Entry<String, Double> entry : maxUsageMap.entrySet()) {
+ String instance = entry.getKey();
+ double usage = entry.getValue();
+ String instanceBeanName =
+ String.format("%s,%s=%s", monitor.clusterBeanName(), monitor.INSTANCE_DN_KEY, instance);
+ ObjectName instanceObjectName = monitor.getObjectName(instanceBeanName);
+
+ Assert.assertTrue(_server.isRegistered(instanceObjectName));
+ Assert.assertEquals(_server.getAttribute(instanceObjectName, "MaxCapacityUsageGauge"), usage);
+ }
+
+ // Reset monitor.
+ monitor.reset();
+ Assert.assertFalse(_server.isRegistered(clusterMonitorObjName),
+ "Failed to unregister ClusterStatusMonitor.");
+ for (String instance : maxUsageMap.keySet()) {
+ String instanceBeanName =
+ String.format("%s,%s=%s", monitor.clusterBeanName(), monitor.INSTANCE_DN_KEY, instance);
+ ObjectName instanceObjectName = monitor.getObjectName(instanceBeanName);
+ Assert.assertFalse(_server.isRegistered(instanceObjectName),
+ "Failed to unregister instance monitor for instance: " + instance);
+ }
}
}
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestInstanceMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestInstanceMonitor.java
new file mode 100644
index 0000000..709c5f7
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestInstanceMonitor.java
@@ -0,0 +1,76 @@
+package org.apache.helix.monitoring.mbeans;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.management.JMException;
+import javax.management.ObjectName;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestInstanceMonitor {
+ @Test
+ public void testInstanceMonitor()
+ throws JMException {
+ String testCluster = "testCluster";
+ String testInstance = "testInstance";
+ String testDomain = "testDomain:key=value";
+ Set<String> tags = ImmutableSet.of("test", "DEFAULT");
+ Map<String, List<String>> disabledPartitions =
+ ImmutableMap.of("instance1", ImmutableList.of("partition1", "partition2"));
+ InstanceMonitor monitor =
+ new InstanceMonitor(testCluster, testInstance, new ObjectName(testDomain));
+
+ // Verify init status.
+ Assert.assertEquals(monitor.getSensorName(),
+ "ParticipantStatus.testCluster.DEFAULT.testInstance");
+ Assert.assertEquals(monitor.getInstanceName(), testInstance);
+ Assert.assertEquals(monitor.getOnline(), 0L);
+ Assert.assertEquals(monitor.getEnabled(), 0L);
+ Assert.assertEquals(monitor.getTotalMessageReceived(), 0L);
+ Assert.assertEquals(monitor.getDisabledPartitions(), 0L);
+ Assert.assertEquals(monitor.getMaxCapacityUsageGauge(), 0.0d);
+
+ // Update metrics.
+ monitor.updateMaxCapacityUsage(0.5d);
+ monitor.increaseMessageCount(10L);
+ monitor.updateInstance(tags, disabledPartitions, Collections.emptyList(), true, true);
+
+ // Verify metrics.
+ Assert.assertEquals(monitor.getTotalMessageReceived(), 10L);
+ Assert.assertEquals(monitor.getSensorName(),
+ "ParticipantStatus.testCluster.DEFAULT|test.testInstance");
+ Assert.assertEquals(monitor.getInstanceName(), testInstance);
+ Assert.assertEquals(monitor.getOnline(), 1L);
+ Assert.assertEquals(monitor.getEnabled(), 1L);
+ Assert.assertEquals(monitor.getDisabledPartitions(), 2L);
+ Assert.assertEquals(monitor.getMaxCapacityUsageGauge(), 0.5d);
+
+ monitor.unregister();
+ }
+}