You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 00:26:41 UTC
[21/47] Refactoring from com.linkedin.helix to org.apache.helix
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/monitoring/ParticipantMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/monitoring/ParticipantMonitor.java b/helix-core/src/main/java/com/linkedin/helix/monitoring/ParticipantMonitor.java
deleted file mode 100644
index 8c9fb4a..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/monitoring/ParticipantMonitor.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.monitoring;
-
-import java.lang.management.ManagementFactory;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-
-import javax.management.MBeanServer;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.monitoring.mbeans.StateTransitionStatMonitor;
-
-public class ParticipantMonitor
-{
- private final ConcurrentHashMap<StateTransitionContext, StateTransitionStatMonitor> _monitorMap
- = new ConcurrentHashMap<StateTransitionContext, StateTransitionStatMonitor>();
- private static final Logger LOG = Logger.getLogger(ParticipantMonitor.class);
-
- private MBeanServer _beanServer;
-
- public ParticipantMonitor()
- {
- try
- {
- _beanServer = ManagementFactory.getPlatformMBeanServer();
- }
- catch(Exception e)
- {
- LOG.warn(e);
- e.printStackTrace();
- _beanServer = null;
- }
- }
-
- public void reportTransitionStat(StateTransitionContext cxt,
- StateTransitionDataPoint data)
- {
- if(_beanServer == null)
- {
- LOG.warn("bean server is null, skip reporting");
- return;
- }
- try
- {
- if(!_monitorMap.containsKey(cxt))
- {
- synchronized(this)
- {
- if(!_monitorMap.containsKey(cxt))
- {
- StateTransitionStatMonitor bean = new StateTransitionStatMonitor(cxt, TimeUnit.MILLISECONDS);
- _monitorMap.put(cxt, bean);
- String beanName = cxt.toString();
- register(bean, getObjectName(beanName));
- }
- }
- }
- _monitorMap.get(cxt).addDataPoint(data);
- }
- catch(Exception e)
- {
- LOG.warn(e);
- e.printStackTrace();
- }
- }
-
-
- private ObjectName getObjectName(String name) throws MalformedObjectNameException
- {
- LOG.info("Registering bean: "+name);
- return new ObjectName("CLMParticipantReport:"+name);
- }
-
- private void register(Object bean, ObjectName name)
- {
- if(_beanServer == null)
- {
- LOG.warn("bean server is null, skip reporting");
- return;
- }
- try
- {
- _beanServer.unregisterMBean(name);
- }
- catch (Exception e1)
- {
- // Swallow silently
- }
-
- try
- {
- _beanServer.registerMBean(bean, name);
- }
- catch (Exception e)
- {
- LOG.warn("Could not register MBean", e);
- }
- }
-
- public void shutDown()
- {
- for(StateTransitionContext cxt : _monitorMap.keySet() )
- {
- try
- {
- ObjectName name = getObjectName(cxt.toString());
- if (_beanServer.isRegistered(name))
- {
- _beanServer.unregisterMBean(name);
- }
- }
- catch (Exception e)
- {
- LOG.warn("fail to unregister " + cxt.toString(), e);
- }
- }
- _monitorMap.clear();
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/monitoring/SensorNameProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/monitoring/SensorNameProvider.java b/helix-core/src/main/java/com/linkedin/helix/monitoring/SensorNameProvider.java
deleted file mode 100644
index ff410c4..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/monitoring/SensorNameProvider.java
+++ /dev/null
@@ -1,6 +0,0 @@
-package com.linkedin.helix.monitoring;
-
-public interface SensorNameProvider
-{
- String getSensorName();
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/monitoring/StatCollector.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/monitoring/StatCollector.java b/helix-core/src/main/java/com/linkedin/helix/monitoring/StatCollector.java
deleted file mode 100644
index ebbb8ec..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/monitoring/StatCollector.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.monitoring;
-
-import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
-import org.apache.commons.math.stat.descriptive.SynchronizedDescriptiveStatistics;
-
-public class StatCollector
-{
- private static final int DEFAULT_WINDOW_SIZE = 100;
- private final DescriptiveStatistics _stats;
- private long _numDataPoints;
- private long _totalSum;
-
- public StatCollector()
- {
- _stats = new SynchronizedDescriptiveStatistics();
- _stats.setWindowSize(DEFAULT_WINDOW_SIZE);
- }
-
- public void addData(double data)
- {
- _numDataPoints++;
- _totalSum += data;
- _stats.addValue(data);
- }
-
- public long getTotalSum()
- {
- return _totalSum;
- }
-
- public DescriptiveStatistics getStatistics()
- {
- return _stats;
- }
-
- public long getNumDataPoints()
- {
- return _numDataPoints;
- }
-
- public void reset()
- {
- _numDataPoints = 0;
- _totalSum = 0;
- _stats.clear();
- }
-
- public double getMean()
- {
- if(_stats.getN() == 0)
- {
- return 0;
- }
- return _stats.getMean();
- }
-
- public double getMax()
- {
- return _stats.getMax();
- }
-
- public double getMin()
- {
- return _stats.getMin();
- }
-
- public double getPercentile(int percentage)
- {
- if(_stats.getN() == 0)
- {
- return 0;
- }
- return _stats.getPercentile(percentage*1.0);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/monitoring/StateTransitionContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/monitoring/StateTransitionContext.java b/helix-core/src/main/java/com/linkedin/helix/monitoring/StateTransitionContext.java
deleted file mode 100644
index 4f6d17d..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/monitoring/StateTransitionContext.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.monitoring;
-
-public class StateTransitionContext
-{
- private final String _resourceName;
- private final String _clusterName;
- private final String _instanceName;
- private final String _transition;
-
- public StateTransitionContext(
- String clusterName,
- String instanceName,
- String resourceName,
- String transition
- )
- {
- _clusterName = clusterName;
- _resourceName = resourceName;
- _transition = transition;
- _instanceName = instanceName;
- }
-
- public String getClusterName()
- {
- return _clusterName;
- }
-
- public String getInstanceName()
- {
- return _instanceName;
- }
-
- public String getResourceName()
- {
- return _resourceName;
- }
-
- public String getTransition()
- {
- return _transition;
- }
-
- @Override
- public boolean equals(Object other)
- {
- if(! (other instanceof StateTransitionContext))
- {
- return false;
- }
-
- StateTransitionContext otherCxt = (StateTransitionContext) other;
- return
- _clusterName.equals(otherCxt.getClusterName()) &&
- // _instanceName.equals(otherCxt.getInstanceName()) &&
- _resourceName.equals(otherCxt.getResourceName()) &&
- _transition.equals(otherCxt.getTransition()) ;
- }
-
-
- // In the report, we will gather per transition time statistics
- @Override
- public int hashCode()
- {
- return toString().hashCode();
- }
-
- public String toString()
- {
- return "Cluster=" + _clusterName + "," +
- // "instance=" + _instanceName + "," +
- "Resource=" + _resourceName +"," +
- "Transition=" + _transition;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/monitoring/StateTransitionDataPoint.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/monitoring/StateTransitionDataPoint.java b/helix-core/src/main/java/com/linkedin/helix/monitoring/StateTransitionDataPoint.java
deleted file mode 100644
index 1431c79..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/monitoring/StateTransitionDataPoint.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.monitoring;
-
-public class StateTransitionDataPoint
-{
- long _totalDelay;
- long _executionDelay;
- boolean _isSuccess;
-
- public StateTransitionDataPoint(long totalDelay, long executionDelay, boolean isSuccess)
- {
- _totalDelay = totalDelay;
- _executionDelay = executionDelay;
- _isSuccess = isSuccess;
- }
-
- public long getTotalDelay()
- {
- return _totalDelay;
- }
-
- public long getExecutionDelay()
- {
- return _executionDelay;
- }
-
- public boolean getSuccess()
- {
- return _isSuccess;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/monitoring/ZKPathDataDumpTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/monitoring/ZKPathDataDumpTask.java b/helix-core/src/main/java/com/linkedin/helix/monitoring/ZKPathDataDumpTask.java
deleted file mode 100644
index 8e73825..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/monitoring/ZKPathDataDumpTask.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.monitoring;
-
-import java.io.StringWriter;
-import java.util.Date;
-import java.util.List;
-import java.util.TimerTask;
-
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.data.Stat;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.SerializationConfig;
-
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.PropertyType;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.manager.zk.ZNRecordSerializer;
-import com.linkedin.helix.manager.zk.ZkClient;
-import com.linkedin.helix.util.HelixUtil;
-
-public class ZKPathDataDumpTask extends TimerTask
-{
- static Logger logger = Logger.getLogger(ZKPathDataDumpTask.class);
-
- private final int _thresholdNoChangeInMs;
- private final HelixManager _manager;
- private final ZkClient _zkClient;
-
- public ZKPathDataDumpTask(HelixManager manager, ZkClient zkClient, int thresholdNoChangeInMs)
- {
- _manager = manager;
- _zkClient = zkClient;
- logger.info("Scannning cluster statusUpdate " + manager.getClusterName()
- + " thresholdNoChangeInMs: " + thresholdNoChangeInMs);
- _thresholdNoChangeInMs = thresholdNoChangeInMs;
- }
-
- @Override
- public void run()
- {
- // For each record in status update and error node
- // TODO: for now the status updates are dumped to cluster manager log4j log.
- // We need to think if we should create per-instance log files that contains
- // per-instance statusUpdates
- // and errors
- logger.info("Scannning status updates ...");
- try
- {
- HelixDataAccessor accessor = _manager.getHelixDataAccessor();
- Builder keyBuilder = accessor.keyBuilder();
-
- List<String> instances = accessor.getChildNames(keyBuilder.instanceConfigs());
- for (String instanceName : instances)
- {
- scanPath(HelixUtil.getInstancePropertyPath(_manager.getClusterName(), instanceName,
- PropertyType.STATUSUPDATES), _thresholdNoChangeInMs);
- scanPath(HelixUtil.getInstancePropertyPath(_manager.getClusterName(), instanceName,
- PropertyType.ERRORS), _thresholdNoChangeInMs * 3);
- }
- scanPath(HelixUtil.getControllerPropertyPath(_manager.getClusterName(),
- PropertyType.STATUSUPDATES_CONTROLLER), _thresholdNoChangeInMs);
-
- scanPath(HelixUtil.getControllerPropertyPath(_manager.getClusterName(),
- PropertyType.ERRORS_CONTROLLER), _thresholdNoChangeInMs * 3);
- } catch (Exception e)
- {
- logger.error(e);
- e.printStackTrace();
- }
- }
-
- void scanPath(String path, int thresholdNoChangeInMs)
- {
- logger.info("Scannning path " + path);
- List<String> subPaths = _zkClient.getChildren(path);
- for (String subPath : subPaths)
- {
- try
- {
- String nextPath = path + "/" + subPath;
- List<String> subSubPaths = _zkClient.getChildren(nextPath);
- for (String subsubPath : subSubPaths)
- {
- try
- {
- checkAndDump(nextPath + "/" + subsubPath, thresholdNoChangeInMs);
- } catch (Exception e)
- {
- logger.error(e);
- }
- }
- } catch (Exception e)
- {
- logger.error(e);
- }
- }
- }
-
- void checkAndDump(String path, int thresholdNoChangeInMs)
- {
- List<String> subPaths = _zkClient.getChildren(path);
- if(subPaths.size() == 0)
- {
- subPaths.add("");
- }
- for (String subPath : subPaths)
- {
- String fullPath = subPath.length() > 0 ? path + "/" + subPath : path;
- Stat pathStat = _zkClient.getStat(fullPath);
-
- long lastModifiedTimeInMs = pathStat.getMtime();
- long nowInMs = new Date().getTime();
- // logger.info(nowInMs + " " + lastModifiedTimeInMs + " " + fullPath);
-
- // Check the last modified time
- if (nowInMs > lastModifiedTimeInMs)
- {
- long timeDiff = nowInMs - lastModifiedTimeInMs;
- if (timeDiff > thresholdNoChangeInMs)
- {
- logger.info("Dumping status update path " + fullPath + " " + timeDiff + "MS has passed");
- _zkClient.setZkSerializer(new ZNRecordSerializer());
- ZNRecord record = _zkClient.readData(fullPath);
-
- // dump the node content into log file
- ObjectMapper mapper = new ObjectMapper();
- SerializationConfig serializationConfig = mapper.getSerializationConfig();
- serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
-
- StringWriter sw = new StringWriter();
- try
- {
- mapper.writeValue(sw, record);
- logger.info(sw.toString());
- } catch (Exception e)
- {
- logger.warn(
- "Exception during serialization in ZKPathDataDumpTask.checkAndDump. This can mostly be ignored",
- e);
- }
- // Delete the leaf data
- _zkClient.deleteRecursive(fullPath);
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/monitoring/mbeans/ClusterAlertItem.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/monitoring/mbeans/ClusterAlertItem.java b/helix-core/src/main/java/com/linkedin/helix/monitoring/mbeans/ClusterAlertItem.java
deleted file mode 100644
index 925f407..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/monitoring/mbeans/ClusterAlertItem.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.monitoring.mbeans;
-
-
-import java.util.Date;
-
-import com.linkedin.helix.alerts.AlertValueAndStatus;
-
-public class ClusterAlertItem implements ClusterAlertItemMBean
-{
- String _alertItemName;
- double _alertValue;
- int _alertFired;
- String _additionalInfo = "";
- AlertValueAndStatus _valueAndStatus;
- long _lastUpdateTime = 0;
-
- public ClusterAlertItem(String name, AlertValueAndStatus valueAndStatus)
- {
- _valueAndStatus = valueAndStatus;
- _alertItemName = name;
- refreshValues();
- }
- @Override
- public String getSensorName()
- {
- return _alertItemName;
- }
-
- @Override
- public double getAlertValue()
- {
- return _alertValue;
- }
-
- public void setValueMap(AlertValueAndStatus valueAndStatus)
- {
- _valueAndStatus = valueAndStatus;
- refreshValues();
- }
-
- void refreshValues()
- {
- _lastUpdateTime = new Date().getTime();
- if(_valueAndStatus.getValue().getElements().size() > 0)
- {
- _alertValue = Double.parseDouble(_valueAndStatus.getValue().getElements().get(0));
- }
- else
- {
- _alertValue = 0;
- }
- _alertFired = _valueAndStatus.isFired() ? 1 : 0;
- }
- @Override
- public int getAlertFired()
- {
- return _alertFired;
- }
-
- public void setAdditionalInfo(String additionalInfo)
- {
- _additionalInfo = additionalInfo;
- }
-
- @Override
- public String getAdditionalInfo()
- {
- return _additionalInfo;
- }
-
- public void reset()
- {
- _alertFired = 0;
- _additionalInfo = "";
- _alertValue = 0;
- }
-
- public long getLastUpdateTime()
- {
- return _lastUpdateTime;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/monitoring/mbeans/ClusterAlertItemMBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/monitoring/mbeans/ClusterAlertItemMBean.java b/helix-core/src/main/java/com/linkedin/helix/monitoring/mbeans/ClusterAlertItemMBean.java
deleted file mode 100644
index f88196a..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/monitoring/mbeans/ClusterAlertItemMBean.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.monitoring.mbeans;
-
-public interface ClusterAlertItemMBean
-{
- String getSensorName();
-
- double getAlertValue();
-
- int getAlertFired();
-
- String getAdditionalInfo();
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/monitoring/mbeans/ClusterAlertMBeanCollection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/monitoring/mbeans/ClusterAlertMBeanCollection.java b/helix-core/src/main/java/com/linkedin/helix/monitoring/mbeans/ClusterAlertMBeanCollection.java
deleted file mode 100644
index 56a8613..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/monitoring/mbeans/ClusterAlertMBeanCollection.java
+++ /dev/null
@@ -1,333 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.monitoring.mbeans;
-
-import java.io.StringWriter;
-import java.lang.management.ManagementFactory;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
-import org.apache.log4j.Logger;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.SerializationConfig;
-
-import com.linkedin.helix.PropertyType;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.alerts.AlertParser;
-import com.linkedin.helix.alerts.AlertValueAndStatus;
-import com.linkedin.helix.alerts.Tuple;
-
-public class ClusterAlertMBeanCollection
-{
- public static String DOMAIN_ALERT = "HelixAlerts";
- public static String ALERT_SUMMARY = "AlertSummary";
-
- private static final Logger _logger = Logger.getLogger(ClusterAlertMBeanCollection.class);
- ConcurrentHashMap<String, ClusterAlertItem> _alertBeans
- = new ConcurrentHashMap<String, ClusterAlertItem>();
-
- Map<String, String> _recentAlertDelta;
- ClusterAlertSummary _clusterAlertSummary;
- ZNRecord _alertHistory = new ZNRecord(PropertyType.ALERT_HISTORY.toString());
- Set<String> _previousFiredAlerts = new HashSet<String>();
- // 5 min for mbean freshness threshold
- public static final long ALERT_NOCHANGE_THRESHOLD = 5 * 60 * 1000;
-
- final MBeanServer _beanServer;
-
- public interface ClusterAlertSummaryMBean extends ClusterAlertItemMBean
- {
- public String getAlertFiredHistory();
- }
-
- class ClusterAlertSummary extends ClusterAlertItem implements ClusterAlertSummaryMBean
- {
- public ClusterAlertSummary(String name, AlertValueAndStatus valueAndStatus)
- {
- super(name, valueAndStatus);
- }
- /**
- * Returns the previous 100 alert mbean turn on / off history
- * */
- @Override
- public String getAlertFiredHistory()
- {
- try
- {
- ObjectMapper mapper = new ObjectMapper();
- SerializationConfig serializationConfig = mapper.getSerializationConfig();
- serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
- StringWriter sw = new StringWriter();
- mapper.writeValue(sw, _alertHistory);
- return sw.toString();
- }
- catch(Exception e)
- {
- _logger.warn("", e);
- return "";
- }
- }
- }
-
-
- public ClusterAlertMBeanCollection()
- {
- _beanServer = ManagementFactory.getPlatformMBeanServer();
- }
-
- public Collection<ClusterAlertItemMBean> getCurrentAlertMBeans()
- {
- ArrayList<ClusterAlertItemMBean> beans = new ArrayList<ClusterAlertItemMBean>();
- for(ClusterAlertItem item : _alertBeans.values())
- {
- beans.add(item);
- }
- return beans;
- }
-
- void onNewAlertMbeanAdded(ClusterAlertItemMBean bean)
- {
- try
- {
- _logger.info("alert bean " + bean.getSensorName()+" exposed to jmx");
- System.out.println("alert bean " + bean.getSensorName()+" exposed to jmx");
- ObjectName objectName = new ObjectName(DOMAIN_ALERT+":alert="+bean.getSensorName());
- register(bean, objectName);
- }
- catch (Exception e)
- {
- _logger.error("", e);
- e.printStackTrace();
- }
- }
-
- public void setAlerts(String originAlert, Map<String, AlertValueAndStatus> alertResultMap, String clusterName)
- {
- if(alertResultMap == null)
- {
- _logger.warn("null alertResultMap");
- return;
- }
- for(String alertName : alertResultMap.keySet())
- {
- String beanName = "";
- if(alertName.length() > 1)
- {
- String comparator = AlertParser.getComponent(AlertParser.COMPARATOR_NAME, originAlert);
- String constant = AlertParser.getComponent(AlertParser.CONSTANT_NAME, originAlert);
- beanName = "("+ alertName+")" + comparator + "("+constant+")";
- }
- else
- {
- beanName = originAlert + "--(" + alertName + ")";
- }
- // This is to make JMX happy; certain charaters cannot be in JMX bean name
- beanName = beanName.replace('*', '%').replace('=', '#').replace(',', ';');
- if(!_alertBeans.containsKey(beanName))
- {
- ClusterAlertItem item = new ClusterAlertItem(beanName, alertResultMap.get(alertName));
- onNewAlertMbeanAdded(item);
- _alertBeans.put(beanName, item);
- }
- else
- {
- _alertBeans.get(beanName).setValueMap(alertResultMap.get(alertName));
- }
- }
- refreshSummayAlert(clusterName);
- }
-
- public void setAlertHistory(ZNRecord alertHistory)
- {
- _alertHistory = alertHistory;
- }
- /**
- * The summary alert is a combination of all alerts, if it is on, something is wrong on this
- * cluster. The additional info contains all alert mbean names that has been fired.
- */
- void refreshSummayAlert(String clusterName)
- {
- boolean fired = false;
- String alertsFired = "";
- String summaryKey = ALERT_SUMMARY + "_" + clusterName;
- for(String key : _alertBeans.keySet())
- {
- if(!key.equals(summaryKey))
- {
- ClusterAlertItem item = _alertBeans.get(key);
- fired = (item.getAlertFired() == 1) | fired;
- if(item.getAlertFired() == 1)
- {
- alertsFired += item._alertItemName;
- alertsFired += ";";
- }
- }
- }
- Tuple<String> t = new Tuple<String>();
- t.add("0");
- AlertValueAndStatus summaryStatus = new AlertValueAndStatus(t, fired);
- if(!_alertBeans.containsKey(summaryKey))
- {
- ClusterAlertSummary item = new ClusterAlertSummary(summaryKey, summaryStatus);
- onNewAlertMbeanAdded(item);
- item.setAdditionalInfo(alertsFired);
- _alertBeans.put(summaryKey, item);
- _clusterAlertSummary = item;
- }
- else
- {
- _alertBeans.get(summaryKey).setValueMap(summaryStatus);
- _alertBeans.get(summaryKey).setAdditionalInfo(alertsFired);
- }
- }
-
- void register(Object bean, ObjectName name)
- {
- try
- {
- _beanServer.unregisterMBean(name);
- }
- catch (Exception e)
- {
- }
- try
- {
- _beanServer.registerMBean(bean, name);
- }
- catch (Exception e)
- {
- _logger.error("Could not register MBean", e);
- }
- }
-
- public void reset()
- {
- for(String beanName : _alertBeans.keySet())
- {
- ClusterAlertItem item = _alertBeans.get(beanName);
- item.reset();
- try
- {
- ObjectName objectName = new ObjectName(DOMAIN_ALERT+":alert="+item.getSensorName());
- _beanServer.unregisterMBean(objectName);
- }
- catch (Exception e)
- {
- _logger.warn("", e);
- }
- }
- _alertBeans.clear();
- }
-
- public void refreshAlertDelta(String clusterName)
- {
- // Update the alert turn on/turn off history
- String summaryKey = ALERT_SUMMARY + "_" + clusterName;
- Set<String> currentFiredAlerts = new HashSet<String>();
- for(String key : _alertBeans.keySet())
- {
- if(!key.equals(summaryKey))
- {
- ClusterAlertItem item = _alertBeans.get(key);
- if(item.getAlertFired() == 1)
- {
- currentFiredAlerts.add(item._alertItemName);
- }
- }
- }
-
- Map<String, String> onOffAlertsMap = new HashMap<String, String>();
- for(String alertName : currentFiredAlerts)
- {
- if(!_previousFiredAlerts.contains(alertName))
- {
- onOffAlertsMap.put(alertName, "ON");
- _logger.info(alertName + " ON");
- _previousFiredAlerts.add(alertName);
- }
- }
- for(String cachedAlert : _previousFiredAlerts)
- {
- if(!currentFiredAlerts.contains(cachedAlert))
- {
- onOffAlertsMap.put(cachedAlert, "OFF");
- _logger.info(cachedAlert + " OFF");
- }
- }
- for(String key : onOffAlertsMap.keySet())
- {
- if(onOffAlertsMap.get(key).equals("OFF"))
- {
- _previousFiredAlerts.remove(key);
- }
- }
- if(onOffAlertsMap.size() == 0)
- {
- _logger.info("No MBean change");
- }
- _recentAlertDelta = onOffAlertsMap;
-
- checkMBeanFreshness(ALERT_NOCHANGE_THRESHOLD);
- }
-
- public Map<String, String> getRecentAlertDelta()
- {
- return _recentAlertDelta;
- }
-
- /**
- * Remove mbeans that has not been changed for thresholdInMs MS
- * */
- void checkMBeanFreshness(long thresholdInMs)
- {
- long now = new Date().getTime();
- Set<String> oldBeanNames = new HashSet<String>();
- // Get mbean items that has not been updated for thresholdInMs
- for(String beanName : _alertBeans.keySet())
- {
- ClusterAlertItem item = _alertBeans.get(beanName);
- if(now - item.getLastUpdateTime() > thresholdInMs)
- {
- oldBeanNames.add(beanName);
- _logger.info("bean " + beanName+" has not been updated for "+ thresholdInMs +" MS");
- }
- }
- for(String beanName : oldBeanNames)
- {
- ClusterAlertItem item = _alertBeans.get(beanName);
- _alertBeans.remove(beanName);
- try
- {
- item.reset();
- ObjectName objectName = new ObjectName(DOMAIN_ALERT+":alert="+item.getSensorName());
- _beanServer.unregisterMBean(objectName);
- }
- catch (Exception e)
- {
- _logger.warn("", e);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/monitoring/mbeans/ClusterMBeanObserver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/monitoring/mbeans/ClusterMBeanObserver.java b/helix-core/src/main/java/com/linkedin/helix/monitoring/mbeans/ClusterMBeanObserver.java
deleted file mode 100644
index f4db78b..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/monitoring/mbeans/ClusterMBeanObserver.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.monitoring.mbeans;
-
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-
-import javax.management.AttributeNotFoundException;
-import javax.management.InstanceNotFoundException;
-import javax.management.IntrospectionException;
-import javax.management.ListenerNotFoundException;
-import javax.management.MBeanAttributeInfo;
-import javax.management.MBeanException;
-import javax.management.MBeanInfo;
-import javax.management.MBeanServerConnection;
-import javax.management.MBeanServerDelegate;
-import javax.management.MBeanServerNotification;
-import javax.management.MalformedObjectNameException;
-import javax.management.Notification;
-import javax.management.NotificationListener;
-import javax.management.ReflectionException;
-import javax.management.relation.MBeanServerNotificationFilter;
-
-import org.apache.log4j.Logger;
-
-/*
- * TODO: this class should be in espresso common, as the only usage of it is
- * to create ingraph adaptors
- * **/
-public abstract class ClusterMBeanObserver implements NotificationListener
-{
- protected final String _domain;
- protected MBeanServerConnection _server;
- private static final Logger _logger = Logger.getLogger(ClusterMBeanObserver.class);
-
- public ClusterMBeanObserver(String domain)
- throws InstanceNotFoundException, IOException, MalformedObjectNameException, NullPointerException
- {
- // Get a reference to the target MBeanServer
- _domain = domain;
- _server = ManagementFactory.getPlatformMBeanServer();
- MBeanServerNotificationFilter filter = new MBeanServerNotificationFilter();
- filter.enableAllObjectNames();
- _server.addNotificationListener(MBeanServerDelegate.DELEGATE_NAME, this, filter, null);
- }
-
- public void handleNotification(Notification notification, Object handback)
- {
- MBeanServerNotification mbs = (MBeanServerNotification) notification;
- if(MBeanServerNotification.REGISTRATION_NOTIFICATION.equals(mbs.getType()))
- {
- if(mbs.getMBeanName().getDomain().equalsIgnoreCase(_domain))
- {
- _logger.info("MBean Registered, name :" + mbs.getMBeanName());
- onMBeanRegistered(_server, mbs);
- }
- }
- else if(MBeanServerNotification.UNREGISTRATION_NOTIFICATION.equals(mbs.getType()))
- {
- if(mbs.getMBeanName().getDomain().equalsIgnoreCase(_domain))
- {
- _logger.info("MBean Unregistered, name :" + mbs.getMBeanName());
- onMBeanUnRegistered(_server, mbs);
- }
- }
- }
-
- public void disconnect()
- {
- MBeanServerNotificationFilter filter = new MBeanServerNotificationFilter();
- try
- {
- _server.removeNotificationListener(MBeanServerDelegate.DELEGATE_NAME, this);
- }
- catch (Exception e)
- {
- _logger.error("", e);
- }
- }
-
- public abstract void onMBeanRegistered(MBeanServerConnection server, MBeanServerNotification mbsNotification);
-
- public abstract void onMBeanUnRegistered(MBeanServerConnection server, MBeanServerNotification mbsNotification);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/monitoring/mbeans/ClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/com/linkedin/helix/monitoring/mbeans/ClusterStatusMonitor.java
deleted file mode 100644
index 9af44be..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ /dev/null
@@ -1,283 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.monitoring.mbeans;
-
-import java.lang.management.ManagementFactory;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.concurrent.ConcurrentHashMap;
-
-import javax.management.MBeanServer;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.model.ExternalView;
-import com.linkedin.helix.model.IdealState;
-
-public class ClusterStatusMonitor implements ClusterStatusMonitorMBean
-{
- private static final Logger LOG =
- Logger.getLogger(ClusterStatusMonitor.class);
-
- static final String CLUSTER_STATUS_KEY =
- "ClusterStatus";
- static final String MESSAGE_QUEUE_STATUS_KEY =
- "MessageQueueStatus";
- static final String RESOURCE_STATUS_KEY =
- "ResourceStatus";
- static final String CLUSTER_DN_KEY =
- "cluster";
- static final String RESOURCE_DN_KEY =
- "resourceName";
- static final String INSTANCE_DN_KEY =
- "instanceName";
-
- private final String _clusterName;
- private final MBeanServer _beanServer;
-
- private int _numOfLiveInstances =
- 0;
- private int _numOfInstances =
- 0;
- private int _numOfDisabledInstances =
- 0;
- private int _numOfDisabledPartitions =
- 0;
-
- private final ConcurrentHashMap<String, ResourceMonitor> _resourceMbeanMap =
- new ConcurrentHashMap<String, ResourceMonitor>();
-
- private final ConcurrentHashMap<String, MessageQueueMonitor> _instanceMsgQueueMbeanMap =
- new ConcurrentHashMap<String, MessageQueueMonitor>();
-
- public ClusterStatusMonitor(String clusterName)
- {
- _clusterName = clusterName;
- _beanServer = ManagementFactory.getPlatformMBeanServer();
- try
- {
- register(this, getObjectName(CLUSTER_DN_KEY + "=" + _clusterName));
- }
- catch (Exception e)
- {
- LOG.error("Register self failed.", e);
- }
- }
-
- public ObjectName getObjectName(String name) throws MalformedObjectNameException
- {
- return new ObjectName(CLUSTER_STATUS_KEY + ": " + name);
- }
-
- // Used by other external JMX consumers like ingraph
- public String getBeanName()
- {
- return CLUSTER_STATUS_KEY + " " + _clusterName;
- }
-
- @Override
- public long getDownInstanceGauge()
- {
- return _numOfInstances - _numOfLiveInstances;
- }
-
- @Override
- public long getInstancesGauge()
- {
- return _numOfInstances;
- }
-
- @Override
- public long getDisabledInstancesGauge()
- {
- return _numOfDisabledInstances;
- }
-
- @Override
- public long getDisabledPartitionsGauge()
- {
- return _numOfDisabledPartitions;
- }
-
- @Override
- public long getMaxMessageQueueSizeGauge()
- {
- long maxQueueSize = 0;
- for (MessageQueueMonitor msgQueue : _instanceMsgQueueMbeanMap.values())
- {
- if (msgQueue.getMaxMessageQueueSize() > maxQueueSize)
- {
- maxQueueSize = (long)msgQueue.getMaxMessageQueueSize();
- }
- }
-
- return maxQueueSize;
- }
-
- @Override
- public String getMessageQueueSizes()
- {
- Map<String, Long> msgQueueSizes = new TreeMap<String, Long>();
- for (String instance : _instanceMsgQueueMbeanMap.keySet())
- {
- MessageQueueMonitor msgQueue = _instanceMsgQueueMbeanMap.get(instance);
- msgQueueSizes.put(instance, new Long( (long)msgQueue.getMaxMessageQueueSize()));
- }
-
- return msgQueueSizes.toString();
- }
-
- private void register(Object bean, ObjectName name)
- {
- try
- {
- if (_beanServer.isRegistered(name))
- {
- _beanServer.unregisterMBean(name);
- }
- }
- catch (Exception e)
- {
- // OK
- }
-
- try
- {
- LOG.info("Registering " + name.toString());
- _beanServer.registerMBean(bean, name);
- }
- catch (Exception e)
- {
- LOG.warn("Could not register MBean" + name, e);
- }
- }
-
- private void unregister(ObjectName name)
- {
- try
- {
- if (_beanServer.isRegistered(name))
- {
- LOG.info("Unregistering " + name.toString());
- _beanServer.unregisterMBean(name);
- }
- }
- catch (Exception e)
- {
- LOG.warn("Could not unregister MBean" + name, e);
- }
- }
-
- public void setClusterStatusCounters(int numberLiveInstances,
- int numberOfInstances,
- int disabledInstances,
- int disabledPartitions)
- {
- _numOfInstances = numberOfInstances;
- _numOfLiveInstances = numberLiveInstances;
- _numOfDisabledInstances = disabledInstances;
- _numOfDisabledPartitions = disabledPartitions;
- }
-
- public void onExternalViewChange(ExternalView externalView, IdealState idealState)
- {
- try
- {
- String resourceName = externalView.getId();
- if (!_resourceMbeanMap.containsKey(resourceName))
- {
- synchronized (this)
- {
- if (!_resourceMbeanMap.containsKey(resourceName))
- {
- ResourceMonitor bean = new ResourceMonitor(_clusterName, resourceName);
- String beanName =
- CLUSTER_DN_KEY + "=" + _clusterName + "," + RESOURCE_DN_KEY + "="
- + resourceName;
- register(bean, getObjectName(beanName));
- _resourceMbeanMap.put(resourceName, bean);
- }
- }
- }
- _resourceMbeanMap.get(resourceName).updateExternalView(externalView, idealState);
- }
- catch (Exception e)
- {
- LOG.warn(e);
- }
- }
-
- public void addMessageQueueSize(String instanceName, int msgQueueSize)
- {
- try
- {
- if (!_instanceMsgQueueMbeanMap.containsKey(instanceName))
- {
- synchronized (this)
- {
- if (!_instanceMsgQueueMbeanMap.containsKey(instanceName))
- {
- MessageQueueMonitor bean =
- new MessageQueueMonitor(_clusterName, instanceName);
- _instanceMsgQueueMbeanMap.put(instanceName, bean);
- }
- }
- }
- _instanceMsgQueueMbeanMap.get(instanceName).addMessageQueueSize(msgQueueSize);
- }
- catch (Exception e)
- {
- LOG.warn("fail to add message queue size to mbean", e);
- }
- }
-
- public void reset()
- {
- LOG.info("Resetting ClusterStatusMonitor");
- try
- {
- for (String resourceName : _resourceMbeanMap.keySet())
- {
- String beanName =
- CLUSTER_DN_KEY + "=" + _clusterName + "," + RESOURCE_DN_KEY + "="
- + resourceName;
- unregister(getObjectName(beanName));
- }
- _resourceMbeanMap.clear();
-
- for (MessageQueueMonitor bean : _instanceMsgQueueMbeanMap.values())
- {
- bean.reset();
- }
- _instanceMsgQueueMbeanMap.clear();
-
- unregister(getObjectName(CLUSTER_DN_KEY + "=" + _clusterName));
- }
- catch (Exception e)
- {
- LOG.error("fail to reset ClusterStatusMonitor", e);
- }
- }
-
- @Override
- public String getSensorName()
- {
- return CLUSTER_STATUS_KEY + "_" + _clusterName;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java b/helix-core/src/main/java/com/linkedin/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
deleted file mode 100644
index 83f208c..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.monitoring.mbeans;
-
-import com.linkedin.helix.monitoring.SensorNameProvider;
-
-public interface ClusterStatusMonitorMBean extends SensorNameProvider
-{
- public long getDownInstanceGauge();
-
- public long getInstancesGauge();
-
- public long getDisabledInstancesGauge();
-
- public long getDisabledPartitionsGauge();
-
- /**
- * The max message queue size across all instances including controller
- * will report to ingraph
- * @return
- */
- public long getMaxMessageQueueSizeGauge();
-
- /**
- * Get all message queue sizes as a string
- * will NOT report to ingraph
- * @return
- */
- public String getMessageQueueSizes();
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/monitoring/mbeans/HelixStageLatencyMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/monitoring/mbeans/HelixStageLatencyMonitor.java b/helix-core/src/main/java/com/linkedin/helix/monitoring/mbeans/HelixStageLatencyMonitor.java
deleted file mode 100644
index 05b4407..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/monitoring/mbeans/HelixStageLatencyMonitor.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.monitoring.mbeans;
-
-import java.lang.management.ManagementFactory;
-
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.monitoring.StatCollector;
-
-public class HelixStageLatencyMonitor implements HelixStageLatencyMonitorMBean
-{
- private static final Logger LOG = Logger.getLogger(HelixStageLatencyMonitor.class);
-
- private final StatCollector _stgLatency;
- private final MBeanServer _beanServer;
- private final String _clusterName;
- private final String _stageName;
- private final ObjectName _objectName;
-
- public HelixStageLatencyMonitor(String clusterName, String stageName) throws Exception
- {
- _clusterName = clusterName;
- _stageName = stageName;
- _stgLatency = new StatCollector();
- _beanServer = ManagementFactory.getPlatformMBeanServer();
- _objectName = new ObjectName("StageLatencyMonitor: " + "cluster=" + _clusterName + ",stage=" + _stageName);
- try
- {
- register(this, _objectName);
- }
- catch (Exception e)
- {
- LOG.error("Couldn't register " + _objectName + " mbean", e);
- throw e;
- }
- }
-
- private void register(Object bean, ObjectName name) throws Exception
- {
- try
- {
- _beanServer.unregisterMBean(name);
- }
- catch (Exception e)
- {
- // OK
- }
-
- _beanServer.registerMBean(bean, name);
- }
-
- private void unregister(ObjectName name)
- {
- try
- {
- if (_beanServer.isRegistered(name))
- {
- _beanServer.unregisterMBean(name);
- }
- }
- catch (Exception e)
- {
- LOG.error("Couldn't unregister " + _objectName + " mbean", e);
- }
- }
-
- public void addStgLatency(long time)
- {
- _stgLatency.addData(time);
- }
-
- public void reset()
- {
- _stgLatency.reset();
- unregister(_objectName);
- }
-
- @Override
- public long getMaxStgLatency()
- {
- return (long) _stgLatency.getMax();
- }
-
- @Override
- public long getMeanStgLatency()
- {
- return (long) _stgLatency.getMean();
- }
-
- @Override
- public long get95StgLatency()
- {
- return (long) _stgLatency.getPercentile(95);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/monitoring/mbeans/HelixStageLatencyMonitorMBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/monitoring/mbeans/HelixStageLatencyMonitorMBean.java b/helix-core/src/main/java/com/linkedin/helix/monitoring/mbeans/HelixStageLatencyMonitorMBean.java
deleted file mode 100644
index 315a484..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/monitoring/mbeans/HelixStageLatencyMonitorMBean.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.monitoring.mbeans;
-
-public interface HelixStageLatencyMonitorMBean
-{
- public long getMaxStgLatency();
-
- public long getMeanStgLatency();
-
- public long get95StgLatency();
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/monitoring/mbeans/MessageQueueMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/monitoring/mbeans/MessageQueueMonitor.java b/helix-core/src/main/java/com/linkedin/helix/monitoring/mbeans/MessageQueueMonitor.java
deleted file mode 100644
index ec63a0a..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/monitoring/mbeans/MessageQueueMonitor.java
+++ /dev/null
@@ -1,51 +0,0 @@
-package com.linkedin.helix.monitoring.mbeans;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.monitoring.StatCollector;
-
-public class MessageQueueMonitor implements MessageQueueMonitorMBean
-{
- private static final Logger LOG = Logger.getLogger(MessageQueueMonitor.class);
-
- private final StatCollector _messageQueueSizeStat;
- private final String _clusterName;
- private final String _instanceName;
-
- public MessageQueueMonitor(String clusterName, String instanceName)
- {
- _clusterName = clusterName;
- _instanceName = instanceName;
- _messageQueueSizeStat = new StatCollector();
- }
-
-
- public void addMessageQueueSize(long size)
- {
- _messageQueueSizeStat.addData(size);
- }
-
- public void reset()
- {
- _messageQueueSizeStat.reset();
- }
-
- @Override
- public double getMaxMessageQueueSize()
- {
- return _messageQueueSizeStat.getMax();
- }
-
- @Override
- public double getMeanMessageQueueSize()
- {
- return _messageQueueSizeStat.getMean();
- }
-
- @Override
- public String getSensorName()
- {
- return ClusterStatusMonitor.MESSAGE_QUEUE_STATUS_KEY + "_" + _clusterName + "_"
- + _instanceName;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/monitoring/mbeans/MessageQueueMonitorMBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/monitoring/mbeans/MessageQueueMonitorMBean.java b/helix-core/src/main/java/com/linkedin/helix/monitoring/mbeans/MessageQueueMonitorMBean.java
deleted file mode 100644
index 9270632..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/monitoring/mbeans/MessageQueueMonitorMBean.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package com.linkedin.helix.monitoring.mbeans;
-
-import com.linkedin.helix.monitoring.SensorNameProvider;
-
-public interface MessageQueueMonitorMBean extends SensorNameProvider
-{
- /**
- * Get the max message queue size
- * @return
- */
- public double getMaxMessageQueueSize();
-
- /**
- * Get the mean message queue size
- * @return
- */
- public double getMeanMessageQueueSize();
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/monitoring/mbeans/ResourceMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/monitoring/mbeans/ResourceMonitor.java b/helix-core/src/main/java/com/linkedin/helix/monitoring/mbeans/ResourceMonitor.java
deleted file mode 100644
index 19c8bb3..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/monitoring/mbeans/ResourceMonitor.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.monitoring.mbeans;
-
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.DataAccessor;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.PropertyType;
-import com.linkedin.helix.model.ExternalView;
-import com.linkedin.helix.model.IdealState;
-
-public class ResourceMonitor implements ResourceMonitorMBean
-{
- int _numOfPartitions;
- int _numOfPartitionsInExternalView;
- int _numOfErrorPartitions;
- int _externalViewIdealStateDiff;
- private static final Logger LOG = Logger.getLogger(ResourceMonitor.class);
-
- String _resourceName, _clusterName;
-
- public ResourceMonitor(String clusterName, String resourceName)
- {
- _clusterName = clusterName;
- _resourceName = resourceName;
- }
-
- @Override
- public long getPartitionGauge()
- {
- return _numOfPartitions;
- }
-
- @Override
- public long getErrorPartitionGauge()
- {
- return _numOfErrorPartitions;
- }
-
- @Override
- public long getDifferenceWithIdealStateGauge()
- {
- return _externalViewIdealStateDiff;
- }
-
- @Override
- public String getSensorName()
- {
- return ClusterStatusMonitor.RESOURCE_STATUS_KEY + "_" + _clusterName + "_"
- + _resourceName;
- }
-
- public void updateExternalView(ExternalView externalView, IdealState idealState)
- {
- if (externalView == null)
- {
- LOG.warn("external view is null");
- return;
- }
- String resourceName = externalView.getId();
-
- if (idealState == null)
- {
- LOG.warn("ideal state is null for " + resourceName);
- _numOfErrorPartitions = 0;
- _externalViewIdealStateDiff = 0;
- _numOfPartitionsInExternalView = 0;
- return;
- }
-
- assert (resourceName.equals(idealState.getId()));
-
- int numOfErrorPartitions = 0;
- int numOfDiff = 0;
-
- if (_numOfPartitions == 0)
- {
- _numOfPartitions = idealState.getRecord().getMapFields().size();
- }
-
- // TODO fix this; IdealState shall have either map fields (CUSTOM mode)
- // or list fields (AUDO mode)
- for (String partitionName : idealState.getRecord().getMapFields().keySet())
- {
- Map<String, String> idealRecord = idealState.getInstanceStateMap(partitionName);
- Map<String, String> externalViewRecord = externalView.getStateMap(partitionName);
-
- if (externalViewRecord == null)
- {
- numOfDiff += idealRecord.size();
- continue;
- }
- for (String host : idealRecord.keySet())
- {
- if (!externalViewRecord.containsKey(host)
- || !externalViewRecord.get(host).equals(idealRecord.get(host)))
- {
- numOfDiff++;
- }
- }
-
- for (String host : externalViewRecord.keySet())
- {
- if (externalViewRecord.get(host).equalsIgnoreCase("ERROR"))
- {
- numOfErrorPartitions++;
- }
- }
- }
- _numOfErrorPartitions = numOfErrorPartitions;
- _externalViewIdealStateDiff = numOfDiff;
- _numOfPartitionsInExternalView = externalView.getPartitionSet().size();
- }
-
- @Override
- public long getExternalViewPartitionGauge()
- {
- return _numOfPartitionsInExternalView;
- }
-
- public String getBeanName()
- {
- return _clusterName + " " + _resourceName;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/monitoring/mbeans/ResourceMonitorMBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/monitoring/mbeans/ResourceMonitorMBean.java b/helix-core/src/main/java/com/linkedin/helix/monitoring/mbeans/ResourceMonitorMBean.java
deleted file mode 100644
index a76caf8..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/monitoring/mbeans/ResourceMonitorMBean.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.monitoring.mbeans;
-
-import com.linkedin.helix.monitoring.SensorNameProvider;
-
-public interface ResourceMonitorMBean extends SensorNameProvider
-{
- public long getPartitionGauge();
-
- public long getErrorPartitionGauge();
-
- public long getDifferenceWithIdealStateGauge();
-
- public long getExternalViewPartitionGauge();
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/monitoring/mbeans/StateTransitionStatMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/monitoring/mbeans/StateTransitionStatMonitor.java b/helix-core/src/main/java/com/linkedin/helix/monitoring/mbeans/StateTransitionStatMonitor.java
deleted file mode 100644
index 588c649..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/monitoring/mbeans/StateTransitionStatMonitor.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.monitoring.mbeans;
-
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
-
-import com.linkedin.helix.monitoring.StatCollector;
-import com.linkedin.helix.monitoring.StateTransitionContext;
-import com.linkedin.helix.monitoring.StateTransitionDataPoint;
-
-public class StateTransitionStatMonitor implements StateTransitionStatMonitorMBean
-{
- public enum LATENCY_TYPE {TOTAL, EXECUTION};
-
- private static final int DEFAULT_WINDOW_SIZE = 4000;
- private long _numDataPoints;
- private long _successCount;
- private TimeUnit _unit;
-
- private ConcurrentHashMap<LATENCY_TYPE, StatCollector> _monitorMap
- = new ConcurrentHashMap<LATENCY_TYPE, StatCollector>();
-
- StateTransitionContext _context;
-
- public StateTransitionStatMonitor(StateTransitionContext context, TimeUnit unit)
- {
- _context = context;
- _monitorMap.put(LATENCY_TYPE.TOTAL, new StatCollector());
- _monitorMap.put(LATENCY_TYPE.EXECUTION, new StatCollector());
- reset();
- }
-
- public StateTransitionContext getContext()
- {
- return _context;
- }
-
- public String getBeanName()
- {
- return _context.getClusterName()+" "+_context.getResourceName()+" "+_context.getTransition();
- }
-
- public void addDataPoint(StateTransitionDataPoint data)
- {
- _numDataPoints++;
- if(data.getSuccess())
- {
- _successCount++;
- }
- // should we count only the transition time for successful transitions?
- addLatency(LATENCY_TYPE.TOTAL, data.getTotalDelay());
- addLatency(LATENCY_TYPE.EXECUTION, data.getExecutionDelay());
- }
-
- void addLatency(LATENCY_TYPE type, double latency)
- {
- assert(_monitorMap.containsKey(type));
- _monitorMap.get(type).addData(latency);
- }
-
- public long getNumDataPoints()
- {
- return _numDataPoints;
- }
-
- public void reset()
- {
- _numDataPoints = 0;
- _successCount = 0;
- for(StatCollector monitor : _monitorMap.values())
- {
- monitor.reset();
- }
- }
-
- @Override
- public long getTotalStateTransitionGauge()
- {
- return _numDataPoints;
- }
-
- @Override
- public long getTotalFailedTransitionGauge()
- {
- return _numDataPoints - _successCount;
- }
-
- @Override
- public long getTotalSuccessTransitionGauge()
- {
- return _successCount;
- }
-
- @Override
- public double getMeanTransitionLatency()
- {
- return _monitorMap.get(LATENCY_TYPE.TOTAL).getMean();
- }
-
- @Override
- public double getMaxTransitionLatency()
- {
- return _monitorMap.get(LATENCY_TYPE.TOTAL).getMax();
- }
-
- @Override
- public double getMinTransitionLatency()
- {
- return _monitorMap.get(LATENCY_TYPE.TOTAL).getMin();
- }
-
- @Override
- public double getPercentileTransitionLatency(int percentage)
- {
- return _monitorMap.get(LATENCY_TYPE.TOTAL).getPercentile(percentage);
- }
-
- @Override
- public double getMeanTransitionExecuteLatency()
- {
- return _monitorMap.get(LATENCY_TYPE.EXECUTION).getMean();
- }
-
- @Override
- public double getMaxTransitionExecuteLatency()
- {
- return _monitorMap.get(LATENCY_TYPE.EXECUTION).getMax();
- }
-
- @Override
- public double getMinTransitionExecuteLatency()
- {
- return _monitorMap.get(LATENCY_TYPE.EXECUTION).getMin();
- }
-
- @Override
- public double getPercentileTransitionExecuteLatency(int percentage)
- {
- return _monitorMap.get(LATENCY_TYPE.EXECUTION).getPercentile(percentage);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/monitoring/mbeans/StateTransitionStatMonitorMBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/monitoring/mbeans/StateTransitionStatMonitorMBean.java b/helix-core/src/main/java/com/linkedin/helix/monitoring/mbeans/StateTransitionStatMonitorMBean.java
deleted file mode 100644
index 7800c19..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/monitoring/mbeans/StateTransitionStatMonitorMBean.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.monitoring.mbeans;
-
-
-public interface StateTransitionStatMonitorMBean
-{
- long getTotalStateTransitionGauge();
-
- long getTotalFailedTransitionGauge();
-
- long getTotalSuccessTransitionGauge();
-
- double getMeanTransitionLatency();
-
- double getMaxTransitionLatency();
-
- double getMinTransitionLatency();
-
- double getPercentileTransitionLatency(int percentage);
-
- double getMeanTransitionExecuteLatency();
-
- double getMaxTransitionExecuteLatency();
-
- double getMinTransitionExecuteLatency();
-
- double getPercentileTransitionExecuteLatency(int percentage);
-
- void reset();
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/monitoring/mbeans/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/monitoring/mbeans/package-info.java b/helix-core/src/main/java/com/linkedin/helix/monitoring/mbeans/package-info.java
deleted file mode 100644
index 58eb6d5..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/monitoring/mbeans/package-info.java
+++ /dev/null
@@ -1,5 +0,0 @@
-/**
- * Helix jmx bean classes
- *
- */
-package com.linkedin.helix.monitoring.mbeans;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/monitoring/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/monitoring/package-info.java b/helix-core/src/main/java/com/linkedin/helix/monitoring/package-info.java
deleted file mode 100644
index b6f1652..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/monitoring/package-info.java
+++ /dev/null
@@ -1,5 +0,0 @@
-/**
- * Helix monitoring classes
- *
- */
-package com.linkedin.helix.monitoring;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/package-info.java b/helix-core/src/main/java/com/linkedin/helix/package-info.java
deleted file mode 100644
index 05b8846..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/package-info.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Provide the classes necessary to create a Helix cluster manager
- * <p>
- * General flow
- * <blockquote>
- * <pre>
- * manager = HelixManagerFactory.getManagerForROLE(); ROLE can be participant, spectator or a controller<br/>
- * manager.connect();
- * manager.addSOMEListener();
- * After connect the subsequent interactions will be via listener onChange callbacks
- * There will be 3 scenarios for onChange callback, which can be determined using NotificationContext.type
- * INIT -> will be invoked the first time the listener is added
- * CALLBACK -> will be invoked due to datachange in the property value
- * FINALIZE -> will be invoked when listener is removed or session expires
- *
- * manager.disconnect()
- * </pre>
- *
- * </blockquote>
- *
- * Default implementations available
- *
- * @see com.linkedin.helix.participant.HelixStateMachineEngine for participant
- * @see RoutingTableProvider for spectator
- * @see GenericHelixController for controller
- *
- * @author kgopalak
- */
-package com.linkedin.helix;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/participant/CustomCodeCallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/participant/CustomCodeCallbackHandler.java b/helix-core/src/main/java/com/linkedin/helix/participant/CustomCodeCallbackHandler.java
deleted file mode 100644
index 9d9edb8..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/participant/CustomCodeCallbackHandler.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.participant;
-
-import com.linkedin.helix.NotificationContext;
-
-/**
- * Callback interface to running custom code on Helix participant manager
- * The custom code will be triggered on user specified cluster changes
- */
-public interface CustomCodeCallbackHandler
-{
- /**
- * callback
- * @param context
- */
- public void onCallback(NotificationContext context);
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/participant/CustomCodeInvoker.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/participant/CustomCodeInvoker.java b/helix-core/src/main/java/com/linkedin/helix/participant/CustomCodeInvoker.java
deleted file mode 100644
index 0c3eef5..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/participant/CustomCodeInvoker.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.participant;
-
-import java.util.List;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.ConfigChangeListener;
-import com.linkedin.helix.ExternalViewChangeListener;
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.LiveInstanceChangeListener;
-import com.linkedin.helix.NotificationContext;
-import com.linkedin.helix.NotificationContext.Type;
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.model.CurrentState;
-import com.linkedin.helix.model.ExternalView;
-import com.linkedin.helix.model.InstanceConfig;
-import com.linkedin.helix.model.LiveInstance;
-
-public class CustomCodeInvoker implements
- LiveInstanceChangeListener,
- ConfigChangeListener,
- ExternalViewChangeListener
-{
- private static Logger LOG = Logger.getLogger(CustomCodeInvoker.class);
- private final CustomCodeCallbackHandler _callback;
- private final String _partitionKey;
-
- public CustomCodeInvoker(CustomCodeCallbackHandler callback, String partitionKey)
- {
- _callback = callback;
- _partitionKey = partitionKey;
- }
-
- private void callParticipantCode(NotificationContext context)
- {
- // System.out.println("callback invoked. type:" + context.getType().toString());
- if (context.getType() == Type.INIT || context.getType() == Type.CALLBACK)
- {
- // since ZkClient.unsubscribe() does not immediately remove listeners
- // from zk, it is possible that two listeners exist when leadership transfers
- // therefore, double check to make sure only one participant invokes the code
- if (context.getType() == Type.CALLBACK)
- {
- HelixManager manager = context.getManager();
- // DataAccessor accessor = manager.getDataAccessor();
- HelixDataAccessor accessor = manager.getHelixDataAccessor();
- Builder keyBuilder = accessor.keyBuilder();
-
- String instance = manager.getInstanceName();
- String sessionId = manager.getSessionId();
-
- // get resource name from partition key: "PARTICIPANT_LEADER_XXX_0"
- String resourceName = _partitionKey.substring(0, _partitionKey.lastIndexOf('_'));
-
- CurrentState curState =
- accessor.getProperty(keyBuilder.currentState(instance,
- sessionId,
- resourceName));
- if (curState == null)
- {
- return;
- }
-
- String state = curState.getState(_partitionKey);
- if (state == null || !state.equalsIgnoreCase("LEADER"))
- {
- return;
- }
- }
-
- try
- {
- _callback.onCallback(context);
- }
- catch (Exception e)
- {
- LOG.error("Error invoking callback:" + _callback, e);
- }
- }
- }
-
- @Override
- public void onLiveInstanceChange(List<LiveInstance> liveInstances,
- NotificationContext changeContext)
- {
- LOG.info("onLiveInstanceChange() invoked");
- callParticipantCode(changeContext);
- }
-
- @Override
- public void onConfigChange(List<InstanceConfig> configs,
- NotificationContext changeContext)
- {
- LOG.info("onConfigChange() invoked");
- callParticipantCode(changeContext);
- }
-
- @Override
- public void onExternalViewChange(List<ExternalView> externalViewList,
- NotificationContext changeContext)
- {
- LOG.info("onExternalViewChange() invoked");
- callParticipantCode(changeContext);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/participant/DistClusterControllerElection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/participant/DistClusterControllerElection.java b/helix-core/src/main/java/com/linkedin/helix/participant/DistClusterControllerElection.java
deleted file mode 100644
index ed253d0..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/participant/DistClusterControllerElection.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.participant;
-
-import java.lang.management.ManagementFactory;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.ControllerChangeListener;
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.HelixManagerFactory;
-import com.linkedin.helix.InstanceType;
-import com.linkedin.helix.NotificationContext;
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.PropertyType;
-import com.linkedin.helix.controller.GenericHelixController;
-import com.linkedin.helix.controller.HelixControllerMain;
-import com.linkedin.helix.controller.restlet.ZKPropertyTransferServer;
-import com.linkedin.helix.model.LeaderHistory;
-import com.linkedin.helix.model.LiveInstance;
-
-// TODO: merge with GenericHelixController
-public class DistClusterControllerElection implements ControllerChangeListener
-{
- private static Logger LOG =
- Logger.getLogger(DistClusterControllerElection.class);
- private final String _zkAddr;
- private final GenericHelixController _controller = new GenericHelixController();
- private HelixManager _leader;
-
- public DistClusterControllerElection(String zkAddr)
- {
- _zkAddr = zkAddr;
- }
-
- /**
- * may be accessed by multiple threads: zk-client thread and
- * ZkHelixManager.disconnect()->reset() TODO: Refactor accessing HelixMaangerMain class
- * statically
- */
- @Override
- public synchronized void onControllerChange(NotificationContext changeContext)
- {
- HelixManager manager = changeContext.getManager();
- if (manager == null)
- {
- LOG.error("missing attributes in changeContext. requires HelixManager");
- return;
- }
-
- InstanceType type = manager.getInstanceType();
- if (type != InstanceType.CONTROLLER && type != InstanceType.CONTROLLER_PARTICIPANT)
- {
- LOG.error("fail to become controller because incorrect instanceType (was "
- + type.toString() + ", requires CONTROLLER | CONTROLLER_PARTICIPANT)");
- return;
- }
-
- try
- {
- if (changeContext.getType().equals(NotificationContext.Type.INIT)
- || changeContext.getType().equals(NotificationContext.Type.CALLBACK))
- {
- // DataAccessor dataAccessor = manager.getDataAccessor();
- HelixDataAccessor accessor = manager.getHelixDataAccessor();
- Builder keyBuilder = accessor.keyBuilder();
-
- while (accessor.getProperty(keyBuilder.controllerLeader()) == null)
- {
- boolean success = tryUpdateController(manager);
- if (success)
- {
- updateHistory(manager);
- if (type == InstanceType.CONTROLLER)
- {
- HelixControllerMain.addListenersToController(manager, _controller);
- manager.startTimerTasks();
- }
- else if (type == InstanceType.CONTROLLER_PARTICIPANT)
- {
- String clusterName = manager.getClusterName();
- String controllerName = manager.getInstanceName();
- _leader =
- HelixManagerFactory.getZKHelixManager(clusterName,
- controllerName,
- InstanceType.CONTROLLER,
- _zkAddr);
-
- _leader.connect();
- _leader.startTimerTasks();
- HelixControllerMain.addListenersToController(_leader, _controller);
- }
-
- }
- }
- }
- else if (changeContext.getType().equals(NotificationContext.Type.FINALIZE))
- {
-
- if (_leader != null)
- {
- _leader.disconnect();
- }
- }
-
- }
- catch (Exception e)
- {
- LOG.error("Exception when trying to become leader", e);
- }
- }
-
- private boolean tryUpdateController(HelixManager manager)
- {
- // DataAccessor dataAccessor = manager.getDataAccessor();
- HelixDataAccessor accessor = manager.getHelixDataAccessor();
- Builder keyBuilder = accessor.keyBuilder();
-
- LiveInstance leader = new LiveInstance(manager.getInstanceName());
- try
- {
- leader.setLiveInstance(ManagementFactory.getRuntimeMXBean().getName());
- // TODO: this session id is not the leader's session id in distributed mode
- leader.setSessionId(manager.getSessionId());
- leader.setHelixVersion(manager.getVersion());
- if(ZKPropertyTransferServer.getInstance() != null)
- {
- String zkPropertyTransferServiceUrl = ZKPropertyTransferServer.getInstance().getWebserviceUrl();
- if(zkPropertyTransferServiceUrl != null)
- {
- leader.setWebserviceUrl(zkPropertyTransferServiceUrl);
- }
- }
- else
- {
- LOG.warn("ZKPropertyTransferServer instnace is null");
- }
- boolean success = accessor.createProperty(keyBuilder.controllerLeader(), leader);
- if (success)
- {
- return true;
- }
- else
- {
- LOG.info("Unable to become leader probably because some other controller becames the leader");
- }
- }
- catch (Exception e)
- {
- LOG.error("Exception when trying to updating leader record in cluster:"
- + manager.getClusterName()
- + ". Need to check again whether leader node has been created or not",
- e);
- }
-
- leader = accessor.getProperty(keyBuilder.controllerLeader());
- if (leader != null)
- {
- String leaderName = leader.getInstanceName(); // leader.getLeader();
- LOG.info("Leader exists for cluster:" + manager.getClusterName()
- + ", currentLeader:" + leaderName);
-
- if (leaderName != null && leaderName.equals(manager.getInstanceName()))
- {
- return true;
- }
- }
-
- return false;
- }
-
- private void updateHistory(HelixManager manager)
- {
- HelixDataAccessor accessor = manager.getHelixDataAccessor();
- Builder keyBuilder = accessor.keyBuilder();
-
- LeaderHistory history = accessor.getProperty(keyBuilder.controllerLeaderHistory());
- if (history == null)
- {
- history = new LeaderHistory(PropertyType.HISTORY.toString());
- }
- history.updateHistory(manager.getClusterName(), manager.getInstanceName());
- accessor.setProperty(keyBuilder.controllerLeaderHistory(), history);
- }
-}