You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/07/11 21:58:02 UTC
[07/17] [HELIX-395] Remove old Helix alert/stat modules
http://git-wip-us.apache.org/repos/asf/helix/blob/38b43965/helix-core/src/main/java/org/apache/helix/controller/stages/StatsAggregationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/StatsAggregationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/StatsAggregationStage.java
deleted file mode 100644
index c48f156..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/StatsAggregationStage.java
+++ /dev/null
@@ -1,399 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.text.SimpleDateFormat;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixProperty;
-import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.PropertyType;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.alerts.AlertParser;
-import org.apache.helix.alerts.AlertProcessor;
-import org.apache.helix.alerts.AlertValueAndStatus;
-import org.apache.helix.alerts.AlertsHolder;
-import org.apache.helix.alerts.ExpressionParser;
-import org.apache.helix.alerts.StatsHolder;
-import org.apache.helix.alerts.Tuple;
-import org.apache.helix.controller.pipeline.AbstractBaseStage;
-import org.apache.helix.controller.pipeline.StageContext;
-import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.healthcheck.StatHealthReportProvider;
-import org.apache.helix.manager.zk.DefaultParticipantErrorMessageHandlerFactory.ActionOnError;
-import org.apache.helix.model.AlertHistory;
-import org.apache.helix.model.HealthStat;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.PersistentStats;
-import org.apache.helix.monitoring.mbeans.ClusterAlertMBeanCollection;
-import org.apache.log4j.Logger;
-
-/**
- * For each LiveInstances select currentState and message whose sessionId matches
- * sessionId from LiveInstance Get Partition,State for all the resources computed in
- * previous State [ResourceComputationStage]
- */
-public class StatsAggregationStage extends AbstractBaseStage {
-
- public static final int ALERT_HISTORY_SIZE = 30;
-
- private static final Logger logger = Logger.getLogger(StatsAggregationStage.class.getName());
-
- StatsHolder _statsHolder = null;
- AlertsHolder _alertsHolder = null;
- Map<String, Map<String, AlertValueAndStatus>> _alertStatus;
- Map<String, Tuple<String>> _statStatus;
- ClusterAlertMBeanCollection _alertBeanCollection = new ClusterAlertMBeanCollection();
- Map<String, String> _alertActionTaken = new HashMap<String, String>();
-
- public final String PARTICIPANT_STAT_REPORT_NAME = StatHealthReportProvider.REPORT_NAME;
- public final String ESPRESSO_STAT_REPORT_NAME = "RestQueryStats";
- public final String REPORT_NAME = "AggStats";
- // public final String DEFAULT_AGG_TYPE = "decay";
- // public final String DEFAULT_DECAY_PARAM = "0.1";
- // public final String DEFAULT_AGG_TYPE = "window";
- // public final String DEFAULT_DECAY_PARAM = "5";
-
- public StatHealthReportProvider _aggStatsProvider;
-
- // public AggregationType _defaultAggType;
-
- public Map<String, Map<String, AlertValueAndStatus>> getAlertStatus() {
- return _alertStatus;
- }
-
- public Map<String, Tuple<String>> getStatStatus() {
- return _statStatus;
- }
-
- public void persistAggStats(HelixManager manager) {
- Map<String, String> report = _aggStatsProvider.getRecentHealthReport();
- Map<String, Map<String, String>> partitionReport =
- _aggStatsProvider.getRecentPartitionHealthReport();
- ZNRecord record = new ZNRecord(_aggStatsProvider.getReportName());
- if (report != null) {
- record.setSimpleFields(report);
- }
- if (partitionReport != null) {
- record.setMapFields(partitionReport);
- }
-
- // DataAccessor accessor = manager.getDataAccessor();
- HelixDataAccessor accessor = manager.getHelixDataAccessor();
- // boolean retVal = accessor.setProperty(PropertyType.PERSISTENTSTATS, record);
- Builder keyBuilder = accessor.keyBuilder();
- boolean retVal = accessor.setProperty(keyBuilder.persistantStat(), new PersistentStats(record));
- if (retVal == false) {
- logger.error("attempt to persist derived stats failed");
- }
- }
-
- @Override
- public void init(StageContext context) {
- }
-
- public String getAgeStatName(String instance) {
- return instance + ExpressionParser.statFieldDelim + "reportingage";
- }
-
- // currTime in seconds
- public void reportAgeStat(LiveInstance instance, long modifiedTime, long currTime) {
- String statName = getAgeStatName(instance.getInstanceName());
- long age = (currTime - modifiedTime) / 1000; // XXX: ensure this is in
- // seconds
- Map<String, String> ageStatMap = new HashMap<String, String>();
- ageStatMap.put(StatsHolder.TIMESTAMP_NAME, String.valueOf(currTime));
- ageStatMap.put(StatsHolder.VALUE_NAME, String.valueOf(age));
- // note that applyStat will only work if alert already added
- _statsHolder.applyStat(statName, ageStatMap);
- }
-
- @Override
- public void process(ClusterEvent event) throws Exception {
- long startTime = System.currentTimeMillis();
- // String aggTypeName =
- // DEFAULT_AGG_TYPE+AggregationType.DELIM+DEFAULT_DECAY_PARAM;
- // _defaultAggType = AggregationTypeFactory.getAggregationType(aggTypeName);
-
- HelixManager manager = event.getAttribute("helixmanager");
- HealthDataCache cache = event.getAttribute("HealthDataCache");
-
- if (manager == null || cache == null) {
- throw new StageException("helixmanager|HealthDataCache attribute value is null");
- }
- if (_alertsHolder == null) {
- _statsHolder = new StatsHolder(manager, cache);
- _alertsHolder = new AlertsHolder(manager, cache, _statsHolder);
- } else {
- _statsHolder.updateCache(cache);
- _alertsHolder.updateCache(cache);
- }
- if (_statsHolder.getStatsList().size() == 0) {
- if (logger.isTraceEnabled()) {
- logger.trace("stat holder is empty");
- }
- return;
- }
-
- // init agg stats from cache
- // initAggStats(cache);
-
- Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
-
- long currTime = System.currentTimeMillis();
- // for each live node, read node's stats
- long readInstancesStart = System.currentTimeMillis();
- for (LiveInstance instance : liveInstances.values()) {
- String instanceName = instance.getInstanceName();
- logger.debug("instanceName: " + instanceName);
- // XXX: now have map of HealthStats, so no need to traverse them...verify
- // correctness
- Map<String, HealthStat> stats;
- stats = cache.getHealthStats(instanceName);
- // find participants stats
- long modTime = -1;
- // TODO: get healthreport child node modified time and reportAgeStat based on that
- boolean reportedAge = false;
- for (HealthStat participantStat : stats.values()) {
- if (participantStat != null && !reportedAge) {
- // generate and report stats for how old this node's report is
- modTime = participantStat.getLastModifiedTimeStamp();
- reportAgeStat(instance, modTime, currTime);
- reportedAge = true;
- }
- // System.out.println(modTime);
- // XXX: need to convert participantStat to a better format
- // need to get instanceName in here
-
- if (participantStat != null) {
- // String timestamp = String.valueOf(instance.getModifiedTime()); WANT
- // REPORT LEVEL TS
- Map<String, Map<String, String>> statMap = participantStat.getHealthFields(instanceName);
- for (String key : statMap.keySet()) {
- _statsHolder.applyStat(key, statMap.get(key));
- }
- }
- }
- }
- // Call _statsHolder.persistStats() once per pipeline. This will
- // write the updated persisted stats into zookeeper
- _statsHolder.persistStats();
- logger.info("Done processing stats: " + (System.currentTimeMillis() - readInstancesStart));
- // populate _statStatus
- _statStatus = _statsHolder.getStatsMap();
-
- for (String statKey : _statStatus.keySet()) {
- logger.debug("Stat key, value: " + statKey + ": " + _statStatus.get(statKey));
- }
-
- long alertExecuteStartTime = System.currentTimeMillis();
- // execute alerts, populate _alertStatus
- _alertStatus =
- AlertProcessor.executeAllAlerts(_alertsHolder.getAlertList(), _statsHolder.getStatsList());
- logger.info("done executing alerts: " + (System.currentTimeMillis() - alertExecuteStartTime));
- for (String originAlertName : _alertStatus.keySet()) {
- _alertBeanCollection.setAlerts(originAlertName, _alertStatus.get(originAlertName),
- manager.getClusterName());
- }
-
- executeAlertActions(manager);
- // Write alert fire history to zookeeper
- updateAlertHistory(manager);
- long writeAlertStartTime = System.currentTimeMillis();
- // write out alert status (to zk)
- _alertsHolder.addAlertStatusSet(_alertStatus);
- logger.info("done writing alerts: " + (System.currentTimeMillis() - writeAlertStartTime));
-
- // TODO: access the 2 status variables from somewhere to populate graphs
-
- long logAlertStartTime = System.currentTimeMillis();
- // logging alert status
- for (String alertOuterKey : _alertStatus.keySet()) {
- logger.debug("Alert Outer Key: " + alertOuterKey);
- Map<String, AlertValueAndStatus> alertInnerMap = _alertStatus.get(alertOuterKey);
- if (alertInnerMap == null) {
- logger.debug(alertOuterKey + " has no alerts to report.");
- continue;
- }
- for (String alertInnerKey : alertInnerMap.keySet()) {
- logger.debug(" " + alertInnerKey + " value: "
- + alertInnerMap.get(alertInnerKey).getValue() + ", status: "
- + alertInnerMap.get(alertInnerKey).isFired());
- }
- }
-
- logger.info("done logging alerts: " + (System.currentTimeMillis() - logAlertStartTime));
-
- long processLatency = System.currentTimeMillis() - startTime;
- addLatencyToMonitor(event, processLatency);
- logger.info("process end: " + processLatency);
- }
-
- /**
- * Go through the _alertStatus, and call executeAlertAction for those actual alerts that
- * has been fired
- */
-
- void executeAlertActions(HelixManager manager) {
- _alertActionTaken.clear();
- // Go through the original alert strings
- for (String originAlertName : _alertStatus.keySet()) {
- Map<String, String> alertFields = _alertsHolder.getAlertsMap().get(originAlertName);
- if (alertFields != null && alertFields.containsKey(AlertParser.ACTION_NAME)) {
- String actionValue = alertFields.get(AlertParser.ACTION_NAME);
- Map<String, AlertValueAndStatus> alertResultMap = _alertStatus.get(originAlertName);
- if (alertResultMap == null) {
- logger.info("Alert " + originAlertName + " does not have alert status map");
- continue;
- }
- // For each original alert, iterate all actual alerts that it expands into
- for (String actualStatName : alertResultMap.keySet()) {
- // if the actual alert is fired, execute the action
- if (alertResultMap.get(actualStatName).isFired()) {
- logger.warn("Alert " + originAlertName + " action " + actionValue + " is triggered by "
- + actualStatName);
- _alertActionTaken.put(actualStatName, actionValue);
- // move functionalities into a seperate class
- executeAlertAction(actualStatName, actionValue, manager);
- }
- }
- }
- }
- }
-
- /**
- * Execute the action if an alert is fired, and the alert has an action associated with it.
- * NOTE: consider unify this with DefaultParticipantErrorMessageHandler.handleMessage()
- */
- void executeAlertAction(String actualStatName, String actionValue, HelixManager manager) {
- if (actionValue.equals(ActionOnError.DISABLE_INSTANCE.toString())) {
- String instanceName = parseInstanceName(actualStatName, manager);
- if (instanceName != null) {
- logger.info("Disabling instance " + instanceName);
- manager.getClusterManagmentTool().enableInstance(manager.getClusterName(), instanceName,
- false);
- }
- } else if (actionValue.equals(ActionOnError.DISABLE_PARTITION.toString())) {
- String instanceName = parseInstanceName(actualStatName, manager);
- String resourceName = parseResourceName(actualStatName, manager);
- String partitionName = parsePartitionName(actualStatName, manager);
- if (instanceName != null && resourceName != null && partitionName != null) {
- logger.info("Disabling partition " + partitionName + " instanceName " + instanceName);
- manager.getClusterManagmentTool().enablePartition(false, manager.getClusterName(),
- instanceName, resourceName, Arrays.asList(partitionName));
- }
- } else if (actionValue.equals(ActionOnError.DISABLE_RESOURCE.toString())) {
- String instanceName = parseInstanceName(actualStatName, manager);
- String resourceName = parseResourceName(actualStatName, manager);
- logger.info("Disabling resource " + resourceName + " instanceName " + instanceName
- + " not implemented");
-
- }
- }
-
- public static String parseResourceName(String actualStatName, HelixManager manager) {
- HelixDataAccessor accessor = manager.getHelixDataAccessor();
- Builder kb = accessor.keyBuilder();
- List<IdealState> idealStates = accessor.getChildValues(kb.idealStates());
- for (IdealState idealState : idealStates) {
- String resourceName = idealState.getResourceId().stringify();
- if (actualStatName.contains("=" + resourceName + ".")
- || actualStatName.contains("=" + resourceName + ";")) {
- return resourceName;
- }
- }
- return null;
- }
-
- public static String parsePartitionName(String actualStatName, HelixManager manager) {
- String resourceName = parseResourceName(actualStatName, manager);
- if (resourceName != null) {
- String partitionKey = "=" + resourceName + "_";
- if (actualStatName.contains(partitionKey)) {
- int pos = actualStatName.indexOf(partitionKey);
- int nextDotPos = actualStatName.indexOf('.', pos + partitionKey.length());
- int nextCommaPos = actualStatName.indexOf(';', pos + partitionKey.length());
- if (nextCommaPos > 0 && nextCommaPos < nextDotPos) {
- nextDotPos = nextCommaPos;
- }
-
- String partitionName = actualStatName.substring(pos + 1, nextDotPos);
- return partitionName;
- }
- }
- return null;
- }
-
- public static String parseInstanceName(String actualStatName, HelixManager manager) {
- HelixDataAccessor accessor = manager.getHelixDataAccessor();
- Builder kb = accessor.keyBuilder();
- List<LiveInstance> liveInstances = accessor.getChildValues(kb.liveInstances());
- for (LiveInstance instance : liveInstances) {
- String instanceName = instance.getInstanceName();
- if (actualStatName.startsWith(instanceName)) {
- return instanceName;
- }
- }
- return null;
- }
-
- void updateAlertHistory(HelixManager manager) {
- // Write alert fire history to zookeeper
- _alertBeanCollection.refreshAlertDelta(manager.getClusterName());
- Map<String, String> delta = _alertBeanCollection.getRecentAlertDelta();
- // Update history only when some beans has changed
- if (delta.size() > 0) {
- delta.putAll(_alertActionTaken);
- SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh:mm:ss:SSS");
- String date = dateFormat.format(new Date());
-
- HelixDataAccessor accessor = manager.getHelixDataAccessor();
- Builder keyBuilder = accessor.keyBuilder();
-
- HelixProperty property = accessor.getProperty(keyBuilder.alertHistory());
- ZNRecord alertFiredHistory;
- if (property == null) {
- alertFiredHistory = new ZNRecord(PropertyType.ALERT_HISTORY.toString());
- } else {
- alertFiredHistory = property.getRecord();
- }
- while (alertFiredHistory.getMapFields().size() >= ALERT_HISTORY_SIZE) {
- // ZNRecord uses TreeMap which is sorted ascending internally
- String firstKey = (String) (alertFiredHistory.getMapFields().keySet().toArray()[0]);
- alertFiredHistory.getMapFields().remove(firstKey);
- }
- alertFiredHistory.setMapField(date, delta);
- // manager.getDataAccessor().setProperty(PropertyType.ALERT_HISTORY, alertFiredHistory);
- accessor.setProperty(keyBuilder.alertHistory(), new AlertHistory(alertFiredHistory));
- _alertBeanCollection.setAlertHistory(alertFiredHistory);
- }
- }
-
- public ClusterAlertMBeanCollection getClusterAlertMBeanCollection() {
- return _alertBeanCollection;
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/38b43965/helix-core/src/main/java/org/apache/helix/healthcheck/AccumulateAggregationType.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/AccumulateAggregationType.java b/helix-core/src/main/java/org/apache/helix/healthcheck/AccumulateAggregationType.java
deleted file mode 100644
index a3c443f..0000000
--- a/helix-core/src/main/java/org/apache/helix/healthcheck/AccumulateAggregationType.java
+++ /dev/null
@@ -1,41 +0,0 @@
-package org.apache.helix.healthcheck;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.log4j.Logger;
-
-public class AccumulateAggregationType implements AggregationType {
-
- private static final Logger logger = Logger.getLogger(AccumulateAggregationType.class);
-
- public final static String TYPE_NAME = "accumulate";
-
- @Override
- public String getName() {
- return TYPE_NAME;
- }
-
- @Override
- public String merge(String iv, String ev, long prevTimestamp) {
- double inVal = Double.parseDouble(iv);
- double existingVal = Double.parseDouble(ev);
- return String.valueOf(inVal + existingVal);
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/38b43965/helix-core/src/main/java/org/apache/helix/healthcheck/AggregationType.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/AggregationType.java b/helix-core/src/main/java/org/apache/helix/healthcheck/AggregationType.java
deleted file mode 100644
index 29f5921..0000000
--- a/helix-core/src/main/java/org/apache/helix/healthcheck/AggregationType.java
+++ /dev/null
@@ -1,31 +0,0 @@
-package org.apache.helix.healthcheck;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-public interface AggregationType {
-
- // public abstract <T extends Object> T merge(T iv, T ev);
-
- public final static String DELIM = "#";
-
- public abstract String merge(String incomingVal, String existingVal, long prevTimestamp);
-
- public abstract String getName();
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/38b43965/helix-core/src/main/java/org/apache/helix/healthcheck/AggregationTypeFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/AggregationTypeFactory.java b/helix-core/src/main/java/org/apache/helix/healthcheck/AggregationTypeFactory.java
deleted file mode 100644
index d946641..0000000
--- a/helix-core/src/main/java/org/apache/helix/healthcheck/AggregationTypeFactory.java
+++ /dev/null
@@ -1,69 +0,0 @@
-package org.apache.helix.healthcheck;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.StringTokenizer;
-
-import org.apache.log4j.Logger;
-
-public class AggregationTypeFactory {
- private static final Logger logger = Logger.getLogger(AggregationTypeFactory.class);
-
- public AggregationTypeFactory() {
- }
-
- // TODO: modify this function so that it takes a single string, but can parse
- // apart params from type
- public static AggregationType getAggregationType(String input) {
- if (input == null) {
- logger.error("AggregationType name is null");
- return null;
- }
- StringTokenizer tok = new StringTokenizer(input, AggregationType.DELIM);
- String type = tok.nextToken();
- int numParams = tok.countTokens();
- String[] params = new String[numParams];
- for (int i = 0; i < numParams; i++) {
- if (!tok.hasMoreTokens()) {
- logger.error("Trying to parse non-existent params");
- return null;
- }
- params[i] = tok.nextToken();
- }
-
- if (type.equals(AccumulateAggregationType.TYPE_NAME)) {
- return new AccumulateAggregationType();
- } else if (type.equals(DecayAggregationType.TYPE_NAME)) {
- if (params.length < 1) {
- logger.error("DecayAggregationType must contain <decay weight> parameter");
- return null;
- }
- return new DecayAggregationType(Double.parseDouble(params[0]));
- } else if (type.equals(WindowAggregationType.TYPE_NAME)) {
- if (params.length < 1) {
- logger.error("WindowAggregationType must contain <window size> parameter");
- }
- return new WindowAggregationType(Integer.parseInt(params[0]));
- } else {
- logger.error("Unknown AggregationType " + type);
- return null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/38b43965/helix-core/src/main/java/org/apache/helix/healthcheck/DecayAggregationType.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/DecayAggregationType.java b/helix-core/src/main/java/org/apache/helix/healthcheck/DecayAggregationType.java
deleted file mode 100644
index 2409b84..0000000
--- a/helix-core/src/main/java/org/apache/helix/healthcheck/DecayAggregationType.java
+++ /dev/null
@@ -1,57 +0,0 @@
-package org.apache.helix.healthcheck;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.log4j.Logger;
-
-public class DecayAggregationType implements AggregationType {
-
- private static final Logger logger = Logger.getLogger(DecayAggregationType.class);
-
- public final static String TYPE_NAME = "decay";
-
- double _decayFactor = 0.1;
-
- public DecayAggregationType(double df) {
- super();
- _decayFactor = df;
- }
-
- @Override
- public String getName() {
- StringBuilder sb = new StringBuilder();
- sb.append(TYPE_NAME);
- sb.append(DELIM);
- sb.append(_decayFactor);
- return sb.toString();
- }
-
- @Override
- public String merge(String iv, String ev, long prevTimestamp) {
- double incomingVal = Double.parseDouble(iv);
- double existingVal = Double.parseDouble(ev);
- long currTimestamp = System.currentTimeMillis();
- double minutesOld = (currTimestamp - prevTimestamp) / 60000.0;
- // come up with decay coeff for old value. More time passed, the more it
- // decays
- double oldDecayCoeff = Math.pow((1 - _decayFactor), minutesOld);
- return String.valueOf((oldDecayCoeff * existingVal + (1 - oldDecayCoeff) * incomingVal));
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/38b43965/helix-core/src/main/java/org/apache/helix/healthcheck/DefaultHealthReportProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/DefaultHealthReportProvider.java b/helix-core/src/main/java/org/apache/helix/healthcheck/DefaultHealthReportProvider.java
deleted file mode 100644
index 619667c..0000000
--- a/helix-core/src/main/java/org/apache/helix/healthcheck/DefaultHealthReportProvider.java
+++ /dev/null
@@ -1,86 +0,0 @@
-package org.apache.helix.healthcheck;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.lang.management.ManagementFactory;
-import java.lang.management.OperatingSystemMXBean;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.apache.log4j.Logger;
-
-class DefaultHealthReportProvider extends HealthReportProvider {
- private static final Logger _logger = Logger.getLogger(DefaultHealthReportProvider.class);
-
- public final static String _availableCPUs = "availableCPUs";
- public final static String _freePhysicalMemory = "freePhysicalMemory";
- public final static String _totalJvmMemory = "totalJvmMemory";
- public final static String _freeJvmMemory = "freeJvmMemory";
- public final static String _averageSystemLoad = "averageSystemLoad";
-
- public DefaultHealthReportProvider() {
- }
-
- @Override
- public Map<String, String> getRecentHealthReport() {
- OperatingSystemMXBean osMxBean = ManagementFactory.getOperatingSystemMXBean();
- long freeJvmMemory = Runtime.getRuntime().freeMemory();
- long totalJvmMemory = Runtime.getRuntime().totalMemory();
- int availableCPUs = osMxBean.getAvailableProcessors();
- double avgSystemLoad = osMxBean.getSystemLoadAverage();
- long freePhysicalMemory = Long.MAX_VALUE;
-
- try {
- // if( osMxBean instanceof com.sun.management.OperatingSystemMXBean)
- // {
- // com.sun.management.OperatingSystemMXBean sunOsMxBean
- // = (com.sun.management.OperatingSystemMXBean) osMxBean;
- // freePhysicalMemory = sunOsMxBean.getFreePhysicalMemorySize();
- // }
- } catch (Throwable t) {
- _logger.error(t);
- }
-
- Map<String, String> result = new TreeMap<String, String>();
-
- result.put(_availableCPUs, "" + availableCPUs);
- result.put(_freePhysicalMemory, "" + freePhysicalMemory);
- result.put(_freeJvmMemory, "" + freeJvmMemory);
- result.put(_totalJvmMemory, "" + totalJvmMemory);
- result.put(_averageSystemLoad, "" + avgSystemLoad);
-
- return result;
- }
-
- @Override
- public Map<String, Map<String, String>> getRecentPartitionHealthReport() {
- Map<String, Map<String, String>> result = new HashMap<String, Map<String, String>>();
-
- result.put(getReportName(), getRecentHealthReport());
- return result;
- }
-
- @Override
- public void resetStats() {
- // TODO Auto-generated method stub
-
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/38b43965/helix-core/src/main/java/org/apache/helix/healthcheck/DefaultPerfCounters.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/DefaultPerfCounters.java b/helix-core/src/main/java/org/apache/helix/healthcheck/DefaultPerfCounters.java
deleted file mode 100644
index e1afd5c..0000000
--- a/helix-core/src/main/java/org/apache/helix/healthcheck/DefaultPerfCounters.java
+++ /dev/null
@@ -1,95 +0,0 @@
-package org.apache.helix.healthcheck;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Date;
-
-import org.apache.helix.ZNRecord;
-import org.apache.log4j.Logger;
-
-@Deprecated
-public class DefaultPerfCounters extends ZNRecord {
- private static final Logger _logger = Logger.getLogger(DefaultPerfCounters.class);
-
- public final static String _availableCPUs = "availableCPUs";
- public final static String _freePhysicalMemory = "freePhysicalMemory";
- public final static String _totalJvmMemory = "totalJvmMemory";
- public final static String _freeJvmMemory = "freeJvmMemory";
- public final static String _averageSystemLoad = "averageSystemLoad";
-
- public DefaultPerfCounters(String instanceName, long availableCPUs, long freePhysicalMemory,
- long freeJvmMemory, long totalJvmMemory, double averageSystemLoad) {
- super("DefaultPerfCounters");
- setSimpleField("instanceName", instanceName);
- setSimpleField("createTime", new Date().toString());
-
- setSimpleField(_availableCPUs, "" + availableCPUs);
- setSimpleField(_freePhysicalMemory, "" + freePhysicalMemory);
- setSimpleField(_freeJvmMemory, "" + freeJvmMemory);
- setSimpleField(_totalJvmMemory, "" + totalJvmMemory);
- setSimpleField(_averageSystemLoad, "" + averageSystemLoad);
- }
-
- public long getAvailableCpus() {
- return getSimpleLongVal(_availableCPUs);
- }
-
- public double getAverageSystemLoad() {
- return getSimpleDoubleVal(_averageSystemLoad);
- }
-
- public long getTotalJvmMemory() {
- return getSimpleLongVal(_totalJvmMemory);
- }
-
- public long getFreeJvmMemory() {
- return getSimpleLongVal(_freeJvmMemory);
- }
-
- public long getFreePhysicalMemory() {
- return getSimpleLongVal(_freePhysicalMemory);
- }
-
- long getSimpleLongVal(String key) {
- String strVal = getSimpleField(key);
- if (strVal == null) {
- return 0;
- }
- try {
- return Long.parseLong(strVal);
- } catch (Exception e) {
- _logger.warn(e);
- return 0;
- }
- }
-
- double getSimpleDoubleVal(String key) {
- String strVal = getSimpleField(key);
- if (strVal == null) {
- return 0;
- }
- try {
- return Double.parseDouble(strVal);
- } catch (Exception e) {
- _logger.warn(e);
- return 0;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/38b43965/helix-core/src/main/java/org/apache/helix/healthcheck/HealthReportProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/HealthReportProvider.java b/helix-core/src/main/java/org/apache/helix/healthcheck/HealthReportProvider.java
deleted file mode 100644
index f7afd04..0000000
--- a/helix-core/src/main/java/org/apache/helix/healthcheck/HealthReportProvider.java
+++ /dev/null
@@ -1,39 +0,0 @@
-package org.apache.helix.healthcheck;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Map;
-
-public abstract class HealthReportProvider {
- public static final String _defaultPerfCounters = "defaultPerfCounters";
-
- public abstract Map<String, String> getRecentHealthReport();
-
- public Map<String, Map<String, String>> getRecentPartitionHealthReport() {
- return null;
- }
-
- public abstract void resetStats();
-
- public String getReportName() {
- return _defaultPerfCounters;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/38b43965/helix-core/src/main/java/org/apache/helix/healthcheck/HealthStatsAggregationTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/HealthStatsAggregationTask.java b/helix-core/src/main/java/org/apache/helix/healthcheck/HealthStatsAggregationTask.java
deleted file mode 100644
index 05ffaef..0000000
--- a/helix-core/src/main/java/org/apache/helix/healthcheck/HealthStatsAggregationTask.java
+++ /dev/null
@@ -1,89 +0,0 @@
-package org.apache.helix.healthcheck;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Random;
-import java.util.Timer;
-import java.util.TimerTask;
-
-import org.apache.helix.HelixTimerTask;
-import org.apache.log4j.Logger;
-
-public class HealthStatsAggregationTask extends HelixTimerTask {
- private static final Logger LOG = Logger.getLogger(HealthStatsAggregationTask.class);
- public final static int DEFAULT_HEALTH_CHECK_LATENCY = 30 * 1000;
-
- final HealthStatsAggregator _healthStatsAggregator;
-
- class HealthStatsAggregationTaskTimer extends TimerTask {
-
- @Override
- public void run() {
- _healthStatsAggregator.aggregate();
- }
-
- }
-
- private Timer _timer;
- private final int _delay;
- private final int _period;
-
- public HealthStatsAggregationTask(HealthStatsAggregator healthStatsAggregator, int delay,
- int period) {
- _healthStatsAggregator = healthStatsAggregator;
-
- _delay = delay;
- _period = period;
- }
-
- public HealthStatsAggregationTask(HealthStatsAggregator healthStatsAggregator) {
- this(healthStatsAggregator, DEFAULT_HEALTH_CHECK_LATENCY, DEFAULT_HEALTH_CHECK_LATENCY);
- }
-
- @Override
- public void start() {
-
- if (_timer == null) {
- LOG.info("START HealthStatsAggregationTimerTask");
-
- // Remove all the previous health check values, if any
- _healthStatsAggregator.init();
-
- _timer = new Timer("HealthStatsAggregationTimerTask", true);
- _timer.scheduleAtFixedRate(new HealthStatsAggregationTaskTimer(),
- new Random().nextInt(_delay), _period);
- } else {
- LOG.warn("HealthStatsAggregationTimerTask already started");
- }
- }
-
- @Override
- public synchronized void stop() {
- if (_timer != null) {
- LOG.info("Stop HealthStatsAggregationTimerTask");
- _timer.cancel();
- _healthStatsAggregator.reset();
- _timer = null;
- } else {
- LOG.warn("HealthStatsAggregationTimerTask already stopped");
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/38b43965/helix-core/src/main/java/org/apache/helix/healthcheck/HealthStatsAggregator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/HealthStatsAggregator.java b/helix-core/src/main/java/org/apache/helix/healthcheck/HealthStatsAggregator.java
deleted file mode 100644
index bc95e6d..0000000
--- a/helix-core/src/main/java/org/apache/helix/healthcheck/HealthStatsAggregator.java
+++ /dev/null
@@ -1,141 +0,0 @@
-package org.apache.helix.healthcheck;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.helix.ConfigAccessor;
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixManager;
-import org.apache.helix.controller.pipeline.Pipeline;
-import org.apache.helix.controller.pipeline.Stage;
-import org.apache.helix.controller.stages.ClusterEvent;
-import org.apache.helix.controller.stages.ReadHealthDataStage;
-import org.apache.helix.controller.stages.StatsAggregationStage;
-import org.apache.helix.model.ConfigScope;
-import org.apache.helix.model.builder.ConfigScopeBuilder;
-import org.apache.helix.monitoring.mbeans.ClusterAlertMBeanCollection;
-import org.apache.helix.monitoring.mbeans.HelixStageLatencyMonitor;
-import org.apache.log4j.Logger;
-
-public class HealthStatsAggregator {
- private static final Logger LOG = Logger.getLogger(HealthStatsAggregator.class);
-
- public final static int DEFAULT_HEALTH_CHECK_LATENCY = 30 * 1000;
-
- private final HelixManager _manager;
- private final Pipeline _healthStatsAggregationPipeline;
- private final ClusterAlertMBeanCollection _alertItemCollection;
- private final Map<String, HelixStageLatencyMonitor> _stageLatencyMonitorMap =
- new HashMap<String, HelixStageLatencyMonitor>();
-
- public HealthStatsAggregator(HelixManager manager) {
- _manager = manager;
-
- // health stats pipeline
- _healthStatsAggregationPipeline = new Pipeline();
- _healthStatsAggregationPipeline.addStage(new ReadHealthDataStage());
- StatsAggregationStage statAggregationStage = new StatsAggregationStage();
- _healthStatsAggregationPipeline.addStage(statAggregationStage);
- _alertItemCollection = statAggregationStage.getClusterAlertMBeanCollection();
-
- registerStageLatencyMonitor(_healthStatsAggregationPipeline);
- }
-
- private void registerStageLatencyMonitor(Pipeline pipeline) {
- for (Stage stage : pipeline.getStages()) {
- String stgName = stage.getStageName();
- if (!_stageLatencyMonitorMap.containsKey(stgName)) {
- try {
- _stageLatencyMonitorMap.put(stage.getStageName(),
- new HelixStageLatencyMonitor(_manager.getClusterName(), stgName));
- } catch (Exception e) {
- LOG.error("Couldn't create StageLatencyMonitor mbean for stage: " + stgName, e);
- }
- } else {
- LOG.error("StageLatencyMonitor for stage: " + stgName + " already exists. Skip register it");
- }
- }
- }
-
- public synchronized void aggregate() {
- if (!isEnabled()) {
- LOG.info("HealthAggregationTask is disabled.");
- return;
- }
-
- if (!_manager.isLeader()) {
- LOG.error("Cluster manager: " + _manager.getInstanceName()
- + " is not leader. Pipeline will not be invoked");
- return;
- }
-
- try {
- ClusterEvent event = new ClusterEvent("healthChange");
- event.addAttribute("helixmanager", _manager);
- event.addAttribute("HelixStageLatencyMonitorMap", _stageLatencyMonitorMap);
-
- _healthStatsAggregationPipeline.handle(event);
- _healthStatsAggregationPipeline.finish();
- } catch (Exception e) {
- LOG.error("Exception while executing pipeline: " + _healthStatsAggregationPipeline, e);
- }
- }
-
- private boolean isEnabled() {
- ConfigAccessor configAccessor = _manager.getConfigAccessor();
- boolean enabled = true;
- if (configAccessor != null) {
- // zk-based cluster manager
- ConfigScope scope = new ConfigScopeBuilder().forCluster(_manager.getClusterName()).build();
- String isEnabled = configAccessor.get(scope, "healthChange.enabled");
- if (isEnabled != null) {
- enabled = new Boolean(isEnabled);
- }
- } else {
- LOG.debug("File-based cluster manager doesn't support disable healthChange");
- }
- return enabled;
- }
-
- public void init() {
- // Remove all the previous health check values, if any
- HelixDataAccessor accessor = _manager.getHelixDataAccessor();
- List<String> existingHealthRecordNames =
- accessor.getChildNames(accessor.keyBuilder().healthReports(_manager.getInstanceName()));
- for (String healthReportName : existingHealthRecordNames) {
- LOG.info("Removing old healthrecord " + healthReportName);
- accessor.removeProperty(accessor.keyBuilder().healthReport(_manager.getInstanceName(),
- healthReportName));
- }
-
- }
-
- public void reset() {
- _alertItemCollection.reset();
-
- for (HelixStageLatencyMonitor stgLatencyMonitor : _stageLatencyMonitorMap.values()) {
- stgLatencyMonitor.reset();
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/38b43965/helix-core/src/main/java/org/apache/helix/healthcheck/ParticipantHealthReportCollector.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/ParticipantHealthReportCollector.java b/helix-core/src/main/java/org/apache/helix/healthcheck/ParticipantHealthReportCollector.java
deleted file mode 100644
index 266ed8b..0000000
--- a/helix-core/src/main/java/org/apache/helix/healthcheck/ParticipantHealthReportCollector.java
+++ /dev/null
@@ -1,32 +0,0 @@
-package org.apache.helix.healthcheck;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.ZNRecord;
-
-public interface ParticipantHealthReportCollector {
- public abstract void addHealthReportProvider(HealthReportProvider provider);
-
- public abstract void removeHealthReportProvider(HealthReportProvider provider);
-
- public abstract void reportHealthReportMessage(ZNRecord healthReport);
-
- public abstract void transmitHealthReports();
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/38b43965/helix-core/src/main/java/org/apache/helix/healthcheck/ParticipantHealthReportCollectorImpl.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/ParticipantHealthReportCollectorImpl.java b/helix-core/src/main/java/org/apache/helix/healthcheck/ParticipantHealthReportCollectorImpl.java
deleted file mode 100644
index 9023641..0000000
--- a/helix-core/src/main/java/org/apache/helix/healthcheck/ParticipantHealthReportCollectorImpl.java
+++ /dev/null
@@ -1,115 +0,0 @@
-package org.apache.helix.healthcheck;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.LinkedList;
-import java.util.Map;
-
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixManager;
-import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.alerts.StatsHolder;
-import org.apache.helix.model.HealthStat;
-import org.apache.log4j.Logger;
-
-public class ParticipantHealthReportCollectorImpl implements ParticipantHealthReportCollector {
- private final LinkedList<HealthReportProvider> _healthReportProviderList =
- new LinkedList<HealthReportProvider>();
- private static final Logger _logger = Logger
- .getLogger(ParticipantHealthReportCollectorImpl.class);
- private final HelixManager _helixManager;
- String _instanceName;
-
- public ParticipantHealthReportCollectorImpl(HelixManager helixManager, String instanceName) {
- _helixManager = helixManager;
- _instanceName = instanceName;
- addDefaultHealthCheckInfoProvider();
- }
-
- private void addDefaultHealthCheckInfoProvider() {
- addHealthReportProvider(new DefaultHealthReportProvider());
- }
-
- @Override
- public void addHealthReportProvider(HealthReportProvider provider) {
- try {
- synchronized (_healthReportProviderList) {
- if (!_healthReportProviderList.contains(provider)) {
- _healthReportProviderList.add(provider);
- } else {
- _logger.warn("Skipping a duplicated HealthCheckInfoProvider");
- }
- }
- } catch (Exception e) {
- _logger.error(e);
- }
- }
-
- @Override
- public void removeHealthReportProvider(HealthReportProvider provider) {
- synchronized (_healthReportProviderList) {
- if (_healthReportProviderList.contains(provider)) {
- _healthReportProviderList.remove(provider);
- } else {
- _logger.warn("Skip removing a non-exist HealthCheckInfoProvider");
- }
- }
- }
-
- @Override
- public void reportHealthReportMessage(ZNRecord healthCheckInfoUpdate) {
- HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
- Builder keyBuilder = accessor.keyBuilder();
- accessor.setProperty(keyBuilder.healthReport(_instanceName, healthCheckInfoUpdate.getId()),
- new HealthStat(healthCheckInfoUpdate));
-
- }
-
- @Override
- public synchronized void transmitHealthReports() {
- synchronized (_healthReportProviderList) {
- for (HealthReportProvider provider : _healthReportProviderList) {
- try {
- Map<String, String> report = provider.getRecentHealthReport();
- Map<String, Map<String, String>> partitionReport =
- provider.getRecentPartitionHealthReport();
- ZNRecord record = new ZNRecord(provider.getReportName());
- if (report != null) {
- record.setSimpleFields(report);
- }
- if (partitionReport != null) {
- record.setMapFields(partitionReport);
- }
- record.setSimpleField(StatsHolder.TIMESTAMP_NAME, "" + System.currentTimeMillis());
-
- HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
- Builder keyBuilder = accessor.keyBuilder();
- accessor.setProperty(keyBuilder.healthReport(_instanceName, record.getId()),
- new HealthStat(record));
-
- provider.resetStats();
- } catch (Exception e) {
- _logger.error("fail to transmit health report", e);
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/38b43965/helix-core/src/main/java/org/apache/helix/healthcheck/ParticipantHealthReportTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/ParticipantHealthReportTask.java b/helix-core/src/main/java/org/apache/helix/healthcheck/ParticipantHealthReportTask.java
deleted file mode 100644
index 59d74c7..0000000
--- a/helix-core/src/main/java/org/apache/helix/healthcheck/ParticipantHealthReportTask.java
+++ /dev/null
@@ -1,71 +0,0 @@
-package org.apache.helix.healthcheck;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Random;
-import java.util.Timer;
-import java.util.TimerTask;
-
-import org.apache.helix.HelixTimerTask;
-import org.apache.log4j.Logger;
-
-public class ParticipantHealthReportTask extends HelixTimerTask {
- private static final Logger LOG = Logger.getLogger(ParticipantHealthReportTask.class);
- public final static int DEFAULT_REPORT_LATENCY = 60 * 1000;
-
- Timer _timer;
- final ParticipantHealthReportCollectorImpl _healthReportCollector;
-
- class ParticipantHealthReportTimerTask extends TimerTask {
-
- @Override
- public void run() {
- _healthReportCollector.transmitHealthReports();
- }
- }
-
- public ParticipantHealthReportTask(ParticipantHealthReportCollectorImpl healthReportCollector) {
- _healthReportCollector = healthReportCollector;
- }
-
- @Override
- public void start() {
- if (_timer == null) {
- LOG.info("Start HealthCheckInfoReportingTask");
- _timer = new Timer("ParticipantHealthReportTimerTask", true);
- _timer.scheduleAtFixedRate(new ParticipantHealthReportTimerTask(),
- new Random().nextInt(DEFAULT_REPORT_LATENCY), DEFAULT_REPORT_LATENCY);
- } else {
- LOG.warn("ParticipantHealthReportTimerTask already started");
- }
- }
-
- @Override
- public void stop() {
- if (_timer != null) {
- LOG.info("Stop ParticipantHealthReportTimerTask");
- _timer.cancel();
- _timer = null;
- } else {
- LOG.warn("ParticipantHealthReportTimerTask already stopped");
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/38b43965/helix-core/src/main/java/org/apache/helix/healthcheck/PerformanceHealthReportProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/PerformanceHealthReportProvider.java b/helix-core/src/main/java/org/apache/helix/healthcheck/PerformanceHealthReportProvider.java
deleted file mode 100644
index 6bc33d3..0000000
--- a/helix-core/src/main/java/org/apache/helix/healthcheck/PerformanceHealthReportProvider.java
+++ /dev/null
@@ -1,138 +0,0 @@
-package org.apache.helix.healthcheck;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.apache.log4j.Logger;
-
-public class PerformanceHealthReportProvider extends HealthReportProvider {
-
- private static final Logger _logger = Logger.getLogger(PerformanceHealthReportProvider.class);
-
- public final static String _testStat = "testStat";
- public final static String _readLatencyStat = "readLatencyStat";
- public final static String _requestCountStat = "requestCountStat";
- public final static String _partitionRequestCountStat = "partitionRequestCountStat";
-
- public static final String _performanceCounters = "performanceCounters";
-
- public int readLatencyCount = 0;
- public double readLatencySum = 0;
-
- public int requestCount = 0;
-
- // private final Map<String, String> _partitionCountsMap = new HashMap<String,
- // String>();
-
- private final Map<String, HashMap<String, String>> _partitionStatMaps =
- new HashMap<String, HashMap<String, String>>();
-
- public PerformanceHealthReportProvider() {
- }
-
- @Override
- public Map<String, String> getRecentHealthReport() {
- long testStat = 10;
-
- Map<String, String> result = new TreeMap<String, String>();
-
- result.put(_testStat, "" + testStat);
- result.put(_readLatencyStat, "" + readLatencySum / readLatencyCount);
- result.put(_requestCountStat, "" + requestCount);
-
- return result;
- }
-
- @Override
- public Map<String, Map<String, String>> getRecentPartitionHealthReport() {
- Map<String, Map<String, String>> result = new TreeMap<String, Map<String, String>>();
- for (String statName : _partitionStatMaps.keySet()) {
- result.put(statName, _partitionStatMaps.get(statName));
- }
- return result;
- }
-
- HashMap<String, String> getStatMap(String statName, boolean createIfMissing) {
- // check if map for this stat exists. if not, create it
- HashMap<String, String> statMap;
- if (!_partitionStatMaps.containsKey(statName)) {
- if (!createIfMissing) {
- return null;
- }
- statMap = new HashMap<String, String>();
- _partitionStatMaps.put(statName, statMap);
- } else {
- statMap = _partitionStatMaps.get(statName);
- }
- return statMap;
- }
-
- // TODO:
- // Currently participant is source of truth and updates ZK. We want ZK to be
- // source of truth.
- // Revise this approach the participant sends deltas of stats to controller
- // (ZK?) and have controller do aggregation
- // and update ZK. Make sure to wipe the participant between uploads.
- String getPartitionStat(HashMap<String, String> partitionMap, String partitionName) {
- return partitionMap.get(partitionName);
- }
-
- void setPartitionStat(HashMap<String, String> partitionMap, String partitionName, String value) {
- partitionMap.put(partitionName, value);
- }
-
- public void incrementPartitionStat(String statName, String partitionName) {
- HashMap<String, String> statMap = getStatMap(statName, true);
- String currValStr = getPartitionStat(statMap, partitionName);
- double currVal;
- if (currValStr == null) {
- currVal = 1.0;
- } else {
- currVal = Double.parseDouble(getPartitionStat(statMap, partitionName));
- currVal++;
- }
- setPartitionStat(statMap, partitionName, String.valueOf(currVal));
- }
-
- public void submitPartitionStat(String statName, String partitionName, String value) {
- HashMap<String, String> statMap = getStatMap(statName, true);
- setPartitionStat(statMap, partitionName, value);
- }
-
- public String getPartitionStat(String statName, String partitionName) {
- HashMap<String, String> statMap = getStatMap(statName, false);
- if (statMap == null) {
- return null;
- } else {
- return statMap.get(partitionName);
- }
- }
-
- public void resetStats() {
- _partitionStatMaps.clear();
- }
-
- public String getReportName() {
- return _performanceCounters;
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/38b43965/helix-core/src/main/java/org/apache/helix/healthcheck/Stat.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/Stat.java b/helix-core/src/main/java/org/apache/helix/healthcheck/Stat.java
deleted file mode 100644
index e675792..0000000
--- a/helix-core/src/main/java/org/apache/helix/healthcheck/Stat.java
+++ /dev/null
@@ -1,125 +0,0 @@
-package org.apache.helix.healthcheck;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-
-public class Stat {
-
- private static final Logger _logger = Logger.getLogger(Stat.class);
-
- public final static String OP_TYPE = "HTTP_OP";
- public final static String MEASUREMENT_TYPE = "MEASUREMENT";
- public final static String RESOURCE_NAME = "RESOURCE_NAME";
- public final static String PARTITION_NAME = "PARTITION_NAME";
- public final static String NODE_NAME = "NODE_NAME";
- public final static String TIMESTAMP = "TIMESTAMP";
- public final static String RETURN_STATUS = "RETURN_STATUS";
- public final static String METRIC_NAME = "METRIC_NAME";
- public final static String AGG_TYPE = "AGG_TYPE";
-
- public String _opType;
- public String _measurementType;
- public String _resourceName;
- public String _partitionName;
- public String _nodeName;
- public String _returnStatus;
- public String _metricName;
- public String _aggTypeName;
- public String _timestamp;
-
- public Stat(String opType, String measurementType, String resourceName, String partitionName,
- String nodeName) {
- // this(opType, measurementType, resourceName, partitionName, nodeName,
- // null, null, null);
- this(opType, measurementType, resourceName, partitionName, nodeName, null, null, null);
- }
-
- public Stat(String opType, String measurementType, String resourceName, String partitionName,
- String nodeName, String returnStatus, String metricName, AggregationType aggType) {
- this._opType = opType;
- this._measurementType = measurementType;
- this._resourceName = resourceName;
- this._partitionName = partitionName;
- this._nodeName = nodeName;
- this._returnStatus = returnStatus;
- this._metricName = metricName;
- this._aggTypeName = null;
- if (aggType != null) {
- this._aggTypeName = aggType.getName();
- }
-
- _timestamp = String.valueOf(System.currentTimeMillis());
- }
-
- public Stat(Map<String, String> in) {
- _opType = in.get(OP_TYPE);
- _measurementType = in.get(MEASUREMENT_TYPE);
- _resourceName = in.get(RESOURCE_NAME);
- _partitionName = in.get(PARTITION_NAME);
- _nodeName = in.get(NODE_NAME);
- _timestamp = String.valueOf(System.currentTimeMillis());
- }
-
- public void setAggType(AggregationType aggType) {
- this._aggTypeName = aggType.getName();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (!(obj instanceof Stat)) {
- return false;
- }
- Stat other = (Stat) obj;
- if (!_partitionName.equals(other._partitionName)) {
- return false;
- }
- if (!_opType.equals(other._opType)) {
- return false;
- }
- if (!_measurementType.equals(other._measurementType)) {
- return false;
- }
- if (!_resourceName.equals(other._resourceName)) {
- return false;
- }
- if (!_nodeName.equals(other._nodeName)) {
- return false;
- }
- return true;
- }
-
- @Override
- public int hashCode() {
- return (_partitionName + _opType + _measurementType + _resourceName + _nodeName).hashCode();
- }
-
- public void addAlert(long value) {
- // TODO Auto-generated method stub
-
- }
-
- public String toString() {
- return _nodeName + "." + _resourceName + "." + _partitionName + "." + _opType + "."
- + _measurementType + "." + _returnStatus + "." + _metricName + "." + _aggTypeName;
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/38b43965/helix-core/src/main/java/org/apache/helix/healthcheck/StatHealthReportProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/StatHealthReportProvider.java b/helix-core/src/main/java/org/apache/helix/healthcheck/StatHealthReportProvider.java
deleted file mode 100644
index 82b2d31..0000000
--- a/helix-core/src/main/java/org/apache/helix/healthcheck/StatHealthReportProvider.java
+++ /dev/null
@@ -1,159 +0,0 @@
-package org.apache.helix.healthcheck;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.log4j.Logger;
-
-public class StatHealthReportProvider extends HealthReportProvider {
-
- private static final Logger _logger = Logger.getLogger(StatHealthReportProvider.class);
-
- /*
- * public final static String _testStat = "testStat"; public final static
- * String _readLatencyStat = "readLatencyStat"; public final static String
- * _requestCountStat = "requestCountStat"; public final static String
- * _partitionRequestCountStat = "partitionRequestCountStat";
- */
-
- public static final String REPORT_NAME = "ParticipantStats";
- public String _reportName = REPORT_NAME;
-
- public static final String STAT_VALUE = "value";
- public static final String TIMESTAMP = "timestamp";
-
- public int readLatencyCount = 0;
- public double readLatencySum = 0;
-
- public int requestCount = 0;
-
- // private final Map<String, String> _partitionCountsMap = new HashMap<String,
- // String>();
-
- // private final Map<String, HashMap<String,String>> _partitionStatMaps = new
- // HashMap<String, HashMap<String,String>>();
- private final ConcurrentHashMap<String, String> _statsToValues =
- new ConcurrentHashMap<String, String>();
- private final ConcurrentHashMap<String, String> _statsToTimestamps =
- new ConcurrentHashMap<String, String>();
-
- public StatHealthReportProvider() {
- }
-
- @Override
- public Map<String, String> getRecentHealthReport() {
- return null;
- }
-
- // TODO: function is misnamed, but return type is what I want
- @Override
- public Map<String, Map<String, String>> getRecentPartitionHealthReport() {
- Map<String, Map<String, String>> result = new HashMap<String, Map<String, String>>();
- for (String stat : _statsToValues.keySet()) {
- Map<String, String> currStat = new HashMap<String, String>();
- /*
- * currStat.put(Stat.OP_TYPE, stat._opType);
- * currStat.put(Stat.MEASUREMENT_TYPE, stat._measurementType);
- * currStat.put(Stat.NODE_NAME, stat._nodeName);
- * currStat.put(Stat.PARTITION_NAME, stat._partitionName);
- * currStat.put(Stat.RESOURCE_NAME, stat._resourceName);
- * currStat.put(Stat.RETURN_STATUS, stat._returnStatus);
- * currStat.put(Stat.METRIC_NAME, stat._metricName);
- * currStat.put(Stat.AGG_TYPE, stat._aggTypeName);
- */
- currStat.put(TIMESTAMP, _statsToTimestamps.get(stat));
- currStat.put(STAT_VALUE, _statsToValues.get(stat));
- result.put(stat, currStat);
- }
- return result;
- }
-
- public boolean contains(Stat inStat) {
- return _statsToValues.containsKey(inStat);
- }
-
- public Set<String> keySet() {
- return _statsToValues.keySet();
- }
-
- public String getStatValue(Stat inStat) {
- return _statsToValues.get(inStat);
- }
-
- public long getStatTimestamp(Stat inStat) {
- return Long.parseLong(_statsToTimestamps.get(inStat));
- }
-
- /*
- * public String getStatValue(String opType, String measurementType, String
- * resourceName, String partitionName, String nodeName, boolean
- * createIfMissing) { Stat rs = new Stat(opType, measurementType,
- * resourceName, partitionName, nodeName); String val =
- * _statsToValues.get(rs); if (val == null && createIfMissing) { val = "0";
- * _statsToValues.put(rs, val); } return val; }
- */
-
- public void writeStat(String statName, String val, String timestamp) {
- _statsToValues.put(statName, val);
- _statsToTimestamps.put(statName, timestamp);
- }
-
- /*
- * public void setStat(Stat es, String val, String timestamp) { writeStat(es,
- * val, timestamp); }
- * public void setStat(String opType, String measurementType, String
- * resourceName, String partitionName, String nodeName, double val, String
- * timestamp) { Stat rs = new Stat(opType, measurementType, resourceName,
- * partitionName, nodeName); writeStat(rs, String.valueOf(val), timestamp); }
- */
-
- public void incrementStat(String statName, String timestamp) {
- // Stat rs = new Stat(opType, measurementType, resourceName, partitionName,
- // nodeName);
- String val = _statsToValues.get(statName);
- if (val == null) {
- val = "0";
- } else {
- val = String.valueOf(Double.parseDouble(val) + 1);
- }
- writeStat(statName, val, timestamp);
- }
-
- public int size() {
- return _statsToValues.size();
- }
-
- public void resetStats() {
- _statsToValues.clear();
- _statsToTimestamps.clear();
- }
-
- public void setReportName(String name) {
- _reportName = name;
- }
-
- public String getReportName() {
- return _reportName;
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/38b43965/helix-core/src/main/java/org/apache/helix/healthcheck/WindowAggregationType.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/WindowAggregationType.java b/helix-core/src/main/java/org/apache/helix/healthcheck/WindowAggregationType.java
deleted file mode 100644
index 77161af..0000000
--- a/helix-core/src/main/java/org/apache/helix/healthcheck/WindowAggregationType.java
+++ /dev/null
@@ -1,67 +0,0 @@
-package org.apache.helix.healthcheck;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.log4j.Logger;
-
-public class WindowAggregationType implements AggregationType {
-
- private static final Logger logger = Logger.getLogger(WindowAggregationType.class);
-
- public final String WINDOW_DELIM = "#";
-
- public final static String TYPE_NAME = "window";
-
- int _windowSize = 1;
-
- public WindowAggregationType(int ws) {
- super();
- _windowSize = ws;
- }
-
- @Override
- public String getName() {
- StringBuilder sb = new StringBuilder();
- sb.append(TYPE_NAME);
- sb.append(DELIM);
- sb.append(_windowSize);
- return sb.toString();
- }
-
- @Override
- public String merge(String incomingVal, String existingVal, long prevTimestamp) {
- String[] windowVals;
- if (existingVal == null) {
- return incomingVal;
- } else {
- windowVals = existingVal.split(WINDOW_DELIM);
- int currLength = windowVals.length;
- // window not full
- if (currLength < _windowSize) {
- return existingVal + WINDOW_DELIM + incomingVal;
- }
- // evict oldest
- else {
- int firstDelim = existingVal.indexOf(WINDOW_DELIM);
- return existingVal.substring(firstDelim + 1) + WINDOW_DELIM + incomingVal;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/38b43965/helix-core/src/main/java/org/apache/helix/healthcheck/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/package-info.java b/helix-core/src/main/java/org/apache/helix/healthcheck/package-info.java
deleted file mode 100644
index f584b5b..0000000
--- a/helix-core/src/main/java/org/apache/helix/healthcheck/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/**
- * Helix health check classes
- *
- */
-package org.apache.helix.healthcheck;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/helix/blob/38b43965/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
index b844926..65fe2f9 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
@@ -41,7 +41,6 @@ import org.apache.helix.ConfigChangeListener;
import org.apache.helix.ControllerChangeListener;
import org.apache.helix.CurrentStateChangeListener;
import org.apache.helix.ExternalViewChangeListener;
-import org.apache.helix.HealthStateChangeListener;
import org.apache.helix.HelixConstants.ChangeType;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
@@ -59,7 +58,6 @@ import org.apache.helix.ScopedConfigChangeListener;
import org.apache.helix.ZNRecord;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.ExternalView;
-import org.apache.helix.model.HealthStat;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
@@ -209,15 +207,6 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
ControllerChangeListener controllerChangelistener = (ControllerChangeListener) _listener;
subscribeForChanges(changeContext, _path, true, false);
controllerChangelistener.onControllerChange(changeContext);
- } else if (_changeType == ChangeType.HEALTH) {
- HealthStateChangeListener healthStateChangeListener = (HealthStateChangeListener) _listener;
- subscribeForChanges(changeContext, _path, true, true); // TODO: figure out
- // settings here
- String instanceName = PropertyPathConfig.getInstanceNameFromPath(_path);
-
- List<HealthStat> healthReportList = _accessor.getChildValues(_propertyKey);
-
- healthStateChangeListener.onHealthChange(instanceName, healthReportList, changeContext);
}
long end = System.currentTimeMillis();
http://git-wip-us.apache.org/repos/asf/helix/blob/38b43965/helix-core/src/main/java/org/apache/helix/manager/zk/HelixConnectionAdaptor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/HelixConnectionAdaptor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/HelixConnectionAdaptor.java
index e13c127..ef17715 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/HelixConnectionAdaptor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/HelixConnectionAdaptor.java
@@ -25,7 +25,6 @@ import org.apache.helix.ConfigChangeListener;
import org.apache.helix.ControllerChangeListener;
import org.apache.helix.CurrentStateChangeListener;
import org.apache.helix.ExternalViewChangeListener;
-import org.apache.helix.HealthStateChangeListener;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixAutoController;
import org.apache.helix.HelixConnection;
@@ -49,7 +48,6 @@ import org.apache.helix.api.id.ClusterId;
import org.apache.helix.api.id.Id;
import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.SessionId;
-import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
@@ -136,13 +134,6 @@ public class HelixConnectionAdaptor implements HelixManager {
}
@Override
- public void addHealthStateChangeListener(HealthStateChangeListener listener, String instanceName)
- throws Exception {
- _connection.addHealthStateChangeListener(_role, listener, _clusterId,
- ParticipantId.from(instanceName));
- }
-
- @Override
public void addExternalViewChangeListener(ExternalViewChangeListener listener) throws Exception {
_connection.addExternalViewChangeListener(_role, listener, _clusterId);
}
@@ -203,11 +194,6 @@ public class HelixConnectionAdaptor implements HelixManager {
}
@Override
- public ParticipantHealthReportCollector getHealthReportCollector() {
- throw new UnsupportedOperationException();
- }
-
- @Override
public InstanceType getInstanceType() {
return _instanceType;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/38b43965/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java
index 925c52f..c1d856d 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java
@@ -278,15 +278,4 @@ public class ParticipantManagerHelper {
}
- /**
- * create zk path for health check info
- * TODO move it to cluster-setup
- */
- public void createHealthCheckPath() {
- String healthCheckInfoPath = _dataAccessor.keyBuilder().healthReports(_instanceName).getPath();
- if (!_zkclient.exists(healthCheckInfoPath)) {
- _zkclient.createPersistent(healthCheckInfoPath, true);
- LOG.info("Created healthcheck info path " + healthCheckInfoPath);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/helix/blob/38b43965/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index da0c80c..dee343f 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -52,8 +52,6 @@ import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.PropertyPathConfig;
import org.apache.helix.PropertyType;
import org.apache.helix.ZNRecord;
-import org.apache.helix.alerts.AlertsHolder;
-import org.apache.helix.alerts.StatsHolder;
import org.apache.helix.api.State;
import org.apache.helix.api.id.ConstraintId;
import org.apache.helix.api.id.MessageId;
@@ -63,7 +61,6 @@ import org.apache.helix.api.id.SessionId;
import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.api.id.StateModelFactoryId;
import org.apache.helix.controller.strategy.DefaultTwoStateStrategy;
-import org.apache.helix.model.Alerts;
import org.apache.helix.model.ClusterConstraints;
import org.apache.helix.model.ClusterConstraints.ConstraintType;
import org.apache.helix.model.ConstraintItem;
@@ -80,7 +77,6 @@ import org.apache.helix.model.Message;
import org.apache.helix.model.Message.MessageState;
import org.apache.helix.model.Message.MessageType;
import org.apache.helix.model.PauseSignal;
-import org.apache.helix.model.PersistentStats;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.util.HelixUtil;
import org.apache.helix.util.RebalanceUtil;
@@ -749,79 +745,6 @@ public class ZKHelixAdmin implements HelixAdmin {
}
@Override
- public void addStat(String clusterName, final String statName) {
- if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) {
- throw new HelixException("cluster " + clusterName + " is not setup yet");
- }
-
- String persistentStatsPath =
- PropertyPathConfig.getPath(PropertyType.PERSISTENTSTATS, clusterName);
- ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
-
- baseAccessor.update(persistentStatsPath, new DataUpdater<ZNRecord>() {
-
- @Override
- public ZNRecord update(ZNRecord statsRec) {
- if (statsRec == null) {
- // TODO: fix naming of this record, if it matters
- statsRec = new ZNRecord(PersistentStats.nodeName);
- }
-
- Map<String, Map<String, String>> currStatMap = statsRec.getMapFields();
- Map<String, Map<String, String>> newStatMap = StatsHolder.parseStat(statName);
- for (String newStat : newStatMap.keySet()) {
- if (!currStatMap.containsKey(newStat)) {
- currStatMap.put(newStat, newStatMap.get(newStat));
- }
- }
- statsRec.setMapFields(currStatMap);
-
- return statsRec;
- }
- }, AccessOption.PERSISTENT);
- }
-
- @Override
- public void addAlert(final String clusterName, final String alertName) {
- if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) {
- throw new HelixException("cluster " + clusterName + " is not setup yet");
- }
-
- ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
-
- String alertsPath = PropertyPathConfig.getPath(PropertyType.ALERTS, clusterName);
-
- baseAccessor.update(alertsPath, new DataUpdater<ZNRecord>() {
-
- @Override
- public ZNRecord update(ZNRecord alertsRec) {
- if (alertsRec == null) {
- // TODO: fix naming of this record, if it matters
- alertsRec = new ZNRecord(Alerts.nodeName);
-
- }
-
- Map<String, Map<String, String>> currAlertMap = alertsRec.getMapFields();
- StringBuilder newStatName = new StringBuilder();
- Map<String, String> newAlertMap = new HashMap<String, String>();
-
- // use AlertsHolder to get map of new stats and map for this alert
- AlertsHolder.parseAlert(alertName, newStatName, newAlertMap);
-
- // add stat
- addStat(clusterName, newStatName.toString());
-
- // add alert
- currAlertMap.put(alertName, newAlertMap);
-
- alertsRec.setMapFields(currAlertMap);
-
- return alertsRec;
- }
- }, AccessOption.PERSISTENT);
- }
-
- @Override
public void dropCluster(String clusterName) {
logger.info("Deleting cluster " + clusterName);
HelixDataAccessor accessor =
@@ -842,70 +765,6 @@ public class ZKHelixAdmin implements HelixAdmin {
}
@Override
- public void dropStat(String clusterName, final String statName) {
- if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) {
- throw new HelixException("cluster " + clusterName + " is not setup yet");
- }
-
- String persistentStatsPath =
- PropertyPathConfig.getPath(PropertyType.PERSISTENTSTATS, clusterName);
- ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
-
- baseAccessor.update(persistentStatsPath, new DataUpdater<ZNRecord>() {
-
- @Override
- public ZNRecord update(ZNRecord statsRec) {
- if (statsRec == null) {
- throw new HelixException("No stats record in ZK, nothing to drop");
- }
-
- Map<String, Map<String, String>> currStatMap = statsRec.getMapFields();
- Map<String, Map<String, String>> newStatMap = StatsHolder.parseStat(statName);
-
- // delete each stat from stat map
- for (String newStat : newStatMap.keySet()) {
- if (currStatMap.containsKey(newStat)) {
- currStatMap.remove(newStat);
- }
- }
- statsRec.setMapFields(currStatMap);
-
- return statsRec;
- }
- }, AccessOption.PERSISTENT);
- }
-
- @Override
- public void dropAlert(String clusterName, final String alertName) {
- if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) {
- throw new HelixException("cluster " + clusterName + " is not setup yet");
- }
-
- String alertsPath = PropertyPathConfig.getPath(PropertyType.ALERTS, clusterName);
-
- ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
-
- if (!baseAccessor.exists(alertsPath, 0)) {
- throw new HelixException("No alerts node in ZK, nothing to drop");
- }
-
- baseAccessor.update(alertsPath, new DataUpdater<ZNRecord>() {
- @Override
- public ZNRecord update(ZNRecord alertsRec) {
- if (alertsRec == null) {
- throw new HelixException("No alerts record in ZK, nothing to drop");
- }
-
- Map<String, Map<String, String>> currAlertMap = alertsRec.getMapFields();
- currAlertMap.remove(alertName);
- alertsRec.setMapFields(currAlertMap);
-
- return alertsRec;
- }
- }, AccessOption.PERSISTENT);
- }
-
- @Override
public void addClusterToGrandCluster(String clusterName, String grandCluster) {
if (!ZKUtil.isClusterSetup(grandCluster, _zkClient)) {
throw new HelixException("Grand cluster " + grandCluster + " is not setup yet");