You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:58 UTC
[18/42] Refactoring the package names and removing jsql parser
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/monitoring/ParticipantMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/ParticipantMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/ParticipantMonitor.java
new file mode 100644
index 0000000..4a44c8b
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/ParticipantMonitor.java
@@ -0,0 +1,137 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.monitoring;
+
+import java.lang.management.ManagementFactory;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+import org.apache.helix.monitoring.mbeans.StateTransitionStatMonitor;
+import org.apache.log4j.Logger;
+
+
+public class ParticipantMonitor
+{
+ private final ConcurrentHashMap<StateTransitionContext, StateTransitionStatMonitor> _monitorMap
+ = new ConcurrentHashMap<StateTransitionContext, StateTransitionStatMonitor>();
+ private static final Logger LOG = Logger.getLogger(ParticipantMonitor.class);
+
+ private MBeanServer _beanServer;
+
+ public ParticipantMonitor()
+ {
+ try
+ {
+ _beanServer = ManagementFactory.getPlatformMBeanServer();
+ }
+ catch(Exception e)
+ {
+ LOG.warn(e);
+ e.printStackTrace();
+ _beanServer = null;
+ }
+ }
+
+ public void reportTransitionStat(StateTransitionContext cxt,
+ StateTransitionDataPoint data)
+ {
+ if(_beanServer == null)
+ {
+ LOG.warn("bean server is null, skip reporting");
+ return;
+ }
+ try
+ {
+ if(!_monitorMap.containsKey(cxt))
+ {
+ synchronized(this)
+ {
+ if(!_monitorMap.containsKey(cxt))
+ {
+ StateTransitionStatMonitor bean = new StateTransitionStatMonitor(cxt, TimeUnit.MILLISECONDS);
+ _monitorMap.put(cxt, bean);
+ String beanName = cxt.toString();
+ register(bean, getObjectName(beanName));
+ }
+ }
+ }
+ _monitorMap.get(cxt).addDataPoint(data);
+ }
+ catch(Exception e)
+ {
+ LOG.warn(e);
+ e.printStackTrace();
+ }
+ }
+
+
+ private ObjectName getObjectName(String name) throws MalformedObjectNameException
+ {
+ LOG.info("Registering bean: "+name);
+ return new ObjectName("CLMParticipantReport:"+name);
+ }
+
+ private void register(Object bean, ObjectName name)
+ {
+ if(_beanServer == null)
+ {
+ LOG.warn("bean server is null, skip reporting");
+ return;
+ }
+ try
+ {
+ _beanServer.unregisterMBean(name);
+ }
+ catch (Exception e1)
+ {
+ // Swallow silently
+ }
+
+ try
+ {
+ _beanServer.registerMBean(bean, name);
+ }
+ catch (Exception e)
+ {
+ LOG.warn("Could not register MBean", e);
+ }
+ }
+
+ public void shutDown()
+ {
+ for(StateTransitionContext cxt : _monitorMap.keySet() )
+ {
+ try
+ {
+ ObjectName name = getObjectName(cxt.toString());
+ if (_beanServer.isRegistered(name))
+ {
+ _beanServer.unregisterMBean(name);
+ }
+ }
+ catch (Exception e)
+ {
+ LOG.warn("fail to unregister " + cxt.toString(), e);
+ }
+ }
+ _monitorMap.clear();
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/monitoring/SensorNameProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/SensorNameProvider.java b/helix-core/src/main/java/org/apache/helix/monitoring/SensorNameProvider.java
new file mode 100644
index 0000000..e5fc914
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/SensorNameProvider.java
@@ -0,0 +1,6 @@
+package org.apache.helix.monitoring;
+
+public interface SensorNameProvider
+{
+ String getSensorName();
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/monitoring/StatCollector.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/StatCollector.java b/helix-core/src/main/java/org/apache/helix/monitoring/StatCollector.java
new file mode 100644
index 0000000..b4e6ef9
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/StatCollector.java
@@ -0,0 +1,90 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.monitoring;
+
+import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
+import org.apache.commons.math.stat.descriptive.SynchronizedDescriptiveStatistics;
+
+public class StatCollector
+{
+ private static final int DEFAULT_WINDOW_SIZE = 100;
+ private final DescriptiveStatistics _stats;
+ private long _numDataPoints;
+ private long _totalSum;
+
+ public StatCollector()
+ {
+ _stats = new SynchronizedDescriptiveStatistics();
+ _stats.setWindowSize(DEFAULT_WINDOW_SIZE);
+ }
+
+ public void addData(double data)
+ {
+ _numDataPoints++;
+ _totalSum += data;
+ _stats.addValue(data);
+ }
+
+ public long getTotalSum()
+ {
+ return _totalSum;
+ }
+
+ public DescriptiveStatistics getStatistics()
+ {
+ return _stats;
+ }
+
+ public long getNumDataPoints()
+ {
+ return _numDataPoints;
+ }
+
+ public void reset()
+ {
+ _numDataPoints = 0;
+ _totalSum = 0;
+ _stats.clear();
+ }
+
+ public double getMean()
+ {
+ if(_stats.getN() == 0)
+ {
+ return 0;
+ }
+ return _stats.getMean();
+ }
+
+ public double getMax()
+ {
+ return _stats.getMax();
+ }
+
+ public double getMin()
+ {
+ return _stats.getMin();
+ }
+
+ public double getPercentile(int percentage)
+ {
+ if(_stats.getN() == 0)
+ {
+ return 0;
+ }
+ return _stats.getPercentile(percentage*1.0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/monitoring/StateTransitionContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/StateTransitionContext.java b/helix-core/src/main/java/org/apache/helix/monitoring/StateTransitionContext.java
new file mode 100644
index 0000000..58f8a04
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/StateTransitionContext.java
@@ -0,0 +1,90 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.monitoring;
+
+public class StateTransitionContext
+{
+ private final String _resourceName;
+ private final String _clusterName;
+ private final String _instanceName;
+ private final String _transition;
+
+ public StateTransitionContext(
+ String clusterName,
+ String instanceName,
+ String resourceName,
+ String transition
+ )
+ {
+ _clusterName = clusterName;
+ _resourceName = resourceName;
+ _transition = transition;
+ _instanceName = instanceName;
+ }
+
+ public String getClusterName()
+ {
+ return _clusterName;
+ }
+
+ public String getInstanceName()
+ {
+ return _instanceName;
+ }
+
+ public String getResourceName()
+ {
+ return _resourceName;
+ }
+
+ public String getTransition()
+ {
+ return _transition;
+ }
+
+ @Override
+ public boolean equals(Object other)
+ {
+ if(! (other instanceof StateTransitionContext))
+ {
+ return false;
+ }
+
+ StateTransitionContext otherCxt = (StateTransitionContext) other;
+ return
+ _clusterName.equals(otherCxt.getClusterName()) &&
+ // _instanceName.equals(otherCxt.getInstanceName()) &&
+ _resourceName.equals(otherCxt.getResourceName()) &&
+ _transition.equals(otherCxt.getTransition()) ;
+ }
+
+
+ // In the report, we will gather per transition time statistics
+ @Override
+ public int hashCode()
+ {
+ return toString().hashCode();
+ }
+
+ public String toString()
+ {
+ return "Cluster=" + _clusterName + "," +
+ // "instance=" + _instanceName + "," +
+ "Resource=" + _resourceName +"," +
+ "Transition=" + _transition;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/monitoring/StateTransitionDataPoint.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/StateTransitionDataPoint.java b/helix-core/src/main/java/org/apache/helix/monitoring/StateTransitionDataPoint.java
new file mode 100644
index 0000000..cdf3024
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/StateTransitionDataPoint.java
@@ -0,0 +1,45 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.monitoring;
+
+public class StateTransitionDataPoint
+{
+ long _totalDelay;
+ long _executionDelay;
+ boolean _isSuccess;
+
+ public StateTransitionDataPoint(long totalDelay, long executionDelay, boolean isSuccess)
+ {
+ _totalDelay = totalDelay;
+ _executionDelay = executionDelay;
+ _isSuccess = isSuccess;
+ }
+
+ public long getTotalDelay()
+ {
+ return _totalDelay;
+ }
+
+ public long getExecutionDelay()
+ {
+ return _executionDelay;
+ }
+
+ public boolean getSuccess()
+ {
+ return _isSuccess;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/monitoring/ZKPathDataDumpTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/ZKPathDataDumpTask.java b/helix-core/src/main/java/org/apache/helix/monitoring/ZKPathDataDumpTask.java
new file mode 100644
index 0000000..87fbef9
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/ZKPathDataDumpTask.java
@@ -0,0 +1,163 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.monitoring;
+
+import java.io.StringWriter;
+import java.util.Date;
+import java.util.List;
+import java.util.TimerTask;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.util.HelixUtil;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.data.Stat;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig;
+
+
+public class ZKPathDataDumpTask extends TimerTask
+{
+ static Logger logger = Logger.getLogger(ZKPathDataDumpTask.class);
+
+ private final int _thresholdNoChangeInMs;
+ private final HelixManager _manager;
+ private final ZkClient _zkClient;
+
+ public ZKPathDataDumpTask(HelixManager manager, ZkClient zkClient, int thresholdNoChangeInMs)
+ {
+ _manager = manager;
+ _zkClient = zkClient;
+ logger.info("Scannning cluster statusUpdate " + manager.getClusterName()
+ + " thresholdNoChangeInMs: " + thresholdNoChangeInMs);
+ _thresholdNoChangeInMs = thresholdNoChangeInMs;
+ }
+
+ @Override
+ public void run()
+ {
+ // For each record in status update and error node
+ // TODO: for now the status updates are dumped to cluster manager log4j log.
+ // We need to think if we should create per-instance log files that contains
+ // per-instance statusUpdates
+ // and errors
+ logger.info("Scannning status updates ...");
+ try
+ {
+ HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+ Builder keyBuilder = accessor.keyBuilder();
+
+ List<String> instances = accessor.getChildNames(keyBuilder.instanceConfigs());
+ for (String instanceName : instances)
+ {
+ scanPath(HelixUtil.getInstancePropertyPath(_manager.getClusterName(), instanceName,
+ PropertyType.STATUSUPDATES), _thresholdNoChangeInMs);
+ scanPath(HelixUtil.getInstancePropertyPath(_manager.getClusterName(), instanceName,
+ PropertyType.ERRORS), _thresholdNoChangeInMs * 3);
+ }
+ scanPath(HelixUtil.getControllerPropertyPath(_manager.getClusterName(),
+ PropertyType.STATUSUPDATES_CONTROLLER), _thresholdNoChangeInMs);
+
+ scanPath(HelixUtil.getControllerPropertyPath(_manager.getClusterName(),
+ PropertyType.ERRORS_CONTROLLER), _thresholdNoChangeInMs * 3);
+ } catch (Exception e)
+ {
+ logger.error(e);
+ e.printStackTrace();
+ }
+ }
+
+ void scanPath(String path, int thresholdNoChangeInMs)
+ {
+ logger.info("Scannning path " + path);
+ List<String> subPaths = _zkClient.getChildren(path);
+ for (String subPath : subPaths)
+ {
+ try
+ {
+ String nextPath = path + "/" + subPath;
+ List<String> subSubPaths = _zkClient.getChildren(nextPath);
+ for (String subsubPath : subSubPaths)
+ {
+ try
+ {
+ checkAndDump(nextPath + "/" + subsubPath, thresholdNoChangeInMs);
+ } catch (Exception e)
+ {
+ logger.error(e);
+ }
+ }
+ } catch (Exception e)
+ {
+ logger.error(e);
+ }
+ }
+ }
+
+ void checkAndDump(String path, int thresholdNoChangeInMs)
+ {
+ List<String> subPaths = _zkClient.getChildren(path);
+ if(subPaths.size() == 0)
+ {
+ subPaths.add("");
+ }
+ for (String subPath : subPaths)
+ {
+ String fullPath = subPath.length() > 0 ? path + "/" + subPath : path;
+ Stat pathStat = _zkClient.getStat(fullPath);
+
+ long lastModifiedTimeInMs = pathStat.getMtime();
+ long nowInMs = new Date().getTime();
+ // logger.info(nowInMs + " " + lastModifiedTimeInMs + " " + fullPath);
+
+ // Check the last modified time
+ if (nowInMs > lastModifiedTimeInMs)
+ {
+ long timeDiff = nowInMs - lastModifiedTimeInMs;
+ if (timeDiff > thresholdNoChangeInMs)
+ {
+ logger.info("Dumping status update path " + fullPath + " " + timeDiff + "MS has passed");
+ _zkClient.setZkSerializer(new ZNRecordSerializer());
+ ZNRecord record = _zkClient.readData(fullPath);
+
+ // dump the node content into log file
+ ObjectMapper mapper = new ObjectMapper();
+ SerializationConfig serializationConfig = mapper.getSerializationConfig();
+ serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+
+ StringWriter sw = new StringWriter();
+ try
+ {
+ mapper.writeValue(sw, record);
+ logger.info(sw.toString());
+ } catch (Exception e)
+ {
+ logger.warn(
+ "Exception during serialization in ZKPathDataDumpTask.checkAndDump. This can mostly be ignored",
+ e);
+ }
+ // Delete the leaf data
+ _zkClient.deleteRecursive(fullPath);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterAlertItem.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterAlertItem.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterAlertItem.java
new file mode 100644
index 0000000..a4dc212
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterAlertItem.java
@@ -0,0 +1,98 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.monitoring.mbeans;
+
+
+import java.util.Date;
+
+import org.apache.helix.alerts.AlertValueAndStatus;
+
+
+public class ClusterAlertItem implements ClusterAlertItemMBean
+{
+ String _alertItemName;
+ double _alertValue;
+ int _alertFired;
+ String _additionalInfo = "";
+ AlertValueAndStatus _valueAndStatus;
+ long _lastUpdateTime = 0;
+
+ public ClusterAlertItem(String name, AlertValueAndStatus valueAndStatus)
+ {
+ _valueAndStatus = valueAndStatus;
+ _alertItemName = name;
+ refreshValues();
+ }
+ @Override
+ public String getSensorName()
+ {
+ return _alertItemName;
+ }
+
+ @Override
+ public double getAlertValue()
+ {
+ return _alertValue;
+ }
+
+ public void setValueMap(AlertValueAndStatus valueAndStatus)
+ {
+ _valueAndStatus = valueAndStatus;
+ refreshValues();
+ }
+
+ void refreshValues()
+ {
+ _lastUpdateTime = new Date().getTime();
+ if(_valueAndStatus.getValue().getElements().size() > 0)
+ {
+ _alertValue = Double.parseDouble(_valueAndStatus.getValue().getElements().get(0));
+ }
+ else
+ {
+ _alertValue = 0;
+ }
+ _alertFired = _valueAndStatus.isFired() ? 1 : 0;
+ }
+ @Override
+ public int getAlertFired()
+ {
+ return _alertFired;
+ }
+
+ public void setAdditionalInfo(String additionalInfo)
+ {
+ _additionalInfo = additionalInfo;
+ }
+
+ @Override
+ public String getAdditionalInfo()
+ {
+ return _additionalInfo;
+ }
+
+ public void reset()
+ {
+ _alertFired = 0;
+ _additionalInfo = "";
+ _alertValue = 0;
+ }
+
+ public long getLastUpdateTime()
+ {
+ return _lastUpdateTime;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterAlertItemMBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterAlertItemMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterAlertItemMBean.java
new file mode 100644
index 0000000..1bcf1e5
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterAlertItemMBean.java
@@ -0,0 +1,27 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.monitoring.mbeans;
+
+public interface ClusterAlertItemMBean
+{
+ String getSensorName();
+
+ double getAlertValue();
+
+ int getAlertFired();
+
+ String getAdditionalInfo();
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterAlertMBeanCollection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterAlertMBeanCollection.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterAlertMBeanCollection.java
new file mode 100644
index 0000000..a1441e6
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterAlertMBeanCollection.java
@@ -0,0 +1,333 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.monitoring.mbeans;
+
+import java.io.StringWriter;
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.alerts.AlertParser;
+import org.apache.helix.alerts.AlertValueAndStatus;
+import org.apache.helix.alerts.Tuple;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig;
+
+
+public class ClusterAlertMBeanCollection
+{
+ public static String DOMAIN_ALERT = "HelixAlerts";
+ public static String ALERT_SUMMARY = "AlertSummary";
+
+ private static final Logger _logger = Logger.getLogger(ClusterAlertMBeanCollection.class);
+ ConcurrentHashMap<String, ClusterAlertItem> _alertBeans
+ = new ConcurrentHashMap<String, ClusterAlertItem>();
+
+ Map<String, String> _recentAlertDelta;
+ ClusterAlertSummary _clusterAlertSummary;
+ ZNRecord _alertHistory = new ZNRecord(PropertyType.ALERT_HISTORY.toString());
+ Set<String> _previousFiredAlerts = new HashSet<String>();
+ // 5 min for mbean freshness threshold
+ public static final long ALERT_NOCHANGE_THRESHOLD = 5 * 60 * 1000;
+
+ final MBeanServer _beanServer;
+
+ public interface ClusterAlertSummaryMBean extends ClusterAlertItemMBean
+ {
+ public String getAlertFiredHistory();
+ }
+
+ class ClusterAlertSummary extends ClusterAlertItem implements ClusterAlertSummaryMBean
+ {
+ public ClusterAlertSummary(String name, AlertValueAndStatus valueAndStatus)
+ {
+ super(name, valueAndStatus);
+ }
+ /**
+ * Returns the previous 100 alert mbean turn on / off history
+ * */
+ @Override
+ public String getAlertFiredHistory()
+ {
+ try
+ {
+ ObjectMapper mapper = new ObjectMapper();
+ SerializationConfig serializationConfig = mapper.getSerializationConfig();
+ serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+ StringWriter sw = new StringWriter();
+ mapper.writeValue(sw, _alertHistory);
+ return sw.toString();
+ }
+ catch(Exception e)
+ {
+ _logger.warn("", e);
+ return "";
+ }
+ }
+ }
+
+
+ public ClusterAlertMBeanCollection()
+ {
+ _beanServer = ManagementFactory.getPlatformMBeanServer();
+ }
+
+ public Collection<ClusterAlertItemMBean> getCurrentAlertMBeans()
+ {
+ ArrayList<ClusterAlertItemMBean> beans = new ArrayList<ClusterAlertItemMBean>();
+ for(ClusterAlertItem item : _alertBeans.values())
+ {
+ beans.add(item);
+ }
+ return beans;
+ }
+
+ void onNewAlertMbeanAdded(ClusterAlertItemMBean bean)
+ {
+ try
+ {
+ _logger.info("alert bean " + bean.getSensorName()+" exposed to jmx");
+ System.out.println("alert bean " + bean.getSensorName()+" exposed to jmx");
+ ObjectName objectName = new ObjectName(DOMAIN_ALERT+":alert="+bean.getSensorName());
+ register(bean, objectName);
+ }
+ catch (Exception e)
+ {
+ _logger.error("", e);
+ e.printStackTrace();
+ }
+ }
+
+ public void setAlerts(String originAlert, Map<String, AlertValueAndStatus> alertResultMap, String clusterName)
+ {
+ if(alertResultMap == null)
+ {
+ _logger.warn("null alertResultMap");
+ return;
+ }
+ for(String alertName : alertResultMap.keySet())
+ {
+ String beanName = "";
+ if(alertName.length() > 1)
+ {
+ String comparator = AlertParser.getComponent(AlertParser.COMPARATOR_NAME, originAlert);
+ String constant = AlertParser.getComponent(AlertParser.CONSTANT_NAME, originAlert);
+ beanName = "("+ alertName+")" + comparator + "("+constant+")";
+ }
+ else
+ {
+ beanName = originAlert + "--(" + alertName + ")";
+ }
+ // This is to make JMX happy; certain charaters cannot be in JMX bean name
+ beanName = beanName.replace('*', '%').replace('=', '#').replace(',', ';');
+ if(!_alertBeans.containsKey(beanName))
+ {
+ ClusterAlertItem item = new ClusterAlertItem(beanName, alertResultMap.get(alertName));
+ onNewAlertMbeanAdded(item);
+ _alertBeans.put(beanName, item);
+ }
+ else
+ {
+ _alertBeans.get(beanName).setValueMap(alertResultMap.get(alertName));
+ }
+ }
+ refreshSummayAlert(clusterName);
+ }
+
+ public void setAlertHistory(ZNRecord alertHistory)
+ {
+ _alertHistory = alertHistory;
+ }
+ /**
+ * The summary alert is a combination of all alerts, if it is on, something is wrong on this
+ * cluster. The additional info contains all alert mbean names that has been fired.
+ */
+ void refreshSummayAlert(String clusterName)
+ {
+ boolean fired = false;
+ String alertsFired = "";
+ String summaryKey = ALERT_SUMMARY + "_" + clusterName;
+ for(String key : _alertBeans.keySet())
+ {
+ if(!key.equals(summaryKey))
+ {
+ ClusterAlertItem item = _alertBeans.get(key);
+ fired = (item.getAlertFired() == 1) | fired;
+ if(item.getAlertFired() == 1)
+ {
+ alertsFired += item._alertItemName;
+ alertsFired += ";";
+ }
+ }
+ }
+ Tuple<String> t = new Tuple<String>();
+ t.add("0");
+ AlertValueAndStatus summaryStatus = new AlertValueAndStatus(t, fired);
+ if(!_alertBeans.containsKey(summaryKey))
+ {
+ ClusterAlertSummary item = new ClusterAlertSummary(summaryKey, summaryStatus);
+ onNewAlertMbeanAdded(item);
+ item.setAdditionalInfo(alertsFired);
+ _alertBeans.put(summaryKey, item);
+ _clusterAlertSummary = item;
+ }
+ else
+ {
+ _alertBeans.get(summaryKey).setValueMap(summaryStatus);
+ _alertBeans.get(summaryKey).setAdditionalInfo(alertsFired);
+ }
+ }
+
+ void register(Object bean, ObjectName name)
+ {
+ try
+ {
+ _beanServer.unregisterMBean(name);
+ }
+ catch (Exception e)
+ {
+ }
+ try
+ {
+ _beanServer.registerMBean(bean, name);
+ }
+ catch (Exception e)
+ {
+ _logger.error("Could not register MBean", e);
+ }
+ }
+
+ public void reset()
+ {
+ for(String beanName : _alertBeans.keySet())
+ {
+ ClusterAlertItem item = _alertBeans.get(beanName);
+ item.reset();
+ try
+ {
+ ObjectName objectName = new ObjectName(DOMAIN_ALERT+":alert="+item.getSensorName());
+ _beanServer.unregisterMBean(objectName);
+ }
+ catch (Exception e)
+ {
+ _logger.warn("", e);
+ }
+ }
+ _alertBeans.clear();
+ }
+
+ public void refreshAlertDelta(String clusterName)
+ {
+ // Update the alert turn on/turn off history
+ String summaryKey = ALERT_SUMMARY + "_" + clusterName;
+ Set<String> currentFiredAlerts = new HashSet<String>();
+ for(String key : _alertBeans.keySet())
+ {
+ if(!key.equals(summaryKey))
+ {
+ ClusterAlertItem item = _alertBeans.get(key);
+ if(item.getAlertFired() == 1)
+ {
+ currentFiredAlerts.add(item._alertItemName);
+ }
+ }
+ }
+
+ Map<String, String> onOffAlertsMap = new HashMap<String, String>();
+ for(String alertName : currentFiredAlerts)
+ {
+ if(!_previousFiredAlerts.contains(alertName))
+ {
+ onOffAlertsMap.put(alertName, "ON");
+ _logger.info(alertName + " ON");
+ _previousFiredAlerts.add(alertName);
+ }
+ }
+ for(String cachedAlert : _previousFiredAlerts)
+ {
+ if(!currentFiredAlerts.contains(cachedAlert))
+ {
+ onOffAlertsMap.put(cachedAlert, "OFF");
+ _logger.info(cachedAlert + " OFF");
+ }
+ }
+ for(String key : onOffAlertsMap.keySet())
+ {
+ if(onOffAlertsMap.get(key).equals("OFF"))
+ {
+ _previousFiredAlerts.remove(key);
+ }
+ }
+ if(onOffAlertsMap.size() == 0)
+ {
+ _logger.info("No MBean change");
+ }
+ _recentAlertDelta = onOffAlertsMap;
+
+ checkMBeanFreshness(ALERT_NOCHANGE_THRESHOLD);
+ }
+
+ public Map<String, String> getRecentAlertDelta()
+ {
+ return _recentAlertDelta;
+ }
+
+ /**
+ * Remove mbeans that has not been changed for thresholdInMs MS
+ * */
+ void checkMBeanFreshness(long thresholdInMs)
+ {
+ long now = new Date().getTime();
+ Set<String> oldBeanNames = new HashSet<String>();
+ // Get mbean items that has not been updated for thresholdInMs
+ for(String beanName : _alertBeans.keySet())
+ {
+ ClusterAlertItem item = _alertBeans.get(beanName);
+ if(now - item.getLastUpdateTime() > thresholdInMs)
+ {
+ oldBeanNames.add(beanName);
+ _logger.info("bean " + beanName+" has not been updated for "+ thresholdInMs +" MS");
+ }
+ }
+ for(String beanName : oldBeanNames)
+ {
+ ClusterAlertItem item = _alertBeans.get(beanName);
+ _alertBeans.remove(beanName);
+ try
+ {
+ item.reset();
+ ObjectName objectName = new ObjectName(DOMAIN_ALERT+":alert="+item.getSensorName());
+ _beanServer.unregisterMBean(objectName);
+ }
+ catch (Exception e)
+ {
+ _logger.warn("", e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterMBeanObserver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterMBeanObserver.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterMBeanObserver.java
new file mode 100644
index 0000000..dde8a49
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterMBeanObserver.java
@@ -0,0 +1,98 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.monitoring.mbeans;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+
+import javax.management.AttributeNotFoundException;
+import javax.management.InstanceNotFoundException;
+import javax.management.IntrospectionException;
+import javax.management.ListenerNotFoundException;
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanException;
+import javax.management.MBeanInfo;
+import javax.management.MBeanServerConnection;
+import javax.management.MBeanServerDelegate;
+import javax.management.MBeanServerNotification;
+import javax.management.MalformedObjectNameException;
+import javax.management.Notification;
+import javax.management.NotificationListener;
+import javax.management.ReflectionException;
+import javax.management.relation.MBeanServerNotificationFilter;
+
+import org.apache.log4j.Logger;
+
+/*
+ * TODO: this class should be in espresso common, as the only usage of it is
+ * to create ingraph adaptors
+ * **/
+public abstract class ClusterMBeanObserver implements NotificationListener
+{
+ protected final String _domain;
+ protected MBeanServerConnection _server;
+ private static final Logger _logger = Logger.getLogger(ClusterMBeanObserver.class);
+
+ public ClusterMBeanObserver(String domain)
+ throws InstanceNotFoundException, IOException, MalformedObjectNameException, NullPointerException
+ {
+ // Get a reference to the target MBeanServer
+ _domain = domain;
+ _server = ManagementFactory.getPlatformMBeanServer();
+ MBeanServerNotificationFilter filter = new MBeanServerNotificationFilter();
+ filter.enableAllObjectNames();
+ _server.addNotificationListener(MBeanServerDelegate.DELEGATE_NAME, this, filter, null);
+ }
+
+ public void handleNotification(Notification notification, Object handback)
+ {
+ MBeanServerNotification mbs = (MBeanServerNotification) notification;
+ if(MBeanServerNotification.REGISTRATION_NOTIFICATION.equals(mbs.getType()))
+ {
+ if(mbs.getMBeanName().getDomain().equalsIgnoreCase(_domain))
+ {
+ _logger.info("MBean Registered, name :" + mbs.getMBeanName());
+ onMBeanRegistered(_server, mbs);
+ }
+ }
+ else if(MBeanServerNotification.UNREGISTRATION_NOTIFICATION.equals(mbs.getType()))
+ {
+ if(mbs.getMBeanName().getDomain().equalsIgnoreCase(_domain))
+ {
+ _logger.info("MBean Unregistered, name :" + mbs.getMBeanName());
+ onMBeanUnRegistered(_server, mbs);
+ }
+ }
+ }
+
+ public void disconnect()
+ {
+ MBeanServerNotificationFilter filter = new MBeanServerNotificationFilter();
+ try
+ {
+ _server.removeNotificationListener(MBeanServerDelegate.DELEGATE_NAME, this);
+ }
+ catch (Exception e)
+ {
+ _logger.error("", e);
+ }
+ }
+
+ public abstract void onMBeanRegistered(MBeanServerConnection server, MBeanServerNotification mbsNotification);
+
+ public abstract void onMBeanUnRegistered(MBeanServerConnection server, MBeanServerNotification mbsNotification);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
new file mode 100644
index 0000000..4499add
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -0,0 +1,283 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.monitoring.mbeans;
+
+import java.lang.management.ManagementFactory;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.log4j.Logger;
+
+
+public class ClusterStatusMonitor implements ClusterStatusMonitorMBean
+{
+ private static final Logger LOG =
+ Logger.getLogger(ClusterStatusMonitor.class);
+
+ static final String CLUSTER_STATUS_KEY =
+ "ClusterStatus";
+ static final String MESSAGE_QUEUE_STATUS_KEY =
+ "MessageQueueStatus";
+ static final String RESOURCE_STATUS_KEY =
+ "ResourceStatus";
+ static final String CLUSTER_DN_KEY =
+ "cluster";
+ static final String RESOURCE_DN_KEY =
+ "resourceName";
+ static final String INSTANCE_DN_KEY =
+ "instanceName";
+
+ private final String _clusterName;
+ private final MBeanServer _beanServer;
+
+ private int _numOfLiveInstances =
+ 0;
+ private int _numOfInstances =
+ 0;
+ private int _numOfDisabledInstances =
+ 0;
+ private int _numOfDisabledPartitions =
+ 0;
+
+ private final ConcurrentHashMap<String, ResourceMonitor> _resourceMbeanMap =
+ new ConcurrentHashMap<String, ResourceMonitor>();
+
+ private final ConcurrentHashMap<String, MessageQueueMonitor> _instanceMsgQueueMbeanMap =
+ new ConcurrentHashMap<String, MessageQueueMonitor>();
+
+ public ClusterStatusMonitor(String clusterName)
+ {
+ _clusterName = clusterName;
+ _beanServer = ManagementFactory.getPlatformMBeanServer();
+ try
+ {
+ register(this, getObjectName(CLUSTER_DN_KEY + "=" + _clusterName));
+ }
+ catch (Exception e)
+ {
+ LOG.error("Register self failed.", e);
+ }
+ }
+
+ public ObjectName getObjectName(String name) throws MalformedObjectNameException
+ {
+ return new ObjectName(CLUSTER_STATUS_KEY + ": " + name);
+ }
+
+ // Used by other external JMX consumers like ingraph
+ public String getBeanName()
+ {
+ return CLUSTER_STATUS_KEY + " " + _clusterName;
+ }
+
+ @Override
+ public long getDownInstanceGauge()
+ {
+ return _numOfInstances - _numOfLiveInstances;
+ }
+
+ @Override
+ public long getInstancesGauge()
+ {
+ return _numOfInstances;
+ }
+
+ @Override
+ public long getDisabledInstancesGauge()
+ {
+ return _numOfDisabledInstances;
+ }
+
+ @Override
+ public long getDisabledPartitionsGauge()
+ {
+ return _numOfDisabledPartitions;
+ }
+
+ @Override
+ public long getMaxMessageQueueSizeGauge()
+ {
+ long maxQueueSize = 0;
+ for (MessageQueueMonitor msgQueue : _instanceMsgQueueMbeanMap.values())
+ {
+ if (msgQueue.getMaxMessageQueueSize() > maxQueueSize)
+ {
+ maxQueueSize = (long)msgQueue.getMaxMessageQueueSize();
+ }
+ }
+
+ return maxQueueSize;
+ }
+
+ @Override
+ public String getMessageQueueSizes()
+ {
+ Map<String, Long> msgQueueSizes = new TreeMap<String, Long>();
+ for (String instance : _instanceMsgQueueMbeanMap.keySet())
+ {
+ MessageQueueMonitor msgQueue = _instanceMsgQueueMbeanMap.get(instance);
+ msgQueueSizes.put(instance, new Long( (long)msgQueue.getMaxMessageQueueSize()));
+ }
+
+ return msgQueueSizes.toString();
+ }
+
+ private void register(Object bean, ObjectName name)
+ {
+ try
+ {
+ if (_beanServer.isRegistered(name))
+ {
+ _beanServer.unregisterMBean(name);
+ }
+ }
+ catch (Exception e)
+ {
+ // OK
+ }
+
+ try
+ {
+ LOG.info("Registering " + name.toString());
+ _beanServer.registerMBean(bean, name);
+ }
+ catch (Exception e)
+ {
+ LOG.warn("Could not register MBean" + name, e);
+ }
+ }
+
+ private void unregister(ObjectName name)
+ {
+ try
+ {
+ if (_beanServer.isRegistered(name))
+ {
+ LOG.info("Unregistering " + name.toString());
+ _beanServer.unregisterMBean(name);
+ }
+ }
+ catch (Exception e)
+ {
+ LOG.warn("Could not unregister MBean" + name, e);
+ }
+ }
+
+ public void setClusterStatusCounters(int numberLiveInstances,
+ int numberOfInstances,
+ int disabledInstances,
+ int disabledPartitions)
+ {
+ _numOfInstances = numberOfInstances;
+ _numOfLiveInstances = numberLiveInstances;
+ _numOfDisabledInstances = disabledInstances;
+ _numOfDisabledPartitions = disabledPartitions;
+ }
+
+ public void onExternalViewChange(ExternalView externalView, IdealState idealState)
+ {
+ try
+ {
+ String resourceName = externalView.getId();
+ if (!_resourceMbeanMap.containsKey(resourceName))
+ {
+ synchronized (this)
+ {
+ if (!_resourceMbeanMap.containsKey(resourceName))
+ {
+ ResourceMonitor bean = new ResourceMonitor(_clusterName, resourceName);
+ String beanName =
+ CLUSTER_DN_KEY + "=" + _clusterName + "," + RESOURCE_DN_KEY + "="
+ + resourceName;
+ register(bean, getObjectName(beanName));
+ _resourceMbeanMap.put(resourceName, bean);
+ }
+ }
+ }
+ _resourceMbeanMap.get(resourceName).updateExternalView(externalView, idealState);
+ }
+ catch (Exception e)
+ {
+ LOG.warn(e);
+ }
+ }
+
+ public void addMessageQueueSize(String instanceName, int msgQueueSize)
+ {
+ try
+ {
+ if (!_instanceMsgQueueMbeanMap.containsKey(instanceName))
+ {
+ synchronized (this)
+ {
+ if (!_instanceMsgQueueMbeanMap.containsKey(instanceName))
+ {
+ MessageQueueMonitor bean =
+ new MessageQueueMonitor(_clusterName, instanceName);
+ _instanceMsgQueueMbeanMap.put(instanceName, bean);
+ }
+ }
+ }
+ _instanceMsgQueueMbeanMap.get(instanceName).addMessageQueueSize(msgQueueSize);
+ }
+ catch (Exception e)
+ {
+ LOG.warn("fail to add message queue size to mbean", e);
+ }
+ }
+
+ public void reset()
+ {
+ LOG.info("Resetting ClusterStatusMonitor");
+ try
+ {
+ for (String resourceName : _resourceMbeanMap.keySet())
+ {
+ String beanName =
+ CLUSTER_DN_KEY + "=" + _clusterName + "," + RESOURCE_DN_KEY + "="
+ + resourceName;
+ unregister(getObjectName(beanName));
+ }
+ _resourceMbeanMap.clear();
+
+ for (MessageQueueMonitor bean : _instanceMsgQueueMbeanMap.values())
+ {
+ bean.reset();
+ }
+ _instanceMsgQueueMbeanMap.clear();
+
+ unregister(getObjectName(CLUSTER_DN_KEY + "=" + _clusterName));
+ }
+ catch (Exception e)
+ {
+ LOG.error("fail to reset ClusterStatusMonitor", e);
+ }
+ }
+
+ @Override
+ public String getSensorName()
+ {
+ return CLUSTER_STATUS_KEY + "_" + _clusterName;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
new file mode 100644
index 0000000..bfdb9c9
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
@@ -0,0 +1,43 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.monitoring.mbeans;
+
+import org.apache.helix.monitoring.SensorNameProvider;
+
+public interface ClusterStatusMonitorMBean extends SensorNameProvider
+{
+ public long getDownInstanceGauge();
+
+ public long getInstancesGauge();
+
+ public long getDisabledInstancesGauge();
+
+ public long getDisabledPartitionsGauge();
+
+ /**
+ * The max message queue size across all instances including controller
+ * will report to ingraph
+ * @return
+ */
+ public long getMaxMessageQueueSizeGauge();
+
+ /**
+ * Get all message queue sizes as a string
+ * will NOT report to ingraph
+ * @return
+ */
+ public String getMessageQueueSizes();
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/HelixStageLatencyMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/HelixStageLatencyMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/HelixStageLatencyMonitor.java
new file mode 100644
index 0000000..8a42a28
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/HelixStageLatencyMonitor.java
@@ -0,0 +1,113 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.monitoring.mbeans;
+
+import java.lang.management.ManagementFactory;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.helix.monitoring.StatCollector;
+import org.apache.log4j.Logger;
+
+
+public class HelixStageLatencyMonitor implements HelixStageLatencyMonitorMBean
+{
+ private static final Logger LOG = Logger.getLogger(HelixStageLatencyMonitor.class);
+
+ private final StatCollector _stgLatency;
+ private final MBeanServer _beanServer;
+ private final String _clusterName;
+ private final String _stageName;
+ private final ObjectName _objectName;
+
+ public HelixStageLatencyMonitor(String clusterName, String stageName) throws Exception
+ {
+ _clusterName = clusterName;
+ _stageName = stageName;
+ _stgLatency = new StatCollector();
+ _beanServer = ManagementFactory.getPlatformMBeanServer();
+ _objectName = new ObjectName("StageLatencyMonitor: " + "cluster=" + _clusterName + ",stage=" + _stageName);
+ try
+ {
+ register(this, _objectName);
+ }
+ catch (Exception e)
+ {
+ LOG.error("Couldn't register " + _objectName + " mbean", e);
+ throw e;
+ }
+ }
+
+ private void register(Object bean, ObjectName name) throws Exception
+ {
+ try
+ {
+ _beanServer.unregisterMBean(name);
+ }
+ catch (Exception e)
+ {
+ // OK
+ }
+
+ _beanServer.registerMBean(bean, name);
+ }
+
+ private void unregister(ObjectName name)
+ {
+ try
+ {
+ if (_beanServer.isRegistered(name))
+ {
+ _beanServer.unregisterMBean(name);
+ }
+ }
+ catch (Exception e)
+ {
+ LOG.error("Couldn't unregister " + _objectName + " mbean", e);
+ }
+ }
+
+ public void addStgLatency(long time)
+ {
+ _stgLatency.addData(time);
+ }
+
+ public void reset()
+ {
+ _stgLatency.reset();
+ unregister(_objectName);
+ }
+
+ @Override
+ public long getMaxStgLatency()
+ {
+ return (long) _stgLatency.getMax();
+ }
+
+ @Override
+ public long getMeanStgLatency()
+ {
+ return (long) _stgLatency.getMean();
+ }
+
+ @Override
+ public long get95StgLatency()
+ {
+ return (long) _stgLatency.getPercentile(95);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/HelixStageLatencyMonitorMBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/HelixStageLatencyMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/HelixStageLatencyMonitorMBean.java
new file mode 100644
index 0000000..f62a36c
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/HelixStageLatencyMonitorMBean.java
@@ -0,0 +1,25 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.monitoring.mbeans;
+
+public interface HelixStageLatencyMonitorMBean
+{
+ public long getMaxStgLatency();
+
+ public long getMeanStgLatency();
+
+ public long get95StgLatency();
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageQueueMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageQueueMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageQueueMonitor.java
new file mode 100644
index 0000000..879f04f
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageQueueMonitor.java
@@ -0,0 +1,51 @@
+package org.apache.helix.monitoring.mbeans;
+
+import org.apache.helix.monitoring.StatCollector;
+import org.apache.log4j.Logger;
+
+
+public class MessageQueueMonitor implements MessageQueueMonitorMBean
+{
+ private static final Logger LOG = Logger.getLogger(MessageQueueMonitor.class);
+
+ private final StatCollector _messageQueueSizeStat;
+ private final String _clusterName;
+ private final String _instanceName;
+
+ public MessageQueueMonitor(String clusterName, String instanceName)
+ {
+ _clusterName = clusterName;
+ _instanceName = instanceName;
+ _messageQueueSizeStat = new StatCollector();
+ }
+
+
+ public void addMessageQueueSize(long size)
+ {
+ _messageQueueSizeStat.addData(size);
+ }
+
+ public void reset()
+ {
+ _messageQueueSizeStat.reset();
+ }
+
+ @Override
+ public double getMaxMessageQueueSize()
+ {
+ return _messageQueueSizeStat.getMax();
+ }
+
+ @Override
+ public double getMeanMessageQueueSize()
+ {
+ return _messageQueueSizeStat.getMean();
+ }
+
+ @Override
+ public String getSensorName()
+ {
+ return ClusterStatusMonitor.MESSAGE_QUEUE_STATUS_KEY + "_" + _clusterName + "_"
+ + _instanceName;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageQueueMonitorMBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageQueueMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageQueueMonitorMBean.java
new file mode 100644
index 0000000..2201b1b
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageQueueMonitorMBean.java
@@ -0,0 +1,19 @@
+package org.apache.helix.monitoring.mbeans;
+
+import org.apache.helix.monitoring.SensorNameProvider;
+
+public interface MessageQueueMonitorMBean extends SensorNameProvider
+{
+ /**
+ * Get the max message queue size
+ * @return
+ */
+ public double getMaxMessageQueueSize();
+
+ /**
+ * Get the mean message queue size
+ * @return
+ */
+ public double getMeanMessageQueueSize();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
new file mode 100644
index 0000000..55b6697
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
@@ -0,0 +1,141 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.monitoring.mbeans;
+
+import java.util.Map;
+
+import org.apache.helix.DataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyType;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.log4j.Logger;
+
+
+public class ResourceMonitor implements ResourceMonitorMBean
+{
+ int _numOfPartitions;
+ int _numOfPartitionsInExternalView;
+ int _numOfErrorPartitions;
+ int _externalViewIdealStateDiff;
+ private static final Logger LOG = Logger.getLogger(ResourceMonitor.class);
+
+ String _resourceName, _clusterName;
+
+ public ResourceMonitor(String clusterName, String resourceName)
+ {
+ _clusterName = clusterName;
+ _resourceName = resourceName;
+ }
+
+ @Override
+ public long getPartitionGauge()
+ {
+ return _numOfPartitions;
+ }
+
+ @Override
+ public long getErrorPartitionGauge()
+ {
+ return _numOfErrorPartitions;
+ }
+
+ @Override
+ public long getDifferenceWithIdealStateGauge()
+ {
+ return _externalViewIdealStateDiff;
+ }
+
+ @Override
+ public String getSensorName()
+ {
+ return ClusterStatusMonitor.RESOURCE_STATUS_KEY + "_" + _clusterName + "_"
+ + _resourceName;
+ }
+
+ public void updateExternalView(ExternalView externalView, IdealState idealState)
+ {
+ if (externalView == null)
+ {
+ LOG.warn("external view is null");
+ return;
+ }
+ String resourceName = externalView.getId();
+
+ if (idealState == null)
+ {
+ LOG.warn("ideal state is null for " + resourceName);
+ _numOfErrorPartitions = 0;
+ _externalViewIdealStateDiff = 0;
+ _numOfPartitionsInExternalView = 0;
+ return;
+ }
+
+ assert (resourceName.equals(idealState.getId()));
+
+ int numOfErrorPartitions = 0;
+ int numOfDiff = 0;
+
+ if (_numOfPartitions == 0)
+ {
+ _numOfPartitions = idealState.getRecord().getMapFields().size();
+ }
+
+ // TODO fix this; IdealState shall have either map fields (CUSTOM mode)
+ // or list fields (AUDO mode)
+ for (String partitionName : idealState.getRecord().getMapFields().keySet())
+ {
+ Map<String, String> idealRecord = idealState.getInstanceStateMap(partitionName);
+ Map<String, String> externalViewRecord = externalView.getStateMap(partitionName);
+
+ if (externalViewRecord == null)
+ {
+ numOfDiff += idealRecord.size();
+ continue;
+ }
+ for (String host : idealRecord.keySet())
+ {
+ if (!externalViewRecord.containsKey(host)
+ || !externalViewRecord.get(host).equals(idealRecord.get(host)))
+ {
+ numOfDiff++;
+ }
+ }
+
+ for (String host : externalViewRecord.keySet())
+ {
+ if (externalViewRecord.get(host).equalsIgnoreCase("ERROR"))
+ {
+ numOfErrorPartitions++;
+ }
+ }
+ }
+ _numOfErrorPartitions = numOfErrorPartitions;
+ _externalViewIdealStateDiff = numOfDiff;
+ _numOfPartitionsInExternalView = externalView.getPartitionSet().size();
+ }
+
+ @Override
+ public long getExternalViewPartitionGauge()
+ {
+ return _numOfPartitionsInExternalView;
+ }
+
+ public String getBeanName()
+ {
+ return _clusterName + " " + _resourceName;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitorMBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitorMBean.java
new file mode 100644
index 0000000..63fab88
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitorMBean.java
@@ -0,0 +1,29 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.monitoring.mbeans;
+
+import org.apache.helix.monitoring.SensorNameProvider;
+
+public interface ResourceMonitorMBean extends SensorNameProvider
+{
+ public long getPartitionGauge();
+
+ public long getErrorPartitionGauge();
+
+ public long getDifferenceWithIdealStateGauge();
+
+ public long getExternalViewPartitionGauge();
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitor.java
new file mode 100644
index 0000000..582b441
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitor.java
@@ -0,0 +1,157 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.monitoring.mbeans;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
+import org.apache.helix.monitoring.StatCollector;
+import org.apache.helix.monitoring.StateTransitionContext;
+import org.apache.helix.monitoring.StateTransitionDataPoint;
+
+
+public class StateTransitionStatMonitor implements StateTransitionStatMonitorMBean
+{
+ public enum LATENCY_TYPE {TOTAL, EXECUTION};
+
+ private static final int DEFAULT_WINDOW_SIZE = 4000;
+ private long _numDataPoints;
+ private long _successCount;
+ private TimeUnit _unit;
+
+ private ConcurrentHashMap<LATENCY_TYPE, StatCollector> _monitorMap
+ = new ConcurrentHashMap<LATENCY_TYPE, StatCollector>();
+
+ StateTransitionContext _context;
+
+ public StateTransitionStatMonitor(StateTransitionContext context, TimeUnit unit)
+ {
+ _context = context;
+ _monitorMap.put(LATENCY_TYPE.TOTAL, new StatCollector());
+ _monitorMap.put(LATENCY_TYPE.EXECUTION, new StatCollector());
+ reset();
+ }
+
+ public StateTransitionContext getContext()
+ {
+ return _context;
+ }
+
+ public String getBeanName()
+ {
+ return _context.getClusterName()+" "+_context.getResourceName()+" "+_context.getTransition();
+ }
+
+ public void addDataPoint(StateTransitionDataPoint data)
+ {
+ _numDataPoints++;
+ if(data.getSuccess())
+ {
+ _successCount++;
+ }
+ // should we count only the transition time for successful transitions?
+ addLatency(LATENCY_TYPE.TOTAL, data.getTotalDelay());
+ addLatency(LATENCY_TYPE.EXECUTION, data.getExecutionDelay());
+ }
+
+ void addLatency(LATENCY_TYPE type, double latency)
+ {
+ assert(_monitorMap.containsKey(type));
+ _monitorMap.get(type).addData(latency);
+ }
+
+ public long getNumDataPoints()
+ {
+ return _numDataPoints;
+ }
+
+ public void reset()
+ {
+ _numDataPoints = 0;
+ _successCount = 0;
+ for(StatCollector monitor : _monitorMap.values())
+ {
+ monitor.reset();
+ }
+ }
+
+ @Override
+ public long getTotalStateTransitionGauge()
+ {
+ return _numDataPoints;
+ }
+
+ @Override
+ public long getTotalFailedTransitionGauge()
+ {
+ return _numDataPoints - _successCount;
+ }
+
+ @Override
+ public long getTotalSuccessTransitionGauge()
+ {
+ return _successCount;
+ }
+
+ @Override
+ public double getMeanTransitionLatency()
+ {
+ return _monitorMap.get(LATENCY_TYPE.TOTAL).getMean();
+ }
+
+ @Override
+ public double getMaxTransitionLatency()
+ {
+ return _monitorMap.get(LATENCY_TYPE.TOTAL).getMax();
+ }
+
+ @Override
+ public double getMinTransitionLatency()
+ {
+ return _monitorMap.get(LATENCY_TYPE.TOTAL).getMin();
+ }
+
+ @Override
+ public double getPercentileTransitionLatency(int percentage)
+ {
+ return _monitorMap.get(LATENCY_TYPE.TOTAL).getPercentile(percentage);
+ }
+
+ @Override
+ public double getMeanTransitionExecuteLatency()
+ {
+ return _monitorMap.get(LATENCY_TYPE.EXECUTION).getMean();
+ }
+
+ @Override
+ public double getMaxTransitionExecuteLatency()
+ {
+ return _monitorMap.get(LATENCY_TYPE.EXECUTION).getMax();
+ }
+
+ @Override
+ public double getMinTransitionExecuteLatency()
+ {
+ return _monitorMap.get(LATENCY_TYPE.EXECUTION).getMin();
+ }
+
+ @Override
+ public double getPercentileTransitionExecuteLatency(int percentage)
+ {
+ return _monitorMap.get(LATENCY_TYPE.EXECUTION).getPercentile(percentage);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitorMBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitorMBean.java
new file mode 100644
index 0000000..057998c
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitorMBean.java
@@ -0,0 +1,44 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.monitoring.mbeans;
+
+
+public interface StateTransitionStatMonitorMBean
+{
+ long getTotalStateTransitionGauge();
+
+ long getTotalFailedTransitionGauge();
+
+ long getTotalSuccessTransitionGauge();
+
+ double getMeanTransitionLatency();
+
+ double getMaxTransitionLatency();
+
+ double getMinTransitionLatency();
+
+ double getPercentileTransitionLatency(int percentage);
+
+ double getMeanTransitionExecuteLatency();
+
+ double getMaxTransitionExecuteLatency();
+
+ double getMinTransitionExecuteLatency();
+
+ double getPercentileTransitionExecuteLatency(int percentage);
+
+ void reset();
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/package-info.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/package-info.java
new file mode 100644
index 0000000..fa2b18f
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/package-info.java
@@ -0,0 +1,5 @@
+/**
+ * Helix jmx bean classes
+ *
+ */
+package org.apache.helix.monitoring.mbeans;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/monitoring/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/package-info.java b/helix-core/src/main/java/org/apache/helix/monitoring/package-info.java
new file mode 100644
index 0000000..081ca2e
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/package-info.java
@@ -0,0 +1,5 @@
+/**
+ * Helix monitoring classes
+ *
+ */
+package org.apache.helix.monitoring;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/package-info.java b/helix-core/src/main/java/org/apache/helix/package-info.java
new file mode 100644
index 0000000..587cdb0
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/package-info.java
@@ -0,0 +1,29 @@
+/**
+ * Provide the classes necessary to create a Helix cluster manager
+ * <p>
+ * General flow
+ * <blockquote>
+ * <pre>
+ * manager = HelixManagerFactory.getManagerForROLE(); ROLE can be participant, spectator or a controller<br/>
+ * manager.connect();
+ * manager.addSOMEListener();
+ * After connect the subsequent interactions will be via listener onChange callbacks
+ * There will be 3 scenarios for onChange callback, which can be determined using NotificationContext.type
+ * INIT -> will be invoked the first time the listener is added
+ * CALLBACK -> will be invoked due to datachange in the property value
+ * FINALIZE -> will be invoked when listener is removed or session expires
+ *
+ * manager.disconnect()
+ * </pre>
+ *
+ * </blockquote>
+ *
+ * Default implementations available
+ *
+ * @see org.apache.helix.participant.HelixStateMachineEngine for participant
+ * @see RoutingTableProvider for spectator
+ * @see GenericHelixController for controller
+ *
+ * @author kgopalak
+ */
+package org.apache.helix;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/participant/CustomCodeCallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/CustomCodeCallbackHandler.java b/helix-core/src/main/java/org/apache/helix/participant/CustomCodeCallbackHandler.java
new file mode 100644
index 0000000..bd0ebe0
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/participant/CustomCodeCallbackHandler.java
@@ -0,0 +1,31 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.participant;
+
+import org.apache.helix.NotificationContext;
+
+/**
+ * Callback interface to running custom code on Helix participant manager
+ * The custom code will be triggered on user specified cluster changes
+ */
+public interface CustomCodeCallbackHandler
+{
+ /**
+ * callback
+ * @param context
+ */
+ public void onCallback(NotificationContext context);
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/participant/CustomCodeInvoker.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/CustomCodeInvoker.java b/helix-core/src/main/java/org/apache/helix/participant/CustomCodeInvoker.java
new file mode 100644
index 0000000..3f712d7
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/participant/CustomCodeInvoker.java
@@ -0,0 +1,122 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.participant;
+
+import java.util.List;
+
+import org.apache.helix.ConfigChangeListener;
+import org.apache.helix.ExternalViewChangeListener;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.LiveInstanceChangeListener;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.NotificationContext.Type;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.log4j.Logger;
+
+
+public class CustomCodeInvoker implements
+ LiveInstanceChangeListener,
+ ConfigChangeListener,
+ ExternalViewChangeListener
+{
+ private static Logger LOG = Logger.getLogger(CustomCodeInvoker.class);
+ private final CustomCodeCallbackHandler _callback;
+ private final String _partitionKey;
+
+ public CustomCodeInvoker(CustomCodeCallbackHandler callback, String partitionKey)
+ {
+ _callback = callback;
+ _partitionKey = partitionKey;
+ }
+
+ private void callParticipantCode(NotificationContext context)
+ {
+ // System.out.println("callback invoked. type:" + context.getType().toString());
+ if (context.getType() == Type.INIT || context.getType() == Type.CALLBACK)
+ {
+ // since ZkClient.unsubscribe() does not immediately remove listeners
+ // from zk, it is possible that two listeners exist when leadership transfers
+ // therefore, double check to make sure only one participant invokes the code
+ if (context.getType() == Type.CALLBACK)
+ {
+ HelixManager manager = context.getManager();
+ // DataAccessor accessor = manager.getDataAccessor();
+ HelixDataAccessor accessor = manager.getHelixDataAccessor();
+ Builder keyBuilder = accessor.keyBuilder();
+
+ String instance = manager.getInstanceName();
+ String sessionId = manager.getSessionId();
+
+ // get resource name from partition key: "PARTICIPANT_LEADER_XXX_0"
+ String resourceName = _partitionKey.substring(0, _partitionKey.lastIndexOf('_'));
+
+ CurrentState curState =
+ accessor.getProperty(keyBuilder.currentState(instance,
+ sessionId,
+ resourceName));
+ if (curState == null)
+ {
+ return;
+ }
+
+ String state = curState.getState(_partitionKey);
+ if (state == null || !state.equalsIgnoreCase("LEADER"))
+ {
+ return;
+ }
+ }
+
+ try
+ {
+ _callback.onCallback(context);
+ }
+ catch (Exception e)
+ {
+ LOG.error("Error invoking callback:" + _callback, e);
+ }
+ }
+ }
+
+ @Override
+ public void onLiveInstanceChange(List<LiveInstance> liveInstances,
+ NotificationContext changeContext)
+ {
+ LOG.info("onLiveInstanceChange() invoked");
+ callParticipantCode(changeContext);
+ }
+
+ @Override
+ public void onConfigChange(List<InstanceConfig> configs,
+ NotificationContext changeContext)
+ {
+ LOG.info("onConfigChange() invoked");
+ callParticipantCode(changeContext);
+ }
+
+ @Override
+ public void onExternalViewChange(List<ExternalView> externalViewList,
+ NotificationContext changeContext)
+ {
+ LOG.info("onExternalViewChange() invoked");
+ callParticipantCode(changeContext);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerElection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerElection.java b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerElection.java
new file mode 100644
index 0000000..ea070f0
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerElection.java
@@ -0,0 +1,199 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.participant;
+
+import java.lang.management.ManagementFactory;
+
+import org.apache.helix.ControllerChangeListener;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyType;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.GenericHelixController;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.controller.restlet.ZKPropertyTransferServer;
+import org.apache.helix.model.LeaderHistory;
+import org.apache.helix.model.LiveInstance;
+import org.apache.log4j.Logger;
+
+
+// TODO: merge with GenericHelixController
+public class DistClusterControllerElection implements ControllerChangeListener
+{
+ private static Logger LOG =
+ Logger.getLogger(DistClusterControllerElection.class);
+ private final String _zkAddr;
+ private final GenericHelixController _controller = new GenericHelixController();
+ private HelixManager _leader;
+
+ public DistClusterControllerElection(String zkAddr)
+ {
+ _zkAddr = zkAddr;
+ }
+
+ /**
+ * may be accessed by multiple threads: zk-client thread and
+ * ZkHelixManager.disconnect()->reset() TODO: Refactor accessing HelixMaangerMain class
+ * statically
+ */
+ @Override
+ public synchronized void onControllerChange(NotificationContext changeContext)
+ {
+ HelixManager manager = changeContext.getManager();
+ if (manager == null)
+ {
+ LOG.error("missing attributes in changeContext. requires HelixManager");
+ return;
+ }
+
+ InstanceType type = manager.getInstanceType();
+ if (type != InstanceType.CONTROLLER && type != InstanceType.CONTROLLER_PARTICIPANT)
+ {
+ LOG.error("fail to become controller because incorrect instanceType (was "
+ + type.toString() + ", requires CONTROLLER | CONTROLLER_PARTICIPANT)");
+ return;
+ }
+
+ try
+ {
+ if (changeContext.getType().equals(NotificationContext.Type.INIT)
+ || changeContext.getType().equals(NotificationContext.Type.CALLBACK))
+ {
+ // DataAccessor dataAccessor = manager.getDataAccessor();
+ HelixDataAccessor accessor = manager.getHelixDataAccessor();
+ Builder keyBuilder = accessor.keyBuilder();
+
+ while (accessor.getProperty(keyBuilder.controllerLeader()) == null)
+ {
+ boolean success = tryUpdateController(manager);
+ if (success)
+ {
+ updateHistory(manager);
+ if (type == InstanceType.CONTROLLER)
+ {
+ HelixControllerMain.addListenersToController(manager, _controller);
+ manager.startTimerTasks();
+ }
+ else if (type == InstanceType.CONTROLLER_PARTICIPANT)
+ {
+ String clusterName = manager.getClusterName();
+ String controllerName = manager.getInstanceName();
+ _leader =
+ HelixManagerFactory.getZKHelixManager(clusterName,
+ controllerName,
+ InstanceType.CONTROLLER,
+ _zkAddr);
+
+ _leader.connect();
+ _leader.startTimerTasks();
+ HelixControllerMain.addListenersToController(_leader, _controller);
+ }
+
+ }
+ }
+ }
+ else if (changeContext.getType().equals(NotificationContext.Type.FINALIZE))
+ {
+
+ if (_leader != null)
+ {
+ _leader.disconnect();
+ }
+ }
+
+ }
+ catch (Exception e)
+ {
+ LOG.error("Exception when trying to become leader", e);
+ }
+ }
+
+ private boolean tryUpdateController(HelixManager manager)
+ {
+ // DataAccessor dataAccessor = manager.getDataAccessor();
+ HelixDataAccessor accessor = manager.getHelixDataAccessor();
+ Builder keyBuilder = accessor.keyBuilder();
+
+ LiveInstance leader = new LiveInstance(manager.getInstanceName());
+ try
+ {
+ leader.setLiveInstance(ManagementFactory.getRuntimeMXBean().getName());
+ // TODO: this session id is not the leader's session id in distributed mode
+ leader.setSessionId(manager.getSessionId());
+ leader.setHelixVersion(manager.getVersion());
+ if(ZKPropertyTransferServer.getInstance() != null)
+ {
+ String zkPropertyTransferServiceUrl = ZKPropertyTransferServer.getInstance().getWebserviceUrl();
+ if(zkPropertyTransferServiceUrl != null)
+ {
+ leader.setWebserviceUrl(zkPropertyTransferServiceUrl);
+ }
+ }
+ else
+ {
+ LOG.warn("ZKPropertyTransferServer instnace is null");
+ }
+ boolean success = accessor.createProperty(keyBuilder.controllerLeader(), leader);
+ if (success)
+ {
+ return true;
+ }
+ else
+ {
+ LOG.info("Unable to become leader probably because some other controller becames the leader");
+ }
+ }
+ catch (Exception e)
+ {
+ LOG.error("Exception when trying to updating leader record in cluster:"
+ + manager.getClusterName()
+ + ". Need to check again whether leader node has been created or not",
+ e);
+ }
+
+ leader = accessor.getProperty(keyBuilder.controllerLeader());
+ if (leader != null)
+ {
+ String leaderName = leader.getInstanceName(); // leader.getLeader();
+ LOG.info("Leader exists for cluster:" + manager.getClusterName()
+ + ", currentLeader:" + leaderName);
+
+ if (leaderName != null && leaderName.equals(manager.getInstanceName()))
+ {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ private void updateHistory(HelixManager manager)
+ {
+ HelixDataAccessor accessor = manager.getHelixDataAccessor();
+ Builder keyBuilder = accessor.keyBuilder();
+
+ LeaderHistory history = accessor.getProperty(keyBuilder.controllerLeaderHistory());
+ if (history == null)
+ {
+ history = new LeaderHistory(PropertyType.HISTORY.toString());
+ }
+ history.updateHistory(manager.getClusterName(), manager.getInstanceName());
+ accessor.setProperty(keyBuilder.controllerLeaderHistory(), history);
+ }
+}