You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2014/05/14 19:25:54 UTC
git commit: [HELIX-444] add per-participant partition count gauges to
helix, rb=21419
Repository: helix
Updated Branches:
refs/heads/helix-0.6.2-release 7b5250a34 -> 9661fd2f8
[HELIX-444] add per-participant partition count gauges to helix, rb=21419
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/9661fd2f
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/9661fd2f
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/9661fd2f
Branch: refs/heads/helix-0.6.2-release
Commit: 9661fd2f832c904377959ece434e769c34f87f99
Parents: 7b5250a
Author: zzhang <zz...@apache.org>
Authored: Wed May 14 10:25:31 2014 -0700
Committer: zzhang <zz...@apache.org>
Committed: Wed May 14 10:25:31 2014 -0700
----------------------------------------------------------------------
.../stages/BestPossibleStateCalcStage.java | 8 +
.../stages/BestPossibleStateOutput.java | 44 ++--
.../controller/stages/ClusterDataCache.java | 39 +---
.../stages/ExternalViewComputeStage.java | 2 +-
.../monitoring/mbeans/ClusterStatusMonitor.java | 181 +++++++++++++---
.../monitoring/mbeans/InstanceMonitor.java | 4 +-
.../mbeans/PerInstanceResourceMonitor.java | 146 +++++++++++++
.../mbeans/PerInstanceResourceMonitorMBean.java | 34 +++
.../monitoring/mbeans/ResourceMonitor.java | 21 +-
.../TestClusterStatusMonitorLifecycle.java | 40 ++--
.../mbeans/TestClusterStatusMonitor.java | 210 +++++++++++--------
.../monitoring/mbeans/TestResourceMonitor.java | 127 +++--------
12 files changed, 582 insertions(+), 274 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/9661fd2f/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 458218c..a4aa4a3 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -32,6 +32,7 @@ import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
+import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
import org.apache.helix.util.HelixUtil;
import org.apache.log4j.Logger;
@@ -61,6 +62,13 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
compute(event, resourceMap, currentStateOutput);
event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.toString(), bestPossibleStateOutput);
+ ClusterStatusMonitor clusterStatusMonitor =
+ (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
+ if (clusterStatusMonitor != null) {
+ clusterStatusMonitor.setPerInstanceResourceStatus(bestPossibleStateOutput,
+ cache.getInstanceConfigMap(), resourceMap, cache.getStateModelDefMap());
+ }
+
long endTime = System.currentTimeMillis();
logger.info("END BestPossibleStateCalcStage.process(). took: " + (endTime - startTime) + " ms");
}
http://git-wip-us.apache.org/repos/asf/helix/blob/9661fd2f/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java
index 3da9bef..3dc4568 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java
@@ -22,36 +22,52 @@ package org.apache.helix.controller.stages;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
import org.apache.helix.model.Partition;
public class BestPossibleStateOutput {
- // resource->partition->instance->state
- Map<String, Map<Partition, Map<String, String>>> _dataMap;
+ // Map of resource->partition->instance->state
+ Map<String, Map<Partition, Map<String, String>>> _stateMap;
public BestPossibleStateOutput() {
- _dataMap = new HashMap<String, Map<Partition, Map<String, String>>>();
+ _stateMap = new HashMap<String, Map<Partition, Map<String, String>>>();
}
- public void setState(String resourceName, Partition resource,
+ public Set<String> resourceSet() {
+ return _stateMap.keySet();
+ }
+
+ public void setState(String resourceName, Partition partition,
Map<String, String> bestInstanceStateMappingForResource) {
- if (!_dataMap.containsKey(resourceName)) {
- _dataMap.put(resourceName, new HashMap<Partition, Map<String, String>>());
+ if (!_stateMap.containsKey(resourceName)) {
+ _stateMap.put(resourceName, new HashMap<Partition, Map<String, String>>());
+ }
+ Map<Partition, Map<String, String>> map = _stateMap.get(resourceName);
+ map.put(partition, bestInstanceStateMappingForResource);
+ }
+
+ public void setState(String resourceName, Partition partition, String instance, String state) {
+ if (!_stateMap.containsKey(resourceName)) {
+ _stateMap.put(resourceName, new HashMap<Partition, Map<String, String>>());
+ }
+
+ if (!_stateMap.get(resourceName).containsKey(partition)) {
+ _stateMap.get(resourceName).put(partition, new HashMap<String, String>());
}
- Map<Partition, Map<String, String>> map = _dataMap.get(resourceName);
- map.put(resource, bestInstanceStateMappingForResource);
+ _stateMap.get(resourceName).get(partition).put(instance, state);
}
- public Map<String, String> getInstanceStateMap(String resourceName, Partition resource) {
- Map<Partition, Map<String, String>> map = _dataMap.get(resourceName);
+ public Map<String, String> getInstanceStateMap(String resourceName, Partition partition) {
+ Map<Partition, Map<String, String>> map = _stateMap.get(resourceName);
if (map != null) {
- return map.get(resource);
+ return map.get(partition);
}
return Collections.emptyMap();
}
public Map<Partition, Map<String, String>> getResourceMap(String resourceName) {
- Map<Partition, Map<String, String>> map = _dataMap.get(resourceName);
+ Map<Partition, Map<String, String>> map = _stateMap.get(resourceName);
if (map != null) {
return map;
}
@@ -59,11 +75,11 @@ public class BestPossibleStateOutput {
}
public Map<String, Map<Partition, Map<String, String>>> getStateMap() {
- return _dataMap;
+ return _stateMap;
}
@Override
public String toString() {
- return _dataMap.toString();
+ return _stateMap.toString();
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/9661fd2f/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index 2bf9f11..427873f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -312,37 +312,6 @@ public class ClusterDataCache {
}
}
- // public HealthStat getGlobalStats()
- // {
- // return _globalStats;
- // }
- //
- // public PersistentStats getPersistentStats()
- // {
- // return _persistentStats;
- // }
- //
- // public Alerts getAlerts()
- // {
- // return _alerts;
- // }
- //
- // public AlertStatus getAlertStatus()
- // {
- // return _alertStatus;
- // }
- //
- // public Map<String, HealthStat> getHealthStats(String instanceName)
- // {
- // Map<String, HealthStat> map = _healthStatMap.get(instanceName);
- // if (map != null)
- // {
- // return map;
- // } else
- // {
- // return Collections.emptyMap();
- // }
- // }
/**
* Provides the state model definition for a given state model
* @param stateModelDefRef
@@ -354,6 +323,14 @@ public class ClusterDataCache {
}
/**
+ * Provides all state model definitions
+ * @return state model definition map
+ */
+ public Map<String, StateModelDefinition> getStateModelDefMap() {
+ return _stateModelDefMap;
+ }
+
+ /**
* Provides the idealstate for a given resource
* @param resourceName
* @return
http://git-wip-us.apache.org/repos/asf/helix/blob/9661fd2f/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
index 35ef177..7ef4584 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@ -111,7 +111,7 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
if (clusterStatusMonitor != null
&& !idealState.getStateModelDefRef().equalsIgnoreCase(
DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
- clusterStatusMonitor.onExternalViewChange(view,
+ clusterStatusMonitor.setResourceStatus(view,
cache._idealStateMap.get(view.getResourceName()));
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/9661fd2f/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index b468856..5389e76 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -23,6 +23,7 @@ import java.lang.management.ManagementFactory;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
@@ -32,8 +33,13 @@ import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
+import org.apache.helix.controller.stages.BestPossibleStateOutput;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.StateModelDefinition;
import org.apache.log4j.Logger;
import com.google.common.collect.Sets;
@@ -41,15 +47,15 @@ import com.google.common.collect.Sets;
public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
private static final Logger LOG = Logger.getLogger(ClusterStatusMonitor.class);
- static final String CLUSTER_STATUS_KEY = "ClusterStatus";
+ public static final String CLUSTER_STATUS_KEY = "ClusterStatus";
static final String MESSAGE_QUEUE_STATUS_KEY = "MessageQueueStatus";
static final String RESOURCE_STATUS_KEY = "ResourceStatus";
- static final String PARTICIPANT_STATUS_KEY = "ParticipantStatus";
+ public static final String PARTICIPANT_STATUS_KEY = "ParticipantStatus";
static final String CLUSTER_DN_KEY = "cluster";
static final String RESOURCE_DN_KEY = "resourceName";
static final String INSTANCE_DN_KEY = "instanceName";
- static final String DEFAULT_TAG = "DEFAULT";
+ public static final String DEFAULT_TAG = "DEFAULT";
private final String _clusterName;
private final MBeanServer _beanServer;
@@ -68,20 +74,27 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
private final ConcurrentHashMap<String, InstanceMonitor> _instanceMbeanMap =
new ConcurrentHashMap<String, InstanceMonitor>();
+ /**
+ * PerInstanceResource bean map: beanName->bean
+ */
+ private final Map<PerInstanceResourceMonitor.BeanName, PerInstanceResourceMonitor> _perInstanceResourceMap =
+ new ConcurrentHashMap<PerInstanceResourceMonitor.BeanName, PerInstanceResourceMonitor>();
+
public ClusterStatusMonitor(String clusterName) {
_clusterName = clusterName;
_beanServer = ManagementFactory.getPlatformMBeanServer();
try {
- register(this, getObjectName(CLUSTER_DN_KEY + "=" + _clusterName));
+ register(this, getObjectName(clusterBeanName()));
} catch (Exception e) {
- LOG.error("Register self failed.", e);
+ LOG.error("Fail to regiter ClusterStatusMonitor", e);
}
}
public ObjectName getObjectName(String name) throws MalformedObjectNameException {
- return new ObjectName(CLUSTER_STATUS_KEY + ": " + name);
+ return new ObjectName(String.format("%s: %s", CLUSTER_STATUS_KEY, name));
}
+ // TODO remove getBeanName()?
// Used by other external JMX consumers like ingraph
public String getBeanName() {
return CLUSTER_STATUS_KEY + " " + _clusterName;
@@ -144,21 +157,21 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
}
try {
- LOG.info("Registering " + name.toString());
+ LOG.info("Register MBean: " + name);
_beanServer.registerMBean(bean, name);
} catch (Exception e) {
- LOG.warn("Could not register MBean" + name, e);
+ LOG.warn("Could not register MBean: " + name, e);
}
}
private void unregister(ObjectName name) {
try {
if (_beanServer.isRegistered(name)) {
- LOG.info("Unregistering " + name.toString());
+ LOG.info("Unregister MBean: " + name);
_beanServer.unregisterMBean(name);
}
} catch (Exception e) {
- LOG.warn("Could not unregister MBean" + name, e);
+ LOG.warn("Could not unregister MBean: " + name, e);
}
}
@@ -227,28 +240,100 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
}
}
- public void onExternalViewChange(ExternalView externalView, IdealState idealState) {
+ /**
+ * Update gauges for resource at instance level
+ * @param bestPossibleStates
+ * @param resourceMap
+ * @param stateModelDefMap
+ */
+ public void setPerInstanceResourceStatus(BestPossibleStateOutput bestPossibleStates,
+ Map<String, InstanceConfig> instanceConfigMap, Map<String, Resource> resourceMap,
+ Map<String, StateModelDefinition> stateModelDefMap) {
+
+ // Convert to perInstanceResource beanName->partition->state
+ Map<PerInstanceResourceMonitor.BeanName, Map<Partition, String>> beanMap =
+ new HashMap<PerInstanceResourceMonitor.BeanName, Map<Partition, String>>();
+
+ for (String resource : bestPossibleStates.resourceSet()) {
+ Map<Partition, Map<String, String>> partitionStateMap =
+ bestPossibleStates.getResourceMap(resource);
+ for (Partition partition : partitionStateMap.keySet()) {
+ Map<String, String> instanceStateMap = partitionStateMap.get(partition);
+ for (String instance : instanceStateMap.keySet()) {
+ String state = instanceStateMap.get(instance);
+ PerInstanceResourceMonitor.BeanName beanName =
+ new PerInstanceResourceMonitor.BeanName(instance, resource);
+ if (!beanMap.containsKey(beanName)) {
+ beanMap.put(beanName, new HashMap<Partition, String>());
+ }
+ beanMap.get(beanName).put(partition, state);
+ }
+ }
+ }
+
+ // Unregister beans for per-instance resources that no longer exist
+ Set<PerInstanceResourceMonitor.BeanName> toUnregister =
+ Sets.newHashSet(_perInstanceResourceMap.keySet());
+ toUnregister.removeAll(beanMap.keySet());
+ try {
+ unregisterPerInstanceResources(toUnregister);
+ } catch (MalformedObjectNameException e) {
+ LOG.error("Fail to unregister per-instance resource from MBean server: " + toUnregister, e);
+ }
+
+ // Register beans for per-instance resources that are newly configured
+ Set<PerInstanceResourceMonitor.BeanName> toRegister = Sets.newHashSet(beanMap.keySet());
+ toRegister.removeAll(_perInstanceResourceMap.keySet());
+ Set<PerInstanceResourceMonitor> monitorsToRegister = Sets.newHashSet();
+ for (PerInstanceResourceMonitor.BeanName beanName : toRegister) {
+ PerInstanceResourceMonitor bean =
+ new PerInstanceResourceMonitor(_clusterName, beanName.instanceName(),
+ beanName.resourceName());
+ String stateModelDefName = resourceMap.get(beanName.resourceName()).getStateModelDefRef();
+ InstanceConfig config = instanceConfigMap.get(beanName.instanceName());
+ bean.update(beanMap.get(beanName), Sets.newHashSet(config.getTags()),
+ stateModelDefMap.get(stateModelDefName));
+ monitorsToRegister.add(bean);
+ }
+
+ try {
+ registerPerInstanceResources(monitorsToRegister);
+ } catch (MalformedObjectNameException e) {
+ LOG.error("Fail to register per-instance resource with MBean server: " + toRegister, e);
+ }
+
+ // Update existing beans
+ for (PerInstanceResourceMonitor.BeanName beanName : _perInstanceResourceMap.keySet()) {
+ PerInstanceResourceMonitor bean = _perInstanceResourceMap.get(beanName);
+ String stateModelDefName = resourceMap.get(beanName.resourceName()).getStateModelDefRef();
+ InstanceConfig config = instanceConfigMap.get(beanName.instanceName());
+ bean.update(beanMap.get(beanName), Sets.newHashSet(config.getTags()),
+ stateModelDefMap.get(stateModelDefName));
+ }
+ }
+
+ public void setResourceStatus(ExternalView externalView, IdealState idealState) {
try {
String resourceName = externalView.getId();
if (!_resourceMbeanMap.containsKey(resourceName)) {
synchronized (this) {
if (!_resourceMbeanMap.containsKey(resourceName)) {
ResourceMonitor bean = new ResourceMonitor(_clusterName, resourceName);
- bean.updateExternalView(externalView, idealState);
+ bean.updateResource(externalView, idealState);
registerResources(Arrays.asList(bean));
}
}
}
ResourceMonitor bean = _resourceMbeanMap.get(resourceName);
String oldSensorName = bean.getSensorName();
- bean.updateExternalView(externalView, idealState);
+ bean.updateResource(externalView, idealState);
String newSensorName = bean.getSensorName();
if (!oldSensorName.equals(newSensorName)) {
unregisterResources(Arrays.asList(resourceName));
registerResources(Arrays.asList(bean));
}
} catch (Exception e) {
- LOG.warn(e);
+ LOG.error("Fail to set resource status", e);
}
}
@@ -264,18 +349,14 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
}
_instanceMsgQueueMbeanMap.get(instanceName).addMessageQueueSize(msgQueueSize);
} catch (Exception e) {
- LOG.warn("fail to add message queue size to mbean", e);
+ LOG.error("Fail to add message queue size to mbean", e);
}
}
public void reset() {
- LOG.info("Resetting ClusterStatusMonitor");
+ LOG.info("Reset ClusterStatusMonitor");
try {
- for (String resourceName : _resourceMbeanMap.keySet()) {
- String beanName =
- CLUSTER_DN_KEY + "=" + _clusterName + "," + RESOURCE_DN_KEY + "=" + resourceName;
- unregister(getObjectName(beanName));
- }
+ unregisterResources(_resourceMbeanMap.keySet());
_resourceMbeanMap.clear();
for (MessageQueueMonitor bean : _instanceMsgQueueMbeanMap.values()) {
@@ -286,9 +367,11 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
unregisterInstances(_instanceMbeanMap.keySet());
_instanceMbeanMap.clear();
- unregister(getObjectName(CLUSTER_DN_KEY + "=" + _clusterName));
+ unregisterPerInstanceResources(_perInstanceResourceMap.keySet());
+
+ unregister(getObjectName(clusterBeanName()));
} catch (Exception e) {
- LOG.error("fail to reset ClusterStatusMonitor", e);
+ LOG.error("Fail to reset ClusterStatusMonitor", e);
}
}
@@ -311,6 +394,28 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
_instanceMbeanMap.keySet().removeAll(instances);
}
+ private synchronized void registerPerInstanceResources(
+ Collection<PerInstanceResourceMonitor> monitors) throws MalformedObjectNameException {
+ for (PerInstanceResourceMonitor monitor : monitors) {
+ String instanceName = monitor.getInstanceName();
+ String resourceName = monitor.getResourceName();
+ String beanName = getPerInstanceResourceBeanName(instanceName, resourceName);
+ register(monitor, getObjectName(beanName));
+ _perInstanceResourceMap.put(new PerInstanceResourceMonitor.BeanName(instanceName,
+ resourceName), monitor);
+ }
+ }
+
+ private synchronized void unregisterPerInstanceResources(
+ Collection<PerInstanceResourceMonitor.BeanName> beanNames)
+ throws MalformedObjectNameException {
+ for (PerInstanceResourceMonitor.BeanName beanName : beanNames) {
+ unregister(getObjectName(getPerInstanceResourceBeanName(beanName.instanceName(),
+ beanName.resourceName())));
+ }
+ _perInstanceResourceMap.keySet().removeAll(beanNames);
+ }
+
private synchronized void registerResources(Collection<ResourceMonitor> resources)
throws MalformedObjectNameException {
for (ResourceMonitor monitor : resources) {
@@ -330,12 +435,38 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
_resourceMbeanMap.keySet().removeAll(resources);
}
+ public String clusterBeanName() {
+ return String.format("%s=%s", CLUSTER_DN_KEY, _clusterName);
+ }
+
+ /**
+ * Build instance bean name
+ * @param instanceName
+ * @return instance bean name
+ */
private String getInstanceBeanName(String instanceName) {
- return CLUSTER_DN_KEY + "=" + _clusterName + "," + INSTANCE_DN_KEY + "=" + instanceName;
+ return String.format("%s,%s=%s", clusterBeanName(), INSTANCE_DN_KEY, instanceName);
}
+ /**
+ * Build resource bean name
+ * @param resourceName
+ * @return resource bean name
+ */
private String getResourceBeanName(String resourceName) {
- return CLUSTER_DN_KEY + "=" + _clusterName + "," + RESOURCE_DN_KEY + "=" + resourceName;
+ return String.format("%s,%s=%s", clusterBeanName(), RESOURCE_DN_KEY, resourceName);
+ }
+
+ /**
+ * Build per-instance resource bean name:
+ * "cluster={clusterName},instanceName={instanceName},resourceName={resourceName}"
+ * @param instanceName
+ * @param resourceName
+ * @return per-instance resource bean name
+ */
+ public String getPerInstanceResourceBeanName(String instanceName, String resourceName) {
+ return String.format("%s,%s", clusterBeanName(), new PerInstanceResourceMonitor.BeanName(
+ instanceName, resourceName).toString());
}
@Override
http://git-wip-us.apache.org/repos/asf/helix/blob/9661fd2f/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
index 1385568..d9875cc 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
@@ -54,8 +54,8 @@ public class InstanceMonitor implements InstanceMonitorMBean {
@Override
public String getSensorName() {
- return ClusterStatusMonitor.PARTICIPANT_STATUS_KEY + "." + _clusterName + "."
- + serializedTags() + "." + _participantName;
+ return String.format("%s.%s.%s.%s", ClusterStatusMonitor.PARTICIPANT_STATUS_KEY, _clusterName,
+ serializedTags(), _participantName);
}
@Override
http://git-wip-us.apache.org/repos/asf/helix/blob/9661fd2f/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitor.java
new file mode 100644
index 0000000..714767b
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitor.java
@@ -0,0 +1,146 @@
+package org.apache.helix.monitoring.mbeans;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixDefinedState;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.StateModelDefinition;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+public class PerInstanceResourceMonitor implements PerInstanceResourceMonitorMBean {
+ public static class BeanName {
+ private final String _instanceName;
+ private final String _resourceName;
+
+ public BeanName(String instanceName, String resourceName) {
+ if (instanceName == null || resourceName == null) {
+ throw new NullPointerException("Illegal beanName. instanceName: " + instanceName
+ + ", resourceName: " + resourceName);
+ }
+ _instanceName = instanceName;
+ _resourceName = resourceName;
+ }
+
+ public String instanceName() {
+ return _instanceName;
+ }
+
+ public String resourceName() {
+ return _resourceName;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null || !(obj instanceof BeanName)) {
+ return false;
+ }
+
+ BeanName that = (BeanName) obj;
+ return _instanceName.equals(that._instanceName) && _resourceName.equals(that._resourceName);
+ }
+
+ @Override
+ public int hashCode() {
+ return toString().hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%s=%s,%s=%s", ClusterStatusMonitor.INSTANCE_DN_KEY, _instanceName,
+ ClusterStatusMonitor.RESOURCE_DN_KEY, _resourceName);
+ }
+ }
+
+ private final String _clusterName;
+ private List<String> _tags;
+ private final String _participantName;
+ private final String _resourceName;
+ private long _partitions;
+
+ public PerInstanceResourceMonitor(String clusterName, String participantName, String resourceName) {
+ _clusterName = clusterName;
+ _tags = ImmutableList.of(ClusterStatusMonitor.DEFAULT_TAG);
+ _participantName = participantName;
+ _resourceName = resourceName;
+ _partitions = 0;
+ }
+
+ @Override
+ public String getSensorName() {
+ return Joiner
+ .on('.')
+ .join(
+ ImmutableList.of(ClusterStatusMonitor.PARTICIPANT_STATUS_KEY, _clusterName,
+ serializedTags(), _participantName, _resourceName)).toString();
+ }
+
+ private String serializedTags() {
+ return Joiner.on('|').skipNulls().join(_tags).toString();
+ }
+
+ @Override
+ public long getPartitionGauge() {
+ return _partitions;
+ }
+
+ public String getInstanceName() {
+ return _participantName;
+ }
+
+ public String getResourceName() {
+ return _resourceName;
+ }
+
+ /**
+ * Update per-instance resource bean
+ * @param stateMap partition->state
+ * @tags tags instance tags
+ * @param stateModelDef
+ */
+ public synchronized void update(Map<Partition, String> stateMap, Set<String> tags,
+ StateModelDefinition stateModelDef) {
+ if (tags == null || tags.isEmpty()) {
+ _tags = ImmutableList.of(ClusterStatusMonitor.DEFAULT_TAG);
+ } else {
+ _tags = Lists.newArrayList(tags);
+ Collections.sort(_tags);
+ }
+
+ int cnt = 0;
+ for (String state : stateMap.values()) {
+ // Skip DROPPED and initial state (e.g. OFFLINE)
+ if (state.equalsIgnoreCase(HelixDefinedState.DROPPED.name())
+ || state.equalsIgnoreCase(stateModelDef.getInitialState())) {
+ continue;
+ }
+ cnt++;
+ }
+ _partitions = cnt;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/9661fd2f/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitorMBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitorMBean.java
new file mode 100644
index 0000000..4b544b1
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitorMBean.java
@@ -0,0 +1,34 @@
+package org.apache.helix.monitoring.mbeans;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.monitoring.SensorNameProvider;
+
+/**
+ * A bean that describes the resource on each instance
+ */
+public interface PerInstanceResourceMonitorMBean extends SensorNameProvider {
+ /**
+ * Get the number of partitions of the resource in best possible ideal state
+ * for the instance
+ * @return number of partitions
+ */
+ long getPartitionGauge();
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/9661fd2f/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
index a07c79d..7304033 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
@@ -27,14 +27,15 @@ import org.apache.helix.model.IdealState;
import org.apache.log4j.Logger;
public class ResourceMonitor implements ResourceMonitorMBean {
- private int _numOfPartitions;
- int _numOfPartitionsInExternalView;
- int _numOfErrorPartitions;
- int _externalViewIdealStateDiff;
- String _tag = ClusterStatusMonitor.DEFAULT_TAG;
private static final Logger LOG = Logger.getLogger(ResourceMonitor.class);
- String _resourceName, _clusterName;
+ private int _numOfPartitions;
+ private int _numOfPartitionsInExternalView;
+ private int _numOfErrorPartitions;
+ private int _externalViewIdealStateDiff;
+ private String _tag = ClusterStatusMonitor.DEFAULT_TAG;
+ private final String _clusterName;
+ private final String _resourceName;
public ResourceMonitor(String clusterName, String resourceName) {
_clusterName = clusterName;
@@ -58,15 +59,15 @@ public class ResourceMonitor implements ResourceMonitorMBean {
@Override
public String getSensorName() {
- return ClusterStatusMonitor.RESOURCE_STATUS_KEY + "." + _clusterName + "." + _tag + "."
- + _resourceName;
+ return String.format("%s.%s.%s.%s", ClusterStatusMonitor.RESOURCE_STATUS_KEY, _clusterName,
+ _tag, _resourceName);
}
public String getResourceName() {
return _resourceName;
}
- public void updateExternalView(ExternalView externalView, IdealState idealState) {
+ public void updateResource(ExternalView externalView, IdealState idealState) {
if (externalView == null) {
LOG.warn("external view is null");
return;
@@ -91,7 +92,7 @@ public class ResourceMonitor implements ResourceMonitorMBean {
}
// TODO fix this; IdealState shall have either map fields (CUSTOM mode)
- // or list fields (AUDO mode)
+ // or list fields (AUTO mode)
for (String partitionName : idealState.getRecord().getMapFields().keySet()) {
Map<String, String> idealRecord = idealState.getInstanceStateMap(partitionName);
Map<String, String> externalViewRecord = externalView.getStateMap(partitionName);
http://git-wip-us.apache.org/repos/asf/helix/blob/9661fd2f/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
index 45bc709..58b691a 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
@@ -18,12 +18,14 @@ import org.apache.helix.monitoring.mbeans.ClusterMBeanObserver;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.apache.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase {
+ private static final Logger LOG = Logger.getLogger(TestClusterStatusMonitorLifecycle.class);
MockParticipantManager[] _participants;
ClusterDistributedController[] _controllers;
@@ -157,12 +159,14 @@ public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase {
@Override
public void onMBeanRegistered(MBeanServerConnection server,
MBeanServerNotification mbsNotification) {
+ LOG.info("Register mbean: " + mbsNotification.getMBeanName());
_nMbeansRegistered++;
}
@Override
public void onMBeanUnRegistered(MBeanServerConnection server,
MBeanServerNotification mbsNotification) {
+ LOG.info("Unregister mbean: " + mbsNotification.getMBeanName());
_nMbeansUnregistered++;
}
}
@@ -177,9 +181,11 @@ public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase {
_participants[0].disconnect();
- // participant goes away. should be no change in number of beans as config is still present
+ // 1 participant goes away
+ // No change in instance/resource mbean
+ // Unregister 1 per-instance resource mbean
Thread.sleep(1000);
- Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered);
+ Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 1);
Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered);
HelixDataAccessor accessor = _participants[n - 1].getHelixDataAccessor();
@@ -196,19 +202,25 @@ public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase {
Thread.sleep(1000);
// 1 cluster status monitor, 1 resource monitor, 5 instances
- Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 7);
- Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered - 7);
+ // Unregister 1+4+1 per-instance resource mbean
+ // Register 4 per-instance resource mbean
+ Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 13);
+ Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered - 11);
String instanceName = "localhost0_" + (12918 + 0);
_participants[0] = new MockParticipantManager(ZK_ADDR, _firstClusterName, instanceName);
_participants[0].syncStart();
- // participant goes back. should be no change
+ // 1 participant comes back
+ // No change in instance/resource mbean
+ // Register 1 per-instance resource mbean
Thread.sleep(1000);
- Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 7);
- Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered - 7);
+ Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 13);
+ Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered - 12);
- // Add a resource, one more mbean registered
+ // Add a resource
+ // Register 1 resource mbean
+ // Register 5 per-instance resource mbean
ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
IdealState idealState = accessor.getProperty(accessor.keyBuilder().idealStates("TestDB00"));
@@ -218,14 +230,16 @@ public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase {
Integer.parseInt(idealState.getReplicas()));
Thread.sleep(1000);
- Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 7);
- Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered - 8);
+ Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 13);
+ Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered - 18);
- // remove resource, no change
+ // Remove a resource
+ // No change in instance/resource mbean
+ // Unregister 5 per-instance resource mbean
setupTool.dropResourceFromCluster(_firstClusterName, "TestDB1");
Thread.sleep(1000);
- Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 7);
- Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered - 8);
+ Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 18);
+ Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered - 18);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/9661fd2f/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
index 711aff2..723d969 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
@@ -19,101 +19,145 @@ package org.apache.helix.monitoring.mbeans;
* under the License.
*/
-import java.util.ArrayList;
+import java.lang.management.ManagementFactory;
import java.util.Date;
-import java.util.List;
-import java.util.UUID;
-
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.Mocks;
-import org.apache.helix.NotificationContext;
-import org.apache.helix.PropertyType;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.model.LiveInstance.LiveInstanceProperty;
-import org.apache.helix.tools.DefaultIdealStateCalculator;
+import java.util.Map;
+
+import javax.management.InstanceNotFoundException;
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.controller.stages.BestPossibleStateOutput;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.tools.StateModelConfigGenerator;
+import org.testng.Assert;
import org.testng.annotations.Test;
+import com.beust.jcommander.internal.Maps;
+
public class TestClusterStatusMonitor {
- List<String> _instances;
- List<ZNRecord> _liveInstances;
- String _db = "DB";
- String _db2 = "TestDB";
- int _replicas = 3;
- int _partitions = 50;
- ZNRecord _externalView, _externalView2;
-
- class MockDataAccessor extends Mocks.MockAccessor {
- public MockDataAccessor() {
- _instances = new ArrayList<String>();
- for (int i = 0; i < 5; i++) {
- String instance = "localhost_" + (12918 + i);
- _instances.add(instance);
- }
- ZNRecord externalView =
- DefaultIdealStateCalculator.calculateIdealState(_instances, _partitions, _replicas, _db,
- "MASTER", "SLAVE");
-
- ZNRecord externalView2 =
- DefaultIdealStateCalculator.calculateIdealState(_instances, 80, 2, _db2, "MASTER",
- "SLAVE");
+ private static final MBeanServerConnection _server = ManagementFactory.getPlatformMBeanServer();
+ @Test()
+ public void testReportData() throws Exception {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ int n = 5;
+ String testDB = "TestDB";
+ String testDB_0 = testDB + "_0";
+
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ ClusterStatusMonitor monitor = new ClusterStatusMonitor(clusterName);
+ ObjectName clusterMonitorObjName = monitor.getObjectName(monitor.clusterBeanName());
+ try {
+ _server.getMBeanInfo(clusterMonitorObjName);
+ } catch (Exception e) {
+ Assert.fail("Fail to register ClusterStatusMonitor");
}
- public ZNRecord getProperty(PropertyType type, String resource) {
- if (type == PropertyType.IDEALSTATES || type == PropertyType.EXTERNALVIEW) {
- if (resource.equals(_db)) {
- return _externalView;
- } else if (resource.equals(_db2)) {
- return _externalView2;
- }
- }
- return null;
+ // Test #setPerInstanceResourceStatus()
+ BestPossibleStateOutput bestPossibleStates = new BestPossibleStateOutput();
+ bestPossibleStates.setState(testDB, new Partition(testDB_0), "localhost_12918", "MASTER");
+ bestPossibleStates.setState(testDB, new Partition(testDB_0), "localhost_12919", "SLAVE");
+ bestPossibleStates.setState(testDB, new Partition(testDB_0), "localhost_12920", "SLAVE");
+ bestPossibleStates.setState(testDB, new Partition(testDB_0), "localhost_12921", "OFFLINE");
+ bestPossibleStates.setState(testDB, new Partition(testDB_0), "localhost_12922", "DROPPED");
+
+ Map<String, InstanceConfig> instanceConfigMap = Maps.newHashMap();
+ for (int i = 0; i < n; i++) {
+ String instanceName = "localhost_" + (12918 + i);
+ InstanceConfig config = new InstanceConfig(instanceName);
+ instanceConfigMap.put(instanceName, config);
}
- }
- class MockHelixManager extends Mocks.MockManager {
- MockDataAccessor _accessor = new MockDataAccessor();
-
- @Override
- public HelixDataAccessor getHelixDataAccessor() {
- return _accessor;
+ Map<String, Resource> resourceMap = Maps.newHashMap();
+ Resource db = new Resource(testDB);
+ db.setStateModelDefRef("MasterSlave");
+ db.addPartition(testDB_0);
+ resourceMap.put(testDB, db);
+
+ Map<String, StateModelDefinition> stateModelDefMap = Maps.newHashMap();
+ StateModelDefinition msStateModelDef =
+ new StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave());
+ stateModelDefMap.put("MasterSlave", msStateModelDef);
+
+ monitor.setPerInstanceResourceStatus(bestPossibleStates, instanceConfigMap, resourceMap,
+ stateModelDefMap);
+
+ // localhost_12918 should have 1 partition because it's MASTER
+ ObjectName objName =
+ monitor.getObjectName(monitor.getPerInstanceResourceBeanName("localhost_12918", testDB));
+ Object value = _server.getAttribute(objName, "PartitionGauge");
+ Assert.assertTrue(value instanceof Long);
+ Assert.assertEquals((Long) value, new Long(1));
+ value = _server.getAttribute(objName, "SensorName");
+ Assert.assertTrue(value instanceof String);
+ Assert.assertEquals((String) value, String.format("%s.%s.%s.%s.%s",
+ ClusterStatusMonitor.PARTICIPANT_STATUS_KEY, clusterName, ClusterStatusMonitor.DEFAULT_TAG,
+ "localhost_12918", testDB));
+
+ // localhost_12919 should have 1 partition because it's SLAVE
+ objName =
+ monitor.getObjectName(monitor.getPerInstanceResourceBeanName("localhost_12919", testDB));
+ value = _server.getAttribute(objName, "PartitionGauge");
+ Assert.assertTrue(value instanceof Long);
+ Assert.assertEquals((Long) value, new Long(1));
+
+ // localhost_12921 should have 0 partition because it's OFFLINE
+ objName =
+ monitor.getObjectName(monitor.getPerInstanceResourceBeanName("localhost_12921", testDB));
+ value = _server.getAttribute(objName, "PartitionGauge");
+ Assert.assertTrue(value instanceof Long);
+ Assert.assertEquals((Long) value, new Long(0));
+
+ // localhost_12922 should have 0 partition because it's DROPPED
+ objName =
+ monitor.getObjectName(monitor.getPerInstanceResourceBeanName("localhost_12922", testDB));
+ value = _server.getAttribute(objName, "PartitionGauge");
+ Assert.assertTrue(value instanceof Long);
+ Assert.assertEquals((Long) value, new Long(0));
+
+ // Missing localhost_12918 in best possible ideal-state should remove it from mbean
+ bestPossibleStates.getInstanceStateMap(testDB, new Partition(testDB_0)).remove(
+ "localhost_12918");
+ monitor.setPerInstanceResourceStatus(bestPossibleStates, instanceConfigMap, resourceMap,
+ stateModelDefMap);
+ try {
+ objName =
+ monitor.getObjectName(monitor.getPerInstanceResourceBeanName("localhost_12918", testDB));
+ _server.getMBeanInfo(objName);
+ Assert.fail("Fail to unregister PerInstanceResource mbean for localhost_12918");
+
+ } catch (InstanceNotFoundException e) {
+ // OK
}
- }
+ // Clean up
+ monitor.reset();
- @Test()
- public void TestReportData() {
- System.out.println("START TestClusterStatusMonitor at" + new Date(System.currentTimeMillis()));
- List<String> _instances;
- List<ZNRecord> _liveInstances = new ArrayList<ZNRecord>();
- String _db = "DB";
- int _replicas = 3;
- int _partitions = 50;
-
- _instances = new ArrayList<String>();
- for (int i = 0; i < 5; i++) {
- String instance = "localhost_" + (12918 + i);
- _instances.add(instance);
- ZNRecord metaData = new ZNRecord(instance);
- metaData.setSimpleField(LiveInstanceProperty.SESSION_ID.toString(), UUID.randomUUID()
- .toString());
- _liveInstances.add(metaData);
+ try {
+ objName =
+ monitor.getObjectName(monitor.getPerInstanceResourceBeanName("localhost_12920", testDB));
+ _server.getMBeanInfo(objName);
+ Assert.fail("Fail to unregister PerInstanceResource mbean for localhost_12920");
+
+ } catch (InstanceNotFoundException e) {
+ // OK
}
- ZNRecord externalView =
- DefaultIdealStateCalculator.calculateIdealState(_instances, _partitions, _replicas, _db,
- "MASTER", "SLAVE");
-
- ZNRecord externalView2 =
- DefaultIdealStateCalculator.calculateIdealState(_instances, 80, 2, "TestDB", "MASTER",
- "SLAVE");
-
- List<ZNRecord> externalViews = new ArrayList<ZNRecord>();
- externalViews.add(externalView);
- externalViews.add(externalView2);
-
- ClusterStatusMonitor monitor = new ClusterStatusMonitor("cluster1");
- MockHelixManager manager = new MockHelixManager();
- NotificationContext context = new NotificationContext(manager);
- System.out.println("END TestClusterStatusMonitor at" + new Date(System.currentTimeMillis()));
+
+ try {
+ _server.getMBeanInfo(clusterMonitorObjName);
+ Assert.fail("Fail to unregister ClusterStatusMonitor");
+ } catch (InstanceNotFoundException e) {
+ // OK
+ }
+
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/9661fd2f/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
index d631dd2..652eb88 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
@@ -22,21 +22,13 @@ package org.apache.helix.monitoring.mbeans;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.UUID;
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixProperty;
-import org.apache.helix.Mocks;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.PropertyType;
import org.apache.helix.ZNRecord;
-import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
-import org.apache.helix.model.LiveInstance.LiveInstanceProperty;
import org.apache.helix.monitoring.mbeans.ResourceMonitor;
import org.apache.helix.tools.DefaultIdealStateCalculator;
-import org.testng.AssertJUnit;
+import org.testng.Assert;
import org.testng.annotations.Test;
public class TestResourceMonitor {
@@ -45,108 +37,53 @@ public class TestResourceMonitor {
int _replicas = 3;
int _partitions = 50;
- class MockHelixManager extends Mocks.MockManager {
- class MockDataAccessor extends Mocks.MockAccessor {
- @Override
- public <T extends HelixProperty> List<T> getChildValues(PropertyKey key) {
- List<T> result = new ArrayList<T>();
- PropertyType type = key.getType();
- Class<? extends HelixProperty> clazz = key.getTypeClass();
- if (type == PropertyType.EXTERNALVIEW) {
- HelixProperty typedInstance = HelixProperty.convertToTypedInstance(clazz, _externalView);
- result.add((T) typedInstance);
- return result;
- } else if (type == PropertyType.LIVEINSTANCES) {
- return (List<T>) HelixProperty.convertToTypedList(clazz, _liveInstances);
- }
-
- return result;
- }
-
- @Override
- public <T extends HelixProperty> T getProperty(PropertyKey key) {
- PropertyType type = key.getType();
- if (type == PropertyType.EXTERNALVIEW) {
- return (T) new ExternalView(_externalView);
- } else if (type == PropertyType.IDEALSTATES) {
- return (T) new IdealState(_idealState);
- }
- return null;
- }
- }
-
- HelixDataAccessor _accessor = new MockDataAccessor();
- ZNRecord _idealState;
- ZNRecord _externalView;
- List<String> _instances;
- List<ZNRecord> _liveInstances;
- String _db = "DB";
-
- public MockHelixManager() {
- _liveInstances = new ArrayList<ZNRecord>();
- _instances = new ArrayList<String>();
- for (int i = 0; i < 5; i++) {
- String instance = "localhost_" + (12918 + i);
- _instances.add(instance);
- ZNRecord metaData = new ZNRecord(instance);
- metaData.setSimpleField(LiveInstanceProperty.SESSION_ID.toString(), UUID.randomUUID()
- .toString());
-
- }
- _idealState =
- DefaultIdealStateCalculator.calculateIdealState(_instances, _partitions, _replicas,
- _dbName, "MASTER", "SLAVE");
- _externalView = new ZNRecord(_idealState);
- }
-
- @Override
- public HelixDataAccessor getHelixDataAccessor() {
- return _accessor;
- }
-
- }
-
@Test()
- public void TestReportData() {
- MockHelixManager manager = new MockHelixManager();
+ public void testReportData() {
+ final int n = 5;
ResourceMonitor monitor = new ResourceMonitor(_clusterName, _dbName);
- HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
- Builder keyBuilder = helixDataAccessor.keyBuilder();
- ExternalView externalView = helixDataAccessor.getProperty(keyBuilder.externalView(_dbName));
- IdealState idealState = helixDataAccessor.getProperty(keyBuilder.idealStates(_dbName));
+ List<String> instances = new ArrayList<String>();
+ for (int i = 0; i < n; i++) {
+ String instance = "localhost_" + (12918 + i);
+ instances.add(instance);
+ }
- monitor.updateExternalView(externalView, idealState);
+ ZNRecord idealStateRecord =
+ DefaultIdealStateCalculator.calculateIdealState(instances, _partitions, _replicas, _dbName,
+ "MASTER", "SLAVE");
+ IdealState idealState = new IdealState(idealStateRecord);
+ ExternalView externalView = new ExternalView(idealStateRecord);
- AssertJUnit.assertEquals(monitor.getDifferenceWithIdealStateGauge(), 0);
- AssertJUnit.assertEquals(monitor.getErrorPartitionGauge(), 0);
- AssertJUnit.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions);
- AssertJUnit.assertEquals(monitor.getPartitionGauge(), _partitions);
- monitor.getBeanName();
+ monitor.updateResource(externalView, idealState);
- int n = 4;
- for (int i = 0; i < n; i++) {
+ Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), 0);
+ Assert.assertEquals(monitor.getErrorPartitionGauge(), 0);
+ Assert.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions);
+ Assert.assertEquals(monitor.getPartitionGauge(), _partitions);
+ // monitor.getBeanName();
+
+ final int m = n - 1;
+ for (int i = 0; i < m; i++) {
Map<String, String> map = externalView.getStateMap(_dbName + "_" + 3 * i);
String key = map.keySet().toArray()[0].toString();
map.put(key, "ERROR");
externalView.setStateMap(_dbName + "_" + 3 * i, map);
}
- monitor.updateExternalView(externalView, idealState);
- AssertJUnit.assertEquals(monitor.getDifferenceWithIdealStateGauge(), 0);
- AssertJUnit.assertEquals(monitor.getErrorPartitionGauge(), n);
- AssertJUnit.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions);
- AssertJUnit.assertEquals(monitor.getPartitionGauge(), _partitions);
+ monitor.updateResource(externalView, idealState);
+ Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), 0);
+ Assert.assertEquals(monitor.getErrorPartitionGauge(), m);
+ Assert.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions);
+ Assert.assertEquals(monitor.getPartitionGauge(), _partitions);
- n = 5;
for (int i = 0; i < n; i++) {
externalView.getRecord().getMapFields().remove(_dbName + "_" + 4 * i);
}
- monitor.updateExternalView(externalView, idealState);
- AssertJUnit.assertEquals(monitor.getDifferenceWithIdealStateGauge(), n * (_replicas + 1));
- AssertJUnit.assertEquals(monitor.getErrorPartitionGauge(), 3);
- AssertJUnit.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions - n);
- AssertJUnit.assertEquals(monitor.getPartitionGauge(), _partitions);
+ monitor.updateResource(externalView, idealState);
+ Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), n * (_replicas + 1));
+ Assert.assertEquals(monitor.getErrorPartitionGauge(), 3);
+ Assert.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions - n);
+ Assert.assertEquals(monitor.getPartitionGauge(), _partitions);
}
}