You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2014/03/04 20:04:53 UTC
[6/7] [HELIX-395] Remove old Helix alert/stat modules
http://git-wip-us.apache.org/repos/asf/helix/blob/db4c10a2/helix-core/src/main/java/org/apache/helix/alerts/GreaterAlertComparator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/GreaterAlertComparator.java b/helix-core/src/main/java/org/apache/helix/alerts/GreaterAlertComparator.java
deleted file mode 100644
index 0e9c8f1..0000000
--- a/helix-core/src/main/java/org/apache/helix/alerts/GreaterAlertComparator.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package org.apache.helix.alerts;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Iterator;
-
-public class GreaterAlertComparator extends AlertComparator {
-
- @Override
- /*
- * Returns true if any element left tuple exceeds any element in right tuple
- */
- public boolean evaluate(Tuple<String> leftTup, Tuple<String> rightTup) {
- Iterator<String> leftIter = leftTup.iterator();
- while (leftIter.hasNext()) {
- double leftVal = Double.parseDouble(leftIter.next());
- Iterator<String> rightIter = rightTup.iterator();
- while (rightIter.hasNext()) {
- double rightVal = Double.parseDouble(rightIter.next());
- if (leftVal > rightVal) {
- return true;
- }
- }
- }
- return false;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/db4c10a2/helix-core/src/main/java/org/apache/helix/alerts/MultiplyOperator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/MultiplyOperator.java b/helix-core/src/main/java/org/apache/helix/alerts/MultiplyOperator.java
deleted file mode 100644
index 74a4688..0000000
--- a/helix-core/src/main/java/org/apache/helix/alerts/MultiplyOperator.java
+++ /dev/null
@@ -1,60 +0,0 @@
-package org.apache.helix.alerts;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-public class MultiplyOperator extends Operator {
-
- public MultiplyOperator() {
- minInputTupleLists = 1;
- maxInputTupleLists = Integer.MAX_VALUE;
- inputOutputTupleListsCountsEqual = false;
- numOutputTupleLists = 1;
- }
-
- public List<Iterator<Tuple<String>>> singleSetToIter(ArrayList<Tuple<String>> input) {
- List out = new ArrayList();
- out.add(input.iterator());
- return out;
- }
-
- @Override
- public List<Iterator<Tuple<String>>> execute(List<Iterator<Tuple<String>>> input) {
- ArrayList<Tuple<String>> output = new ArrayList<Tuple<String>>();
- if (input == null || input.size() == 0) {
- return singleSetToIter(output);
- }
- while (true) { // loop through set of iters, return when 1 runs out (not completing the row in
- // progress)
- Tuple<String> rowProduct = null;
- for (Iterator<Tuple<String>> it : input) {
- if (!it.hasNext()) { // when any iterator runs out, we are done
- return singleSetToIter(output);
- }
- rowProduct = multiplyTuples(rowProduct, it.next());
- }
- output.add(rowProduct);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/db4c10a2/helix-core/src/main/java/org/apache/helix/alerts/Operator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/Operator.java b/helix-core/src/main/java/org/apache/helix/alerts/Operator.java
deleted file mode 100644
index 0612cf3..0000000
--- a/helix-core/src/main/java/org/apache/helix/alerts/Operator.java
+++ /dev/null
@@ -1,111 +0,0 @@
-package org.apache.helix.alerts;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Iterator;
-import java.util.List;
-
-public abstract class Operator {
-
- public int minInputTupleLists;
- public int maxInputTupleLists;
- public int numOutputTupleLists = -1;
- public boolean inputOutputTupleListsCountsEqual = false;
-
- public Operator() {
-
- }
-
- public Tuple<String> multiplyTuples(Tuple<String> tup1, Tuple<String> tup2) {
- if (tup1 == null) {
- return tup2;
- }
- if (tup2 == null) {
- return tup1;
- }
- Tuple<String> outputTup = new Tuple<String>();
-
- // sum staggers if the tuples are same length
- // e.g. 1,2,3 + 4,5 = 1,6,8
- // so this is a bit tricky
- Tuple<String> largerTup;
- Tuple<String> smallerTup;
- if (tup1.size() >= tup2.size()) {
- largerTup = tup1;
- smallerTup = tup2;
- } else {
- largerTup = tup2;
- smallerTup = tup1;
- }
- int gap = largerTup.size() - smallerTup.size();
-
- for (int i = 0; i < largerTup.size(); i++) {
- if (i < gap) {
- outputTup.add(largerTup.getElement(i));
- } else {
- double elementProduct = 0;
- elementProduct =
- Double.parseDouble(largerTup.getElement(i))
- * Double.parseDouble(smallerTup.getElement(i - gap));
- outputTup.add(String.valueOf(elementProduct));
- }
- }
- return outputTup;
- }
-
- public Tuple<String> sumTuples(Tuple<String> tup1, Tuple<String> tup2) {
- if (tup1 == null) {
- return tup2;
- }
- if (tup2 == null) {
- return tup1;
- }
- Tuple<String> outputTup = new Tuple<String>();
-
- // sum staggers if the tuples are same length
- // e.g. 1,2,3 + 4,5 = 1,6,8
- // so this is a bit tricky
- Tuple<String> largerTup;
- Tuple<String> smallerTup;
- if (tup1.size() >= tup2.size()) {
- largerTup = tup1;
- smallerTup = tup2;
- } else {
- largerTup = tup2;
- smallerTup = tup1;
- }
- int gap = largerTup.size() - smallerTup.size();
-
- for (int i = 0; i < largerTup.size(); i++) {
- if (i < gap) {
- outputTup.add(largerTup.getElement(i));
- } else {
- double elementSum = 0;
- elementSum =
- Double.parseDouble(largerTup.getElement(i))
- + Double.parseDouble(smallerTup.getElement(i - gap));
- outputTup.add(String.valueOf(elementSum));
- }
- }
- return outputTup;
- }
-
- public abstract List<Iterator<Tuple<String>>> execute(List<Iterator<Tuple<String>>> input);
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/db4c10a2/helix-core/src/main/java/org/apache/helix/alerts/Stat.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/Stat.java b/helix-core/src/main/java/org/apache/helix/alerts/Stat.java
deleted file mode 100644
index 6895128..0000000
--- a/helix-core/src/main/java/org/apache/helix/alerts/Stat.java
+++ /dev/null
@@ -1,44 +0,0 @@
-package org.apache.helix.alerts;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-public class Stat {
- String _name;
- Tuple<String> _value;
- Tuple<String> _timestamp;
-
- public Stat(String name, Tuple<String> value, Tuple<String> timestamp) {
- _name = name;
- _value = value;
- _timestamp = timestamp;
- }
-
- public String getName() {
- return _name;
- }
-
- public Tuple<String> getValue() {
- return _value;
- }
-
- public Tuple<String> getTimestamp() {
- return _timestamp;
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/db4c10a2/helix-core/src/main/java/org/apache/helix/alerts/StatsHolder.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/StatsHolder.java b/helix-core/src/main/java/org/apache/helix/alerts/StatsHolder.java
deleted file mode 100644
index 1538eb8..0000000
--- a/helix-core/src/main/java/org/apache/helix/alerts/StatsHolder.java
+++ /dev/null
@@ -1,306 +0,0 @@
-package org.apache.helix.alerts;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixException;
-import org.apache.helix.HelixManager;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.controller.stages.HealthDataCache;
-import org.apache.helix.model.PersistentStats;
-import org.apache.log4j.Logger;
-
-public class StatsHolder {
- enum MatchResult {
- WILDCARDMATCH,
- EXACTMATCH,
- NOMATCH
- };
-
- private static final Logger logger = Logger.getLogger(StatsHolder.class.getName());
-
- public static final String VALUE_NAME = "value";
- public static final String TIMESTAMP_NAME = "TimeStamp";
-
- HelixDataAccessor _accessor;
- HealthDataCache _cache;
-
- Map<String, Map<String, String>> _statMap;
- Map<String, Map<String, MatchResult>> _statAlertMatchResult;
-
- private Builder _keyBuilder;
-
- // PersistentStats _persistentStats;
-
- public StatsHolder(HelixManager manager, HealthDataCache cache) {
- _accessor = manager.getHelixDataAccessor();
- _cache = cache;
- _keyBuilder = new PropertyKey.Builder(manager.getClusterName());
- updateCache(_cache);
- _statAlertMatchResult = new HashMap<String, Map<String, MatchResult>>();
-
- }
-
- public void refreshStats() {
- logger.info("Refreshing cached stats");
- _cache.refresh(_accessor);
- updateCache(_cache);
- }
-
- public void persistStats() {
- // XXX: Am I using _accessor too directly here?
- // took around 35 ms from desktop to ESV4 machine
- PersistentStats stats = _accessor.getProperty(_keyBuilder.persistantStat());
- if (stats == null) {
- stats = new PersistentStats(PersistentStats.nodeName); // TODO: fix naming of
- // this record, if it
- // matters
- }
- stats.getRecord().setMapFields(_statMap);
- boolean retVal = _accessor.setProperty(_keyBuilder.persistantStat(), stats);
- }
-
- public void getStatsFromCache(boolean refresh) {
- long refreshStartTime = System.currentTimeMillis();
- if (refresh) {
- _cache.refresh(_accessor);
- }
- PersistentStats persistentStatRecord = _cache.getPersistentStats();
- if (persistentStatRecord != null) {
- _statMap = persistentStatRecord.getMapFields();
- } else {
- _statMap = new HashMap<String, Map<String, String>>();
- }
- /*
- * if (_cache.getPersistentStats() != null) {
- * _statMap = _cache.getPersistentStats();
- * }
- */
- // TODO: confirm this a good place to init the _statMap when null
- /*
- * if (_statMap == null) {
- * _statMap = new HashMap<String, Map<String, String>>();
- * }
- */
- System.out.println("Refresh stats done: " + (System.currentTimeMillis() - refreshStartTime));
- }
-
- public Iterator<String> getAllStats() {
- return null;
- }
-
- /*
- * TODO: figure out pre-conditions here. I think not allowing anything to be
- * null on input
- */
- public Map<String, String> mergeStats(String statName, Map<String, String> existingStat,
- Map<String, String> incomingStat) throws HelixException {
- if (existingStat == null) {
- throw new HelixException("existing stat for merge is null");
- }
- if (incomingStat == null) {
- throw new HelixException("incoming stat for merge is null");
- }
- // get agg type and arguments, then get agg object
- String aggTypeStr = ExpressionParser.getAggregatorStr(statName);
- String[] aggArgs = ExpressionParser.getAggregatorArgs(statName);
- Aggregator agg = ExpressionParser.getAggregator(aggTypeStr);
- // XXX: some of below lines might fail with null exceptions
-
- // get timestamps, values out of zk maps
- String existingTime = existingStat.get(TIMESTAMP_NAME);
- String existingVal = existingStat.get(VALUE_NAME);
- String incomingTime = incomingStat.get(TIMESTAMP_NAME);
- String incomingVal = incomingStat.get(VALUE_NAME);
- // parse values into tuples, if the values exist. else, tuples are null
- Tuple<String> existingTimeTuple =
- (existingTime != null) ? Tuple.fromString(existingTime) : null;
- Tuple<String> existingValueTuple = (existingVal != null) ? Tuple.fromString(existingVal) : null;
- Tuple<String> incomingTimeTuple =
- (incomingTime != null) ? Tuple.fromString(incomingTime) : null;
- Tuple<String> incomingValueTuple = (incomingVal != null) ? Tuple.fromString(incomingVal) : null;
-
- // dp merge
- agg.merge(existingValueTuple, incomingValueTuple, existingTimeTuple, incomingTimeTuple, aggArgs);
- // put merged tuples back in map
- Map<String, String> mergedMap = new HashMap<String, String>();
- if (existingTimeTuple.size() == 0) {
- throw new HelixException("merged time tuple has size zero");
- }
- if (existingValueTuple.size() == 0) {
- throw new HelixException("merged value tuple has size zero");
- }
-
- mergedMap.put(TIMESTAMP_NAME, existingTimeTuple.toString());
- mergedMap.put(VALUE_NAME, existingValueTuple.toString());
- return mergedMap;
- }
-
- /*
- * Find all persisted stats this stat matches. Update those stats. An incoming
- * stat can match multiple stats exactly (if that stat has multiple agg types)
- * An incoming stat can match multiple wildcard stats
- */
-
- // need to do a time check here!
-
- public void applyStat(String incomingStatName, Map<String, String> statFields) {
- // TODO: consider locking stats here
- // refreshStats(); //will have refreshed by now during stage
-
- Map<String, Map<String, String>> pendingAdds = new HashMap<String, Map<String, String>>();
-
- if (!_statAlertMatchResult.containsKey(incomingStatName)) {
- _statAlertMatchResult.put(incomingStatName, new HashMap<String, MatchResult>());
- }
- Map<String, MatchResult> resultMap = _statAlertMatchResult.get(incomingStatName);
- // traverse through all persistent stats
- for (String key : _statMap.keySet()) {
- if (resultMap.containsKey(key)) {
- MatchResult cachedMatchResult = resultMap.get(key);
- if (cachedMatchResult == MatchResult.EXACTMATCH) {
- processExactMatch(key, statFields);
- } else if (cachedMatchResult == MatchResult.WILDCARDMATCH) {
- processWildcardMatch(incomingStatName, key, statFields, pendingAdds);
- }
- // don't care about NOMATCH
- continue;
- }
- // exact match on stat and stat portion of persisted stat, just update
- if (ExpressionParser.isIncomingStatExactMatch(key, incomingStatName)) {
- processExactMatch(key, statFields);
- resultMap.put(key, MatchResult.EXACTMATCH);
- }
- // wildcard match
- else if (ExpressionParser.isIncomingStatWildcardMatch(key, incomingStatName)) {
- processWildcardMatch(incomingStatName, key, statFields, pendingAdds);
- resultMap.put(key, MatchResult.WILDCARDMATCH);
- } else {
- resultMap.put(key, MatchResult.NOMATCH);
- }
- }
- _statMap.putAll(pendingAdds);
- }
-
- void processExactMatch(String key, Map<String, String> statFields) {
- Map<String, String> mergedStat = mergeStats(key, _statMap.get(key), statFields);
- // update in place, no problem with hash map
- _statMap.put(key, mergedStat);
- }
-
- void processWildcardMatch(String incomingStatName, String key, Map<String, String> statFields,
- Map<String, Map<String, String>> pendingAdds) {
-
- // make sure incoming stat doesn't already exist, either in previous
- // round or this round
- // form new key (incomingStatName with agg type from the wildcarded
- // stat)
- String statToAdd = ExpressionParser.getWildcardStatSubstitution(key, incomingStatName);
- // if the stat already existed in _statMap, we have/will apply it as an
- // exact match
- // if the stat was added this round to pendingAdds, no need to recreate
- // (it would have same value)
- if (!_statMap.containsKey(statToAdd) && !pendingAdds.containsKey(statToAdd)) {
- // add this stat to persisted stats
- Map<String, String> mergedStat = mergeStats(statToAdd, getEmptyStat(), statFields);
- // add to pendingAdds so we don't mess up ongoing traversal of
- // _statMap
- pendingAdds.put(statToAdd, mergedStat);
- }
- }
-
- // add parsing of stat (or is that in expression holder?) at least add
- // validate
- public void addStat(String exp) throws HelixException {
- refreshStats(); // get current stats
-
- String[] parsedStats = ExpressionParser.getBaseStats(exp);
-
- for (String stat : parsedStats) {
- if (_statMap.containsKey(stat)) {
- logger.debug("Stat " + stat + " already exists; not adding");
- continue;
- }
- _statMap.put(stat, getEmptyStat()); // add new stat to map
- }
- }
-
- public static Map<String, Map<String, String>> parseStat(String exp) throws HelixException {
- String[] parsedStats = ExpressionParser.getBaseStats(exp);
- Map<String, Map<String, String>> statMap = new HashMap<String, Map<String, String>>();
-
- for (String stat : parsedStats) {
- if (statMap.containsKey(stat)) {
- logger.debug("Stat " + stat + " already exists; not adding");
- continue;
- }
- statMap.put(stat, getEmptyStat()); // add new stat to map
- }
- return statMap;
- }
-
- public static Map<String, String> getEmptyStat() {
- Map<String, String> statFields = new HashMap<String, String>();
- statFields.put(TIMESTAMP_NAME, "");
- statFields.put(VALUE_NAME, "");
- return statFields;
- }
-
- public List<Stat> getStatsList() {
- List<Stat> stats = new LinkedList<Stat>();
- for (String stat : _statMap.keySet()) {
- Map<String, String> statFields = _statMap.get(stat);
- Tuple<String> valTup = Tuple.fromString(statFields.get(VALUE_NAME));
- Tuple<String> timeTup = Tuple.fromString(statFields.get(TIMESTAMP_NAME));
- Stat s = new Stat(stat, valTup, timeTup);
- stats.add(s);
- }
- return stats;
- }
-
- public Map<String, Tuple<String>> getStatsMap() {
- // refreshStats(); //don't refresh, stage will have refreshed by this time
- HashMap<String, Tuple<String>> stats = new HashMap<String, Tuple<String>>();
- for (String stat : _statMap.keySet()) {
- Map<String, String> statFields = _statMap.get(stat);
- Tuple<String> valTup = Tuple.fromString(statFields.get(VALUE_NAME));
- Tuple<String> timeTup = Tuple.fromString(statFields.get(TIMESTAMP_NAME));
- stats.put(stat, valTup);
- }
- return stats;
- }
-
- public void updateCache(HealthDataCache cache) {
- _cache = cache;
- PersistentStats persistentStatRecord = _cache.getPersistentStats();
- if (persistentStatRecord != null) {
- _statMap = persistentStatRecord.getMapFields();
- } else {
- _statMap = new HashMap<String, Map<String, String>>();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/db4c10a2/helix-core/src/main/java/org/apache/helix/alerts/SumEachOperator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/SumEachOperator.java b/helix-core/src/main/java/org/apache/helix/alerts/SumEachOperator.java
deleted file mode 100644
index 2cc733f..0000000
--- a/helix-core/src/main/java/org/apache/helix/alerts/SumEachOperator.java
+++ /dev/null
@@ -1,51 +0,0 @@
-package org.apache.helix.alerts;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-public class SumEachOperator extends Operator {
-
- public SumEachOperator() {
- minInputTupleLists = 1;
- maxInputTupleLists = Integer.MAX_VALUE;
- inputOutputTupleListsCountsEqual = true;
- numOutputTupleLists = -1;
- }
-
- // for each column, generate sum
- @Override
- public List<Iterator<Tuple<String>>> execute(List<Iterator<Tuple<String>>> input) {
- List<Iterator<Tuple<String>>> out = new ArrayList<Iterator<Tuple<String>>>();
- for (Iterator<Tuple<String>> currIt : input) {
- Tuple<String> currSum = null;
- while (currIt.hasNext()) {
- currSum = sumTuples(currSum, currIt.next());
- }
- ArrayList<Tuple<String>> currOutList = new ArrayList<Tuple<String>>();
- currOutList.add(currSum);
- out.add(currOutList.iterator());
- }
- return out;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/db4c10a2/helix-core/src/main/java/org/apache/helix/alerts/SumOperator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/SumOperator.java b/helix-core/src/main/java/org/apache/helix/alerts/SumOperator.java
deleted file mode 100644
index 90c9ab0..0000000
--- a/helix-core/src/main/java/org/apache/helix/alerts/SumOperator.java
+++ /dev/null
@@ -1,59 +0,0 @@
-package org.apache.helix.alerts;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-public class SumOperator extends Operator {
-
- public SumOperator() {
- minInputTupleLists = 1;
- maxInputTupleLists = Integer.MAX_VALUE;
- inputOutputTupleListsCountsEqual = false;
- numOutputTupleLists = 1;
- }
-
- public List<Iterator<Tuple<String>>> singleSetToIter(ArrayList<Tuple<String>> input) {
- List out = new ArrayList();
- out.add(input.iterator());
- return out;
- }
-
- @Override
- public List<Iterator<Tuple<String>>> execute(List<Iterator<Tuple<String>>> input) {
- ArrayList<Tuple<String>> output = new ArrayList<Tuple<String>>();
- if (input == null || input.size() == 0) {
- return singleSetToIter(output);
- }
- while (true) { // loop through set of iters, return when 1 runs out (not completing the row in
- // progress)
- Tuple<String> rowSum = null;
- for (Iterator<Tuple<String>> it : input) {
- if (!it.hasNext()) { // when any iterator runs out, we are done
- return singleSetToIter(output);
- }
- rowSum = sumTuples(rowSum, it.next());
- }
- output.add(rowSum);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/db4c10a2/helix-core/src/main/java/org/apache/helix/alerts/Tuple.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/Tuple.java b/helix-core/src/main/java/org/apache/helix/alerts/Tuple.java
deleted file mode 100644
index e57f088..0000000
--- a/helix-core/src/main/java/org/apache/helix/alerts/Tuple.java
+++ /dev/null
@@ -1,85 +0,0 @@
-package org.apache.helix.alerts;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-public class Tuple<T> {
- List<T> elements;
-
- public Tuple() {
- elements = new ArrayList<T>();
- }
-
- public int size() {
- return elements.size();
- }
-
- public void add(T entry) {
- elements.add(entry);
- }
-
- public void addAll(Tuple<T> incoming) {
- elements.addAll(incoming.getElements());
- }
-
- public Iterator<T> iterator() {
- return elements.listIterator();
- }
-
- public T getElement(int ind) {
- return elements.get(ind);
- }
-
- public List<T> getElements() {
- return elements;
- }
-
- public void clear() {
- elements.clear();
- }
-
- public static Tuple<String> fromString(String in) {
- Tuple<String> tup = new Tuple<String>();
- if (in.length() > 0) {
- String[] elements = in.split(",");
- for (String element : elements) {
- tup.add(element);
- }
- }
- return tup;
- }
-
- public String toString() {
- StringBuilder out = new StringBuilder();
- Iterator<T> it = iterator();
- boolean outEmpty = true;
- while (it.hasNext()) {
- if (!outEmpty) {
- out.append(",");
- }
- out.append(it.next());
- outEmpty = false;
- }
- return out.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/db4c10a2/helix-core/src/main/java/org/apache/helix/alerts/WindowAggregator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/WindowAggregator.java b/helix-core/src/main/java/org/apache/helix/alerts/WindowAggregator.java
deleted file mode 100644
index 6ef4cfe..0000000
--- a/helix-core/src/main/java/org/apache/helix/alerts/WindowAggregator.java
+++ /dev/null
@@ -1,91 +0,0 @@
-package org.apache.helix.alerts;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Iterator;
-
-public class WindowAggregator extends Aggregator {
-
- int _windowSize;
-
- public WindowAggregator(String windowSize) {
- _windowSize = Integer.parseInt(windowSize);
- _numArgs = 1;
- }
-
- public WindowAggregator() {
- this("1");
- }
-
- @Override
- public void merge(Tuple<String> currValTup, Tuple<String> newValTup, Tuple<String> currTimeTup,
- Tuple<String> newTimeTup, String... args) {
-
- _windowSize = Integer.parseInt(args[0]);
-
- // figure out how many curr tuple values we displace
- Tuple<String> mergedTimeTuple = new Tuple<String>();
- Tuple<String> mergedValTuple = new Tuple<String>();
-
- Iterator<String> currTimeIter = currTimeTup.iterator();
- Iterator<String> currValIter = currValTup.iterator();
- Iterator<String> newTimeIter = newTimeTup.iterator();
- Iterator<String> newValIter = newValTup.iterator();
- int currCtr = 0;
- // traverse current vals
- double currTime = -1;
- double currVal;
- while (currTimeIter.hasNext()) {
- currTime = Double.parseDouble(currTimeIter.next());
- currVal = Double.parseDouble(currValIter.next());
- currCtr++;
- // number of evicted currVals equal to total size of both minus _windowSize
- if (currCtr > (newTimeTup.size() + currTimeTup.size() - _windowSize)) { // non-evicted
- // element, just bump
- // down
- mergedTimeTuple.add(String.valueOf(currTime));
- mergedValTuple.add(String.valueOf(currVal));
- }
- }
-
- double newVal;
- double newTime;
- while (newTimeIter.hasNext()) {
- newVal = Double.parseDouble(newValIter.next());
- newTime = Double.parseDouble(newTimeIter.next());
- if (newTime <= currTime) { // oldest new time older than newest curr time. we will not apply
- // new tuple!
- return; // curr tuples remain the same
- }
- currCtr++;
- if (currCtr > (newTimeTup.size() + currTimeTup.size() - _windowSize)) { // non-evicted element
- mergedTimeTuple.add(String.valueOf(newTime));
- mergedValTuple.add(String.valueOf(newVal));
- }
- }
- // set curr tuples to merged tuples
- currTimeTup.clear();
- currTimeTup.addAll(mergedTimeTuple);
- currValTup.clear();
- currValTup.addAll(mergedValTuple);
- // TODO: see if we can do merger in place on curr
- }
-
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/db4c10a2/helix-core/src/main/java/org/apache/helix/alerts/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/package-info.java b/helix-core/src/main/java/org/apache/helix/alerts/package-info.java
deleted file mode 100644
index bf1d9a6..0000000
--- a/helix-core/src/main/java/org/apache/helix/alerts/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/**
- * Classes for Helix alerts
- */
-package org.apache.helix.alerts;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/helix/blob/db4c10a2/helix-core/src/main/java/org/apache/helix/api/Cluster.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Cluster.java b/helix-core/src/main/java/org/apache/helix/api/Cluster.java
index fdeb879..856c09b 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Cluster.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Cluster.java
@@ -32,10 +32,8 @@ import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.SpectatorId;
import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.model.Alerts;
import org.apache.helix.model.ClusterConstraints;
import org.apache.helix.model.ClusterConstraints.ConstraintType;
-import org.apache.helix.model.PersistentStats;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.model.Transition;
@@ -95,8 +93,8 @@ public class Cluster {
public Cluster(ClusterId id, Map<ResourceId, Resource> resourceMap,
Map<ParticipantId, Participant> participantMap, Map<ControllerId, Controller> controllerMap,
ControllerId leaderId, Map<ConstraintType, ClusterConstraints> constraintMap,
- Map<StateModelDefId, StateModelDefinition> stateModelMap, PersistentStats stats,
- Alerts alerts, UserConfig userConfig, boolean isPaused, boolean autoJoinAllowed) {
+ Map<StateModelDefId, StateModelDefinition> stateModelMap, UserConfig userConfig,
+ boolean isPaused, boolean autoJoinAllowed) {
// build the config
// Guava's transform and "copy" functions really return views so the maps all reflect each other
@@ -118,8 +116,7 @@ public class Cluster {
new ClusterConfig.Builder(id).addResources(resourceConfigMap.values())
.addParticipants(participantConfigMap.values()).addConstraints(constraintMap.values())
.addStateModelDefinitions(stateModelMap.values()).pausedStatus(isPaused)
- .userConfig(userConfig).autoJoin(autoJoinAllowed).addStats(stats).addAlerts(alerts)
- .build();
+ .userConfig(userConfig).autoJoin(autoJoinAllowed).build();
_resourceMap = ImmutableMap.copyOf(resourceMap);
@@ -224,22 +221,6 @@ public class Cluster {
}
/**
- * Get all the persisted stats for the cluster
- * @return PersistentStats instance
- */
- public PersistentStats getStats() {
- return _config.getStats();
- }
-
- /**
- * Get all the persisted alerts for the cluster
- * @return Alerts instance
- */
- public Alerts getAlerts() {
- return _config.getAlerts();
- }
-
- /**
* Get user-specified configuration properties of this cluster
* @return UserConfig properties
*/
http://git-wip-us.apache.org/repos/asf/helix/blob/db4c10a2/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
index 1195ce2..f08d857 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
@@ -25,15 +25,10 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.I0Itec.zkclient.DataUpdater;
import org.apache.helix.AccessOption;
import org.apache.helix.BaseDataAccessor;
import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixException;
import org.apache.helix.PropertyKey;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.alerts.AlertsHolder;
-import org.apache.helix.alerts.StatsHolder;
import org.apache.helix.api.Cluster;
import org.apache.helix.api.Controller;
import org.apache.helix.api.Participant;
@@ -53,7 +48,6 @@ import org.apache.helix.api.id.SessionId;
import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
import org.apache.helix.controller.rebalancer.context.RebalancerContext;
-import org.apache.helix.model.Alerts;
import org.apache.helix.model.ClusterConfiguration;
import org.apache.helix.model.ClusterConstraints;
import org.apache.helix.model.ClusterConstraints.ConstraintType;
@@ -65,7 +59,6 @@ import org.apache.helix.model.Leader;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
import org.apache.helix.model.PauseSignal;
-import org.apache.helix.model.PersistentStats;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.ResourceConfiguration;
import org.apache.helix.model.StateModelDefinition;
@@ -125,9 +118,6 @@ public class ClusterAccessor {
if (cluster.autoJoinAllowed()) {
clusterConfig.setAutoJoinAllowed(cluster.autoJoinAllowed());
}
- if (cluster.getStats() != null && !cluster.getStats().getMapFields().isEmpty()) {
- _accessor.setProperty(_keyBuilder.persistantStat(), cluster.getStats());
- }
if (cluster.isPaused()) {
pauseCluster();
}
@@ -169,16 +159,6 @@ public class ClusterAccessor {
ClusterConstraints constraint = constraints.get(type);
_accessor.setProperty(_keyBuilder.constraint(type.toString()), constraint);
}
- if (config.getStats() == null || config.getStats().getMapFields().isEmpty()) {
- _accessor.removeProperty(_keyBuilder.persistantStat());
- } else {
- _accessor.setProperty(_keyBuilder.persistantStat(), config.getStats());
- }
- if (config.getAlerts() == null || config.getAlerts().getMapFields().isEmpty()) {
- _accessor.removeProperty(_keyBuilder.alerts());
- } else {
- _accessor.setProperty(_keyBuilder.alerts(), config.getAlerts());
- }
return true;
}
@@ -261,15 +241,9 @@ public class ClusterAccessor {
// read the state model definitions
Map<StateModelDefId, StateModelDefinition> stateModelMap = readStateModelDefinitions();
- // read the stats
- PersistentStats stats = _accessor.getProperty(_keyBuilder.persistantStat());
-
- // read the alerts
- Alerts alerts = _accessor.getProperty(_keyBuilder.alerts());
-
// create the cluster snapshot object
return new Cluster(_clusterId, resourceMap, participantMap, controllerMap, leaderId,
- clusterConstraintMap, stateModelMap, stats, alerts, userConfig, isPaused, autoJoinAllowed);
+ clusterConstraintMap, stateModelMap, userConfig, isPaused, autoJoinAllowed);
}
/**
@@ -452,144 +426,6 @@ public class ClusterAccessor {
}
/**
- * Get the stats persisted on this cluster
- * @return PersistentStats, or null if none persisted
- */
- public PersistentStats readStats() {
- return _accessor.getProperty(_keyBuilder.persistantStat());
- }
-
- /**
- * Add a statistic specification to the cluster. Existing stat specifications will not be
- * overwritten
- * @param statName string representing a stat specification
- * @return true if the stat spec was added, false otherwise
- */
- public boolean addStat(final String statName) {
- if (!isClusterStructureValid()) {
- LOG.error("cluster " + _clusterId + " is not setup yet");
- return false;
- }
-
- String persistentStatsPath = _keyBuilder.persistantStat().getPath();
- BaseDataAccessor<ZNRecord> baseAccessor = _accessor.getBaseDataAccessor();
- return baseAccessor.update(persistentStatsPath, new DataUpdater<ZNRecord>() {
- @Override
- public ZNRecord update(ZNRecord statsRec) {
- if (statsRec == null) {
- statsRec = new ZNRecord(PersistentStats.nodeName);
- }
- Map<String, Map<String, String>> currStatMap = statsRec.getMapFields();
- Map<String, Map<String, String>> newStatMap = StatsHolder.parseStat(statName);
- for (String newStat : newStatMap.keySet()) {
- if (!currStatMap.containsKey(newStat)) {
- currStatMap.put(newStat, newStatMap.get(newStat));
- }
- }
- statsRec.setMapFields(currStatMap);
- return statsRec;
- }
- }, AccessOption.PERSISTENT);
- }
-
- /**
- * Remove a statistic specification from the cluster
- * @param statName string representing a statistic specification
- * @return true if stats removed, false otherwise
- */
- public boolean dropStat(final String statName) {
- if (!isClusterStructureValid()) {
- LOG.error("cluster " + _clusterId + " is not setup yet");
- return false;
- }
-
- String persistentStatsPath = _keyBuilder.persistantStat().getPath();
- BaseDataAccessor<ZNRecord> baseAccessor = _accessor.getBaseDataAccessor();
- return baseAccessor.update(persistentStatsPath, new DataUpdater<ZNRecord>() {
- @Override
- public ZNRecord update(ZNRecord statsRec) {
- if (statsRec == null) {
- throw new HelixException("No stats record in ZK, nothing to drop");
- }
- Map<String, Map<String, String>> currStatMap = statsRec.getMapFields();
- Map<String, Map<String, String>> newStatMap = StatsHolder.parseStat(statName);
- // delete each stat from stat map
- for (String newStat : newStatMap.keySet()) {
- if (currStatMap.containsKey(newStat)) {
- currStatMap.remove(newStat);
- }
- }
- statsRec.setMapFields(currStatMap);
- return statsRec;
- }
- }, AccessOption.PERSISTENT);
- }
-
- /**
- * Add an alert specification to the cluster
- * @param alertName string representing the alert spec
- * @return true if added, false otherwise
- */
- public boolean addAlert(final String alertName) {
- if (!isClusterStructureValid()) {
- LOG.error("cluster " + _clusterId + " is not setup yet");
- return false;
- }
-
- BaseDataAccessor<ZNRecord> baseAccessor = _accessor.getBaseDataAccessor();
- String alertsPath = _keyBuilder.alerts().getPath();
- return baseAccessor.update(alertsPath, new DataUpdater<ZNRecord>() {
- @Override
- public ZNRecord update(ZNRecord alertsRec) {
- if (alertsRec == null) {
- alertsRec = new ZNRecord(Alerts.nodeName);
- }
- Map<String, Map<String, String>> currAlertMap = alertsRec.getMapFields();
- StringBuilder newStatName = new StringBuilder();
- Map<String, String> newAlertMap = new HashMap<String, String>();
-
- // use AlertsHolder to get map of new stats and map for this alert
- AlertsHolder.parseAlert(alertName, newStatName, newAlertMap);
-
- // add stat
- addStat(newStatName.toString());
-
- // add alert
- currAlertMap.put(alertName, newAlertMap);
- alertsRec.setMapFields(currAlertMap);
- return alertsRec;
- }
- }, AccessOption.PERSISTENT);
- }
-
- /**
- * Remove an alert specification from the cluster
- * @param alertName string representing an alert specification
- * @return true if removed, false otherwise
- */
- public boolean dropAlert(final String alertName) {
- if (!isClusterStructureValid()) {
- LOG.error("cluster " + _clusterId + " is not setup yet");
- return false;
- }
-
- String alertsPath = _keyBuilder.alerts().getPath();
- BaseDataAccessor<ZNRecord> baseAccessor = _accessor.getBaseDataAccessor();
- return baseAccessor.update(alertsPath, new DataUpdater<ZNRecord>() {
- @Override
- public ZNRecord update(ZNRecord alertsRec) {
- if (alertsRec == null) {
- throw new HelixException("No alerts record persisted, nothing to drop");
- }
- Map<String, Map<String, String>> currAlertMap = alertsRec.getMapFields();
- currAlertMap.remove(alertName);
- alertsRec.setMapFields(currAlertMap);
- return alertsRec;
- }
- }, AccessOption.PERSISTENT);
- }
-
- /**
* Add user configuration to the existing cluster user configuration. Overwrites properties with
* the same key
* @param userConfig the user config key-value pairs to add
http://git-wip-us.apache.org/repos/asf/helix/blob/db4c10a2/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java
index 22a1528..4672280 100644
--- a/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java
@@ -5,8 +5,6 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Set;
-import org.apache.helix.alerts.AlertsHolder;
-import org.apache.helix.alerts.StatsHolder;
import org.apache.helix.api.Scope;
import org.apache.helix.api.State;
import org.apache.helix.api.id.ClusterId;
@@ -14,14 +12,12 @@ import org.apache.helix.api.id.ConstraintId;
import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.model.Alerts;
import org.apache.helix.model.ClusterConstraints;
import org.apache.helix.model.ClusterConstraints.ConstraintAttribute;
import org.apache.helix.model.ClusterConstraints.ConstraintType;
import org.apache.helix.model.ClusterConstraints.ConstraintValue;
import org.apache.helix.model.ConstraintItem;
import org.apache.helix.model.Message.MessageType;
-import org.apache.helix.model.PersistentStats;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.model.Transition;
import org.apache.helix.model.builder.ConstraintItemBuilder;
@@ -61,8 +57,6 @@ public class ClusterConfig {
private final Map<ParticipantId, ParticipantConfig> _participantMap;
private final Map<ConstraintType, ClusterConstraints> _constraintMap;
private final Map<StateModelDefId, StateModelDefinition> _stateModelMap;
- private final PersistentStats _stats;
- private final Alerts _alerts;
private final UserConfig _userConfig;
private final boolean _isPaused;
private final boolean _autoJoin;
@@ -83,15 +77,13 @@ public class ClusterConfig {
private ClusterConfig(ClusterId id, Map<ResourceId, ResourceConfig> resourceMap,
Map<ParticipantId, ParticipantConfig> participantMap,
Map<ConstraintType, ClusterConstraints> constraintMap,
- Map<StateModelDefId, StateModelDefinition> stateModelMap, PersistentStats stats,
- Alerts alerts, UserConfig userConfig, boolean isPaused, boolean allowAutoJoin) {
+ Map<StateModelDefId, StateModelDefinition> stateModelMap, UserConfig userConfig,
+ boolean isPaused, boolean allowAutoJoin) {
_id = id;
_resourceMap = ImmutableMap.copyOf(resourceMap);
_participantMap = ImmutableMap.copyOf(participantMap);
_constraintMap = ImmutableMap.copyOf(constraintMap);
_stateModelMap = ImmutableMap.copyOf(stateModelMap);
- _stats = stats;
- _alerts = alerts;
_userConfig = userConfig;
_isPaused = isPaused;
_autoJoin = allowAutoJoin;
@@ -237,22 +229,6 @@ public class ClusterConfig {
}
/**
- * Get all the statistics persisted on the cluster
- * @return PersistentStats instance
- */
- public PersistentStats getStats() {
- return _stats;
- }
-
- /**
- * Get all the alerts persisted on the cluster
- * @return Alerts instance
- */
- public Alerts getAlerts() {
- return _alerts;
- }
-
- /**
* Get user-specified configuration properties of this cluster
* @return UserConfig properties
*/
@@ -287,8 +263,6 @@ public class ClusterConfig {
private Set<Fields> _updateFields;
private Map<ConstraintType, Set<ConstraintId>> _removedConstraints;
- private PersistentStats _removedStats;
- private Alerts _removedAlerts;
private Builder _builder;
/**
@@ -302,8 +276,6 @@ public class ClusterConfig {
Set<ConstraintId> constraints = Sets.newHashSet();
_removedConstraints.put(type, constraints);
}
- _removedStats = new PersistentStats(PersistentStats.nodeName);
- _removedAlerts = new Alerts(Alerts.nodeName);
_builder = new Builder(clusterId);
}
@@ -431,57 +403,6 @@ public class ClusterConfig {
}
/**
- * Add a statistic specification to the cluster. Existing specifications will not be overwritten
- * @param stat string specifying the stat specification
- * @return Delta
- */
- public Delta addStat(String stat) {
- _builder.addStat(stat);
- return this;
- }
-
- /**
- * Add an alert specification for the cluster. Existing specifications will not be overwritten
- * @param alert string specifying the alert specification
- * @return Delta
- */
- public Delta addAlert(String alert) {
- _builder.addAlert(alert);
- return this;
- }
-
- /**
- * Remove a statistic specification from the cluster
- * @param stat statistic specification
- * @return Delta
- */
- public Delta removeStat(String stat) {
- Map<String, Map<String, String>> parsedStat = StatsHolder.parseStat(stat);
- Map<String, Map<String, String>> currentStats = _removedStats.getMapFields();
- for (String statName : parsedStat.keySet()) {
- currentStats.put(statName, parsedStat.get(statName));
- }
- return this;
- }
-
- /**
- * Remove an alert specification for the cluster
- * @param alert alert specification
- * @return Delta
- */
- public Delta removeAlert(String alert) {
- Map<String, Map<String, String>> currAlertMap = _removedAlerts.getMapFields();
- if (!currAlertMap.containsKey(alert)) {
- Map<String, String> parsedAlert = Maps.newHashMap();
- StringBuilder statsName = new StringBuilder();
- AlertsHolder.parseAlert(alert, statsName, parsedAlert);
- removeStat(statsName.toString());
- currAlertMap.put(alert, parsedAlert);
- }
- return this;
- }
-
- /**
* Create a ClusterConfig that is the combination of an existing ClusterConfig and this delta
* @param orig the original ClusterConfig
* @return updated ClusterConfig
@@ -494,8 +415,7 @@ public class ClusterConfig {
.addParticipants(orig.getParticipantMap().values())
.addStateModelDefinitions(orig.getStateModelMap().values())
.userConfig(orig.getUserConfig()).pausedStatus(orig.isPaused())
- .autoJoin(orig.autoJoinAllowed()).addStats(orig.getStats())
- .addAlerts(orig.getAlerts());
+ .autoJoin(orig.autoJoinAllowed());
for (Fields field : _updateFields) {
switch (field) {
case USER_CONFIG:
@@ -529,28 +449,9 @@ public class ClusterConfig {
builder.addConstraint(constraints);
}
- // add stats and alerts
- builder.addStats(deltaConfig.getStats());
- builder.addAlerts(deltaConfig.getAlerts());
-
// get the result
ClusterConfig result = builder.build();
- // remove stats
- PersistentStats stats = result.getStats();
- for (String removedStat : _removedStats.getMapFields().keySet()) {
- if (stats.getMapFields().containsKey(removedStat)) {
- stats.getMapFields().remove(removedStat);
- }
- }
-
- // remove alerts
- Alerts alerts = result.getAlerts();
- for (String removedAlert : _removedAlerts.getMapFields().keySet()) {
- if (alerts.getMapFields().containsKey(removedAlert)) {
- alerts.getMapFields().remove(removedAlert);
- }
- }
return result;
}
}
@@ -565,8 +466,6 @@ public class ClusterConfig {
private final Map<ConstraintType, ClusterConstraints> _constraintMap;
private final Map<StateModelDefId, StateModelDefinition> _stateModelMap;
private UserConfig _userConfig;
- private PersistentStats _stats;
- private Alerts _alerts;
private boolean _isPaused;
private boolean _autoJoin;
@@ -583,8 +482,6 @@ public class ClusterConfig {
_isPaused = false;
_autoJoin = false;
_userConfig = new UserConfig(Scope.cluster(id));
- _stats = new PersistentStats(PersistentStats.nodeName);
- _alerts = new Alerts(Alerts.nodeName);
}
/**
@@ -789,74 +686,6 @@ public class ClusterConfig {
}
/**
- * Add a statistic specification to the cluster. Existing specifications will not be overwritten
- * @param stat String specifying the stat specification
- * @return Builder
- */
- public Builder addStat(String stat) {
- Map<String, Map<String, String>> parsedStat = StatsHolder.parseStat(stat);
- Map<String, Map<String, String>> currentStats = _stats.getMapFields();
- for (String statName : parsedStat.keySet()) {
- if (!currentStats.containsKey(statName)) {
- currentStats.put(statName, parsedStat.get(statName));
- }
- }
- return this;
- }
-
- /**
- * Add statistic specifications to the cluster. Existing specifications will not be overwritten
- * @param stats PersistentStats specifying the stat specification
- * @return Builder
- */
- public Builder addStats(PersistentStats stats) {
- if (stats == null) {
- return this;
- }
- Map<String, Map<String, String>> parsedStat = stats.getMapFields();
- Map<String, Map<String, String>> currentStats = _stats.getMapFields();
- for (String statName : parsedStat.keySet()) {
- if (!currentStats.containsKey(statName)) {
- currentStats.put(statName, parsedStat.get(statName));
- }
- }
- return this;
- }
-
- /**
- * Add alert specifications to the cluster. Existing specifications will not be overwritten
- * @param alert string representing alert specifications
- * @return Builder
- */
- public Builder addAlert(String alert) {
- Map<String, Map<String, String>> currAlertMap = _alerts.getMapFields();
- if (!currAlertMap.containsKey(alert)) {
- Map<String, String> parsedAlert = Maps.newHashMap();
- StringBuilder statsName = new StringBuilder();
- AlertsHolder.parseAlert(alert, statsName, parsedAlert);
- addStat(statsName.toString());
- currAlertMap.put(alert, parsedAlert);
- }
- return this;
- }
-
- /**
- * Add alert specifications to the cluster. Existing specifications will not be overwritten
- * @param alerts Alerts instance
- * @return Builder
- */
- public Builder addAlerts(Alerts alerts) {
- if (alerts == null) {
- return this;
- }
- Map<String, Map<String, String>> alertMap = alerts.getMapFields();
- for (String alert : alertMap.keySet()) {
- addAlert(alert);
- }
- return this;
- }
-
- /**
* Set the paused status of the cluster
* @param isPaused true if paused, false otherwise
* @return Builder
@@ -892,7 +721,7 @@ public class ClusterConfig {
*/
public ClusterConfig build() {
return new ClusterConfig(_id, _resourceMap, _participantMap, _constraintMap, _stateModelMap,
- _stats, _alerts, _userConfig, _isPaused, _autoJoin);
+ _userConfig, _isPaused, _autoJoin);
}
/**
http://git-wip-us.apache.org/repos/asf/helix/blob/db4c10a2/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index ab51670..b9527e6 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -32,7 +32,6 @@ import org.apache.helix.ConfigChangeListener;
import org.apache.helix.ControllerChangeListener;
import org.apache.helix.CurrentStateChangeListener;
import org.apache.helix.ExternalViewChangeListener;
-import org.apache.helix.HealthStateChangeListener;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.IdealStateChangeListener;
@@ -58,7 +57,6 @@ import org.apache.helix.controller.stages.TaskAssignmentStage;
import org.apache.helix.controller.stages.PersistAssignmentStage;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.ExternalView;
-import org.apache.helix.model.HealthStat;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.Leader;
@@ -84,7 +82,7 @@ import org.apache.log4j.Logger;
*/
public class GenericHelixController implements ConfigChangeListener, IdealStateChangeListener,
LiveInstanceChangeListener, MessageListener, CurrentStateChangeListener,
- ExternalViewChangeListener, ControllerChangeListener, HealthStateChangeListener {
+ ExternalViewChangeListener, ControllerChangeListener {
private static final Logger logger = Logger.getLogger(GenericHelixController.class.getName());
volatile boolean init = false;
private final PipelineRegistry _registry;
@@ -311,16 +309,6 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
}
@Override
- public void onHealthChange(String instanceName, List<HealthStat> reports,
- NotificationContext changeContext) {
- /**
- * When there are more participant ( > 20, can be in hundreds), This callback can be
- * called quite frequently as each participant reports health stat every minute. Thus
- * we change the health check pipeline to run in a timer callback.
- */
- }
-
- @Override
public void onMessage(String instanceName, List<Message> messages,
NotificationContext changeContext) {
logger.info("START: GenericClusterController.onMessage()");
http://git-wip-us.apache.org/repos/asf/helix/blob/db4c10a2/helix-core/src/main/java/org/apache/helix/controller/stages/HealthDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/HealthDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/HealthDataCache.java
deleted file mode 100644
index 6b29e2d..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/HealthDataCache.java
+++ /dev/null
@@ -1,95 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.model.AlertStatus;
-import org.apache.helix.model.Alerts;
-import org.apache.helix.model.HealthStat;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.PersistentStats;
-
-public class HealthDataCache {
- Map<String, LiveInstance> _liveInstanceMap;
-
- Map<String, Map<String, HealthStat>> _healthStatMap;
- HealthStat _globalStats; // DON'T THINK I WILL USE THIS ANYMORE
- PersistentStats _persistentStats;
- Alerts _alerts;
- AlertStatus _alertStatus;
-
- public HealthStat getGlobalStats() {
- return _globalStats;
- }
-
- public PersistentStats getPersistentStats() {
- return _persistentStats;
- }
-
- public Alerts getAlerts() {
- return _alerts;
- }
-
- public AlertStatus getAlertStatus() {
- return _alertStatus;
- }
-
- public Map<String, HealthStat> getHealthStats(String instanceName) {
- Map<String, HealthStat> map = _healthStatMap.get(instanceName);
- if (map != null) {
- return map;
- } else {
- return Collections.emptyMap();
- }
- }
-
- public Map<String, LiveInstance> getLiveInstances() {
- return _liveInstanceMap;
- }
-
- public boolean refresh(HelixDataAccessor accessor) {
- Builder keyBuilder = accessor.keyBuilder();
- _liveInstanceMap = accessor.getChildValuesMap(keyBuilder.liveInstances());
-
- Map<String, Map<String, HealthStat>> hsMap = new HashMap<String, Map<String, HealthStat>>();
-
- for (String instanceName : _liveInstanceMap.keySet()) {
- // xxx clearly getting znodes for the instance here...so get the
- // timestamp!
-
- Map<String, HealthStat> childValuesMap =
- accessor.getChildValuesMap(keyBuilder.healthReports(instanceName));
- hsMap.put(instanceName, childValuesMap);
- }
- _healthStatMap = Collections.unmodifiableMap(hsMap);
- _persistentStats = accessor.getProperty(keyBuilder.persistantStat());
- _alerts = accessor.getProperty(keyBuilder.alerts());
- _alertStatus = accessor.getProperty(keyBuilder.alertStatus());
-
- return true;
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/db4c10a2/helix-core/src/main/java/org/apache/helix/controller/stages/ReadHealthDataStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadHealthDataStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadHealthDataStage.java
deleted file mode 100644
index 859c1d0..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadHealthDataStage.java
+++ /dev/null
@@ -1,51 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixManager;
-import org.apache.helix.controller.pipeline.AbstractBaseStage;
-import org.apache.helix.controller.pipeline.StageException;
-
-public class ReadHealthDataStage extends AbstractBaseStage {
- HealthDataCache _cache;
-
- public ReadHealthDataStage() {
- _cache = new HealthDataCache();
- }
-
- @Override
- public void process(ClusterEvent event) throws Exception {
- long startTime = System.currentTimeMillis();
-
- HelixManager manager = event.getAttribute("helixmanager");
- if (manager == null) {
- throw new StageException("HelixManager attribute value is null");
- }
- // DataAccessor dataAccessor = manager.getDataAccessor();
- HelixDataAccessor accessor = manager.getHelixDataAccessor();
- _cache.refresh(accessor);
-
- event.addAttribute("HealthDataCache", _cache);
-
- long processLatency = System.currentTimeMillis() - startTime;
- addLatencyToMonitor(event, processLatency);
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/db4c10a2/helix-core/src/main/java/org/apache/helix/controller/stages/StatsAggregationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/StatsAggregationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/StatsAggregationStage.java
deleted file mode 100644
index c48f156..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/StatsAggregationStage.java
+++ /dev/null
@@ -1,399 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.text.SimpleDateFormat;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixProperty;
-import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.PropertyType;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.alerts.AlertParser;
-import org.apache.helix.alerts.AlertProcessor;
-import org.apache.helix.alerts.AlertValueAndStatus;
-import org.apache.helix.alerts.AlertsHolder;
-import org.apache.helix.alerts.ExpressionParser;
-import org.apache.helix.alerts.StatsHolder;
-import org.apache.helix.alerts.Tuple;
-import org.apache.helix.controller.pipeline.AbstractBaseStage;
-import org.apache.helix.controller.pipeline.StageContext;
-import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.healthcheck.StatHealthReportProvider;
-import org.apache.helix.manager.zk.DefaultParticipantErrorMessageHandlerFactory.ActionOnError;
-import org.apache.helix.model.AlertHistory;
-import org.apache.helix.model.HealthStat;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.PersistentStats;
-import org.apache.helix.monitoring.mbeans.ClusterAlertMBeanCollection;
-import org.apache.log4j.Logger;
-
-/**
- * For each LiveInstances select currentState and message whose sessionId matches
- * sessionId from LiveInstance Get Partition,State for all the resources computed in
- * previous State [ResourceComputationStage]
- */
-public class StatsAggregationStage extends AbstractBaseStage {
-
- public static final int ALERT_HISTORY_SIZE = 30;
-
- private static final Logger logger = Logger.getLogger(StatsAggregationStage.class.getName());
-
- StatsHolder _statsHolder = null;
- AlertsHolder _alertsHolder = null;
- Map<String, Map<String, AlertValueAndStatus>> _alertStatus;
- Map<String, Tuple<String>> _statStatus;
- ClusterAlertMBeanCollection _alertBeanCollection = new ClusterAlertMBeanCollection();
- Map<String, String> _alertActionTaken = new HashMap<String, String>();
-
- public final String PARTICIPANT_STAT_REPORT_NAME = StatHealthReportProvider.REPORT_NAME;
- public final String ESPRESSO_STAT_REPORT_NAME = "RestQueryStats";
- public final String REPORT_NAME = "AggStats";
- // public final String DEFAULT_AGG_TYPE = "decay";
- // public final String DEFAULT_DECAY_PARAM = "0.1";
- // public final String DEFAULT_AGG_TYPE = "window";
- // public final String DEFAULT_DECAY_PARAM = "5";
-
- public StatHealthReportProvider _aggStatsProvider;
-
- // public AggregationType _defaultAggType;
-
- public Map<String, Map<String, AlertValueAndStatus>> getAlertStatus() {
- return _alertStatus;
- }
-
- public Map<String, Tuple<String>> getStatStatus() {
- return _statStatus;
- }
-
- public void persistAggStats(HelixManager manager) {
- Map<String, String> report = _aggStatsProvider.getRecentHealthReport();
- Map<String, Map<String, String>> partitionReport =
- _aggStatsProvider.getRecentPartitionHealthReport();
- ZNRecord record = new ZNRecord(_aggStatsProvider.getReportName());
- if (report != null) {
- record.setSimpleFields(report);
- }
- if (partitionReport != null) {
- record.setMapFields(partitionReport);
- }
-
- // DataAccessor accessor = manager.getDataAccessor();
- HelixDataAccessor accessor = manager.getHelixDataAccessor();
- // boolean retVal = accessor.setProperty(PropertyType.PERSISTENTSTATS, record);
- Builder keyBuilder = accessor.keyBuilder();
- boolean retVal = accessor.setProperty(keyBuilder.persistantStat(), new PersistentStats(record));
- if (retVal == false) {
- logger.error("attempt to persist derived stats failed");
- }
- }
-
- @Override
- public void init(StageContext context) {
- }
-
- public String getAgeStatName(String instance) {
- return instance + ExpressionParser.statFieldDelim + "reportingage";
- }
-
- // currTime in seconds
- public void reportAgeStat(LiveInstance instance, long modifiedTime, long currTime) {
- String statName = getAgeStatName(instance.getInstanceName());
- long age = (currTime - modifiedTime) / 1000; // XXX: ensure this is in
- // seconds
- Map<String, String> ageStatMap = new HashMap<String, String>();
- ageStatMap.put(StatsHolder.TIMESTAMP_NAME, String.valueOf(currTime));
- ageStatMap.put(StatsHolder.VALUE_NAME, String.valueOf(age));
- // note that applyStat will only work if alert already added
- _statsHolder.applyStat(statName, ageStatMap);
- }
-
- @Override
- public void process(ClusterEvent event) throws Exception {
- long startTime = System.currentTimeMillis();
- // String aggTypeName =
- // DEFAULT_AGG_TYPE+AggregationType.DELIM+DEFAULT_DECAY_PARAM;
- // _defaultAggType = AggregationTypeFactory.getAggregationType(aggTypeName);
-
- HelixManager manager = event.getAttribute("helixmanager");
- HealthDataCache cache = event.getAttribute("HealthDataCache");
-
- if (manager == null || cache == null) {
- throw new StageException("helixmanager|HealthDataCache attribute value is null");
- }
- if (_alertsHolder == null) {
- _statsHolder = new StatsHolder(manager, cache);
- _alertsHolder = new AlertsHolder(manager, cache, _statsHolder);
- } else {
- _statsHolder.updateCache(cache);
- _alertsHolder.updateCache(cache);
- }
- if (_statsHolder.getStatsList().size() == 0) {
- if (logger.isTraceEnabled()) {
- logger.trace("stat holder is empty");
- }
- return;
- }
-
- // init agg stats from cache
- // initAggStats(cache);
-
- Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
-
- long currTime = System.currentTimeMillis();
- // for each live node, read node's stats
- long readInstancesStart = System.currentTimeMillis();
- for (LiveInstance instance : liveInstances.values()) {
- String instanceName = instance.getInstanceName();
- logger.debug("instanceName: " + instanceName);
- // XXX: now have map of HealthStats, so no need to traverse them...verify
- // correctness
- Map<String, HealthStat> stats;
- stats = cache.getHealthStats(instanceName);
- // find participants stats
- long modTime = -1;
- // TODO: get healthreport child node modified time and reportAgeStat based on that
- boolean reportedAge = false;
- for (HealthStat participantStat : stats.values()) {
- if (participantStat != null && !reportedAge) {
- // generate and report stats for how old this node's report is
- modTime = participantStat.getLastModifiedTimeStamp();
- reportAgeStat(instance, modTime, currTime);
- reportedAge = true;
- }
- // System.out.println(modTime);
- // XXX: need to convert participantStat to a better format
- // need to get instanceName in here
-
- if (participantStat != null) {
- // String timestamp = String.valueOf(instance.getModifiedTime()); WANT
- // REPORT LEVEL TS
- Map<String, Map<String, String>> statMap = participantStat.getHealthFields(instanceName);
- for (String key : statMap.keySet()) {
- _statsHolder.applyStat(key, statMap.get(key));
- }
- }
- }
- }
- // Call _statsHolder.persistStats() once per pipeline. This will
- // write the updated persisted stats into zookeeper
- _statsHolder.persistStats();
- logger.info("Done processing stats: " + (System.currentTimeMillis() - readInstancesStart));
- // populate _statStatus
- _statStatus = _statsHolder.getStatsMap();
-
- for (String statKey : _statStatus.keySet()) {
- logger.debug("Stat key, value: " + statKey + ": " + _statStatus.get(statKey));
- }
-
- long alertExecuteStartTime = System.currentTimeMillis();
- // execute alerts, populate _alertStatus
- _alertStatus =
- AlertProcessor.executeAllAlerts(_alertsHolder.getAlertList(), _statsHolder.getStatsList());
- logger.info("done executing alerts: " + (System.currentTimeMillis() - alertExecuteStartTime));
- for (String originAlertName : _alertStatus.keySet()) {
- _alertBeanCollection.setAlerts(originAlertName, _alertStatus.get(originAlertName),
- manager.getClusterName());
- }
-
- executeAlertActions(manager);
- // Write alert fire history to zookeeper
- updateAlertHistory(manager);
- long writeAlertStartTime = System.currentTimeMillis();
- // write out alert status (to zk)
- _alertsHolder.addAlertStatusSet(_alertStatus);
- logger.info("done writing alerts: " + (System.currentTimeMillis() - writeAlertStartTime));
-
- // TODO: access the 2 status variables from somewhere to populate graphs
-
- long logAlertStartTime = System.currentTimeMillis();
- // logging alert status
- for (String alertOuterKey : _alertStatus.keySet()) {
- logger.debug("Alert Outer Key: " + alertOuterKey);
- Map<String, AlertValueAndStatus> alertInnerMap = _alertStatus.get(alertOuterKey);
- if (alertInnerMap == null) {
- logger.debug(alertOuterKey + " has no alerts to report.");
- continue;
- }
- for (String alertInnerKey : alertInnerMap.keySet()) {
- logger.debug(" " + alertInnerKey + " value: "
- + alertInnerMap.get(alertInnerKey).getValue() + ", status: "
- + alertInnerMap.get(alertInnerKey).isFired());
- }
- }
-
- logger.info("done logging alerts: " + (System.currentTimeMillis() - logAlertStartTime));
-
- long processLatency = System.currentTimeMillis() - startTime;
- addLatencyToMonitor(event, processLatency);
- logger.info("process end: " + processLatency);
- }
-
- /**
- * Go through the _alertStatus, and call executeAlertAction for those actual alerts that
- * has been fired
- */
-
- void executeAlertActions(HelixManager manager) {
- _alertActionTaken.clear();
- // Go through the original alert strings
- for (String originAlertName : _alertStatus.keySet()) {
- Map<String, String> alertFields = _alertsHolder.getAlertsMap().get(originAlertName);
- if (alertFields != null && alertFields.containsKey(AlertParser.ACTION_NAME)) {
- String actionValue = alertFields.get(AlertParser.ACTION_NAME);
- Map<String, AlertValueAndStatus> alertResultMap = _alertStatus.get(originAlertName);
- if (alertResultMap == null) {
- logger.info("Alert " + originAlertName + " does not have alert status map");
- continue;
- }
- // For each original alert, iterate all actual alerts that it expands into
- for (String actualStatName : alertResultMap.keySet()) {
- // if the actual alert is fired, execute the action
- if (alertResultMap.get(actualStatName).isFired()) {
- logger.warn("Alert " + originAlertName + " action " + actionValue + " is triggered by "
- + actualStatName);
- _alertActionTaken.put(actualStatName, actionValue);
- // move functionalities into a seperate class
- executeAlertAction(actualStatName, actionValue, manager);
- }
- }
- }
- }
- }
-
- /**
- * Execute the action if an alert is fired, and the alert has an action associated with it.
- * NOTE: consider unify this with DefaultParticipantErrorMessageHandler.handleMessage()
- */
- void executeAlertAction(String actualStatName, String actionValue, HelixManager manager) {
- if (actionValue.equals(ActionOnError.DISABLE_INSTANCE.toString())) {
- String instanceName = parseInstanceName(actualStatName, manager);
- if (instanceName != null) {
- logger.info("Disabling instance " + instanceName);
- manager.getClusterManagmentTool().enableInstance(manager.getClusterName(), instanceName,
- false);
- }
- } else if (actionValue.equals(ActionOnError.DISABLE_PARTITION.toString())) {
- String instanceName = parseInstanceName(actualStatName, manager);
- String resourceName = parseResourceName(actualStatName, manager);
- String partitionName = parsePartitionName(actualStatName, manager);
- if (instanceName != null && resourceName != null && partitionName != null) {
- logger.info("Disabling partition " + partitionName + " instanceName " + instanceName);
- manager.getClusterManagmentTool().enablePartition(false, manager.getClusterName(),
- instanceName, resourceName, Arrays.asList(partitionName));
- }
- } else if (actionValue.equals(ActionOnError.DISABLE_RESOURCE.toString())) {
- String instanceName = parseInstanceName(actualStatName, manager);
- String resourceName = parseResourceName(actualStatName, manager);
- logger.info("Disabling resource " + resourceName + " instanceName " + instanceName
- + " not implemented");
-
- }
- }
-
- public static String parseResourceName(String actualStatName, HelixManager manager) {
- HelixDataAccessor accessor = manager.getHelixDataAccessor();
- Builder kb = accessor.keyBuilder();
- List<IdealState> idealStates = accessor.getChildValues(kb.idealStates());
- for (IdealState idealState : idealStates) {
- String resourceName = idealState.getResourceId().stringify();
- if (actualStatName.contains("=" + resourceName + ".")
- || actualStatName.contains("=" + resourceName + ";")) {
- return resourceName;
- }
- }
- return null;
- }
-
- public static String parsePartitionName(String actualStatName, HelixManager manager) {
- String resourceName = parseResourceName(actualStatName, manager);
- if (resourceName != null) {
- String partitionKey = "=" + resourceName + "_";
- if (actualStatName.contains(partitionKey)) {
- int pos = actualStatName.indexOf(partitionKey);
- int nextDotPos = actualStatName.indexOf('.', pos + partitionKey.length());
- int nextCommaPos = actualStatName.indexOf(';', pos + partitionKey.length());
- if (nextCommaPos > 0 && nextCommaPos < nextDotPos) {
- nextDotPos = nextCommaPos;
- }
-
- String partitionName = actualStatName.substring(pos + 1, nextDotPos);
- return partitionName;
- }
- }
- return null;
- }
-
- public static String parseInstanceName(String actualStatName, HelixManager manager) {
- HelixDataAccessor accessor = manager.getHelixDataAccessor();
- Builder kb = accessor.keyBuilder();
- List<LiveInstance> liveInstances = accessor.getChildValues(kb.liveInstances());
- for (LiveInstance instance : liveInstances) {
- String instanceName = instance.getInstanceName();
- if (actualStatName.startsWith(instanceName)) {
- return instanceName;
- }
- }
- return null;
- }
-
- void updateAlertHistory(HelixManager manager) {
- // Write alert fire history to zookeeper
- _alertBeanCollection.refreshAlertDelta(manager.getClusterName());
- Map<String, String> delta = _alertBeanCollection.getRecentAlertDelta();
- // Update history only when some beans has changed
- if (delta.size() > 0) {
- delta.putAll(_alertActionTaken);
- SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh:mm:ss:SSS");
- String date = dateFormat.format(new Date());
-
- HelixDataAccessor accessor = manager.getHelixDataAccessor();
- Builder keyBuilder = accessor.keyBuilder();
-
- HelixProperty property = accessor.getProperty(keyBuilder.alertHistory());
- ZNRecord alertFiredHistory;
- if (property == null) {
- alertFiredHistory = new ZNRecord(PropertyType.ALERT_HISTORY.toString());
- } else {
- alertFiredHistory = property.getRecord();
- }
- while (alertFiredHistory.getMapFields().size() >= ALERT_HISTORY_SIZE) {
- // ZNRecord uses TreeMap which is sorted ascending internally
- String firstKey = (String) (alertFiredHistory.getMapFields().keySet().toArray()[0]);
- alertFiredHistory.getMapFields().remove(firstKey);
- }
- alertFiredHistory.setMapField(date, delta);
- // manager.getDataAccessor().setProperty(PropertyType.ALERT_HISTORY, alertFiredHistory);
- accessor.setProperty(keyBuilder.alertHistory(), new AlertHistory(alertFiredHistory));
- _alertBeanCollection.setAlertHistory(alertFiredHistory);
- }
- }
-
- public ClusterAlertMBeanCollection getClusterAlertMBeanCollection() {
- return _alertBeanCollection;
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/db4c10a2/helix-core/src/main/java/org/apache/helix/healthcheck/AccumulateAggregationType.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/AccumulateAggregationType.java b/helix-core/src/main/java/org/apache/helix/healthcheck/AccumulateAggregationType.java
deleted file mode 100644
index a3c443f..0000000
--- a/helix-core/src/main/java/org/apache/helix/healthcheck/AccumulateAggregationType.java
+++ /dev/null
@@ -1,41 +0,0 @@
-package org.apache.helix.healthcheck;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.log4j.Logger;
-
-public class AccumulateAggregationType implements AggregationType {
-
- private static final Logger logger = Logger.getLogger(AccumulateAggregationType.class);
-
- public final static String TYPE_NAME = "accumulate";
-
- @Override
- public String getName() {
- return TYPE_NAME;
- }
-
- @Override
- public String merge(String iv, String ev, long prevTimestamp) {
- double inVal = Double.parseDouble(iv);
- double existingVal = Double.parseDouble(ev);
- return String.valueOf(inVal + existingVal);
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/db4c10a2/helix-core/src/main/java/org/apache/helix/healthcheck/AggregationType.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/AggregationType.java b/helix-core/src/main/java/org/apache/helix/healthcheck/AggregationType.java
deleted file mode 100644
index 29f5921..0000000
--- a/helix-core/src/main/java/org/apache/helix/healthcheck/AggregationType.java
+++ /dev/null
@@ -1,31 +0,0 @@
-package org.apache.helix.healthcheck;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-public interface AggregationType {
-
- // public abstract <T extends Object> T merge(T iv, T ev);
-
- public final static String DELIM = "#";
-
- public abstract String merge(String incomingVal, String existingVal, long prevTimestamp);
-
- public abstract String getName();
-}