You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/07/11 19:13:19 UTC
[5/7] git commit: [HELIX-444] add per-participant partition count
gauges to helix, rb=21419
[HELIX-444] add per-participant partition count gauges to helix, rb=21419
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/96aef71c
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/96aef71c
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/96aef71c
Branch: refs/heads/master
Commit: 96aef71c899dc1f3956e1211fc1e9a7459a258d1
Parents: 8527729
Author: zzhang <zz...@apache.org>
Authored: Wed May 21 15:56:57 2014 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Thu Jul 10 14:53:41 2014 -0700
----------------------------------------------------------------------
.../main/java/org/apache/helix/api/State.java | 6 +-
.../stages/BestPossibleStateCalcStage.java | 9 +
.../controller/stages/ClusterDataCache.java | 42 +---
.../stages/ExternalViewComputeStage.java | 23 +-
.../monitoring/mbeans/ClusterStatusMonitor.java | 182 +++++++++++++--
.../monitoring/mbeans/InstanceMonitor.java | 4 +-
.../mbeans/PerInstanceResourceMonitor.java | 147 +++++++++++++
.../mbeans/PerInstanceResourceMonitorMBean.java | 34 +++
.../monitoring/mbeans/ResourceMonitor.java | 23 +-
.../TestClusterStatusMonitorLifecycle.java | 42 ++--
.../mbeans/TestClusterStatusMonitor.java | 220 ++++++++++++-------
.../monitoring/mbeans/TestResourceMonitor.java | 59 ++---
12 files changed, 586 insertions(+), 205 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/96aef71c/helix-core/src/main/java/org/apache/helix/api/State.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/State.java b/helix-core/src/main/java/org/apache/helix/api/State.java
index 3315987..aa98df2 100644
--- a/helix-core/src/main/java/org/apache/helix/api/State.java
+++ b/helix-core/src/main/java/org/apache/helix/api/State.java
@@ -47,9 +47,11 @@ public class State {
@Override
public boolean equals(Object that) {
if (that instanceof State) {
- return this.toString().equals(((State) that).toString());
+ return this.toString().equalsIgnoreCase(((State) that).toString());
} else if (that instanceof String) {
- return _state.equals(that);
+ return _state.equalsIgnoreCase(that.toString());
+ } else if (that instanceof HelixDefinedState) {
+ return _state.equalsIgnoreCase(that.toString());
}
return false;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/96aef71c/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 644b9f6..2f93b7f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -42,6 +42,7 @@ import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
import org.apache.log4j.Logger;
import com.google.common.collect.Maps;
@@ -69,6 +70,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
Map<ResourceId, ResourceConfig> resourceMap =
event.getAttribute(AttributeName.RESOURCES.toString());
Cluster cluster = event.getAttribute("Cluster");
+ ClusterDataCache cache = event.getAttribute("ClusterDataCache");
if (currentStateOutput == null || resourceMap == null || cluster == null) {
throw new StageException("Missing attributes in event:" + event
@@ -79,6 +81,13 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
compute(cluster, event, resourceMap, currentStateOutput);
event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.toString(), bestPossibleStateOutput);
+ ClusterStatusMonitor clusterStatusMonitor =
+ (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
+ if (clusterStatusMonitor != null) {
+ clusterStatusMonitor.setPerInstanceResourceStatus(bestPossibleStateOutput,
+ cache.getInstanceConfigMap(), resourceMap, cache.getStateModelDefMap());
+ }
+
long endTime = System.currentTimeMillis();
if (LOG.isInfoEnabled()) {
LOG.info("END BestPossibleStateCalcStage.process(). took: " + (endTime - startTime) + " ms");
http://git-wip-us.apache.org/repos/asf/helix/blob/96aef71c/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index 0c28bdf..8bcfaae 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -79,12 +79,6 @@ public class ClusterDataCache {
boolean _init = true;
- // Map<String, Map<String, HealthStat>> _healthStatMap;
- // private HealthStat _globalStats; // DON'T THINK I WILL USE THIS ANYMORE
- // private PersistentStats _persistentStats;
- // private Alerts _alerts;
- // private AlertStatus _alertStatus;
-
private static final Logger LOG = Logger.getLogger(ClusterDataCache.class.getName());
/**
@@ -334,37 +328,6 @@ public class ClusterDataCache {
return _messageMap;
}
- // public HealthStat getGlobalStats()
- // {
- // return _globalStats;
- // }
- //
- // public PersistentStats getPersistentStats()
- // {
- // return _persistentStats;
- // }
- //
- // public Alerts getAlerts()
- // {
- // return _alerts;
- // }
- //
- // public AlertStatus getAlertStatus()
- // {
- // return _alertStatus;
- // }
- //
- // public Map<String, HealthStat> getHealthStats(String instanceName)
- // {
- // Map<String, HealthStat> map = _healthStatMap.get(instanceName);
- // if (map != null)
- // {
- // return map;
- // } else
- // {
- // return Collections.emptyMap();
- // }
- // }
/**
* Provides the state model definition for a given state model
* @param stateModelDefRef
@@ -375,8 +338,13 @@ public class ClusterDataCache {
}
/**
+<<<<<<< HEAD
* Get all state model definitions
* @return map of name to state model definition
+=======
+ * Provides all state model definitions
+ * @return state model definition map
+>>>>>>> 8d5c27c... [HELIX-444] add per-participant partition count gauges to helix, rb=21419
*/
public Map<String, StateModelDefinition> getStateModelDefMap() {
return _stateModelDefMap;
http://git-wip-us.apache.org/repos/asf/helix/blob/96aef71c/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
index a46acbd..3086a83 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@ -35,7 +35,6 @@ import org.apache.helix.ZNRecord;
import org.apache.helix.ZNRecordDelta;
import org.apache.helix.ZNRecordDelta.MergeOperation;
import org.apache.helix.api.Cluster;
-import org.apache.helix.api.Resource;
import org.apache.helix.api.State;
import org.apache.helix.api.config.ResourceConfig;
import org.apache.helix.api.config.SchedulerTaskConfig;
@@ -46,6 +45,7 @@ import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
+import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Message;
@@ -66,10 +66,11 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
Map<ResourceId, ResourceConfig> resourceMap =
event.getAttribute(AttributeName.RESOURCES.toString());
Cluster cluster = event.getAttribute("Cluster");
+ ClusterDataCache cache = event.getAttribute("ClusterDataCache");
- if (manager == null || resourceMap == null || cluster == null) {
+ if (manager == null || resourceMap == null || cluster == null || cache == null) {
throw new StageException("Missing attributes in event:" + event
- + ". Requires ClusterManager|RESOURCES|Cluster");
+ + ". Requires ClusterManager|RESOURCES|Cluster|ClusterDataCache");
}
HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
@@ -118,15 +119,13 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
// Update cluster status monitor mbean
ClusterStatusMonitor clusterStatusMonitor =
(ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
- Resource currentResource = cluster.getResourceMap().get(view.getResourceId());
- if (clusterStatusMonitor != null && currentResource != null) {
- IdealState idealState = currentResource.getIdealState();
- if (idealState != null) {
- StateModelDefId stateModelDefId = idealState.getStateModelDefId();
- if (stateModelDefId != null
- && !stateModelDefId.equals(StateModelDefId.SchedulerTaskQueue)) {
- clusterStatusMonitor.onExternalViewChange(view, idealState);
- }
+ IdealState idealState = cache._idealStateMap.get(view.getResourceName());
+ if (idealState != null) {
+ if (clusterStatusMonitor != null
+ && !idealState.getStateModelDefRef().equalsIgnoreCase(
+ DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
+ clusterStatusMonitor.setResourceStatus(view,
+ cache._idealStateMap.get(view.getResourceName()));
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/96aef71c/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index b468856..99dee75 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -23,6 +23,7 @@ import java.lang.management.ManagementFactory;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
@@ -32,8 +33,18 @@ import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
+import org.apache.helix.api.State;
+import org.apache.helix.api.config.ResourceConfig;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.controller.stages.BestPossibleStateOutput;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
import org.apache.log4j.Logger;
import com.google.common.collect.Sets;
@@ -41,15 +52,15 @@ import com.google.common.collect.Sets;
public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
private static final Logger LOG = Logger.getLogger(ClusterStatusMonitor.class);
- static final String CLUSTER_STATUS_KEY = "ClusterStatus";
+ public static final String CLUSTER_STATUS_KEY = "ClusterStatus";
static final String MESSAGE_QUEUE_STATUS_KEY = "MessageQueueStatus";
static final String RESOURCE_STATUS_KEY = "ResourceStatus";
- static final String PARTICIPANT_STATUS_KEY = "ParticipantStatus";
+ public static final String PARTICIPANT_STATUS_KEY = "ParticipantStatus";
static final String CLUSTER_DN_KEY = "cluster";
static final String RESOURCE_DN_KEY = "resourceName";
static final String INSTANCE_DN_KEY = "instanceName";
- static final String DEFAULT_TAG = "DEFAULT";
+ public static final String DEFAULT_TAG = "DEFAULT";
private final String _clusterName;
private final MBeanServer _beanServer;
@@ -68,20 +79,27 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
private final ConcurrentHashMap<String, InstanceMonitor> _instanceMbeanMap =
new ConcurrentHashMap<String, InstanceMonitor>();
+ /**
+ * PerInstanceResource bean map: beanName->bean
+ */
+ private final Map<PerInstanceResourceMonitor.BeanName, PerInstanceResourceMonitor> _perInstanceResourceMap =
+ new ConcurrentHashMap<PerInstanceResourceMonitor.BeanName, PerInstanceResourceMonitor>();
+
public ClusterStatusMonitor(String clusterName) {
_clusterName = clusterName;
_beanServer = ManagementFactory.getPlatformMBeanServer();
try {
- register(this, getObjectName(CLUSTER_DN_KEY + "=" + _clusterName));
+ register(this, getObjectName(clusterBeanName()));
} catch (Exception e) {
- LOG.error("Register self failed.", e);
+ LOG.error("Fail to regiter ClusterStatusMonitor", e);
}
}
public ObjectName getObjectName(String name) throws MalformedObjectNameException {
- return new ObjectName(CLUSTER_STATUS_KEY + ": " + name);
+ return new ObjectName(String.format("%s: %s", CLUSTER_STATUS_KEY, name));
}
+ // TODO remove getBeanName()?
// Used by other external JMX consumers like ingraph
public String getBeanName() {
return CLUSTER_STATUS_KEY + " " + _clusterName;
@@ -144,10 +162,10 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
}
try {
- LOG.info("Registering " + name.toString());
+ LOG.info("Register MBean: " + name);
_beanServer.registerMBean(bean, name);
} catch (Exception e) {
- LOG.warn("Could not register MBean" + name, e);
+ LOG.warn("Could not register MBean: " + name, e);
}
}
@@ -158,7 +176,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
_beanServer.unregisterMBean(name);
}
} catch (Exception e) {
- LOG.warn("Could not unregister MBean" + name, e);
+ LOG.warn("Could not unregister MBean: " + name, e);
}
}
@@ -227,28 +245,98 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
}
}
- public void onExternalViewChange(ExternalView externalView, IdealState idealState) {
+ /**
+ * Update gauges for resource at instance level
+ * @param bestPossibleStates
+ * @param resourceMap
+ * @param stateModelDefMap
+ */
+ public void setPerInstanceResourceStatus(BestPossibleStateOutput bestPossibleStates,
+ Map<String, InstanceConfig> instanceConfigMap, Map<ResourceId, ResourceConfig> resourceMap,
+ Map<String, StateModelDefinition> stateModelDefMap) {
+
+ // Convert to perInstanceResource beanName->partition->state
+ Map<PerInstanceResourceMonitor.BeanName, Map<PartitionId, State>> beanMap =
+ new HashMap<PerInstanceResourceMonitor.BeanName, Map<PartitionId, State>>();
+ for (ResourceId resource : bestPossibleStates.getAssignedResources()) {
+ ResourceAssignment assignment = bestPossibleStates.getResourceAssignment(resource);
+ for (PartitionId partition : assignment.getMappedPartitionIds()) {
+ Map<ParticipantId, State> instanceStateMap = assignment.getReplicaMap(partition);
+ for (ParticipantId instance : instanceStateMap.keySet()) {
+ State state = instanceStateMap.get(instance);
+ PerInstanceResourceMonitor.BeanName beanName =
+ new PerInstanceResourceMonitor.BeanName(instance.toString(), resource.toString());
+ if (!beanMap.containsKey(beanName)) {
+ beanMap.put(beanName, new HashMap<PartitionId, State>());
+ }
+ beanMap.get(beanName).put(partition, state);
+ }
+ }
+ }
+ // Unregister beans for per-instance resources that no longer exist
+ Set<PerInstanceResourceMonitor.BeanName> toUnregister =
+ Sets.newHashSet(_perInstanceResourceMap.keySet());
+ toUnregister.removeAll(beanMap.keySet());
+ try {
+ unregisterPerInstanceResources(toUnregister);
+ } catch (MalformedObjectNameException e) {
+ LOG.error("Fail to unregister per-instance resource from MBean server: " + toUnregister, e);
+ }
+ // Register beans for per-instance resources that are newly configured
+ Set<PerInstanceResourceMonitor.BeanName> toRegister = Sets.newHashSet(beanMap.keySet());
+ toRegister.removeAll(_perInstanceResourceMap.keySet());
+ Set<PerInstanceResourceMonitor> monitorsToRegister = Sets.newHashSet();
+ for (PerInstanceResourceMonitor.BeanName beanName : toRegister) {
+ PerInstanceResourceMonitor bean =
+ new PerInstanceResourceMonitor(_clusterName, beanName.instanceName(),
+ beanName.resourceName());
+ StateModelDefId stateModelDefId =
+ resourceMap.get(ResourceId.from(beanName.resourceName())).getRebalancerConfig()
+ .getStateModelDefId();
+ InstanceConfig config = instanceConfigMap.get(beanName.instanceName());
+ bean.update(beanMap.get(beanName), Sets.newHashSet(config.getTags()),
+ stateModelDefMap.get(stateModelDefId.toString()));
+ monitorsToRegister.add(bean);
+ }
+ try {
+ registerPerInstanceResources(monitorsToRegister);
+ } catch (MalformedObjectNameException e) {
+ LOG.error("Fail to register per-instance resource with MBean server: " + toRegister, e);
+ }
+ // Update existing beans
+ for (PerInstanceResourceMonitor.BeanName beanName : _perInstanceResourceMap.keySet()) {
+ PerInstanceResourceMonitor bean = _perInstanceResourceMap.get(beanName);
+ StateModelDefId stateModelDefId =
+ resourceMap.get(ResourceId.from(beanName.resourceName())).getRebalancerConfig()
+ .getStateModelDefId();
+ InstanceConfig config = instanceConfigMap.get(beanName.instanceName());
+ bean.update(beanMap.get(beanName), Sets.newHashSet(config.getTags()),
+ stateModelDefMap.get(stateModelDefId.toString()));
+ }
+ }
+
+ public void setResourceStatus(ExternalView externalView, IdealState idealState) {
try {
String resourceName = externalView.getId();
if (!_resourceMbeanMap.containsKey(resourceName)) {
synchronized (this) {
if (!_resourceMbeanMap.containsKey(resourceName)) {
ResourceMonitor bean = new ResourceMonitor(_clusterName, resourceName);
- bean.updateExternalView(externalView, idealState);
+ bean.updateResource(externalView, idealState);
registerResources(Arrays.asList(bean));
}
}
}
ResourceMonitor bean = _resourceMbeanMap.get(resourceName);
String oldSensorName = bean.getSensorName();
- bean.updateExternalView(externalView, idealState);
+ bean.updateResource(externalView, idealState);
String newSensorName = bean.getSensorName();
if (!oldSensorName.equals(newSensorName)) {
unregisterResources(Arrays.asList(resourceName));
registerResources(Arrays.asList(bean));
}
} catch (Exception e) {
- LOG.warn(e);
+ LOG.error("Fail to set resource status, resource: " + idealState.getResourceName(), e);
}
}
@@ -264,18 +352,15 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
}
_instanceMsgQueueMbeanMap.get(instanceName).addMessageQueueSize(msgQueueSize);
} catch (Exception e) {
- LOG.warn("fail to add message queue size to mbean", e);
+ LOG.error("Fail to add message queue size to mbean, instance: " + instanceName, e);
}
}
public void reset() {
- LOG.info("Resetting ClusterStatusMonitor");
+ LOG.info("Reset ClusterStatusMonitor");
try {
- for (String resourceName : _resourceMbeanMap.keySet()) {
- String beanName =
- CLUSTER_DN_KEY + "=" + _clusterName + "," + RESOURCE_DN_KEY + "=" + resourceName;
- unregister(getObjectName(beanName));
- }
+ unregisterResources(_resourceMbeanMap.keySet());
+
_resourceMbeanMap.clear();
for (MessageQueueMonitor bean : _instanceMsgQueueMbeanMap.values()) {
@@ -286,9 +371,10 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
unregisterInstances(_instanceMbeanMap.keySet());
_instanceMbeanMap.clear();
- unregister(getObjectName(CLUSTER_DN_KEY + "=" + _clusterName));
+ unregisterPerInstanceResources(_perInstanceResourceMap.keySet());
+ unregister(getObjectName(clusterBeanName()));
} catch (Exception e) {
- LOG.error("fail to reset ClusterStatusMonitor", e);
+ LOG.error("Fail to reset ClusterStatusMonitor, cluster: " + _clusterName, e);
}
}
@@ -330,12 +416,60 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
_resourceMbeanMap.keySet().removeAll(resources);
}
+ private synchronized void registerPerInstanceResources(
+ Collection<PerInstanceResourceMonitor> monitors) throws MalformedObjectNameException {
+ for (PerInstanceResourceMonitor monitor : monitors) {
+ String instanceName = monitor.getInstanceName();
+ String resourceName = monitor.getResourceName();
+ String beanName = getPerInstanceResourceBeanName(instanceName, resourceName);
+ register(monitor, getObjectName(beanName));
+ _perInstanceResourceMap.put(new PerInstanceResourceMonitor.BeanName(instanceName,
+ resourceName), monitor);
+ }
+ }
+
+ private synchronized void unregisterPerInstanceResources(
+ Collection<PerInstanceResourceMonitor.BeanName> beanNames)
+ throws MalformedObjectNameException {
+ for (PerInstanceResourceMonitor.BeanName beanName : beanNames) {
+ unregister(getObjectName(getPerInstanceResourceBeanName(beanName.instanceName(),
+ beanName.resourceName())));
+ }
+ _perInstanceResourceMap.keySet().removeAll(beanNames);
+ }
+
+ public String clusterBeanName() {
+ return String.format("%s=%s", CLUSTER_DN_KEY, _clusterName);
+ }
+
+ /**
+ * Build instance bean name
+ * @param instanceName
+ * @return instance bean name
+ */
private String getInstanceBeanName(String instanceName) {
- return CLUSTER_DN_KEY + "=" + _clusterName + "," + INSTANCE_DN_KEY + "=" + instanceName;
+ return String.format("%s,%s=%s", clusterBeanName(), INSTANCE_DN_KEY, instanceName);
}
+ /**
+ * Build resource bean name
+ * @param resourceName
+ * @return resource bean name
+ */
private String getResourceBeanName(String resourceName) {
- return CLUSTER_DN_KEY + "=" + _clusterName + "," + RESOURCE_DN_KEY + "=" + resourceName;
+ return String.format("%s,%s=%s", clusterBeanName(), RESOURCE_DN_KEY, resourceName);
+ }
+
+ /**
+ * Build per-instance resource bean name:
+ * "cluster={clusterName},instanceName={instanceName},resourceName={resourceName}"
+ * @param instanceName
+ * @param resourceName
+ * @return per-instance resource bean name
+ */
+ public String getPerInstanceResourceBeanName(String instanceName, String resourceName) {
+ return String.format("%s,%s", clusterBeanName(), new PerInstanceResourceMonitor.BeanName(
+ instanceName, resourceName).toString());
}
@Override
http://git-wip-us.apache.org/repos/asf/helix/blob/96aef71c/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
index 1385568..d9875cc 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
@@ -54,8 +54,8 @@ public class InstanceMonitor implements InstanceMonitorMBean {
@Override
public String getSensorName() {
- return ClusterStatusMonitor.PARTICIPANT_STATUS_KEY + "." + _clusterName + "."
- + serializedTags() + "." + _participantName;
+ return String.format("%s.%s.%s.%s", ClusterStatusMonitor.PARTICIPANT_STATUS_KEY, _clusterName,
+ serializedTags(), _participantName);
}
@Override
http://git-wip-us.apache.org/repos/asf/helix/blob/96aef71c/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitor.java
new file mode 100644
index 0000000..476445c
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitor.java
@@ -0,0 +1,147 @@
+package org.apache.helix.monitoring.mbeans;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixDefinedState;
+import org.apache.helix.api.State;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.model.StateModelDefinition;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+public class PerInstanceResourceMonitor implements PerInstanceResourceMonitorMBean {
+ public static class BeanName {
+ private final String _instanceName;
+ private final String _resourceName;
+
+ public BeanName(String instanceName, String resourceName) {
+ if (instanceName == null || resourceName == null) {
+ throw new NullPointerException("Illegal beanName. instanceName: " + instanceName
+ + ", resourceName: " + resourceName);
+ }
+ _instanceName = instanceName;
+ _resourceName = resourceName;
+ }
+
+ public String instanceName() {
+ return _instanceName;
+ }
+
+ public String resourceName() {
+ return _resourceName;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null || !(obj instanceof BeanName)) {
+ return false;
+ }
+
+ BeanName that = (BeanName) obj;
+ return _instanceName.equals(that._instanceName) && _resourceName.equals(that._resourceName);
+ }
+
+ @Override
+ public int hashCode() {
+ return toString().hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%s=%s,%s=%s", ClusterStatusMonitor.INSTANCE_DN_KEY, _instanceName,
+ ClusterStatusMonitor.RESOURCE_DN_KEY, _resourceName);
+ }
+ }
+
+ private final String _clusterName;
+ private List<String> _tags;
+ private final String _participantName;
+ private final String _resourceName;
+ private long _partitions;
+
+ public PerInstanceResourceMonitor(String clusterName, String participantName, String resourceName) {
+ _clusterName = clusterName;
+ _tags = ImmutableList.of(ClusterStatusMonitor.DEFAULT_TAG);
+ _participantName = participantName;
+ _resourceName = resourceName;
+ _partitions = 0;
+ }
+
+ @Override
+ public String getSensorName() {
+ return Joiner
+ .on('.')
+ .join(
+ ImmutableList.of(ClusterStatusMonitor.PARTICIPANT_STATUS_KEY, _clusterName,
+ serializedTags(), _participantName, _resourceName)).toString();
+ }
+
+ private String serializedTags() {
+ return Joiner.on('|').skipNulls().join(_tags).toString();
+ }
+
+ @Override
+ public long getPartitionGauge() {
+ return _partitions;
+ }
+
+ public String getInstanceName() {
+ return _participantName;
+ }
+
+ public String getResourceName() {
+ return _resourceName;
+ }
+
+ /**
+ * Update per-instance resource bean
+ * @param stateMap partition->state
+ * @tags tags instance tags
+ * @param stateModelDef
+ */
+ public synchronized void update(Map<PartitionId, State> stateMap, Set<String> tags,
+ StateModelDefinition stateModelDef) {
+ if (tags == null || tags.isEmpty()) {
+ _tags = ImmutableList.of(ClusterStatusMonitor.DEFAULT_TAG);
+ } else {
+ _tags = Lists.newArrayList(tags);
+ Collections.sort(_tags);
+ }
+
+ int cnt = 0;
+ for (State state : stateMap.values()) {
+ // Skip DROPPED and initial state (e.g. OFFLINE)
+ if (state.equals(HelixDefinedState.DROPPED)
+ || state.equals(stateModelDef.getTypedInitialState())) {
+ continue;
+ }
+ cnt++;
+ }
+ _partitions = cnt;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/96aef71c/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitorMBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitorMBean.java
new file mode 100644
index 0000000..4b544b1
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitorMBean.java
@@ -0,0 +1,34 @@
+package org.apache.helix.monitoring.mbeans;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.monitoring.SensorNameProvider;
+
+/**
+ * A bean that describes the resource on each instance
+ */
+public interface PerInstanceResourceMonitorMBean extends SensorNameProvider {
+ /**
+ * Get the number of partitions of the resource in best possible ideal state
+ * for the instance
+ * @return number of partitions
+ */
+ long getPartitionGauge();
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/96aef71c/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
index d1ba595..4739fab 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
@@ -19,6 +19,7 @@ package org.apache.helix.monitoring.mbeans;
* under the License.
*/
+import java.util.Collections;
import java.util.Map;
import org.apache.helix.HelixDefinedState;
@@ -30,14 +31,15 @@ import org.apache.helix.model.IdealState;
import org.apache.log4j.Logger;
public class ResourceMonitor implements ResourceMonitorMBean {
- private int _numOfPartitions;
- int _numOfPartitionsInExternalView;
- int _numOfErrorPartitions;
- int _externalViewIdealStateDiff;
- String _tag = ClusterStatusMonitor.DEFAULT_TAG;
private static final Logger LOG = Logger.getLogger(ResourceMonitor.class);
- String _resourceName, _clusterName;
+ private int _numOfPartitions;
+ private int _numOfPartitionsInExternalView;
+ private int _numOfErrorPartitions;
+ private int _externalViewIdealStateDiff;
+ private String _tag = ClusterStatusMonitor.DEFAULT_TAG;
+ private String _resourceName;
+ private String _clusterName;
public ResourceMonitor(String clusterName, String resourceName) {
_clusterName = clusterName;
@@ -61,15 +63,15 @@ public class ResourceMonitor implements ResourceMonitorMBean {
@Override
public String getSensorName() {
- return ClusterStatusMonitor.RESOURCE_STATUS_KEY + "." + _clusterName + "." + _tag + "."
- + _resourceName;
+ return String.format("%s.%s.%s.%s", ClusterStatusMonitor.RESOURCE_STATUS_KEY, _clusterName,
+ _tag, _resourceName);
}
public String getResourceName() {
return _resourceName;
}
- public void updateExternalView(ExternalView externalView, IdealState idealState) {
+ public void updateResource(ExternalView externalView, IdealState idealState) {
if (externalView == null) {
LOG.warn("external view is null");
return;
@@ -97,6 +99,9 @@ public class ResourceMonitor implements ResourceMonitorMBean {
// or list fields (AUDO mode)
for (PartitionId partitionId : idealState.getPartitionIdSet()) {
Map<ParticipantId, State> idealRecord = idealState.getParticipantStateMap(partitionId);
+ if (idealRecord == null) {
+ idealRecord = Collections.emptyMap();
+ }
Map<ParticipantId, State> externalViewRecord = externalView.getStateMap(partitionId);
if (externalViewRecord == null) {
http://git-wip-us.apache.org/repos/asf/helix/blob/96aef71c/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
index a00db67..0981a2e 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
@@ -37,12 +37,14 @@ import org.apache.helix.monitoring.mbeans.ClusterMBeanObserver;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.apache.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase {
+ private static final Logger LOG = Logger.getLogger(TestClusterStatusMonitorLifecycle.class);
MockParticipantManager[] _participants;
ClusterDistributedController[] _controllers;
@@ -176,12 +178,14 @@ public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase {
@Override
public void onMBeanRegistered(MBeanServerConnection server,
MBeanServerNotification mbsNotification) {
+ LOG.info("Register mbean: " + mbsNotification.getMBeanName());
_nMbeansRegistered++;
}
@Override
public void onMBeanUnRegistered(MBeanServerConnection server,
MBeanServerNotification mbsNotification) {
+ LOG.info("Unregister mbean: " + mbsNotification.getMBeanName());
_nMbeansUnregistered++;
}
}
@@ -196,10 +200,12 @@ public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase {
_participants[0].disconnect();
- // participant goes away. should be no change in number of beans as config is still present
+ // 1 participant goes away
+ // No change in instance/resource mbean
+ // Unregister 1 per-instance resource mbean
Thread.sleep(1000);
- Assert.assertEquals(nMbeansUnregistered, listener._nMbeansUnregistered);
- Assert.assertEquals(nMbeansRegistered, listener._nMbeansRegistered);
+ Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 1);
+ Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered);
HelixDataAccessor accessor = _participants[n - 1].getHelixDataAccessor();
String firstControllerName =
@@ -215,19 +221,25 @@ public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase {
Thread.sleep(1000);
// 1 cluster status monitor, 1 resource monitor, 5 instances
- Assert.assertEquals(nMbeansUnregistered, listener._nMbeansUnregistered - 7);
- Assert.assertEquals(nMbeansRegistered, listener._nMbeansRegistered - 7);
+ // Unregister 1+4+1 per-instance resource mbean
+ // Register 4 per-instance resource mbean
+ Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 13);
+ Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered - 11);
String instanceName = "localhost0_" + (12918 + 0);
_participants[0] = new MockParticipantManager(ZK_ADDR, _firstClusterName, instanceName);
_participants[0].syncStart();
- // participant goes back. should be no change
+ // 1 participant comes back
+ // No change in instance/resource mbean
+ // Register 1 per-instance resource mbean
Thread.sleep(1000);
- Assert.assertEquals(nMbeansUnregistered, listener._nMbeansUnregistered - 7);
- Assert.assertEquals(nMbeansRegistered, listener._nMbeansRegistered - 7);
+ Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 13);
+ Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered - 12);
- // Add a resource, one more mbean registered
+ // Add a resource
+ // Register 1 resource mbean
+ // Register 5 per-instance resource mbean
ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
IdealState idealState = accessor.getProperty(accessor.keyBuilder().idealStates("TestDB00"));
@@ -237,14 +249,16 @@ public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase {
Integer.parseInt(idealState.getReplicas()));
Thread.sleep(1000);
- Assert.assertEquals(nMbeansUnregistered, listener._nMbeansUnregistered - 7);
- Assert.assertEquals(nMbeansRegistered, listener._nMbeansRegistered - 8);
+ Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 13);
+ Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered - 18);
- // remove resource, no change
+ // Remove a resource
+ // No change in instance/resource mbean
+ // Unregister 5 per-instance resource mbean
setupTool.dropResourceFromCluster(_firstClusterName, "TestDB1");
Thread.sleep(1000);
- Assert.assertEquals(nMbeansUnregistered, listener._nMbeansUnregistered - 7);
- Assert.assertEquals(nMbeansRegistered, listener._nMbeansRegistered - 8);
+ Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 18);
+ Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered - 18);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/96aef71c/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
index facb4ea..8c9ab01 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
@@ -19,98 +19,162 @@ package org.apache.helix.monitoring.mbeans;
* under the License.
*/
-import java.util.ArrayList;
+import java.lang.management.ManagementFactory;
import java.util.Date;
-import java.util.List;
-import java.util.UUID;
-
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.Mocks;
-import org.apache.helix.NotificationContext;
-import org.apache.helix.PropertyType;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.controller.strategy.DefaultTwoStateStrategy;
-import org.apache.helix.model.LiveInstance.LiveInstanceProperty;
+import java.util.Map;
+
+import javax.management.InstanceNotFoundException;
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.api.Partition;
+import org.apache.helix.api.State;
+import org.apache.helix.api.config.ResourceConfig;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.SemiAutoRebalancerConfig;
+import org.apache.helix.controller.stages.BestPossibleStateOutput;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.tools.StateModelConfigGenerator;
+import org.testng.Assert;
import org.testng.annotations.Test;
+import com.google.common.collect.Maps;
+
public class TestClusterStatusMonitor {
- List<String> _instances;
- List<ZNRecord> _liveInstances;
- String _db = "DB";
- String _db2 = "TestDB";
- int _replicas = 3;
- int _partitions = 50;
- ZNRecord _externalView, _externalView2;
-
- class MockDataAccessor extends Mocks.MockAccessor {
- public MockDataAccessor() {
- _instances = new ArrayList<String>();
- for (int i = 0; i < 5; i++) {
- String instance = "localhost_" + (12918 + i);
- _instances.add(instance);
- }
- ZNRecord externalView =
- DefaultTwoStateStrategy.calculateIdealState(_instances, _partitions, _replicas, _db,
- "MASTER", "SLAVE");
-
- ZNRecord externalView2 =
- DefaultTwoStateStrategy.calculateIdealState(_instances, 80, 2, _db2, "MASTER", "SLAVE");
- }
+ private static final MBeanServerConnection _server = ManagementFactory.getPlatformMBeanServer();
+
+ @Test()
+ public void testReportData() throws Exception {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ int n = 5;
+ String testDB = "TestDB";
+ String testDB_0 = testDB + "_0";
- public ZNRecord getProperty(PropertyType type, String resource) {
- if (type == PropertyType.IDEALSTATES || type == PropertyType.EXTERNALVIEW) {
- if (resource.equals(_db)) {
- return _externalView;
- } else if (resource.equals(_db2)) {
- return _externalView2;
- }
- }
- return null;
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ ClusterStatusMonitor monitor = new ClusterStatusMonitor(clusterName);
+ ObjectName clusterMonitorObjName = monitor.getObjectName(monitor.clusterBeanName());
+ try {
+ _server.getMBeanInfo(clusterMonitorObjName);
+ } catch (Exception e) {
+ Assert.fail("Fail to register ClusterStatusMonitor");
}
- }
- class MockHelixManager extends Mocks.MockManager {
- MockDataAccessor _accessor = new MockDataAccessor();
+ // Test #setPerInstanceResourceStatus()
+ BestPossibleStateOutput bestPossibleStates = new BestPossibleStateOutput();
+ ResourceAssignment assignment = new ResourceAssignment(ResourceId.from(testDB));
+ Map<ParticipantId, State> replicaMap = Maps.newHashMap();
+ replicaMap.put(ParticipantId.from("localhost_12918"), State.from("MASTER"));
+ replicaMap.put(ParticipantId.from("localhost_12919"), State.from("SLAVE"));
+ replicaMap.put(ParticipantId.from("localhost_12920"), State.from("SLAVE"));
+ replicaMap.put(ParticipantId.from("localhost_12921"), State.from("OFFLINE"));
+ replicaMap.put(ParticipantId.from("localhost_12922"), State.from("DROPPED"));
+ assignment.addReplicaMap(PartitionId.from(testDB_0), replicaMap);
+ bestPossibleStates.setResourceAssignment(ResourceId.from(testDB), assignment);
- @Override
- public HelixDataAccessor getHelixDataAccessor() {
- return _accessor;
+ Map<String, InstanceConfig> instanceConfigMap = Maps.newHashMap();
+ for (int i = 0; i < n; i++) {
+ String instanceName = "localhost_" + (12918 + i);
+ InstanceConfig config = new InstanceConfig(instanceName);
+ instanceConfigMap.put(instanceName, config);
}
- }
+ Map<ResourceId, ResourceConfig> resourceMap = Maps.newHashMap();
+ ResourceId resourceId = ResourceId.from(testDB);
+ RebalancerConfig rebalancerConfig =
+ new SemiAutoRebalancerConfig.Builder(resourceId)
+ .addPartition(new Partition(PartitionId.from(testDB_0)))
+ .stateModelDefId(StateModelDefId.from("MasterSlave")).build();
+ ResourceConfig resourceConfig =
+ new ResourceConfig.Builder(resourceId).rebalancerConfig(rebalancerConfig).build();
+ resourceMap.put(resourceId, resourceConfig);
- @Test()
- public void TestReportData() {
- System.out.println("START TestClusterStatusMonitor at" + new Date(System.currentTimeMillis()));
- List<String> _instances;
- List<ZNRecord> _liveInstances = new ArrayList<ZNRecord>();
- String _db = "DB";
- int _replicas = 3;
- int _partitions = 50;
-
- _instances = new ArrayList<String>();
- for (int i = 0; i < 5; i++) {
- String instance = "localhost_" + (12918 + i);
- _instances.add(instance);
- ZNRecord metaData = new ZNRecord(instance);
- metaData.setSimpleField(LiveInstanceProperty.SESSION_ID.toString(), UUID.randomUUID()
- .toString());
- _liveInstances.add(metaData);
+ Map<String, StateModelDefinition> stateModelDefMap = Maps.newHashMap();
+ StateModelDefinition msStateModelDef =
+ new StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave());
+ stateModelDefMap.put("MasterSlave", msStateModelDef);
+
+ monitor.setPerInstanceResourceStatus(bestPossibleStates, instanceConfigMap, resourceMap,
+ stateModelDefMap);
+
+ // localhost_12918 should have 1 partition because it's MASTER
+ ObjectName objName =
+ monitor.getObjectName(monitor.getPerInstanceResourceBeanName("localhost_12918", testDB));
+ Object value = _server.getAttribute(objName, "PartitionGauge");
+ Assert.assertTrue(value instanceof Long);
+ Assert.assertEquals((Long) value, new Long(1));
+ value = _server.getAttribute(objName, "SensorName");
+ Assert.assertTrue(value instanceof String);
+ Assert.assertEquals((String) value, String.format("%s.%s.%s.%s.%s",
+ ClusterStatusMonitor.PARTICIPANT_STATUS_KEY, clusterName, ClusterStatusMonitor.DEFAULT_TAG,
+ "localhost_12918", testDB));
+
+ // localhost_12919 should have 1 partition because it's SLAVE
+ objName =
+ monitor.getObjectName(monitor.getPerInstanceResourceBeanName("localhost_12919", testDB));
+ value = _server.getAttribute(objName, "PartitionGauge");
+ Assert.assertTrue(value instanceof Long);
+ Assert.assertEquals((Long) value, new Long(1));
+
+ // localhost_12921 should have 0 partition because it's OFFLINE
+ objName =
+ monitor.getObjectName(monitor.getPerInstanceResourceBeanName("localhost_12921", testDB));
+ value = _server.getAttribute(objName, "PartitionGauge");
+ Assert.assertTrue(value instanceof Long);
+ Assert.assertEquals((Long) value, new Long(0));
+
+ // localhost_12922 should have 0 partition because it's DROPPED
+ objName =
+ monitor.getObjectName(monitor.getPerInstanceResourceBeanName("localhost_12922", testDB));
+ value = _server.getAttribute(objName, "PartitionGauge");
+ Assert.assertTrue(value instanceof Long);
+ Assert.assertEquals((Long) value, new Long(0));
+
+ // Missing localhost_12918 in best possible ideal-state should remove it from mbean
+ replicaMap.remove(ParticipantId.from("localhost_12918"));
+ assignment.addReplicaMap(PartitionId.from(testDB_0), replicaMap);
+ bestPossibleStates.setResourceAssignment(ResourceId.from(testDB), assignment);
+ monitor.setPerInstanceResourceStatus(bestPossibleStates, instanceConfigMap, resourceMap,
+ stateModelDefMap);
+ try {
+ objName =
+ monitor.getObjectName(monitor.getPerInstanceResourceBeanName("localhost_12918", testDB));
+ _server.getMBeanInfo(objName);
+ Assert.fail("Fail to unregister PerInstanceResource mbean for localhost_12918");
+
+ } catch (InstanceNotFoundException e) {
+ // OK
}
- ZNRecord externalView =
- DefaultTwoStateStrategy.calculateIdealState(_instances, _partitions, _replicas, _db,
- "MASTER", "SLAVE");
- ZNRecord externalView2 =
- DefaultTwoStateStrategy.calculateIdealState(_instances, 80, 2, "TestDB", "MASTER", "SLAVE");
+ // Clean up
+ monitor.reset();
+
+ try {
+ objName =
+ monitor.getObjectName(monitor.getPerInstanceResourceBeanName("localhost_12920", testDB));
+ _server.getMBeanInfo(objName);
+ Assert.fail("Fail to unregister PerInstanceResource mbean for localhost_12920");
- List<ZNRecord> externalViews = new ArrayList<ZNRecord>();
- externalViews.add(externalView);
- externalViews.add(externalView2);
+ } catch (InstanceNotFoundException e) {
+ // OK
+ }
+
+ try {
+ _server.getMBeanInfo(clusterMonitorObjName);
+ Assert.fail("Fail to unregister ClusterStatusMonitor");
+ } catch (InstanceNotFoundException e) {
+ // OK
+ }
- ClusterStatusMonitor monitor = new ClusterStatusMonitor("cluster1");
- MockHelixManager manager = new MockHelixManager();
- NotificationContext context = new NotificationContext(manager);
- System.out.println("END TestClusterStatusMonitor at" + new Date(System.currentTimeMillis()));
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/96aef71c/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
index e8bb4b6..dcca755 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
@@ -28,14 +28,13 @@ import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixProperty;
import org.apache.helix.Mocks;
import org.apache.helix.PropertyKey;
-import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.PropertyType;
import org.apache.helix.ZNRecord;
import org.apache.helix.controller.strategy.DefaultTwoStateStrategy;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.LiveInstance.LiveInstanceProperty;
-import org.testng.AssertJUnit;
+import org.testng.Assert;
import org.testng.annotations.Test;
public class TestResourceMonitor {
@@ -106,46 +105,52 @@ public class TestResourceMonitor {
}
@Test()
- public void TestReportData() {
- MockHelixManager manager = new MockHelixManager();
+ public void testReportData() {
+ final int n = 5;
ResourceMonitor monitor = new ResourceMonitor(_clusterName, _dbName);
- HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
- Builder keyBuilder = helixDataAccessor.keyBuilder();
- ExternalView externalView = helixDataAccessor.getProperty(keyBuilder.externalView(_dbName));
- IdealState idealState = helixDataAccessor.getProperty(keyBuilder.idealStates(_dbName));
+ List<String> instances = new ArrayList<String>();
+ for (int i = 0; i < n; i++) {
+ String instance = "localhost_" + (12918 + i);
+ instances.add(instance);
+ }
- monitor.updateExternalView(externalView, idealState);
+ ZNRecord idealStateRecord =
+ DefaultTwoStateStrategy.calculateIdealState(instances, _partitions, _replicas, _dbName,
+ "MASTER", "SLAVE");
+ IdealState idealState = new IdealState(idealStateRecord);
+ ExternalView externalView = new ExternalView(idealStateRecord);
- AssertJUnit.assertEquals(monitor.getDifferenceWithIdealStateGauge(), 0);
- AssertJUnit.assertEquals(monitor.getErrorPartitionGauge(), 0);
- AssertJUnit.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions);
- AssertJUnit.assertEquals(monitor.getPartitionGauge(), _partitions);
- monitor.getBeanName();
+ monitor.updateResource(externalView, idealState);
- int n = 4;
- for (int i = 0; i < n; i++) {
+ Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), 0);
+ Assert.assertEquals(monitor.getErrorPartitionGauge(), 0);
+ Assert.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions);
+ Assert.assertEquals(monitor.getPartitionGauge(), _partitions);
+ // monitor.getBeanName();
+
+ final int m = n - 1;
+ for (int i = 0; i < m; i++) {
Map<String, String> map = externalView.getStateMap(_dbName + "_" + 3 * i);
String key = map.keySet().toArray()[0].toString();
map.put(key, "ERROR");
externalView.setStateMap(_dbName + "_" + 3 * i, map);
}
- monitor.updateExternalView(externalView, idealState);
- AssertJUnit.assertEquals(monitor.getDifferenceWithIdealStateGauge(), 0);
- AssertJUnit.assertEquals(monitor.getErrorPartitionGauge(), n);
- AssertJUnit.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions);
- AssertJUnit.assertEquals(monitor.getPartitionGauge(), _partitions);
+ monitor.updateResource(externalView, idealState);
+ Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), 0);
+ Assert.assertEquals(monitor.getErrorPartitionGauge(), m);
+ Assert.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions);
+ Assert.assertEquals(monitor.getPartitionGauge(), _partitions);
- n = 5;
for (int i = 0; i < n; i++) {
externalView.getRecord().getMapFields().remove(_dbName + "_" + 4 * i);
}
- monitor.updateExternalView(externalView, idealState);
- AssertJUnit.assertEquals(monitor.getDifferenceWithIdealStateGauge(), n * (_replicas + 1));
- AssertJUnit.assertEquals(monitor.getErrorPartitionGauge(), 3);
- AssertJUnit.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions - n);
- AssertJUnit.assertEquals(monitor.getPartitionGauge(), _partitions);
+ monitor.updateResource(externalView, idealState);
+ Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), n * (_replicas + 1));
+ Assert.assertEquals(monitor.getErrorPartitionGauge(), 3);
+ Assert.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions - n);
+ Assert.assertEquals(monitor.getPartitionGauge(), _partitions);
}
}