You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:58 UTC

[18/42] Refactoring the package names and removing jsql parser

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/monitoring/ParticipantMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/ParticipantMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/ParticipantMonitor.java
new file mode 100644
index 0000000..4a44c8b
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/ParticipantMonitor.java
@@ -0,0 +1,137 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.monitoring;
+
+import java.lang.management.ManagementFactory;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+import org.apache.helix.monitoring.mbeans.StateTransitionStatMonitor;
+import org.apache.log4j.Logger;
+
+
+public class ParticipantMonitor
+{
+  private final ConcurrentHashMap<StateTransitionContext, StateTransitionStatMonitor> _monitorMap
+   = new ConcurrentHashMap<StateTransitionContext, StateTransitionStatMonitor>();
+  private static final Logger LOG = Logger.getLogger(ParticipantMonitor.class);
+
+  private MBeanServer _beanServer;
+
+  public ParticipantMonitor()
+  {
+    try
+    {
+      _beanServer = ManagementFactory.getPlatformMBeanServer();
+    }
+    catch(Exception e)
+    {
+      LOG.warn(e);
+      e.printStackTrace();
+      _beanServer = null;
+    }
+  }
+
+  public void reportTransitionStat(StateTransitionContext cxt,
+      StateTransitionDataPoint data)
+  {
+    if(_beanServer == null)
+    {
+      LOG.warn("bean server is null, skip reporting");
+      return;
+    }
+    try
+    {
+      if(!_monitorMap.containsKey(cxt))
+      {
+        synchronized(this)
+        {
+          if(!_monitorMap.containsKey(cxt))
+          {
+            StateTransitionStatMonitor bean = new StateTransitionStatMonitor(cxt, TimeUnit.MILLISECONDS);
+            _monitorMap.put(cxt, bean);
+            String beanName = cxt.toString();
+            register(bean, getObjectName(beanName));
+          }
+        }
+      }
+      _monitorMap.get(cxt).addDataPoint(data);
+    }
+    catch(Exception e)
+    {
+      LOG.warn(e);
+      e.printStackTrace();
+    }
+  }
+
+
+  private ObjectName getObjectName(String name) throws MalformedObjectNameException
+  {
+    LOG.info("Registering bean: "+name);
+    return new ObjectName("CLMParticipantReport:"+name);
+  }
+
+  private void register(Object bean, ObjectName name)
+  {
+    if(_beanServer == null)
+    {
+      LOG.warn("bean server is null, skip reporting");
+      return;
+    }
+    try
+    {
+      _beanServer.unregisterMBean(name);
+    }
+    catch (Exception e1)
+    {
+      // Swallow silently
+    }
+
+    try
+    {
+      _beanServer.registerMBean(bean, name);
+    }
+    catch (Exception e)
+    {
+      LOG.warn("Could not register MBean", e);
+    }
+  }
+
+  public void shutDown()
+  {
+    for(StateTransitionContext cxt : _monitorMap.keySet() )
+    {
+      try
+      {
+        ObjectName name = getObjectName(cxt.toString());
+        if (_beanServer.isRegistered(name))
+        {
+          _beanServer.unregisterMBean(name);
+        }
+      }
+      catch (Exception e)
+      {
+        LOG.warn("fail to unregister " + cxt.toString(), e);
+      }
+    }
+    _monitorMap.clear();
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/monitoring/SensorNameProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/SensorNameProvider.java b/helix-core/src/main/java/org/apache/helix/monitoring/SensorNameProvider.java
new file mode 100644
index 0000000..e5fc914
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/SensorNameProvider.java
@@ -0,0 +1,6 @@
+package org.apache.helix.monitoring;
+
+public interface SensorNameProvider
+{
+  String getSensorName();
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/monitoring/StatCollector.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/StatCollector.java b/helix-core/src/main/java/org/apache/helix/monitoring/StatCollector.java
new file mode 100644
index 0000000..b4e6ef9
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/StatCollector.java
@@ -0,0 +1,90 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.monitoring;
+
+import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
+import org.apache.commons.math.stat.descriptive.SynchronizedDescriptiveStatistics;
+
+public class StatCollector
+{
+  private static final int DEFAULT_WINDOW_SIZE = 100;
+  private final DescriptiveStatistics _stats;
+  private long _numDataPoints;
+  private long _totalSum;
+
+  public StatCollector()
+  {
+    _stats = new SynchronizedDescriptiveStatistics();
+    _stats.setWindowSize(DEFAULT_WINDOW_SIZE);
+  }
+
+  public void addData(double data)
+  {
+    _numDataPoints++;
+    _totalSum += data;
+    _stats.addValue(data);
+  }
+
+  public long getTotalSum()
+  {
+    return _totalSum;
+  }
+
+  public DescriptiveStatistics getStatistics()
+  {
+    return _stats;
+  }
+
+  public long getNumDataPoints()
+  {
+    return _numDataPoints;
+  }
+
+  public void reset()
+  {
+    _numDataPoints = 0;
+    _totalSum = 0;
+    _stats.clear();
+  }
+
+  public double getMean()
+  {
+    if(_stats.getN() == 0)
+    {
+      return 0;
+    }
+    return _stats.getMean();
+  }
+
+  public double getMax()
+  {
+    return _stats.getMax();
+  }
+
+  public double getMin()
+  {
+    return _stats.getMin();
+  }
+
+  public double getPercentile(int percentage)
+  {
+    if(_stats.getN() == 0)
+    {
+      return 0;
+    }
+    return _stats.getPercentile(percentage*1.0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/monitoring/StateTransitionContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/StateTransitionContext.java b/helix-core/src/main/java/org/apache/helix/monitoring/StateTransitionContext.java
new file mode 100644
index 0000000..58f8a04
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/StateTransitionContext.java
@@ -0,0 +1,90 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.monitoring;
+
+public class StateTransitionContext
+{
+  private final String _resourceName;
+  private final String _clusterName;
+  private final String _instanceName;
+  private final String _transition;
+  
+  public StateTransitionContext(
+      String clusterName, 
+      String instanceName,
+      String resourceName, 
+      String transition
+      )
+  {
+    _clusterName = clusterName;
+    _resourceName = resourceName; 
+    _transition = transition;
+    _instanceName = instanceName;
+  }
+  
+  public String getClusterName()
+  {
+    return _clusterName;
+  }
+  
+  public String getInstanceName()
+  {
+    return _instanceName;
+  }
+  
+  public String getResourceName()
+  {
+    return _resourceName;
+  }
+  
+  public String getTransition()
+  {
+    return _transition;
+  }
+  
+  @Override
+  public boolean equals(Object other)
+  {
+    if(! (other instanceof StateTransitionContext))
+    {
+      return false;
+    }
+    
+    StateTransitionContext otherCxt = (StateTransitionContext) other;  
+    return
+      _clusterName.equals(otherCxt.getClusterName()) &&
+      // _instanceName.equals(otherCxt.getInstanceName()) &&
+      _resourceName.equals(otherCxt.getResourceName()) &&
+      _transition.equals(otherCxt.getTransition()) ;
+  }
+    
+
+  // In the report, we will gather per transition time statistics
+ @Override
+  public int hashCode()
+  {
+    return toString().hashCode();
+  }
+  
+  public String toString()
+  {
+     return "Cluster=" + _clusterName + "," + 
+           // "instance=" + _instanceName + "," +
+           "Resource=" + _resourceName +"," + 
+           "Transition=" + _transition;    
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/monitoring/StateTransitionDataPoint.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/StateTransitionDataPoint.java b/helix-core/src/main/java/org/apache/helix/monitoring/StateTransitionDataPoint.java
new file mode 100644
index 0000000..cdf3024
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/StateTransitionDataPoint.java
@@ -0,0 +1,45 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.monitoring;
+
+public class StateTransitionDataPoint
+{
+  long _totalDelay;
+  long _executionDelay;
+  boolean _isSuccess;
+  
+  public StateTransitionDataPoint(long totalDelay, long executionDelay, boolean isSuccess)
+  {
+    _totalDelay = totalDelay;
+    _executionDelay = executionDelay;
+    _isSuccess = isSuccess;
+  }
+  
+  public long getTotalDelay()
+  {
+    return _totalDelay;
+  }
+  
+  public long getExecutionDelay()
+  {
+    return _executionDelay;
+  }
+  
+  public boolean getSuccess()
+  {
+    return _isSuccess;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/monitoring/ZKPathDataDumpTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/ZKPathDataDumpTask.java b/helix-core/src/main/java/org/apache/helix/monitoring/ZKPathDataDumpTask.java
new file mode 100644
index 0000000..87fbef9
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/ZKPathDataDumpTask.java
@@ -0,0 +1,163 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.monitoring;
+
+import java.io.StringWriter;
+import java.util.Date;
+import java.util.List;
+import java.util.TimerTask;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.util.HelixUtil;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.data.Stat;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig;
+
+
+public class ZKPathDataDumpTask extends TimerTask
+{
+  static Logger logger = Logger.getLogger(ZKPathDataDumpTask.class);
+
+  private final int _thresholdNoChangeInMs;
+  private final HelixManager _manager;
+  private final ZkClient _zkClient;
+
+  public ZKPathDataDumpTask(HelixManager manager, ZkClient zkClient, int thresholdNoChangeInMs)
+  {
+    _manager = manager;
+    _zkClient = zkClient;
+    logger.info("Scannning cluster statusUpdate " + manager.getClusterName()
+        + " thresholdNoChangeInMs: " + thresholdNoChangeInMs);
+    _thresholdNoChangeInMs = thresholdNoChangeInMs;
+  }
+
+  @Override
+  public void run()
+  {
+    // For each record in status update and error node
+    // TODO: for now the status updates are dumped to cluster manager log4j log.
+    // We need to think if we should create per-instance log files that contains
+    // per-instance statusUpdates
+    // and errors
+    logger.info("Scannning status updates ...");
+    try
+    {
+      HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+      Builder keyBuilder = accessor.keyBuilder();
+
+      List<String> instances = accessor.getChildNames(keyBuilder.instanceConfigs());
+      for (String instanceName : instances)
+      {
+        scanPath(HelixUtil.getInstancePropertyPath(_manager.getClusterName(), instanceName,
+            PropertyType.STATUSUPDATES), _thresholdNoChangeInMs);
+        scanPath(HelixUtil.getInstancePropertyPath(_manager.getClusterName(), instanceName,
+            PropertyType.ERRORS), _thresholdNoChangeInMs * 3);
+      }
+      scanPath(HelixUtil.getControllerPropertyPath(_manager.getClusterName(),
+          PropertyType.STATUSUPDATES_CONTROLLER), _thresholdNoChangeInMs);
+
+      scanPath(HelixUtil.getControllerPropertyPath(_manager.getClusterName(),
+          PropertyType.ERRORS_CONTROLLER), _thresholdNoChangeInMs * 3);
+    } catch (Exception e)
+    {
+      logger.error(e);
+      e.printStackTrace();
+    }
+  }
+
+  void scanPath(String path, int thresholdNoChangeInMs)
+  {
+    logger.info("Scannning path " + path);
+    List<String> subPaths = _zkClient.getChildren(path);
+    for (String subPath : subPaths)
+    {
+      try
+      {
+        String nextPath = path + "/" + subPath;
+        List<String> subSubPaths = _zkClient.getChildren(nextPath);
+        for (String subsubPath : subSubPaths)
+        {
+          try
+          {
+            checkAndDump(nextPath + "/" + subsubPath, thresholdNoChangeInMs);
+          } catch (Exception e)
+          {
+            logger.error(e);
+          }
+        }
+      } catch (Exception e)
+      {
+        logger.error(e);
+      }
+    }
+  }
+
+  void checkAndDump(String path, int thresholdNoChangeInMs)
+  {
+    List<String> subPaths = _zkClient.getChildren(path);
+    if(subPaths.size() == 0)
+    {
+      subPaths.add("");
+    }
+    for (String subPath : subPaths)
+    {
+      String fullPath = subPath.length() > 0 ? path + "/" + subPath : path;
+      Stat pathStat = _zkClient.getStat(fullPath);
+
+      long lastModifiedTimeInMs = pathStat.getMtime();
+      long nowInMs = new Date().getTime();
+      // logger.info(nowInMs + " " + lastModifiedTimeInMs + " " + fullPath);
+
+      // Check the last modified time
+      if (nowInMs > lastModifiedTimeInMs)
+      {
+        long timeDiff = nowInMs - lastModifiedTimeInMs;
+        if (timeDiff > thresholdNoChangeInMs)
+        {
+          logger.info("Dumping status update path " + fullPath + " " + timeDiff + "MS has passed");
+          _zkClient.setZkSerializer(new ZNRecordSerializer());
+          ZNRecord record = _zkClient.readData(fullPath);
+
+          // dump the node content into log file
+          ObjectMapper mapper = new ObjectMapper();
+          SerializationConfig serializationConfig = mapper.getSerializationConfig();
+          serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+
+          StringWriter sw = new StringWriter();
+          try
+          {
+            mapper.writeValue(sw, record);
+            logger.info(sw.toString());
+          } catch (Exception e)
+          {
+            logger.warn(
+                    "Exception during serialization in ZKPathDataDumpTask.checkAndDump. This can mostly be ignored",
+                    e);
+          }
+          // Delete the leaf data
+          _zkClient.deleteRecursive(fullPath);
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterAlertItem.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterAlertItem.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterAlertItem.java
new file mode 100644
index 0000000..a4dc212
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterAlertItem.java
@@ -0,0 +1,98 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.monitoring.mbeans;
+
+
+import java.util.Date;
+
+import org.apache.helix.alerts.AlertValueAndStatus;
+
+
+public class ClusterAlertItem implements ClusterAlertItemMBean
+{
+  String _alertItemName;
+  double  _alertValue;
+  int _alertFired;
+  String _additionalInfo = "";
+  AlertValueAndStatus _valueAndStatus;
+  long _lastUpdateTime = 0;
+  
+  public ClusterAlertItem(String name, AlertValueAndStatus valueAndStatus)
+  {
+    _valueAndStatus = valueAndStatus;
+    _alertItemName = name;
+    refreshValues();
+  }
+  @Override
+  public String getSensorName()
+  {
+    return _alertItemName;
+  }
+
+  @Override
+  public double getAlertValue()
+  {
+    return _alertValue;
+  }
+  
+  public void setValueMap(AlertValueAndStatus valueAndStatus)
+  {
+    _valueAndStatus = valueAndStatus;
+    refreshValues();
+  }
+  
+  void refreshValues()
+  {
+    _lastUpdateTime = new Date().getTime();
+    if(_valueAndStatus.getValue().getElements().size() > 0)
+    {
+      _alertValue = Double.parseDouble(_valueAndStatus.getValue().getElements().get(0));
+    }
+    else
+    {
+      _alertValue = 0;
+    }
+    _alertFired = _valueAndStatus.isFired() ?  1 : 0;
+  }
+  @Override
+  public int getAlertFired()
+  {
+    return _alertFired;
+  }
+  
+  public void setAdditionalInfo(String additionalInfo)
+  {
+    _additionalInfo = additionalInfo;
+  }
+  
+  @Override
+  public String getAdditionalInfo()
+  {
+    return _additionalInfo;
+  }
+  
+  public void reset()
+  {
+    _alertFired = 0;
+    _additionalInfo = "";
+    _alertValue = 0;
+  }
+  
+  public long getLastUpdateTime()
+  {
+    return _lastUpdateTime;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterAlertItemMBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterAlertItemMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterAlertItemMBean.java
new file mode 100644
index 0000000..1bcf1e5
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterAlertItemMBean.java
@@ -0,0 +1,27 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.monitoring.mbeans;
+
+public interface ClusterAlertItemMBean
+{
+  String getSensorName();
+  
+  double getAlertValue();
+  
+  int getAlertFired();
+
+  String getAdditionalInfo();
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterAlertMBeanCollection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterAlertMBeanCollection.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterAlertMBeanCollection.java
new file mode 100644
index 0000000..a1441e6
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterAlertMBeanCollection.java
@@ -0,0 +1,333 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.monitoring.mbeans;
+
+import java.io.StringWriter;
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.alerts.AlertParser;
+import org.apache.helix.alerts.AlertValueAndStatus;
+import org.apache.helix.alerts.Tuple;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig;
+
+
+public class ClusterAlertMBeanCollection
+{
+  public static String DOMAIN_ALERT = "HelixAlerts";
+  public static String ALERT_SUMMARY = "AlertSummary";
+  
+  private static final Logger _logger = Logger.getLogger(ClusterAlertMBeanCollection.class);
+  ConcurrentHashMap<String, ClusterAlertItem> _alertBeans 
+    = new ConcurrentHashMap<String, ClusterAlertItem>();
+  
+  Map<String, String> _recentAlertDelta;
+  ClusterAlertSummary _clusterAlertSummary;
+  ZNRecord _alertHistory = new ZNRecord(PropertyType.ALERT_HISTORY.toString());
+  Set<String> _previousFiredAlerts = new HashSet<String>();
+  // 5 min for mbean freshness threshold
+  public static final long ALERT_NOCHANGE_THRESHOLD = 5 * 60 * 1000;
+    
+  final MBeanServer _beanServer;
+  
+  public interface ClusterAlertSummaryMBean extends ClusterAlertItemMBean
+  {
+    public String getAlertFiredHistory();
+  }
+  
+  class ClusterAlertSummary extends ClusterAlertItem implements ClusterAlertSummaryMBean
+  {
+    public ClusterAlertSummary(String name, AlertValueAndStatus valueAndStatus)
+    {
+      super(name, valueAndStatus);
+    }
+    /**
+     * Returns the previous 100 alert mbean turn on / off history
+     * */
+    @Override
+    public String getAlertFiredHistory()
+    {
+      try
+      {
+        ObjectMapper mapper = new ObjectMapper();
+        SerializationConfig serializationConfig = mapper.getSerializationConfig();
+        serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+        StringWriter sw = new StringWriter();
+        mapper.writeValue(sw, _alertHistory);
+        return sw.toString();
+      }
+      catch(Exception e)
+      {
+        _logger.warn("", e);
+        return "";
+      }
+    }
+  }
+  
+  
+  public ClusterAlertMBeanCollection()
+  {
+    _beanServer = ManagementFactory.getPlatformMBeanServer();
+  }
+  
+  public Collection<ClusterAlertItemMBean> getCurrentAlertMBeans()
+  {
+    ArrayList<ClusterAlertItemMBean> beans = new ArrayList<ClusterAlertItemMBean>();
+    for(ClusterAlertItem item : _alertBeans.values())
+    {
+      beans.add(item);
+    }
+    return beans;
+  }
+
+  void onNewAlertMbeanAdded(ClusterAlertItemMBean bean)
+  {
+    try
+    {
+      _logger.info("alert bean " + bean.getSensorName()+" exposed to jmx");
+      System.out.println("alert bean " + bean.getSensorName()+" exposed to jmx");
+      ObjectName objectName =  new ObjectName(DOMAIN_ALERT+":alert="+bean.getSensorName());
+      register(bean, objectName);
+    } 
+    catch (Exception e)
+    {
+      _logger.error("", e);
+      e.printStackTrace();
+    }
+  }
+  
+  public void setAlerts(String originAlert, Map<String, AlertValueAndStatus> alertResultMap, String clusterName)
+  {
+    if(alertResultMap == null)
+    {
+      _logger.warn("null alertResultMap");
+      return;
+    }
+    for(String alertName : alertResultMap.keySet())
+    {
+      String beanName = "";
+      if(alertName.length() > 1)
+      {
+        String comparator = AlertParser.getComponent(AlertParser.COMPARATOR_NAME, originAlert);
+        String constant = AlertParser.getComponent(AlertParser.CONSTANT_NAME, originAlert);
+        beanName = "("+ alertName+")" + comparator + "("+constant+")";
+      }
+      else
+      {
+        beanName = originAlert + "--(" + alertName + ")";
+      }
+      // This is to make JMX happy; certain charaters cannot be in JMX bean name
+      beanName = beanName.replace('*', '%').replace('=', '#').replace(',', ';');
+      if(!_alertBeans.containsKey(beanName))
+      {
+        ClusterAlertItem item = new ClusterAlertItem(beanName, alertResultMap.get(alertName));
+        onNewAlertMbeanAdded(item);
+        _alertBeans.put(beanName, item);
+      }
+      else
+      {
+        _alertBeans.get(beanName).setValueMap(alertResultMap.get(alertName));
+      }
+    }
+    refreshSummayAlert(clusterName);
+  }
+  
+  public void setAlertHistory(ZNRecord alertHistory)
+  {
+    _alertHistory = alertHistory;
+  }
+  /**
+   *  The summary alert is a combination of all alerts, if it is on, something is wrong on this 
+   *  cluster. The additional info contains all alert mbean names that has been fired.
+   */
+  void refreshSummayAlert(String clusterName)
+  {
+    boolean fired = false;
+    String alertsFired = "";
+    String summaryKey = ALERT_SUMMARY + "_" + clusterName;
+    for(String key : _alertBeans.keySet())
+    {
+      if(!key.equals(summaryKey))
+      {
+        ClusterAlertItem item = _alertBeans.get(key);
+        fired = (item.getAlertFired() == 1) | fired;
+        if(item.getAlertFired() == 1)
+        {
+          alertsFired += item._alertItemName;
+          alertsFired += ";";
+        }
+      }
+    }
+    Tuple<String> t = new Tuple<String>();
+    t.add("0");
+    AlertValueAndStatus summaryStatus = new AlertValueAndStatus(t, fired);
+    if(!_alertBeans.containsKey(summaryKey))
+    {
+      ClusterAlertSummary item = new ClusterAlertSummary(summaryKey, summaryStatus);
+      onNewAlertMbeanAdded(item);
+      item.setAdditionalInfo(alertsFired);
+      _alertBeans.put(summaryKey, item);
+      _clusterAlertSummary = item;
+    }
+    else
+    {
+      _alertBeans.get(summaryKey).setValueMap(summaryStatus);
+      _alertBeans.get(summaryKey).setAdditionalInfo(alertsFired);
+    }
+  }
+  
+  void register(Object bean, ObjectName name)
+  {
+    try
+    {
+      _beanServer.unregisterMBean(name);
+    }
+    catch (Exception e)
+    {
+    }
+    try
+    {
+      _beanServer.registerMBean(bean, name);
+    }
+    catch (Exception e)
+    {
+      _logger.error("Could not register MBean", e);
+    }
+  }
+  
+  public void reset()
+  {
+    for(String beanName : _alertBeans.keySet())
+    {
+      ClusterAlertItem item = _alertBeans.get(beanName);
+      item.reset();
+      try
+      {
+        ObjectName objectName =  new ObjectName(DOMAIN_ALERT+":alert="+item.getSensorName());
+        _beanServer.unregisterMBean(objectName);
+      }
+      catch (Exception e)
+      {
+        _logger.warn("", e);
+      }
+    }
+    _alertBeans.clear();
+  }
+
+  public void refreshAlertDelta(String clusterName)
+  {
+    // Update the alert turn on/turn off history
+    String summaryKey = ALERT_SUMMARY + "_" + clusterName;
+    Set<String> currentFiredAlerts = new HashSet<String>();
+    for(String key : _alertBeans.keySet())
+    {
+      if(!key.equals(summaryKey))
+      {
+        ClusterAlertItem item = _alertBeans.get(key);
+        if(item.getAlertFired() == 1)
+        {
+          currentFiredAlerts.add(item._alertItemName);
+        }
+      }
+    }
+    
+    Map<String, String> onOffAlertsMap = new HashMap<String, String>();
+    for(String alertName : currentFiredAlerts)
+    {
+      if(!_previousFiredAlerts.contains(alertName))
+      {
+        onOffAlertsMap.put(alertName, "ON");
+        _logger.info(alertName + " ON");
+        _previousFiredAlerts.add(alertName);
+      }
+    }      
+    for(String cachedAlert : _previousFiredAlerts)
+    {
+      if(!currentFiredAlerts.contains(cachedAlert))
+      {
+        onOffAlertsMap.put(cachedAlert, "OFF");
+        _logger.info(cachedAlert + " OFF");
+      }
+    }
+    for(String key : onOffAlertsMap.keySet())
+    {
+      if(onOffAlertsMap.get(key).equals("OFF"))
+      {
+        _previousFiredAlerts.remove(key);
+      }
+    }
+    if(onOffAlertsMap.size() == 0)
+    {
+      _logger.info("No MBean change");
+    }
+    _recentAlertDelta = onOffAlertsMap;
+
+    checkMBeanFreshness(ALERT_NOCHANGE_THRESHOLD);
+  }
+  
+  public Map<String, String> getRecentAlertDelta()
+  {
+    return _recentAlertDelta;
+  }
+  
+  /**
+   * Remove mbeans that has not been changed for thresholdInMs MS
+   * */
+  void checkMBeanFreshness(long thresholdInMs)
+  {
+    long now = new Date().getTime();
+    Set<String> oldBeanNames = new HashSet<String>();
+    // Get mbean items that has not been updated for thresholdInMs
+    for(String beanName : _alertBeans.keySet())
+    {
+      ClusterAlertItem item = _alertBeans.get(beanName);
+      if(now - item.getLastUpdateTime() > thresholdInMs)
+      {
+        oldBeanNames.add(beanName);
+        _logger.info("bean " + beanName+" has not been updated for "+ thresholdInMs +" MS");
+      }
+    }
+    for(String beanName : oldBeanNames)
+    {
+      ClusterAlertItem item = _alertBeans.get(beanName);
+      _alertBeans.remove(beanName);
+      try
+      {
+        item.reset();      
+        ObjectName objectName =  new ObjectName(DOMAIN_ALERT+":alert="+item.getSensorName());
+        _beanServer.unregisterMBean(objectName);
+      }
+      catch (Exception e)
+      {
+        _logger.warn("", e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterMBeanObserver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterMBeanObserver.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterMBeanObserver.java
new file mode 100644
index 0000000..dde8a49
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterMBeanObserver.java
@@ -0,0 +1,98 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.monitoring.mbeans;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+
+import javax.management.AttributeNotFoundException;
+import javax.management.InstanceNotFoundException;
+import javax.management.IntrospectionException;
+import javax.management.ListenerNotFoundException;
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanException;
+import javax.management.MBeanInfo;
+import javax.management.MBeanServerConnection;
+import javax.management.MBeanServerDelegate;
+import javax.management.MBeanServerNotification;
+import javax.management.MalformedObjectNameException;
+import javax.management.Notification;
+import javax.management.NotificationListener;
+import javax.management.ReflectionException;
+import javax.management.relation.MBeanServerNotificationFilter;
+
+import org.apache.log4j.Logger;
+
+/*
+ * TODO: this class should be in espresso common, as the only usage of it is
+ * to create ingraph adaptors
+ * **/
+public abstract class ClusterMBeanObserver implements NotificationListener
+{
+  protected final String _domain;
+  protected MBeanServerConnection _server;
+  private static final Logger _logger = Logger.getLogger(ClusterMBeanObserver.class);
+      
+  public ClusterMBeanObserver(String domain) 
+      throws InstanceNotFoundException, IOException, MalformedObjectNameException, NullPointerException
+  {
+    // Get a reference to the target MBeanServer
+    _domain = domain;
+    _server = ManagementFactory.getPlatformMBeanServer();
+    MBeanServerNotificationFilter filter = new MBeanServerNotificationFilter();
+    filter.enableAllObjectNames();
+    _server.addNotificationListener(MBeanServerDelegate.DELEGATE_NAME, this, filter, null);
+  }
+  
+  public void handleNotification(Notification notification, Object handback)
+  {
+    MBeanServerNotification mbs = (MBeanServerNotification) notification;
+    if(MBeanServerNotification.REGISTRATION_NOTIFICATION.equals(mbs.getType())) 
+    {
+      if(mbs.getMBeanName().getDomain().equalsIgnoreCase(_domain))
+      {
+        _logger.info("MBean Registered, name :" + mbs.getMBeanName());
+        onMBeanRegistered(_server, mbs);
+      } 
+    }
+    else if(MBeanServerNotification.UNREGISTRATION_NOTIFICATION.equals(mbs.getType())) 
+    {
+      if(mbs.getMBeanName().getDomain().equalsIgnoreCase(_domain))
+      {
+        _logger.info("MBean Unregistered, name :" + mbs.getMBeanName());
+        onMBeanUnRegistered(_server, mbs);
+      }
+    }
+  }
+  
+  public void disconnect()
+  {
+    MBeanServerNotificationFilter filter = new MBeanServerNotificationFilter();
+    try
+    {
+      _server.removeNotificationListener(MBeanServerDelegate.DELEGATE_NAME, this);
+    }
+    catch (Exception e)
+    {
+      _logger.error("", e);
+    }
+  }
+  
+  public abstract void onMBeanRegistered(MBeanServerConnection server, MBeanServerNotification mbsNotification);
+  
+  public abstract void onMBeanUnRegistered(MBeanServerConnection server, MBeanServerNotification mbsNotification);  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
new file mode 100644
index 0000000..4499add
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -0,0 +1,283 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.monitoring.mbeans;
+
+import java.lang.management.ManagementFactory;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.log4j.Logger;
+
+
+public class ClusterStatusMonitor implements ClusterStatusMonitorMBean
+{
+  private static final Logger                                  LOG                       =
+                                                                                             Logger.getLogger(ClusterStatusMonitor.class);
+
+  static final String                                          CLUSTER_STATUS_KEY        =
+                                                                                             "ClusterStatus";
+  static final String                                          MESSAGE_QUEUE_STATUS_KEY  =
+                                                                                             "MessageQueueStatus";
+  static final String                                          RESOURCE_STATUS_KEY       =
+                                                                                             "ResourceStatus";
+  static final String                                          CLUSTER_DN_KEY            =
+                                                                                             "cluster";
+  static final String                                          RESOURCE_DN_KEY           =
+                                                                                             "resourceName";
+  static final String                                          INSTANCE_DN_KEY           =
+                                                                                             "instanceName";
+
+  private final String                                         _clusterName;
+  private final MBeanServer                                    _beanServer;
+
+  private int                                                  _numOfLiveInstances       =
+                                                                                             0;
+  private int                                                  _numOfInstances           =
+                                                                                             0;
+  private int                                                  _numOfDisabledInstances   =
+                                                                                             0;
+  private int                                                  _numOfDisabledPartitions  =
+                                                                                             0;
+
+  private final ConcurrentHashMap<String, ResourceMonitor>     _resourceMbeanMap         =
+                                                                                             new ConcurrentHashMap<String, ResourceMonitor>();
+
+  private final ConcurrentHashMap<String, MessageQueueMonitor> _instanceMsgQueueMbeanMap =
+                                                                                             new ConcurrentHashMap<String, MessageQueueMonitor>();
+
+  public ClusterStatusMonitor(String clusterName)
+  {
+    _clusterName = clusterName;
+    _beanServer = ManagementFactory.getPlatformMBeanServer();
+    try
+    {
+      register(this, getObjectName(CLUSTER_DN_KEY + "=" + _clusterName));
+    }
+    catch (Exception e)
+    {
+      LOG.error("Register self failed.", e);
+    }
+  }
+
+  public ObjectName getObjectName(String name) throws MalformedObjectNameException
+  {
+    return new ObjectName(CLUSTER_STATUS_KEY + ": " + name);
+  }
+
+  // Used by other external JMX consumers like ingraph
+  public String getBeanName()
+  {
+    return CLUSTER_STATUS_KEY + " " + _clusterName;
+  }
+
+  @Override
+  public long getDownInstanceGauge()
+  {
+    return _numOfInstances - _numOfLiveInstances;
+  }
+
+  @Override
+  public long getInstancesGauge()
+  {
+    return _numOfInstances;
+  }
+
+  @Override
+  public long getDisabledInstancesGauge()
+  {
+    return _numOfDisabledInstances;
+  }
+
+  @Override
+  public long getDisabledPartitionsGauge()
+  {
+    return _numOfDisabledPartitions;
+  }
+
+  @Override
+  public long getMaxMessageQueueSizeGauge()
+  {
+    long maxQueueSize = 0;
+    for (MessageQueueMonitor msgQueue : _instanceMsgQueueMbeanMap.values())
+    {
+      if (msgQueue.getMaxMessageQueueSize() > maxQueueSize)
+      {
+        maxQueueSize = (long)msgQueue.getMaxMessageQueueSize();
+      }
+    }
+    
+    return maxQueueSize;
+  }
+
+  @Override
+  public String getMessageQueueSizes()
+  {
+    Map<String, Long> msgQueueSizes = new TreeMap<String, Long>();
+    for (String instance : _instanceMsgQueueMbeanMap.keySet())
+    {
+      MessageQueueMonitor msgQueue = _instanceMsgQueueMbeanMap.get(instance);
+        msgQueueSizes.put(instance, new Long( (long)msgQueue.getMaxMessageQueueSize()));
+    }
+    
+    return msgQueueSizes.toString();
+  }
+
+  private void register(Object bean, ObjectName name)
+  {
+    try
+    {
+      if (_beanServer.isRegistered(name))
+      {
+        _beanServer.unregisterMBean(name);
+      }
+    }
+    catch (Exception e)
+    {
+      // OK
+    }
+
+    try
+    {
+      LOG.info("Registering " + name.toString());
+      _beanServer.registerMBean(bean, name);
+    }
+    catch (Exception e)
+    {
+      LOG.warn("Could not register MBean" + name, e);
+    }
+  }
+
+  private void unregister(ObjectName name)
+  {
+    try
+    {
+      if (_beanServer.isRegistered(name))
+      {
+        LOG.info("Unregistering " + name.toString());
+        _beanServer.unregisterMBean(name);
+      }
+    }
+    catch (Exception e)
+    {
+      LOG.warn("Could not unregister MBean" + name, e);
+    }
+  }
+
+  public void setClusterStatusCounters(int numberLiveInstances,
+                                       int numberOfInstances,
+                                       int disabledInstances,
+                                       int disabledPartitions)
+  {
+    _numOfInstances = numberOfInstances;
+    _numOfLiveInstances = numberLiveInstances;
+    _numOfDisabledInstances = disabledInstances;
+    _numOfDisabledPartitions = disabledPartitions;
+  }
+
+  public void onExternalViewChange(ExternalView externalView, IdealState idealState)
+  {
+    try
+    {
+      String resourceName = externalView.getId();
+      if (!_resourceMbeanMap.containsKey(resourceName))
+      {
+        synchronized (this)
+        {
+          if (!_resourceMbeanMap.containsKey(resourceName))
+          {
+            ResourceMonitor bean = new ResourceMonitor(_clusterName, resourceName);
+            String beanName =
+                CLUSTER_DN_KEY + "=" + _clusterName + "," + RESOURCE_DN_KEY + "="
+                    + resourceName;
+            register(bean, getObjectName(beanName));
+            _resourceMbeanMap.put(resourceName, bean);
+          }
+        }
+      }
+      _resourceMbeanMap.get(resourceName).updateExternalView(externalView, idealState);
+    }
+    catch (Exception e)
+    {
+      LOG.warn(e);
+    }
+  }
+
+  public void addMessageQueueSize(String instanceName, int msgQueueSize)
+  {
+    try
+    {
+      if (!_instanceMsgQueueMbeanMap.containsKey(instanceName))
+      {
+        synchronized (this)
+        {
+          if (!_instanceMsgQueueMbeanMap.containsKey(instanceName))
+          {
+            MessageQueueMonitor bean =
+                new MessageQueueMonitor(_clusterName, instanceName);
+            _instanceMsgQueueMbeanMap.put(instanceName, bean);
+          }
+        }
+      }
+      _instanceMsgQueueMbeanMap.get(instanceName).addMessageQueueSize(msgQueueSize);
+    }
+    catch (Exception e)
+    {
+      LOG.warn("fail to add message queue size to mbean", e);
+    }
+  }
+
+  public void reset()
+  {
+    LOG.info("Resetting ClusterStatusMonitor");
+    try
+    {
+      for (String resourceName : _resourceMbeanMap.keySet())
+      {
+        String beanName =
+            CLUSTER_DN_KEY + "=" + _clusterName + "," + RESOURCE_DN_KEY + "="
+                + resourceName;
+        unregister(getObjectName(beanName));
+      }
+      _resourceMbeanMap.clear();
+
+      for (MessageQueueMonitor bean : _instanceMsgQueueMbeanMap.values())
+      {
+        bean.reset();
+      }
+      _instanceMsgQueueMbeanMap.clear();
+
+      unregister(getObjectName(CLUSTER_DN_KEY + "=" + _clusterName));
+    }
+    catch (Exception e)
+    {
+      LOG.error("fail to reset ClusterStatusMonitor", e);
+    }
+  }
+
+  @Override
+  public String getSensorName()
+  {
+    return CLUSTER_STATUS_KEY + "_" + _clusterName;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
new file mode 100644
index 0000000..bfdb9c9
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
@@ -0,0 +1,43 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.monitoring.mbeans;
+
+import org.apache.helix.monitoring.SensorNameProvider;
+
+public interface ClusterStatusMonitorMBean extends SensorNameProvider
+{
+  public long getDownInstanceGauge();
+  
+  public long getInstancesGauge();
+  
+  public long getDisabledInstancesGauge();
+  
+  public long getDisabledPartitionsGauge();
+  
+  /**
+   * The max message queue size across all instances including controller
+   * will report to ingraph
+   * @return 
+   */
+  public long getMaxMessageQueueSizeGauge();
+  
+  /**
+   * Get all message queue sizes as a string
+   * will NOT report to ingraph
+   * @return
+   */
+  public String getMessageQueueSizes();
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/HelixStageLatencyMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/HelixStageLatencyMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/HelixStageLatencyMonitor.java
new file mode 100644
index 0000000..8a42a28
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/HelixStageLatencyMonitor.java
@@ -0,0 +1,113 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.monitoring.mbeans;
+
+import java.lang.management.ManagementFactory;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.helix.monitoring.StatCollector;
+import org.apache.log4j.Logger;
+
+
+public class HelixStageLatencyMonitor implements HelixStageLatencyMonitorMBean
+{
+  private static final Logger LOG = Logger.getLogger(HelixStageLatencyMonitor.class);
+
+  private final StatCollector _stgLatency;
+  private final MBeanServer _beanServer;
+  private final String _clusterName;
+  private final String _stageName;
+  private final ObjectName _objectName;
+
+  public HelixStageLatencyMonitor(String clusterName, String stageName) throws Exception
+  {
+    _clusterName = clusterName;
+    _stageName = stageName;
+    _stgLatency = new StatCollector();
+    _beanServer = ManagementFactory.getPlatformMBeanServer();
+    _objectName = new ObjectName("StageLatencyMonitor: " + "cluster=" + _clusterName + ",stage=" + _stageName);
+    try
+    {
+      register(this, _objectName);
+    }
+    catch (Exception e)
+    {
+      LOG.error("Couldn't register " + _objectName + " mbean", e);
+      throw e;
+    }
+  }
+
+  private void register(Object bean, ObjectName name) throws Exception
+  {
+    try
+    {
+      _beanServer.unregisterMBean(name);
+    }
+    catch (Exception e)
+    {
+      // OK
+    }
+
+    _beanServer.registerMBean(bean, name);
+  }
+
+  private void unregister(ObjectName name)
+  {
+    try
+    {
+      if (_beanServer.isRegistered(name))
+      {
+        _beanServer.unregisterMBean(name);
+      }
+    }
+    catch (Exception e)
+    {
+      LOG.error("Couldn't unregister " + _objectName + " mbean", e);
+    }
+  }
+
+  public void addStgLatency(long time)
+  {
+    _stgLatency.addData(time);
+  }
+
+  public void reset()
+  {
+    _stgLatency.reset();
+    unregister(_objectName);
+  }
+
+  @Override
+  public long getMaxStgLatency()
+  {
+    return (long) _stgLatency.getMax();
+  }
+
+  @Override
+  public long getMeanStgLatency()
+  {
+    return (long) _stgLatency.getMean();
+  }
+
+  @Override
+  public long get95StgLatency()
+  {
+    return (long) _stgLatency.getPercentile(95);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/HelixStageLatencyMonitorMBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/HelixStageLatencyMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/HelixStageLatencyMonitorMBean.java
new file mode 100644
index 0000000..f62a36c
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/HelixStageLatencyMonitorMBean.java
@@ -0,0 +1,25 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.monitoring.mbeans;
+
+public interface HelixStageLatencyMonitorMBean
+{
+  public long getMaxStgLatency();
+
+  public long getMeanStgLatency();
+
+  public long get95StgLatency();
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageQueueMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageQueueMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageQueueMonitor.java
new file mode 100644
index 0000000..879f04f
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageQueueMonitor.java
@@ -0,0 +1,51 @@
+package org.apache.helix.monitoring.mbeans;
+
+import org.apache.helix.monitoring.StatCollector;
+import org.apache.log4j.Logger;
+
+
+public class MessageQueueMonitor implements MessageQueueMonitorMBean
+{
+  private static final Logger LOG = Logger.getLogger(MessageQueueMonitor.class);
+
+  private final StatCollector _messageQueueSizeStat;
+  private final String        _clusterName;
+  private final String        _instanceName;
+
+  public MessageQueueMonitor(String clusterName, String instanceName)
+  {
+    _clusterName = clusterName;
+    _instanceName = instanceName;
+    _messageQueueSizeStat = new StatCollector();
+  }
+
+
+  public void addMessageQueueSize(long size)
+  {
+    _messageQueueSizeStat.addData(size);
+  }
+
+  public void reset()
+  {
+    _messageQueueSizeStat.reset();
+  }
+
+  @Override
+  public double getMaxMessageQueueSize()
+  {
+    return _messageQueueSizeStat.getMax();
+  }
+
+  @Override
+  public double getMeanMessageQueueSize()
+  {
+    return _messageQueueSizeStat.getMean();
+  }
+
+  @Override
+  public String getSensorName()
+  {
+    return ClusterStatusMonitor.MESSAGE_QUEUE_STATUS_KEY + "_" + _clusterName + "_"
+        + _instanceName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageQueueMonitorMBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageQueueMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageQueueMonitorMBean.java
new file mode 100644
index 0000000..2201b1b
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageQueueMonitorMBean.java
@@ -0,0 +1,19 @@
+package org.apache.helix.monitoring.mbeans;
+
+import org.apache.helix.monitoring.SensorNameProvider;
+
+public interface MessageQueueMonitorMBean extends SensorNameProvider
+{
+  /**
+   * Get the max message queue size
+   * @return
+   */
+  public double getMaxMessageQueueSize();
+  
+  /**
+   * Get the mean message queue size
+   * @return
+   */
+  public double getMeanMessageQueueSize();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
new file mode 100644
index 0000000..55b6697
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
@@ -0,0 +1,141 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.monitoring.mbeans;
+
+import java.util.Map;
+
+import org.apache.helix.DataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyType;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.log4j.Logger;
+
+
+public class ResourceMonitor implements ResourceMonitorMBean
+{
+  int                         _numOfPartitions;
+  int                         _numOfPartitionsInExternalView;
+  int                         _numOfErrorPartitions;
+  int                         _externalViewIdealStateDiff;
+  private static final Logger LOG = Logger.getLogger(ResourceMonitor.class);
+
+  String                      _resourceName, _clusterName;
+
+  public ResourceMonitor(String clusterName, String resourceName)
+  {
+    _clusterName = clusterName;
+    _resourceName = resourceName;
+  }
+
+  @Override
+  public long getPartitionGauge()
+  {
+    return _numOfPartitions;
+  }
+
+  @Override
+  public long getErrorPartitionGauge()
+  {
+    return _numOfErrorPartitions;
+  }
+
+  @Override
+  public long getDifferenceWithIdealStateGauge()
+  {
+    return _externalViewIdealStateDiff;
+  }
+
+  @Override
+  public String getSensorName()
+  {
+    return ClusterStatusMonitor.RESOURCE_STATUS_KEY + "_" + _clusterName + "_"
+        + _resourceName;
+  }
+
+  public void updateExternalView(ExternalView externalView, IdealState idealState)
+  {
+    if (externalView == null)
+    {
+      LOG.warn("external view is null");
+      return;
+    }
+    String resourceName = externalView.getId();
+
+    if (idealState == null)
+    {
+      LOG.warn("ideal state is null for " + resourceName);
+      _numOfErrorPartitions = 0;
+      _externalViewIdealStateDiff = 0;
+      _numOfPartitionsInExternalView = 0;
+      return;
+    }
+
+    assert (resourceName.equals(idealState.getId()));
+
+    int numOfErrorPartitions = 0;
+    int numOfDiff = 0;
+
+    if (_numOfPartitions == 0)
+    {
+      _numOfPartitions = idealState.getRecord().getMapFields().size();
+    }
+
+    // TODO fix this; IdealState shall have either map fields (CUSTOM mode)
+    // or list fields (AUDO mode)
+    for (String partitionName : idealState.getRecord().getMapFields().keySet())
+    {
+      Map<String, String> idealRecord = idealState.getInstanceStateMap(partitionName);
+      Map<String, String> externalViewRecord = externalView.getStateMap(partitionName);
+
+      if (externalViewRecord == null)
+      {
+        numOfDiff += idealRecord.size();
+        continue;
+      }
+      for (String host : idealRecord.keySet())
+      {
+        if (!externalViewRecord.containsKey(host)
+            || !externalViewRecord.get(host).equals(idealRecord.get(host)))
+        {
+          numOfDiff++;
+        }
+      }
+
+      for (String host : externalViewRecord.keySet())
+      {
+        if (externalViewRecord.get(host).equalsIgnoreCase("ERROR"))
+        {
+          numOfErrorPartitions++;
+        }
+      }
+    }
+    _numOfErrorPartitions = numOfErrorPartitions;
+    _externalViewIdealStateDiff = numOfDiff;
+    _numOfPartitionsInExternalView = externalView.getPartitionSet().size();
+  }
+
+  @Override
+  public long getExternalViewPartitionGauge()
+  {
+    return _numOfPartitionsInExternalView;
+  }
+
+  public String getBeanName()
+  {
+    return _clusterName + " " + _resourceName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitorMBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitorMBean.java
new file mode 100644
index 0000000..63fab88
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitorMBean.java
@@ -0,0 +1,29 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.monitoring.mbeans;
+
+import org.apache.helix.monitoring.SensorNameProvider;
+
+public interface ResourceMonitorMBean extends SensorNameProvider
+{
+  public long getPartitionGauge();
+  
+  public long getErrorPartitionGauge();
+  
+  public long getDifferenceWithIdealStateGauge();
+  
+  public long getExternalViewPartitionGauge();
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitor.java
new file mode 100644
index 0000000..582b441
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitor.java
@@ -0,0 +1,157 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.monitoring.mbeans;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
+import org.apache.helix.monitoring.StatCollector;
+import org.apache.helix.monitoring.StateTransitionContext;
+import org.apache.helix.monitoring.StateTransitionDataPoint;
+
+
+public class StateTransitionStatMonitor implements StateTransitionStatMonitorMBean
+{
+  public enum LATENCY_TYPE {TOTAL, EXECUTION};
+  
+  private static final int DEFAULT_WINDOW_SIZE = 4000;
+  private long _numDataPoints;
+  private long _successCount;
+  private TimeUnit _unit;
+  
+  private ConcurrentHashMap<LATENCY_TYPE, StatCollector> _monitorMap
+     = new ConcurrentHashMap<LATENCY_TYPE, StatCollector>();
+  
+  StateTransitionContext _context;
+  
+  public StateTransitionStatMonitor(StateTransitionContext context, TimeUnit unit)
+  {
+    _context = context;
+    _monitorMap.put(LATENCY_TYPE.TOTAL, new StatCollector());
+    _monitorMap.put(LATENCY_TYPE.EXECUTION, new StatCollector());
+    reset();
+  }
+  
+  public StateTransitionContext getContext()
+  {
+    return _context;
+  }
+  
+  public String getBeanName()
+  {
+    return _context.getClusterName()+" "+_context.getResourceName()+" "+_context.getTransition();
+  }
+  
+  public void addDataPoint(StateTransitionDataPoint data)
+  {
+    _numDataPoints++;
+    if(data.getSuccess())
+    {
+      _successCount++;
+    }
+    // should we count only the transition time for successful transitions?
+    addLatency(LATENCY_TYPE.TOTAL, data.getTotalDelay());
+    addLatency(LATENCY_TYPE.EXECUTION, data.getExecutionDelay());
+  }
+  
+  void addLatency(LATENCY_TYPE type, double latency)
+  {
+    assert(_monitorMap.containsKey(type));
+    _monitorMap.get(type).addData(latency);
+  }
+  
+  public long getNumDataPoints()
+  {
+    return _numDataPoints;
+  }
+  
+  public void reset()
+  {
+    _numDataPoints = 0;
+    _successCount = 0;
+    for(StatCollector monitor : _monitorMap.values())
+    {
+      monitor.reset();
+    }
+  }
+
+  @Override
+  public long getTotalStateTransitionGauge()
+  {
+    return _numDataPoints;
+  }
+
+  @Override
+  public long getTotalFailedTransitionGauge()
+  {
+    return _numDataPoints - _successCount;
+  }
+
+  @Override
+  public long getTotalSuccessTransitionGauge()
+  {
+    return _successCount;
+  }
+
+  @Override
+  public double getMeanTransitionLatency()
+  {
+    return _monitorMap.get(LATENCY_TYPE.TOTAL).getMean();
+  }
+
+  @Override
+  public double getMaxTransitionLatency()
+  {
+    return _monitorMap.get(LATENCY_TYPE.TOTAL).getMax();
+  }
+
+  @Override
+  public double getMinTransitionLatency()
+  {
+    return _monitorMap.get(LATENCY_TYPE.TOTAL).getMin();
+  }
+
+  @Override
+  public double getPercentileTransitionLatency(int percentage)
+  {
+    return _monitorMap.get(LATENCY_TYPE.TOTAL).getPercentile(percentage);
+  }
+
+  @Override
+  public double getMeanTransitionExecuteLatency()
+  {
+    return _monitorMap.get(LATENCY_TYPE.EXECUTION).getMean();
+  }
+
+  @Override
+  public double getMaxTransitionExecuteLatency()
+  {
+    return _monitorMap.get(LATENCY_TYPE.EXECUTION).getMax();
+  }
+
+  @Override
+  public double getMinTransitionExecuteLatency()
+  {
+    return _monitorMap.get(LATENCY_TYPE.EXECUTION).getMin();
+  }
+
+  @Override
+  public double getPercentileTransitionExecuteLatency(int percentage)
+  {
+    return _monitorMap.get(LATENCY_TYPE.EXECUTION).getPercentile(percentage);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitorMBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitorMBean.java
new file mode 100644
index 0000000..057998c
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitorMBean.java
@@ -0,0 +1,44 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.monitoring.mbeans;
+
+
+public interface StateTransitionStatMonitorMBean
+{
+  long getTotalStateTransitionGauge();
+  
+  long getTotalFailedTransitionGauge();
+  
+  long getTotalSuccessTransitionGauge();
+  
+  double getMeanTransitionLatency();
+  
+  double getMaxTransitionLatency();
+  
+  double getMinTransitionLatency();
+
+  double getPercentileTransitionLatency(int percentage);
+  
+  double getMeanTransitionExecuteLatency();
+  
+  double getMaxTransitionExecuteLatency();
+  
+  double getMinTransitionExecuteLatency();
+
+  double getPercentileTransitionExecuteLatency(int percentage);
+  
+  void reset();
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/package-info.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/package-info.java
new file mode 100644
index 0000000..fa2b18f
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/package-info.java
@@ -0,0 +1,5 @@
+/**
+ * Helix jmx bean classes
+ * 
+ */
+package org.apache.helix.monitoring.mbeans;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/monitoring/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/package-info.java b/helix-core/src/main/java/org/apache/helix/monitoring/package-info.java
new file mode 100644
index 0000000..081ca2e
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/package-info.java
@@ -0,0 +1,5 @@
+/**
+ * Helix monitoring classes
+ * 
+ */
+package org.apache.helix.monitoring;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/package-info.java b/helix-core/src/main/java/org/apache/helix/package-info.java
new file mode 100644
index 0000000..587cdb0
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/package-info.java
@@ -0,0 +1,29 @@
+/**
+ * Provide the classes necessary to create a Helix cluster manager
+ * <p>
+ * General flow 
+ * <blockquote>
+ * <pre>
+ * manager = HelixManagerFactory.getManagerForROLE(); ROLE can be participant, spectator or a controller<br/>
+ * manager.connect();
+ * manager.addSOMEListener();
+ * After connect the subsequent interactions will be via listener onChange callbacks
+ * There will be 3 scenarios for onChange callback, which can be determined using NotificationContext.type
+ * INIT -> will be invoked the first time the listener is added
+ * CALLBACK -> will be invoked due to datachange in the property value
+ * FINALIZE -> will be invoked when listener is removed or session expires
+ * 
+ * manager.disconnect()
+ * </pre>
+ * 
+ * </blockquote> 
+ * 
+ * Default implementations available
+ * 
+ * @see org.apache.helix.participant.HelixStateMachineEngine for participant
+ * @see RoutingTableProvider for spectator
+ * @see GenericHelixController for controller
+ * 
+ * @author kgopalak
+ */
+package org.apache.helix;

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/participant/CustomCodeCallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/CustomCodeCallbackHandler.java b/helix-core/src/main/java/org/apache/helix/participant/CustomCodeCallbackHandler.java
new file mode 100644
index 0000000..bd0ebe0
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/participant/CustomCodeCallbackHandler.java
@@ -0,0 +1,31 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.participant;
+
+import org.apache.helix.NotificationContext;
+
+/**
+ * Callback interface to running custom code on Helix participant manager
+ * The custom code will be triggered on user specified cluster changes
+ */
+public interface CustomCodeCallbackHandler
+{
+  /**
+   * callback
+   * @param context
+   */
+  public void onCallback(NotificationContext context);
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/participant/CustomCodeInvoker.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/CustomCodeInvoker.java b/helix-core/src/main/java/org/apache/helix/participant/CustomCodeInvoker.java
new file mode 100644
index 0000000..3f712d7
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/participant/CustomCodeInvoker.java
@@ -0,0 +1,122 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.participant;
+
+import java.util.List;
+
+import org.apache.helix.ConfigChangeListener;
+import org.apache.helix.ExternalViewChangeListener;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.LiveInstanceChangeListener;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.NotificationContext.Type;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.log4j.Logger;
+
+
+public class CustomCodeInvoker implements
+    LiveInstanceChangeListener,
+    ConfigChangeListener,
+    ExternalViewChangeListener
+{
+  private static Logger LOG = Logger.getLogger(CustomCodeInvoker.class);
+  private final CustomCodeCallbackHandler _callback;
+  private final String _partitionKey;
+
+  public CustomCodeInvoker(CustomCodeCallbackHandler callback, String partitionKey)
+  {
+    _callback = callback;
+    _partitionKey = partitionKey;
+  }
+
+  private void callParticipantCode(NotificationContext context)
+  {
+    // System.out.println("callback invoked. type:" + context.getType().toString());
+    if (context.getType() == Type.INIT || context.getType() == Type.CALLBACK)
+    {
+      // since ZkClient.unsubscribe() does not immediately remove listeners
+      // from zk, it is possible that two listeners exist when leadership transfers
+      // therefore, double check to make sure only one participant invokes the code
+      if (context.getType() == Type.CALLBACK)
+      {
+        HelixManager manager = context.getManager();
+        // DataAccessor accessor = manager.getDataAccessor();
+        HelixDataAccessor accessor = manager.getHelixDataAccessor();
+        Builder keyBuilder = accessor.keyBuilder();
+
+        String instance = manager.getInstanceName();
+        String sessionId = manager.getSessionId();
+
+        // get resource name from partition key: "PARTICIPANT_LEADER_XXX_0"
+        String resourceName = _partitionKey.substring(0, _partitionKey.lastIndexOf('_'));
+
+        CurrentState curState =
+            accessor.getProperty(keyBuilder.currentState(instance,
+                                                         sessionId,
+                                                         resourceName));
+        if (curState == null)
+        {
+          return;
+        }
+
+        String state = curState.getState(_partitionKey);
+        if (state == null || !state.equalsIgnoreCase("LEADER"))
+        {
+          return;
+        }
+      }
+
+      try
+      {
+        _callback.onCallback(context);
+      }
+      catch (Exception e)
+      {
+        LOG.error("Error invoking callback:" + _callback, e);
+      }
+    }
+  }
+
+  @Override
+  public void onLiveInstanceChange(List<LiveInstance> liveInstances,
+                                   NotificationContext changeContext)
+  {
+    LOG.info("onLiveInstanceChange() invoked");
+    callParticipantCode(changeContext);
+  }
+
+  @Override
+  public void onConfigChange(List<InstanceConfig> configs,
+                             NotificationContext changeContext)
+  {
+    LOG.info("onConfigChange() invoked");
+    callParticipantCode(changeContext);
+  }
+
+  @Override
+  public void onExternalViewChange(List<ExternalView> externalViewList,
+                                   NotificationContext changeContext)
+  {
+    LOG.info("onExternalViewChange() invoked");
+    callParticipantCode(changeContext);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerElection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerElection.java b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerElection.java
new file mode 100644
index 0000000..ea070f0
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerElection.java
@@ -0,0 +1,199 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.participant;
+
+import java.lang.management.ManagementFactory;
+
+import org.apache.helix.ControllerChangeListener;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyType;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.GenericHelixController;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.controller.restlet.ZKPropertyTransferServer;
+import org.apache.helix.model.LeaderHistory;
+import org.apache.helix.model.LiveInstance;
+import org.apache.log4j.Logger;
+
+
+// TODO: merge with GenericHelixController
+public class DistClusterControllerElection implements ControllerChangeListener
+{
+  private static Logger                LOG         =
+                                                       Logger.getLogger(DistClusterControllerElection.class);
+  private final String                 _zkAddr;
+  private final GenericHelixController _controller = new GenericHelixController();
+  private HelixManager                 _leader;
+
+  public DistClusterControllerElection(String zkAddr)
+  {
+    _zkAddr = zkAddr;
+  }
+
+  /**
+   * may be accessed by multiple threads: zk-client thread and
+   * ZkHelixManager.disconnect()->reset() TODO: Refactor accessing HelixMaangerMain class
+   * statically
+   */
+  @Override
+  public synchronized void onControllerChange(NotificationContext changeContext)
+  {
+    HelixManager manager = changeContext.getManager();
+    if (manager == null)
+    {
+      LOG.error("missing attributes in changeContext. requires HelixManager");
+      return;
+    }
+
+    InstanceType type = manager.getInstanceType();
+    if (type != InstanceType.CONTROLLER && type != InstanceType.CONTROLLER_PARTICIPANT)
+    {
+      LOG.error("fail to become controller because incorrect instanceType (was "
+          + type.toString() + ", requires CONTROLLER | CONTROLLER_PARTICIPANT)");
+      return;
+    }
+
+    try
+    {
+      if (changeContext.getType().equals(NotificationContext.Type.INIT)
+          || changeContext.getType().equals(NotificationContext.Type.CALLBACK))
+      {
+        // DataAccessor dataAccessor = manager.getDataAccessor();
+        HelixDataAccessor accessor = manager.getHelixDataAccessor();
+        Builder keyBuilder = accessor.keyBuilder();
+
+        while (accessor.getProperty(keyBuilder.controllerLeader()) == null)
+        {
+          boolean success = tryUpdateController(manager);
+          if (success)
+          {
+            updateHistory(manager);
+            if (type == InstanceType.CONTROLLER)
+            {
+              HelixControllerMain.addListenersToController(manager, _controller);
+              manager.startTimerTasks();
+            }
+            else if (type == InstanceType.CONTROLLER_PARTICIPANT)
+            {
+              String clusterName = manager.getClusterName();
+              String controllerName = manager.getInstanceName();
+              _leader =
+                  HelixManagerFactory.getZKHelixManager(clusterName,
+                                                        controllerName,
+                                                        InstanceType.CONTROLLER,
+                                                        _zkAddr);
+
+              _leader.connect();
+              _leader.startTimerTasks();
+              HelixControllerMain.addListenersToController(_leader, _controller);
+            }
+
+          }
+        }
+      }
+      else if (changeContext.getType().equals(NotificationContext.Type.FINALIZE))
+      {
+
+        if (_leader != null)
+        {
+          _leader.disconnect();
+        }
+      }
+
+    }
+    catch (Exception e)
+    {
+      LOG.error("Exception when trying to become leader", e);
+    }
+  }
+
+  private boolean tryUpdateController(HelixManager manager)
+  {
+    // DataAccessor dataAccessor = manager.getDataAccessor();
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();
+    Builder keyBuilder = accessor.keyBuilder();
+
+    LiveInstance leader = new LiveInstance(manager.getInstanceName());
+    try
+    {
+      leader.setLiveInstance(ManagementFactory.getRuntimeMXBean().getName());
+      // TODO: this session id is not the leader's session id in distributed mode
+      leader.setSessionId(manager.getSessionId());
+      leader.setHelixVersion(manager.getVersion());
+      if(ZKPropertyTransferServer.getInstance() != null)
+      {
+        String zkPropertyTransferServiceUrl = ZKPropertyTransferServer.getInstance().getWebserviceUrl();
+        if(zkPropertyTransferServiceUrl != null)
+        {
+          leader.setWebserviceUrl(zkPropertyTransferServiceUrl);
+        }
+      }
+      else
+      {
+        LOG.warn("ZKPropertyTransferServer instnace is null");
+      }
+      boolean success = accessor.createProperty(keyBuilder.controllerLeader(), leader);
+      if (success)
+      {
+        return true;
+      }
+      else
+      {
+        LOG.info("Unable to become leader probably because some other controller becames the leader");
+      }
+    }
+    catch (Exception e)
+    {
+      LOG.error("Exception when trying to updating leader record in cluster:"
+                    + manager.getClusterName()
+                    + ". Need to check again whether leader node has been created or not",
+                e);
+    }
+
+    leader = accessor.getProperty(keyBuilder.controllerLeader());
+    if (leader != null)
+    {
+      String leaderName = leader.getInstanceName(); // leader.getLeader();
+      LOG.info("Leader exists for cluster:" + manager.getClusterName()
+          + ", currentLeader:" + leaderName);
+
+      if (leaderName != null && leaderName.equals(manager.getInstanceName()))
+      {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+  private void updateHistory(HelixManager manager)
+  {
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();
+    Builder keyBuilder = accessor.keyBuilder();
+
+    LeaderHistory history = accessor.getProperty(keyBuilder.controllerLeaderHistory());
+    if (history == null)
+    {
+      history = new LeaderHistory(PropertyType.HISTORY.toString());
+    }
+    history.updateHistory(manager.getClusterName(), manager.getInstanceName());
+    accessor.setProperty(keyBuilder.controllerLeaderHistory(), history);
+  }
+}