You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:57 UTC
[7/42] Refactoring the package names and removing jsql parser
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/alerts/Operator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/Operator.java b/helix-core/src/main/java/org/apache/helix/alerts/Operator.java
new file mode 100644
index 0000000..d307c2e
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/alerts/Operator.java
@@ -0,0 +1,115 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.alerts;
+
+import java.util.Iterator;
+import java.util.List;
+
+public abstract class Operator {
+
+ public int minInputTupleLists;
+ public int maxInputTupleLists;
+ public int numOutputTupleLists = -1;
+ public boolean inputOutputTupleListsCountsEqual = false;
+
+ public Operator()
+ {
+
+ }
+
+ public Tuple<String> multiplyTuples(Tuple<String> tup1, Tuple<String> tup2)
+ {
+ if (tup1 == null) {
+ return tup2;
+ }
+ if (tup2 == null) {
+ return tup1;
+ }
+ Tuple<String>outputTup = new Tuple<String>();
+
+
+ //sum staggers if the tuples are same length
+ //e.g. 1,2,3 + 4,5 = 1,6,8
+ //so this is a bit tricky
+ Tuple<String>largerTup;
+ Tuple<String>smallerTup;
+ if (tup1.size() >= tup2.size()) {
+ largerTup = tup1;
+ smallerTup = tup2;
+ }
+ else {
+ largerTup = tup2;
+ smallerTup = tup1;
+ }
+ int gap = largerTup.size() - smallerTup.size();
+
+ for (int i=0; i< largerTup.size();i++) {
+ if (i < gap) {
+ outputTup.add(largerTup.getElement(i));
+ }
+ else {
+ double elementProduct = 0;
+ elementProduct = Double.parseDouble(largerTup.getElement(i)) *
+ Double.parseDouble(smallerTup.getElement(i-gap));
+ outputTup.add(String.valueOf(elementProduct));
+ }
+ }
+ return outputTup;
+ }
+
+ public Tuple<String> sumTuples(Tuple<String> tup1, Tuple<String> tup2)
+ {
+ if (tup1 == null) {
+ return tup2;
+ }
+ if (tup2 == null) {
+ return tup1;
+ }
+ Tuple<String>outputTup = new Tuple<String>();
+
+
+ //sum staggers if the tuples are same length
+ //e.g. 1,2,3 + 4,5 = 1,6,8
+ //so this is a bit tricky
+ Tuple<String>largerTup;
+ Tuple<String>smallerTup;
+ if (tup1.size() >= tup2.size()) {
+ largerTup = tup1;
+ smallerTup = tup2;
+ }
+ else {
+ largerTup = tup2;
+ smallerTup = tup1;
+ }
+ int gap = largerTup.size() - smallerTup.size();
+
+ for (int i=0; i< largerTup.size();i++) {
+ if (i < gap) {
+ outputTup.add(largerTup.getElement(i));
+ }
+ else {
+ double elementSum = 0;
+ elementSum = Double.parseDouble(largerTup.getElement(i)) +
+ Double.parseDouble(smallerTup.getElement(i-gap));
+ outputTup.add(String.valueOf(elementSum));
+ }
+ }
+ return outputTup;
+ }
+
+ public abstract List<Iterator<Tuple<String>>> execute(List<Iterator<Tuple<String>>> input);
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/alerts/Stat.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/Stat.java b/helix-core/src/main/java/org/apache/helix/alerts/Stat.java
new file mode 100644
index 0000000..b5d94f3
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/alerts/Stat.java
@@ -0,0 +1,44 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.alerts;
+
+public class Stat {
+ String _name;
+ Tuple<String> _value;
+ Tuple<String> _timestamp;
+
+ public Stat(String name, Tuple<String> value, Tuple<String> timestamp)
+ {
+ _name = name;
+ _value = value;
+ _timestamp = timestamp;
+ }
+
+ public String getName()
+ {
+ return _name;
+ }
+
+ public Tuple<String> getValue()
+ {
+ return _value;
+ }
+
+ public Tuple<String> getTimestamp()
+ {
+ return _timestamp;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/alerts/StatsHolder.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/StatsHolder.java b/helix-core/src/main/java/org/apache/helix/alerts/StatsHolder.java
new file mode 100644
index 0000000..da45fca
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/alerts/StatsHolder.java
@@ -0,0 +1,358 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.alerts;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.stages.HealthDataCache;
+import org.apache.helix.model.PersistentStats;
+import org.apache.log4j.Logger;
+
+
+public class StatsHolder
+{
+ enum MatchResult {WILDCARDMATCH, EXACTMATCH, NOMATCH};
+
+ private static final Logger logger = Logger.getLogger(StatsHolder.class
+ .getName());
+
+ public static final String VALUE_NAME = "value";
+ public static final String TIMESTAMP_NAME = "TimeStamp";
+
+ HelixDataAccessor _accessor;
+ HealthDataCache _cache;
+
+ Map<String, Map<String, String>> _statMap;
+ Map<String, Map<String, MatchResult>> _statAlertMatchResult;
+
+ private Builder _keyBuilder;
+ // PersistentStats _persistentStats;
+
+ public StatsHolder(HelixManager manager, HealthDataCache cache)
+ {
+ _accessor = manager.getHelixDataAccessor();
+ _cache = cache;
+ _keyBuilder = new PropertyKey.Builder(manager.getClusterName());
+ updateCache(_cache);
+ _statAlertMatchResult = new HashMap<String, Map<String, MatchResult>>();
+
+ }
+
+ public void refreshStats()
+ {
+ logger.info("Refreshing cached stats");
+ _cache.refresh(_accessor);
+ updateCache(_cache);
+ }
+
+ public void persistStats()
+ {
+ // XXX: Am I using _accessor too directly here?
+ // took around 35 ms from desktop to ESV4 machine
+ PersistentStats stats = _accessor.getProperty(_keyBuilder.persistantStat());
+ if (stats == null)
+ {
+ stats = new PersistentStats(PersistentStats.nodeName); // TODO: fix naming of
+ // this record, if it
+ // matters
+ }
+ stats.getRecord().setMapFields(_statMap);
+ boolean retVal = _accessor.setProperty(_keyBuilder.persistantStat(),
+ stats);
+ }
+
+ public void getStatsFromCache(boolean refresh)
+ {
+ long refreshStartTime = System.currentTimeMillis();
+ if (refresh) {
+ _cache.refresh(_accessor);
+ }
+ PersistentStats persistentStatRecord = _cache.getPersistentStats();
+ if (persistentStatRecord != null) {
+ _statMap = persistentStatRecord.getMapFields();
+ }
+ else {
+ _statMap = new HashMap<String,Map<String,String>>();
+ }
+ /*
+ if (_cache.getPersistentStats() != null) {
+
+ _statMap = _cache.getPersistentStats();
+ }
+ */
+ //TODO: confirm this a good place to init the _statMap when null
+ /*
+ if (_statMap == null) {
+ _statMap = new HashMap<String, Map<String, String>>();
+ }
+ */
+ System.out.println("Refresh stats done: "+(System.currentTimeMillis() - refreshStartTime));
+ }
+
+ public Iterator<String> getAllStats()
+ {
+ return null;
+ }
+
+ /*
+ * TODO: figure out pre-conditions here. I think not allowing anything to be
+ * null on input
+ */
+ public Map<String, String> mergeStats(String statName,
+ Map<String, String> existingStat, Map<String, String> incomingStat)
+ throws HelixException
+ {
+ if (existingStat == null)
+ {
+ throw new HelixException("existing stat for merge is null");
+ }
+ if (incomingStat == null)
+ {
+ throw new HelixException("incoming stat for merge is null");
+ }
+ // get agg type and arguments, then get agg object
+ String aggTypeStr = ExpressionParser.getAggregatorStr(statName);
+ String[] aggArgs = ExpressionParser.getAggregatorArgs(statName);
+ Aggregator agg = ExpressionParser.getAggregator(aggTypeStr);
+ // XXX: some of below lines might fail with null exceptions
+
+ // get timestamps, values out of zk maps
+ String existingTime = existingStat.get(TIMESTAMP_NAME);
+ String existingVal = existingStat.get(VALUE_NAME);
+ String incomingTime = incomingStat.get(TIMESTAMP_NAME);
+ String incomingVal = incomingStat.get(VALUE_NAME);
+ // parse values into tuples, if the values exist. else, tuples are null
+ Tuple<String> existingTimeTuple = (existingTime != null) ? Tuple
+ .fromString(existingTime) : null;
+ Tuple<String> existingValueTuple = (existingVal != null) ? Tuple
+ .fromString(existingVal) : null;
+ Tuple<String> incomingTimeTuple = (incomingTime != null) ? Tuple
+ .fromString(incomingTime) : null;
+ Tuple<String> incomingValueTuple = (incomingVal != null) ? Tuple
+ .fromString(incomingVal) : null;
+
+ // dp merge
+ agg.merge(existingValueTuple, incomingValueTuple, existingTimeTuple,
+ incomingTimeTuple, aggArgs);
+ // put merged tuples back in map
+ Map<String, String> mergedMap = new HashMap<String, String>();
+ if (existingTimeTuple.size() == 0)
+ {
+ throw new HelixException("merged time tuple has size zero");
+ }
+ if (existingValueTuple.size() == 0)
+ {
+ throw new HelixException("merged value tuple has size zero");
+ }
+
+ mergedMap.put(TIMESTAMP_NAME, existingTimeTuple.toString());
+ mergedMap.put(VALUE_NAME, existingValueTuple.toString());
+ return mergedMap;
+ }
+
+ /*
+ * Find all persisted stats this stat matches. Update those stats. An incoming
+ * stat can match multiple stats exactly (if that stat has multiple agg types)
+ * An incoming stat can match multiple wildcard stats
+ */
+
+ // need to do a time check here!
+
+ public void applyStat(String incomingStatName, Map<String, String> statFields)
+ {
+ // TODO: consider locking stats here
+ //refreshStats(); //will have refreshed by now during stage
+
+ Map<String, Map<String, String>> pendingAdds = new HashMap<String, Map<String, String>>();
+
+ if(!_statAlertMatchResult.containsKey(incomingStatName))
+ {
+ _statAlertMatchResult.put(incomingStatName, new HashMap<String, MatchResult>());
+ }
+ Map<String, MatchResult> resultMap = _statAlertMatchResult.get(incomingStatName);
+ // traverse through all persistent stats
+ for (String key : _statMap.keySet())
+ {
+ if(resultMap.containsKey(key))
+ {
+ MatchResult cachedMatchResult = resultMap.get(key);
+ if(cachedMatchResult == MatchResult.EXACTMATCH)
+ {
+ processExactMatch(key, statFields);
+ }
+ else if(cachedMatchResult == MatchResult.WILDCARDMATCH)
+ {
+ processWildcardMatch(incomingStatName, key,statFields, pendingAdds);
+ }
+ // don't care about NOMATCH
+ continue;
+ }
+ // exact match on stat and stat portion of persisted stat, just update
+ if (ExpressionParser.isIncomingStatExactMatch(key, incomingStatName))
+ {
+ processExactMatch(key, statFields);
+ resultMap.put(key, MatchResult.EXACTMATCH);
+ }
+ // wildcard match
+ else if (ExpressionParser.isIncomingStatWildcardMatch(key,
+ incomingStatName))
+ {
+ processWildcardMatch(incomingStatName, key,statFields, pendingAdds);
+ resultMap.put(key, MatchResult.WILDCARDMATCH);
+ }
+ else
+ {
+ resultMap.put(key, MatchResult.NOMATCH);
+ }
+ }
+ _statMap.putAll(pendingAdds);
+ }
+
+ void processExactMatch(String key, Map<String, String> statFields)
+ {
+ Map<String, String> mergedStat = mergeStats(key, _statMap.get(key),
+ statFields);
+ // update in place, no problem with hash map
+ _statMap.put(key, mergedStat);
+ }
+
+ void processWildcardMatch(String incomingStatName, String key,
+ Map<String, String> statFields, Map<String, Map<String, String>> pendingAdds)
+ {
+
+ // make sure incoming stat doesn't already exist, either in previous
+ // round or this round
+ // form new key (incomingStatName with agg type from the wildcarded
+ // stat)
+ String statToAdd = ExpressionParser.getWildcardStatSubstitution(key,
+ incomingStatName);
+ // if the stat already existed in _statMap, we have/will apply it as an
+ // exact match
+ // if the stat was added this round to pendingAdds, no need to recreate
+ // (it would have same value)
+ if (!_statMap.containsKey(statToAdd)
+ && !pendingAdds.containsKey(statToAdd))
+ {
+ // add this stat to persisted stats
+ Map<String, String> mergedStat = mergeStats(statToAdd,
+ getEmptyStat(), statFields);
+ // add to pendingAdds so we don't mess up ongoing traversal of
+ // _statMap
+ pendingAdds.put(statToAdd, mergedStat);
+ }
+ }
+
+ // add parsing of stat (or is that in expression holder?) at least add
+ // validate
+ public void addStat(String exp) throws HelixException
+ {
+ refreshStats(); // get current stats
+
+ String[] parsedStats = ExpressionParser.getBaseStats(exp);
+
+ for (String stat : parsedStats)
+ {
+ if (_statMap.containsKey(stat))
+ {
+ logger.debug("Stat " + stat + " already exists; not adding");
+ continue;
+ }
+ _statMap.put(stat, getEmptyStat()); // add new stat to map
+ }
+ }
+
+ public static Map<String, Map<String, String>> parseStat(String exp)
+ throws HelixException
+ {
+ String[] parsedStats = ExpressionParser.getBaseStats(exp);
+ Map<String, Map<String, String>> statMap = new HashMap<String, Map<String, String>>();
+
+ for (String stat : parsedStats)
+ {
+ if (statMap.containsKey(stat))
+ {
+ logger.debug("Stat " + stat + " already exists; not adding");
+ continue;
+ }
+ statMap.put(stat, getEmptyStat()); // add new stat to map
+ }
+ return statMap;
+ }
+
+
+ public static Map<String, String> getEmptyStat()
+ {
+ Map<String, String> statFields = new HashMap<String, String>();
+ statFields.put(TIMESTAMP_NAME, "");
+ statFields.put(VALUE_NAME, "");
+ return statFields;
+ }
+
+ public List<Stat> getStatsList()
+ {
+ List<Stat> stats = new LinkedList<Stat>();
+ for (String stat : _statMap.keySet())
+ {
+ Map<String, String> statFields = _statMap.get(stat);
+ Tuple<String> valTup = Tuple.fromString(statFields.get(VALUE_NAME));
+ Tuple<String> timeTup = Tuple.fromString(statFields.get(TIMESTAMP_NAME));
+ Stat s = new Stat(stat, valTup, timeTup);
+ stats.add(s);
+ }
+ return stats;
+ }
+
+ public Map<String, Tuple<String>> getStatsMap()
+ {
+ //refreshStats(); //don't refresh, stage will have refreshed by this time
+ HashMap<String, Tuple<String>> stats = new HashMap<String, Tuple<String>>();
+ for (String stat : _statMap.keySet())
+ {
+ Map<String, String> statFields = _statMap.get(stat);
+ Tuple<String> valTup = Tuple.fromString(statFields.get(VALUE_NAME));
+ Tuple<String> timeTup = Tuple.fromString(statFields.get(TIMESTAMP_NAME));
+ stats.put(stat, valTup);
+ }
+ return stats;
+ }
+
+ public void updateCache(HealthDataCache cache)
+ {
+ _cache = cache;
+ PersistentStats persistentStatRecord = _cache.getPersistentStats();
+ if (persistentStatRecord != null)
+ {
+ _statMap = persistentStatRecord.getMapFields();
+ }
+ else
+ {
+ _statMap = new HashMap<String, Map<String, String>>();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/alerts/SumEachOperator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/SumEachOperator.java b/helix-core/src/main/java/org/apache/helix/alerts/SumEachOperator.java
new file mode 100644
index 0000000..1028be3
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/alerts/SumEachOperator.java
@@ -0,0 +1,47 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.alerts;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class SumEachOperator extends Operator {
+
+ public SumEachOperator() {
+ minInputTupleLists = 1;
+ maxInputTupleLists = Integer.MAX_VALUE;
+ inputOutputTupleListsCountsEqual = true;
+ numOutputTupleLists = -1;
+ }
+
+ //for each column, generate sum
+ @Override
+ public List<Iterator<Tuple<String>>> execute(List<Iterator<Tuple<String>>> input) {
+ List<Iterator<Tuple<String>>> out = new ArrayList<Iterator<Tuple<String>>>();
+ for (Iterator<Tuple<String>> currIt : input) {
+ Tuple<String> currSum = null;
+ while (currIt.hasNext()) {
+ currSum = sumTuples(currSum, currIt.next());
+ }
+ ArrayList<Tuple<String>> currOutList = new ArrayList<Tuple<String>>();
+ currOutList.add(currSum);
+ out.add(currOutList.iterator());
+ }
+ return out;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/alerts/SumOperator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/SumOperator.java b/helix-core/src/main/java/org/apache/helix/alerts/SumOperator.java
new file mode 100644
index 0000000..2b6cd89
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/alerts/SumOperator.java
@@ -0,0 +1,56 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.alerts;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class SumOperator extends Operator {
+
+ public SumOperator() {
+ minInputTupleLists = 1;
+ maxInputTupleLists = Integer.MAX_VALUE;
+ inputOutputTupleListsCountsEqual = false;
+ numOutputTupleLists = 1;
+ }
+
+
+ public List<Iterator<Tuple<String>>> singleSetToIter(ArrayList<Tuple<String>> input)
+ {
+ List out = new ArrayList();
+ out.add(input.iterator());
+ return out;
+ }
+
+ @Override
+ public List<Iterator<Tuple<String>>> execute(List<Iterator<Tuple<String>>> input) {
+ ArrayList<Tuple<String>> output = new ArrayList<Tuple<String>>();
+ if (input == null || input.size() == 0) {
+ return singleSetToIter(output);
+ }
+ while (true) { //loop through set of iters, return when 1 runs out (not completing the row in progress)
+ Tuple<String> rowSum = null;
+ for (Iterator<Tuple<String>> it : input) {
+ if (!it.hasNext()) { //when any iterator runs out, we are done
+ return singleSetToIter(output);
+ }
+ rowSum = sumTuples(rowSum, it.next());
+ }
+ output.add(rowSum);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/alerts/Tuple.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/Tuple.java b/helix-core/src/main/java/org/apache/helix/alerts/Tuple.java
new file mode 100644
index 0000000..2526633
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/alerts/Tuple.java
@@ -0,0 +1,94 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.alerts;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.StringTokenizer;
+import java.util.Vector;
+
+public class Tuple<T> {
+ List<T> elements;
+
+ public Tuple()
+ {
+ elements = new ArrayList<T>();
+ }
+
+ public int size()
+ {
+ return elements.size();
+ }
+
+ public void add(T entry)
+ {
+ elements.add(entry);
+ }
+
+ public void addAll(Tuple<T> incoming)
+ {
+ elements.addAll(incoming.getElements());
+ }
+
+ public Iterator<T> iterator()
+ {
+ return elements.listIterator();
+ }
+
+ public T getElement(int ind)
+ {
+ return elements.get(ind);
+ }
+
+ public List<T> getElements()
+ {
+ return elements;
+ }
+
+ public void clear()
+ {
+ elements.clear();
+ }
+
+ public static Tuple<String> fromString(String in)
+ {
+ Tuple<String> tup = new Tuple<String>();
+ if (in.length() > 0) {
+ String[] elements = in.split(",");
+ for (String element : elements) {
+ tup.add(element);
+ }
+ }
+ return tup;
+ }
+
+ public String toString()
+ {
+ StringBuilder out = new StringBuilder();
+ Iterator<T> it = iterator();
+ boolean outEmpty=true;
+ while (it.hasNext()) {
+ if (!outEmpty) {
+ out.append(",");
+ }
+ out.append(it.next());
+ outEmpty = false;
+ }
+ return out.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/alerts/WindowAggregator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/WindowAggregator.java b/helix-core/src/main/java/org/apache/helix/alerts/WindowAggregator.java
new file mode 100644
index 0000000..a60605e
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/alerts/WindowAggregator.java
@@ -0,0 +1,91 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.alerts;
+
+import java.util.Iterator;
+
+import org.apache.helix.HelixException;
+
+
+public class WindowAggregator extends Aggregator {
+
+
+ int _windowSize;
+
+ public WindowAggregator(String windowSize)
+ {
+ _windowSize = Integer.parseInt(windowSize);
+ _numArgs = 1;
+ }
+
+ public WindowAggregator()
+ {
+ this("1");
+ }
+
+ @Override
+ public void merge(Tuple<String> currValTup, Tuple<String> newValTup,
+ Tuple<String> currTimeTup, Tuple<String> newTimeTup, String... args) {
+
+ _windowSize = Integer.parseInt(args[0]);
+
+ //figure out how many curr tuple values we displace
+ Tuple<String> mergedTimeTuple = new Tuple<String>();
+ Tuple<String> mergedValTuple = new Tuple<String>();
+
+ Iterator<String> currTimeIter = currTimeTup.iterator();
+ Iterator<String> currValIter = currValTup.iterator();
+ Iterator<String> newTimeIter = newTimeTup.iterator();
+ Iterator<String> newValIter = newValTup.iterator();
+ int currCtr = 0;
+ //traverse current vals
+ double currTime = -1;
+ double currVal;
+ while (currTimeIter.hasNext()) {
+ currTime = Double.parseDouble(currTimeIter.next());
+ currVal = Double.parseDouble(currValIter.next());
+ currCtr++;
+ //number of evicted currVals equal to total size of both minus _windowSize
+ if (currCtr > (newTimeTup.size()+currTimeTup.size()-_windowSize)) { //non-evicted element, just bump down
+ mergedTimeTuple.add(String.valueOf(currTime));
+ mergedValTuple.add(String.valueOf(currVal));
+ }
+ }
+
+ double newVal;
+ double newTime;
+ while (newTimeIter.hasNext()) {
+ newVal = Double.parseDouble(newValIter.next());
+ newTime = Double.parseDouble(newTimeIter.next());
+ if (newTime <= currTime) { //oldest new time older than newest curr time. we will not apply new tuple!
+ return; //curr tuples remain the same
+ }
+ currCtr++;
+ if (currCtr > (newTimeTup.size()+currTimeTup.size()-_windowSize)) { //non-evicted element
+ mergedTimeTuple.add(String.valueOf(newTime));
+ mergedValTuple.add(String.valueOf(newVal));
+ }
+ }
+ //set curr tuples to merged tuples
+ currTimeTup.clear();
+ currTimeTup.addAll(mergedTimeTuple);
+ currValTup.clear();
+ currValTup.addAll(mergedValTuple);
+ //TODO: see if we can do merger in place on curr
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/alerts/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/package-info.java b/helix-core/src/main/java/org/apache/helix/alerts/package-info.java
new file mode 100644
index 0000000..5ff2e7d
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/alerts/package-info.java
@@ -0,0 +1,4 @@
+/**
+ * Classes for Helix alerts
+ */
+package org.apache.helix.alerts;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/ExternalViewGenerator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/ExternalViewGenerator.java b/helix-core/src/main/java/org/apache/helix/controller/ExternalViewGenerator.java
new file mode 100644
index 0000000..21b6a4e
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/ExternalViewGenerator.java
@@ -0,0 +1,154 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.helix.ZNRecord;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.CurrentState.CurrentStateProperty;
+import org.apache.log4j.Logger;
+
+
+/*
+ * ZKRoutingInfoProvider keeps a copy of the routing table. Given a partition id,
+ * it will return
+ *
+ * 1. The list of partition that can be read
+ * 2. the master partition, for write operation
+ *
+ * The routing table is constructed from the currentState of each storage nodes.
+ * The current state is a list of following pairs: partition-id:State(MASTER / SLAVE)
+ *
+ * TODO: move the code as part of router process
+ * TODO: add listeners to node current state changes
+ * */
+public class ExternalViewGenerator
+{
+ static Logger _logger = Logger.getLogger(ExternalViewGenerator.class);
+
+ /*
+ * Given a list of external view ZNRecord nodes(one for each cluster),
+ * calculate the routing map.
+ *
+ * The format of the routing map is like this:
+ *
+ * Map<String, Map<String, Set<String>>> maps from a partitionName to its
+ * states Map<String, List<String>> The second Map maps from a state
+ * ("MASTER", "SLAVE"...) to a list of nodeNames
+ *
+ * So that the we can query the map for the list of nodes by providing the
+ * partition name and the expected state.
+ */
+ public Map<String, Map<String, Set<String>>> getRouterMapFromExternalView(
+ List<ZNRecord> dbExternalViewList)
+ {
+ Map<String, Map<String, Set<String>>> result = new TreeMap<String, Map<String, Set<String>>>();
+
+ for (ZNRecord dbNodeView : dbExternalViewList)
+ {
+ Map<String, Map<String, String>> dbNodeStateMap = dbNodeView
+ .getMapFields();
+ for (String partitionId : dbNodeStateMap.keySet())
+ {
+ if (!result.containsKey(partitionId))
+ {
+ result.put(partitionId, new TreeMap<String, Set<String>>());
+ }
+ Map<String, String> nodeStateMap = dbNodeStateMap.get(partitionId);
+ for (String nodeName : nodeStateMap.keySet())
+ {
+ String state = nodeStateMap.get(nodeName);
+ if (!result.get(partitionId).containsKey(state))
+ {
+ result.get(partitionId).put(state, new TreeSet<String>());
+ }
+ result.get(partitionId).get(state).add(nodeName);
+ }
+ }
+ }
+ return result;
+ }
+
+ /*
+ * The parameter is a map that maps the nodeName to a list of ZNRecords.
+ */
+ public List<ZNRecord> computeExternalView(
+ Map<String, List<ZNRecord>> currentStates, List<ZNRecord> idealStates)
+ {
+ List<ZNRecord> resultList = new ArrayList<ZNRecord>();
+ Map<String, ZNRecord> resultRoutingTable = new HashMap<String, ZNRecord>();
+ // maps from dbName to another map : partition -> map <nodename,
+ // master/slave>;
+ // Fill the routing table with "empty" default state according to ideals
+ // states
+ // in the cluster
+ if (idealStates != null)
+ {
+ for (ZNRecord idealState : idealStates)
+ {
+ ZNRecord defaultDBExternalView = new ZNRecord(idealState.getId());
+ resultRoutingTable.put(idealState.getId(), defaultDBExternalView);
+ }
+ } else
+ {
+ assert (!currentStates.isEmpty());
+ return resultList;
+ }
+ for (String nodeName : currentStates.keySet())
+ {
+ List<ZNRecord> zndbStates = currentStates.get(nodeName);
+ for (ZNRecord dbNodeStateRecord : zndbStates)
+ {
+ Map<String, Map<String, String>> dbStates = dbNodeStateRecord
+ .getMapFields();
+ for (String stateUnitKey : dbStates.keySet())
+ {
+ Map<String, String> dbPartitionStates = dbStates.get(stateUnitKey);
+ String dbName = dbPartitionStates
+ .get(Message.Attributes.RESOURCE_NAME.toString());
+ ZNRecord partitionStatus = resultRoutingTable.get(dbName);
+ if (partitionStatus == null)
+ {
+ partitionStatus = new ZNRecord(dbName);
+ resultRoutingTable.put(dbName, partitionStatus);
+ }
+ String currentStateKey = CurrentStateProperty.CURRENT_STATE.toString();
+
+ if (!partitionStatus.getMapFields().containsKey(stateUnitKey))
+ {
+ partitionStatus.setMapField(stateUnitKey,
+ new TreeMap<String, String>());
+ }
+ partitionStatus.getMapField(stateUnitKey).put(nodeName,
+ dbPartitionStates.get(currentStateKey));
+
+ }
+ }
+ }
+ for (ZNRecord record : resultRoutingTable.values())
+ {
+ resultList.add(record);
+ }
+ return resultList;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
new file mode 100644
index 0000000..8d82c2e
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -0,0 +1,608 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.ConfigChangeListener;
+import org.apache.helix.ConfigScope;
+import org.apache.helix.ConfigScopeBuilder;
+import org.apache.helix.ControllerChangeListener;
+import org.apache.helix.CurrentStateChangeListener;
+import org.apache.helix.ExternalViewChangeListener;
+import org.apache.helix.HealthStateChangeListener;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.IdealStateChangeListener;
+import org.apache.helix.LiveInstanceChangeListener;
+import org.apache.helix.MessageListener;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.NotificationContext.Type;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.pipeline.Pipeline;
+import org.apache.helix.controller.pipeline.PipelineRegistry;
+import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
+import org.apache.helix.controller.stages.ClusterEvent;
+import org.apache.helix.controller.stages.CompatibilityCheckStage;
+import org.apache.helix.controller.stages.CurrentStateComputationStage;
+import org.apache.helix.controller.stages.ExternalViewComputeStage;
+import org.apache.helix.controller.stages.MessageGenerationPhase;
+import org.apache.helix.controller.stages.MessageSelectionStage;
+import org.apache.helix.controller.stages.MessageThrottleStage;
+import org.apache.helix.controller.stages.ReadClusterDataStage;
+import org.apache.helix.controller.stages.ResourceComputationStage;
+import org.apache.helix.controller.stages.TaskAssignmentStage;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.HealthStat;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.PauseSignal;
+import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
+import org.apache.helix.monitoring.mbeans.MessageQueueMonitor;
+import org.apache.log4j.Logger;
+
+
+/**
+ * Cluster Controllers main goal is to keep the cluster state as close as possible to
+ * Ideal State. It does this by listening to changes in cluster state and scheduling new
+ * tasks to get cluster state to best possible ideal state. Every instance of this class
+ * can control can control only one cluster
+ *
+ *
+ * Get all the partitions use IdealState, CurrentState and Messages <br>
+ * foreach partition <br>
+ * 1. get the (instance,state) from IdealState, CurrentState and PendingMessages <br>
+ * 2. compute best possible state (instance,state) pair. This needs previous step data and
+ * state model constraints <br>
+ * 3. compute the messages/tasks needed to move to 1 to 2 <br>
+ * 4. select the messages that can be sent, needs messages and state model constraints <br>
+ * 5. send messages
+ */
+public class GenericHelixController implements
+ ConfigChangeListener,
+ IdealStateChangeListener,
+ LiveInstanceChangeListener,
+ MessageListener,
+ CurrentStateChangeListener,
+ ExternalViewChangeListener,
+ ControllerChangeListener,
+ HealthStateChangeListener
+{
+ private static final Logger logger =
+ Logger.getLogger(GenericHelixController.class.getName());
+ volatile boolean init = false;
+ private final PipelineRegistry _registry;
+
+ /**
+ * Since instance current state is per-session-id, we need to track the session-ids of
+ * the current states that the ClusterController is observing. this set contains all the
+ * session ids that we add currentState listener
+ */
+ private final Set<String> _instanceCurrentStateChangeSubscriptionSessionIds;
+
+ /**
+ * this set contains all the instance names that we add message listener
+ */
+ private final Set<String> _instanceSubscriptionNames;
+
+ ClusterStatusMonitor _clusterStatusMonitor;
+
+
+ /**
+ * The _paused flag is checked by function handleEvent(), while if the flag is set
+ * handleEvent() will be no-op. Other event handling logic keeps the same when the flag
+ * is set.
+ */
+ private boolean _paused;
+
+ /**
+ * The timer that can periodically run the rebalancing pipeline. The timer will start if there
+ * is one resource group has the config to use the timer.
+ */
+ Timer _rebalanceTimer = null;
+ int _timerPeriod = Integer.MAX_VALUE;
+
+ /**
+ * Default constructor that creates a default pipeline registry. This is sufficient in
+ * most cases, but if there is a some thing specific needed use another constructor
+ * where in you can pass a pipeline registry
+ */
+ public GenericHelixController()
+ {
+ this(createDefaultRegistry());
+ }
+
+ class RebalanceTask extends TimerTask
+ {
+ HelixManager _manager;
+
+ public RebalanceTask(HelixManager manager)
+ {
+ _manager = manager;
+ }
+
+ @Override
+ public void run()
+ {
+ NotificationContext changeContext = new NotificationContext(_manager);
+ changeContext.setType(NotificationContext.Type.CALLBACK);
+ ClusterEvent event = new ClusterEvent("periodicalRebalance");
+ event.addAttribute("helixmanager", changeContext.getManager());
+ event.addAttribute("changeContext", changeContext);
+ List<ZNRecord> dummy = new ArrayList<ZNRecord>();
+ event.addAttribute("eventData", dummy);
+ // Should be able to process
+ handleEvent(event);
+ }
+ }
+
+ /**
+ * Starts the rebalancing timer with the specified period. Start the timer if necessary;
+ * If the period is smaller than the current period, cancel the current timer and use
+ * the new period.
+ */
+ void startRebalancingTimer(int period, HelixManager manager)
+ {
+ logger.info("Controller starting timer at period " + period);
+ if(period < _timerPeriod)
+ {
+ if(_rebalanceTimer != null)
+ {
+ _rebalanceTimer.cancel();
+ }
+ _rebalanceTimer = new Timer(true);
+ _timerPeriod = period;
+ _rebalanceTimer.scheduleAtFixedRate(new RebalanceTask(manager), _timerPeriod, _timerPeriod);
+ }
+ else
+ {
+ logger.info("Controller already has timer at period " + _timerPeriod);
+ }
+ }
+
+ /**
+ * Starts the rebalancing timer
+ */
+ void stopRebalancingTimer()
+ {
+ if(_rebalanceTimer != null)
+ {
+ _rebalanceTimer.cancel();
+ _rebalanceTimer = null;
+ }
+ _timerPeriod = Integer.MAX_VALUE;
+ }
+
+ private static PipelineRegistry createDefaultRegistry()
+ {
+ logger.info("createDefaultRegistry");
+ synchronized (GenericHelixController.class)
+ {
+ PipelineRegistry registry = new PipelineRegistry();
+
+ // cluster data cache refresh
+ Pipeline dataRefresh = new Pipeline();
+ dataRefresh.addStage(new ReadClusterDataStage());
+
+ // rebalance pipeline
+ Pipeline rebalancePipeline = new Pipeline();
+ rebalancePipeline.addStage(new ResourceComputationStage());
+ rebalancePipeline.addStage(new CurrentStateComputationStage());
+ rebalancePipeline.addStage(new BestPossibleStateCalcStage());
+ rebalancePipeline.addStage(new MessageGenerationPhase());
+ rebalancePipeline.addStage(new MessageSelectionStage());
+ rebalancePipeline.addStage(new MessageThrottleStage());
+ rebalancePipeline.addStage(new TaskAssignmentStage());
+
+ // external view generation
+ Pipeline externalViewPipeline = new Pipeline();
+ externalViewPipeline.addStage(new ExternalViewComputeStage());
+
+ // backward compatibility check
+ Pipeline liveInstancePipeline = new Pipeline();
+ liveInstancePipeline.addStage(new CompatibilityCheckStage());
+
+ registry.register("idealStateChange", dataRefresh, rebalancePipeline);
+ registry.register("currentStateChange",
+ dataRefresh,
+ rebalancePipeline,
+ externalViewPipeline);
+ registry.register("configChange", dataRefresh, rebalancePipeline);
+ registry.register("liveInstanceChange",
+ dataRefresh,
+ liveInstancePipeline,
+ rebalancePipeline,
+ externalViewPipeline);
+
+ registry.register("messageChange",
+ dataRefresh,
+ rebalancePipeline);
+ registry.register("externalView", dataRefresh);
+ registry.register("resume", dataRefresh, rebalancePipeline, externalViewPipeline);
+ registry.register("periodicalRebalance", dataRefresh, rebalancePipeline, externalViewPipeline);
+
+ // health stats pipeline
+ // Pipeline healthStatsAggregationPipeline = new Pipeline();
+ // StatsAggregationStage statsStage = new StatsAggregationStage();
+ // healthStatsAggregationPipeline.addStage(new ReadHealthDataStage());
+ // healthStatsAggregationPipeline.addStage(statsStage);
+ // registry.register("healthChange", healthStatsAggregationPipeline);
+
+ return registry;
+ }
+ }
+
+ public GenericHelixController(PipelineRegistry registry)
+ {
+ _paused = false;
+ _registry = registry;
+ _instanceCurrentStateChangeSubscriptionSessionIds =
+ new ConcurrentSkipListSet<String>();
+ _instanceSubscriptionNames = new ConcurrentSkipListSet<String>();
+ // _externalViewGenerator = new ExternalViewGenerator();
+ }
+
+ /**
+ * lock-always: caller always needs to obtain an external lock before call, calls to
+ * handleEvent() should be serialized
+ *
+ * @param event
+ */
+ protected synchronized void handleEvent(ClusterEvent event)
+ {
+ HelixManager manager = event.getAttribute("helixmanager");
+ if (manager == null)
+ {
+ logger.error("No cluster manager in event:" + event.getName());
+ return;
+ }
+
+ if (!manager.isLeader())
+ {
+ logger.error("Cluster manager: " + manager.getInstanceName()
+ + " is not leader. Pipeline will not be invoked");
+ return;
+ }
+
+ if (_paused)
+ {
+ logger.info("Cluster is paused. Ignoring the event:" + event.getName());
+ return;
+ }
+
+ NotificationContext context = null;
+ if (event.getAttribute("changeContext") != null)
+ {
+ context = (NotificationContext) (event.getAttribute("changeContext"));
+ }
+
+ // Initialize _clusterStatusMonitor
+ if (context != null)
+ {
+ if (context.getType() == Type.FINALIZE)
+ {
+ if (_clusterStatusMonitor != null)
+ {
+ _clusterStatusMonitor.reset();
+ _clusterStatusMonitor = null;
+ }
+
+ stopRebalancingTimer();
+ logger.info("Get FINALIZE notification, skip the pipeline. Event :" + event.getName());
+ return;
+ }
+ else
+ {
+ if (_clusterStatusMonitor == null)
+ {
+ _clusterStatusMonitor = new ClusterStatusMonitor(manager.getClusterName());
+ }
+
+ event.addAttribute("clusterStatusMonitor", _clusterStatusMonitor);
+ }
+ }
+
+ List<Pipeline> pipelines = _registry.getPipelinesForEvent(event.getName());
+ if (pipelines == null || pipelines.size() == 0)
+ {
+ logger.info("No pipeline to run for event:" + event.getName());
+ return;
+ }
+
+ for (Pipeline pipeline : pipelines)
+ {
+ try
+ {
+ pipeline.handle(event);
+ pipeline.finish();
+ }
+ catch (Exception e)
+ {
+ logger.error("Exception while executing pipeline: " + pipeline
+ + ". Will not continue to next pipeline", e);
+ break;
+ }
+ }
+ }
+
+ // TODO since we read data in pipeline, we can get rid of reading from zookeeper in
+ // callback
+
+ @Override
+ public void onExternalViewChange(List<ExternalView> externalViewList,
+ NotificationContext changeContext)
+ {
+// logger.info("START: GenericClusterController.onExternalViewChange()");
+// ClusterEvent event = new ClusterEvent("externalViewChange");
+// event.addAttribute("helixmanager", changeContext.getManager());
+// event.addAttribute("changeContext", changeContext);
+// event.addAttribute("eventData", externalViewList);
+// // handleEvent(event);
+// logger.info("END: GenericClusterController.onExternalViewChange()");
+ }
+
+ @Override
+ public void onStateChange(String instanceName,
+ List<CurrentState> statesInfo,
+ NotificationContext changeContext)
+ {
+ logger.info("START: GenericClusterController.onStateChange()");
+ ClusterEvent event = new ClusterEvent("currentStateChange");
+ event.addAttribute("helixmanager", changeContext.getManager());
+ event.addAttribute("instanceName", instanceName);
+ event.addAttribute("changeContext", changeContext);
+ event.addAttribute("eventData", statesInfo);
+ handleEvent(event);
+ logger.info("END: GenericClusterController.onStateChange()");
+ }
+
+ @Override
+ public void onHealthChange(String instanceName,
+ List<HealthStat> reports,
+ NotificationContext changeContext)
+ {
+ /**
+ * When there are more participant ( > 20, can be in hundreds), This callback can be
+ * called quite frequently as each participant reports health stat every minute. Thus
+ * we change the health check pipeline to run in a timer callback.
+ */
+ }
+
+ @Override
+ public void onMessage(String instanceName,
+ List<Message> messages,
+ NotificationContext changeContext)
+ {
+ logger.info("START: GenericClusterController.onMessage()");
+
+ ClusterEvent event = new ClusterEvent("messageChange");
+ event.addAttribute("helixmanager", changeContext.getManager());
+ event.addAttribute("instanceName", instanceName);
+ event.addAttribute("changeContext", changeContext);
+ event.addAttribute("eventData", messages);
+ handleEvent(event);
+
+ if (_clusterStatusMonitor != null && messages != null)
+ {
+ _clusterStatusMonitor.addMessageQueueSize(instanceName, messages.size());
+ }
+
+ logger.info("END: GenericClusterController.onMessage()");
+ }
+
+ @Override
+ public void onLiveInstanceChange(List<LiveInstance> liveInstances,
+ NotificationContext changeContext)
+ {
+ logger.info("START: Generic GenericClusterController.onLiveInstanceChange()");
+ if (liveInstances == null)
+ {
+ liveInstances = Collections.emptyList();
+ }
+ // Go though the live instance list and make sure that we are observing them
+ // accordingly. The action is done regardless of the paused flag.
+ if (changeContext.getType() == NotificationContext.Type.INIT ||
+ changeContext.getType() == NotificationContext.Type.CALLBACK)
+ {
+ checkLiveInstancesObservation(liveInstances, changeContext);
+ }
+
+ ClusterEvent event = new ClusterEvent("liveInstanceChange");
+ event.addAttribute("helixmanager", changeContext.getManager());
+ event.addAttribute("changeContext", changeContext);
+ event.addAttribute("eventData", liveInstances);
+ handleEvent(event);
+ logger.info("END: Generic GenericClusterController.onLiveInstanceChange()");
+ }
+
+ void checkRebalancingTimer(HelixManager manager, List<IdealState> idealStates)
+ {
+ if (manager.getConfigAccessor() == null)
+ {
+ logger.warn(manager.getInstanceName() + " config accessor doesn't exist. should be in file-based mode.");
+ return;
+ }
+
+ for(IdealState idealState : idealStates)
+ {
+ int period = idealState.getRebalanceTimerPeriod();
+ if(period > 0)
+ {
+ startRebalancingTimer(period, manager);
+ }
+ }
+ }
+
+ @Override
+ public void onIdealStateChange(List<IdealState> idealStates,
+ NotificationContext changeContext)
+ {
+ logger.info("START: Generic GenericClusterController.onIdealStateChange()");
+ ClusterEvent event = new ClusterEvent("idealStateChange");
+ event.addAttribute("helixmanager", changeContext.getManager());
+ event.addAttribute("changeContext", changeContext);
+ event.addAttribute("eventData", idealStates);
+ handleEvent(event);
+
+ if(changeContext.getType() != Type.FINALIZE)
+ {
+ checkRebalancingTimer(changeContext.getManager(), idealStates);
+ }
+
+ logger.info("END: Generic GenericClusterController.onIdealStateChange()");
+ }
+
+ @Override
+ public void onConfigChange(List<InstanceConfig> configs,
+ NotificationContext changeContext)
+ {
+ logger.info("START: GenericClusterController.onConfigChange()");
+ ClusterEvent event = new ClusterEvent("configChange");
+ event.addAttribute("changeContext", changeContext);
+ event.addAttribute("helixmanager", changeContext.getManager());
+ event.addAttribute("eventData", configs);
+ handleEvent(event);
+ logger.info("END: GenericClusterController.onConfigChange()");
+ }
+
+ @Override
+ public void onControllerChange(NotificationContext changeContext)
+ {
+ logger.info("START: GenericClusterController.onControllerChange()");
+ HelixDataAccessor accessor = changeContext.getManager().getHelixDataAccessor();
+
+ // double check if this controller is the leader
+ Builder keyBuilder = accessor.keyBuilder();
+ LiveInstance leader =
+ accessor.getProperty(keyBuilder.controllerLeader());
+ if (leader == null)
+ {
+ logger.warn("No controller exists for cluster:"
+ + changeContext.getManager().getClusterName());
+ return;
+ }
+ else
+ {
+ String leaderName = leader.getInstanceName();
+
+ String instanceName = changeContext.getManager().getInstanceName();
+ if (leaderName == null || !leaderName.equals(instanceName))
+ {
+ logger.warn("leader name does NOT match, my name: " + instanceName + ", leader: "
+ + leader);
+ return;
+ }
+ }
+
+ PauseSignal pauseSignal = accessor.getProperty(keyBuilder.pause());
+ if (pauseSignal != null)
+ {
+ _paused = true;
+ logger.info("controller is now paused");
+ }
+ else
+ {
+ if (_paused)
+ {
+ // it currently paused
+ logger.info("controller is now resumed");
+ _paused = false;
+ ClusterEvent event = new ClusterEvent("resume");
+ event.addAttribute("changeContext", changeContext);
+ event.addAttribute("helixmanager", changeContext.getManager());
+ event.addAttribute("eventData", pauseSignal);
+ handleEvent(event);
+ }
+ else
+ {
+ _paused = false;
+ }
+ }
+ logger.info("END: GenericClusterController.onControllerChange()");
+ }
+
+ /**
+ * Go through the list of liveinstances in the cluster, and add currentstateChange
+ * listener and Message listeners to them if they are newly added. For current state
+ * change, the observation is tied to the session id of each live instance.
+ *
+ */
+ protected void checkLiveInstancesObservation(List<LiveInstance> liveInstances,
+ NotificationContext changeContext)
+ {
+ for (LiveInstance instance : liveInstances)
+ {
+ String instanceName = instance.getId();
+ String clientSessionId = instance.getSessionId();
+ HelixManager manager = changeContext.getManager();
+
+ // _instanceCurrentStateChangeSubscriptionSessionIds contains all the sessionIds
+ // that we've added a currentState listener
+ if (!_instanceCurrentStateChangeSubscriptionSessionIds.contains(clientSessionId))
+ {
+ try
+ {
+ manager.addCurrentStateChangeListener(this, instanceName, clientSessionId);
+ _instanceCurrentStateChangeSubscriptionSessionIds.add(clientSessionId);
+ logger.info("Observing client session id: " + clientSessionId);
+ }
+ catch (Exception e)
+ {
+ logger.error("Exception adding current state and message listener for instance:"
+ + instanceName,
+ e);
+ }
+ }
+
+ // _instanceSubscriptionNames contains all the instanceNames that we've added a
+ // message listener
+ if (!_instanceSubscriptionNames.contains(instanceName))
+ {
+ try
+ {
+ logger.info("Adding message listener for " + instanceName);
+ manager.addMessageListener(this, instanceName);
+ _instanceSubscriptionNames.add(instanceName);
+ }
+ catch (Exception e)
+ {
+ logger.error("Exception adding message listener for instance:" + instanceName,
+ e);
+ }
+ }
+
+ // TODO we need to remove currentState listeners and message listeners
+ // when a session or an instance no longer exists. This may happen
+ // in case of session expiry, participant rebound, participant goes and new
+ // participant comes
+
+ // TODO shi should call removeListener on the previous session id;
+ // but the removeListener with that functionality is not implemented yet
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java b/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
new file mode 100644
index 0000000..1bc5cad
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
@@ -0,0 +1,266 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller;
+
+/**
+ * start cluster manager controller
+ * cluster manager controller has two modes:
+ * 1) stand-alone mode: in this mode each controller gets a list of clusters
+ * and competes via leader election to become the controller for any of the clusters.
+ * if a controller fails to become the leader of a given cluster, it remains as a standby
+ * and re-does the leader election when the current leader fails
+ *
+ * 2) distributed mode: in this mode each controller first joins as participant into
+ * a special CONTROLLER_CLUSTER. Leader election happens in this special
+ * cluster. The one that becomes the leader controls all controllers (including itself
+ * to become leaders of other clusters.
+ */
+
+import java.util.Arrays;
+
+import org.I0Itec.zkclient.exception.ZkInterruptedException;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.controller.restlet.ZKPropertyTransferServer;
+import org.apache.helix.participant.DistClusterControllerStateModelFactory;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.log4j.Logger;
+
+
+public class HelixControllerMain
+{
+ public static final String zkServerAddress = "zkSvr";
+ public static final String cluster = "cluster";
+ public static final String help = "help";
+ public static final String mode = "mode";
+ public static final String propertyTransferServicePort = "propertyTransferPort";
+ public static final String name = "controllerName";
+ public static final String STANDALONE = "STANDALONE";
+ public static final String DISTRIBUTED = "DISTRIBUTED";
+ private static final Logger logger = Logger.getLogger(HelixControllerMain.class);
+
+ // hack: OptionalBuilder is not thread safe
+ @SuppressWarnings("static-access")
+ synchronized private static Options constructCommandLineOptions()
+ {
+ Option helpOption = OptionBuilder.withLongOpt(help)
+ .withDescription("Prints command-line options info").create();
+
+ Option zkServerOption = OptionBuilder.withLongOpt(zkServerAddress)
+ .withDescription("Provide zookeeper address").create();
+ zkServerOption.setArgs(1);
+ zkServerOption.setRequired(true);
+ zkServerOption.setArgName("ZookeeperServerAddress(Required)");
+
+ Option clusterOption = OptionBuilder.withLongOpt(cluster)
+ .withDescription("Provide cluster name").create();
+ clusterOption.setArgs(1);
+ clusterOption.setRequired(true);
+ clusterOption.setArgName("Cluster name (Required)");
+
+ Option modeOption = OptionBuilder
+ .withLongOpt(mode)
+ .withDescription(
+ "Provide cluster controller mode (Optional): STANDALONE (default) or DISTRIBUTED")
+ .create();
+ modeOption.setArgs(1);
+ modeOption.setRequired(false);
+ modeOption.setArgName("Cluster controller mode (Optional)");
+
+ Option controllerNameOption = OptionBuilder.withLongOpt(name)
+ .withDescription("Provide cluster controller name (Optional)").create();
+ controllerNameOption.setArgs(1);
+ controllerNameOption.setRequired(false);
+ controllerNameOption.setArgName("Cluster controller name (Optional)");
+
+ Option portOption = OptionBuilder
+ .withLongOpt(propertyTransferServicePort)
+ .withDescription(
+ "Webservice port for ZkProperty controller transfer")
+ .create();
+ portOption.setArgs(1);
+ portOption.setRequired(false);
+ portOption.setArgName("Cluster controller property transfer port (Optional)");
+
+ Options options = new Options();
+ options.addOption(helpOption);
+ options.addOption(zkServerOption);
+ options.addOption(clusterOption);
+ options.addOption(modeOption);
+ options.addOption(portOption);
+ options.addOption(controllerNameOption);
+
+ return options;
+ }
+
+ public static void printUsage(Options cliOptions)
+ {
+ HelpFormatter helpFormatter = new HelpFormatter();
+ helpFormatter.setWidth(1000);
+ helpFormatter.printHelp("java " + HelixControllerMain.class.getName(), cliOptions);
+ }
+
+ public static CommandLine processCommandLineArgs(String[] cliArgs) throws Exception
+ {
+ CommandLineParser cliParser = new GnuParser();
+ Options cliOptions = constructCommandLineOptions();
+
+ try
+ {
+ return cliParser.parse(cliOptions, cliArgs);
+ } catch (ParseException pe)
+ {
+ logger.error("fail to parse command-line options. cliArgs: " + Arrays.toString(cliArgs), pe);
+ printUsage(cliOptions);
+ System.exit(1);
+ }
+ return null;
+ }
+
+ public static void addListenersToController(HelixManager manager,
+ GenericHelixController controller)
+ {
+ try
+ {
+ manager.addConfigChangeListener(controller);
+ manager.addLiveInstanceChangeListener(controller);
+ manager.addIdealStateChangeListener(controller);
+ manager.addExternalViewChangeListener(controller);
+ manager.addControllerListener(controller);
+ } catch (ZkInterruptedException e)
+ {
+ logger
+ .warn("zk connection is interrupted during HelixManagerMain.addListenersToController(). "
+ + e);
+ } catch (Exception e)
+ {
+ logger.error("Error when creating HelixManagerContollerMonitor", e);
+ }
+ }
+
+ public static HelixManager startHelixController(final String zkConnectString,
+ final String clusterName, final String controllerName, final String controllerMode)
+ {
+ HelixManager manager = null;
+ try
+ {
+ if (controllerMode.equalsIgnoreCase(STANDALONE))
+ {
+ manager = HelixManagerFactory.getZKHelixManager(clusterName, controllerName,
+ InstanceType.CONTROLLER, zkConnectString);
+ manager.connect();
+ } else if (controllerMode.equalsIgnoreCase(DISTRIBUTED))
+ {
+ manager = HelixManagerFactory.getZKHelixManager(clusterName, controllerName,
+ InstanceType.CONTROLLER_PARTICIPANT, zkConnectString);
+
+ DistClusterControllerStateModelFactory stateModelFactory = new DistClusterControllerStateModelFactory(
+ zkConnectString);
+
+ // StateMachineEngine genericStateMachineHandler = new
+ // StateMachineEngine();
+ StateMachineEngine stateMach = manager.getStateMachineEngine();
+ stateMach.registerStateModelFactory("LeaderStandby", stateModelFactory);
+ // manager.getMessagingService().registerMessageHandlerFactory(MessageType.STATE_TRANSITION.toString(),
+ // genericStateMachineHandler);
+ manager.connect();
+ } else
+ {
+ logger.error("cluster controller mode:" + controllerMode + " NOT supported");
+ // throw new
+ // IllegalArgumentException("Unsupported cluster controller mode:" +
+ // controllerMode);
+ }
+ } catch (Exception e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ return manager;
+ }
+
+ public static void main(String[] args) throws Exception
+ {
+ // read the config;
+ // check if the this process is the master wait indefinitely
+ // other approach is always process the events but when updating the zk
+ // check if this is master.
+ // This is difficult to get right
+ // get the clusters to manage
+ // for each cluster create a manager
+ // add the respective listeners for each manager
+ CommandLine cmd = processCommandLineArgs(args);
+ String zkConnectString = cmd.getOptionValue(zkServerAddress);
+ String clusterName = cmd.getOptionValue(cluster);
+ String controllerMode = STANDALONE;
+ String controllerName = null;
+ int propertyTransServicePort = -1;
+
+ if (cmd.hasOption(mode))
+ {
+ controllerMode = cmd.getOptionValue(mode);
+ }
+
+ if(cmd.hasOption(propertyTransferServicePort))
+ {
+ propertyTransServicePort = Integer.parseInt(cmd.getOptionValue(propertyTransferServicePort));
+ }
+ if (controllerMode.equalsIgnoreCase(DISTRIBUTED) && !cmd.hasOption(name))
+ {
+ throw new IllegalArgumentException(
+ "A unique cluster controller name is required in DISTRIBUTED mode");
+ }
+
+ controllerName = cmd.getOptionValue(name);
+
+ // Espresso_driver.py will consume this
+ logger.info("Cluster manager started, zkServer: " + zkConnectString + ", clusterName:"
+ + clusterName + ", controllerName:" + controllerName + ", mode:" + controllerMode);
+
+ if (propertyTransServicePort > 0)
+ {
+ ZKPropertyTransferServer.getInstance().init(propertyTransServicePort, zkConnectString);
+ }
+
+ HelixManager manager = startHelixController(zkConnectString, clusterName, controllerName,
+ controllerMode);
+ try
+ {
+ Thread.currentThread().join();
+ }
+ catch (InterruptedException e)
+ {
+ logger.info("controller:" + controllerName + ", " + Thread.currentThread().getName()
+ + " interrupted");
+ }
+ finally
+ {
+ manager.disconnect();
+ ZKPropertyTransferServer.getInstance().shutdown();
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/HierarchicalDataHolder.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/HierarchicalDataHolder.java b/helix-core/src/main/java/org/apache/helix/controller/HierarchicalDataHolder.java
new file mode 100644
index 0000000..ec6f3f6
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/HierarchicalDataHolder.java
@@ -0,0 +1,168 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller;
+
+import java.io.FileFilter;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.data.Stat;
+
+
+/**
+ * Generic class that will read the data given the root path.
+ *
+ * @author kgopalak
+ *
+ */
+public class HierarchicalDataHolder<T>
+{
+ private static final Logger logger = Logger
+ .getLogger(HierarchicalDataHolder.class.getName());
+ AtomicReference<Node<T>> root;
+
+ /**
+ * currentVersion, gets updated when data is read from original source
+ */
+ AtomicLong currentVersion;
+ private final ZkClient _zkClient;
+ private final String _rootPath;
+ private final FileFilter _filter;
+
+ public HierarchicalDataHolder(ZkClient client, String rootPath,
+ FileFilter filter)
+ {
+ this._zkClient = client;
+ this._rootPath = rootPath;
+ this._filter = filter;
+ // Node<T> initialValue = new Node<T>();
+ root = new AtomicReference<HierarchicalDataHolder.Node<T>>();
+ currentVersion = new AtomicLong(1);
+ refreshData();
+ }
+
+ public long getVersion()
+ {
+ return currentVersion.get();
+ }
+
+ public boolean refreshData()
+ {
+ Node<T> newRoot = new Node<T>();
+ boolean dataChanged = refreshRecursively(root.get(), newRoot, _rootPath);
+ if (dataChanged)
+ {
+ currentVersion.getAndIncrement();
+ root.set(newRoot);
+ return true;
+ } else
+ {
+ return false;
+ }
+ }
+
+ // private void refreshRecursively(Node<T> oldRoot, Stat oldStat, Node<T>
+ // newRoot,Stat newStat, String path)
+ private boolean refreshRecursively(Node<T> oldRoot, Node<T> newRoot,
+ String path)
+ {
+ boolean dataChanged = false;
+ Stat newStat = _zkClient.getStat(path);
+ Stat oldStat = (oldRoot != null) ? oldRoot.stat : null;
+ newRoot.name = path;
+ if (newStat != null)
+ {
+ if (oldStat == null)
+ {
+ newRoot.stat = newStat;
+ newRoot.data = _zkClient.<T> readData(path, true);
+ dataChanged = true;
+ } else if (newStat.equals(oldStat))
+ {
+ newRoot.stat = oldStat;
+ newRoot.data = oldRoot.data;
+ } else
+ {
+ dataChanged = true;
+ newRoot.stat = newStat;
+ newRoot.data = _zkClient.<T> readData(path, true);
+ }
+ if (newStat.getNumChildren() > 0)
+ {
+ List<String> children = _zkClient.getChildren(path);
+ for (String child : children)
+ {
+ String newPath = path + "/" + child;
+ Node<T> oldChild = (oldRoot != null && oldRoot.children != null) ? oldRoot.children
+ .get(child) : null;
+ if (newRoot.children == null)
+ {
+ newRoot.children = new ConcurrentHashMap<String, HierarchicalDataHolder.Node<T>>();
+ }
+ if (!newRoot.children.contains(child))
+ {
+ newRoot.children.put(child, new Node<T>());
+ }
+ Node<T> newChild = newRoot.children.get(child);
+ boolean childChanged = refreshRecursively(oldChild, newChild, newPath);
+ dataChanged = dataChanged || childChanged;
+ }
+ }
+ } else
+ {
+ logger.info(path + " does not exist");
+ }
+ return dataChanged;
+ }
+
+ static class Node<T>
+ {
+ String name;
+ Stat stat;
+ T data;
+ ConcurrentHashMap<String, Node<T>> children;
+
+ }
+
+ public void print()
+ {
+ logger.info("START "+ _rootPath);
+ LinkedList<Node<T>> stack = new LinkedList<HierarchicalDataHolder.Node<T>>();
+ stack.push(root.get());
+ while (!stack.isEmpty())
+ {
+ Node<T> pop = stack.pop();
+ if (pop != null)
+ {
+ logger.info("name:"+ pop.name);
+ logger.info("\tdata:"+pop.data);
+ if (pop.children != null)
+ {
+ for (Node<T> child : pop.children.values())
+ {
+ stack.push(child);
+ }
+ }
+ }
+ }
+ logger.info("END "+ _rootPath);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/package-info.java b/helix-core/src/main/java/org/apache/helix/controller/package-info.java
new file mode 100644
index 0000000..f999c38
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/package-info.java
@@ -0,0 +1,4 @@
+/**
+ * Helix cluster controller
+ */
+package org.apache.helix.controller;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java
new file mode 100644
index 0000000..c2fb7fa
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java
@@ -0,0 +1,79 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.pipeline;
+
+import java.util.Map;
+
+import org.apache.helix.controller.stages.ClusterEvent;
+import org.apache.helix.monitoring.mbeans.HelixStageLatencyMonitor;
+
+
+public class AbstractBaseStage implements Stage
+{
+ @Override
+ public void init(StageContext context)
+ {
+
+ }
+
+ @Override
+ public void preProcess()
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void process(ClusterEvent event) throws Exception
+ {
+
+ }
+
+ @Override
+ public void postProcess()
+ {
+
+ }
+
+ @Override
+ public void release()
+ {
+
+ }
+
+ @Override
+ public String getStageName()
+ {
+ // default stage name will be the class name
+ String className = this.getClass().getName();
+ return className;
+ }
+
+ public void addLatencyToMonitor(ClusterEvent event, long latency)
+ {
+ Map<String, HelixStageLatencyMonitor> stgLatencyMonitorMap =
+ event.getAttribute("HelixStageLatencyMonitorMap");
+ if (stgLatencyMonitorMap != null)
+ {
+ if (stgLatencyMonitorMap.containsKey(getStageName()))
+ {
+ HelixStageLatencyMonitor stgLatencyMonitor =
+ stgLatencyMonitorMap.get(getStageName());
+ stgLatencyMonitor.addStgLatency(latency);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java b/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java
new file mode 100644
index 0000000..7362747
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java
@@ -0,0 +1,65 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.pipeline;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.helix.controller.stages.ClusterEvent;
+import org.apache.log4j.Logger;
+
+
+public class Pipeline
+{
+ private static final Logger logger = Logger.getLogger(Pipeline.class.getName());
+ List<Stage> _stages;
+
+ public Pipeline()
+ {
+ _stages = new ArrayList<Stage>();
+ }
+
+ public void addStage(Stage stage)
+ {
+ _stages.add(stage);
+ StageContext context = null;
+ stage.init(context);
+ }
+
+ public void handle(ClusterEvent event) throws Exception
+ {
+ if (_stages == null)
+ {
+ return;
+ }
+ for (Stage stage : _stages)
+ {
+ stage.preProcess();
+ stage.process(event);
+ stage.postProcess();
+ }
+ }
+
+ public void finish()
+ {
+
+ }
+
+ public List<Stage> getStages()
+ {
+ return _stages;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/pipeline/PipelineRegistry.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/pipeline/PipelineRegistry.java b/helix-core/src/main/java/org/apache/helix/controller/pipeline/PipelineRegistry.java
new file mode 100644
index 0000000..e72fc4c
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/pipeline/PipelineRegistry.java
@@ -0,0 +1,54 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.pipeline;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class PipelineRegistry
+{
+ Map<String, List<Pipeline>> _map;
+
+ public PipelineRegistry()
+ {
+ _map = new HashMap<String, List<Pipeline>>();
+ }
+
+ public void register(String eventName, Pipeline... pipelines)
+ {
+ if (!_map.containsKey(eventName))
+ {
+ _map.put(eventName, new ArrayList<Pipeline>());
+ }
+ List<Pipeline> list = _map.get(eventName);
+ for (Pipeline pipeline : pipelines)
+ {
+ list.add(pipeline);
+ }
+ }
+
+ public List<Pipeline> getPipelinesForEvent(String eventName)
+ {
+ if (_map.containsKey(eventName))
+ {
+ return _map.get(eventName);
+ }
+ return Collections.emptyList();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/pipeline/Stage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/pipeline/Stage.java b/helix-core/src/main/java/org/apache/helix/controller/pipeline/Stage.java
new file mode 100644
index 0000000..662d573
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/pipeline/Stage.java
@@ -0,0 +1,60 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.pipeline;
+
+import org.apache.helix.controller.stages.ClusterEvent;
+
+/**
+ * Logically independent unit in processing callbacks for cluster changes
+ *
+ */
+public interface Stage
+{
+
+ /**
+ * Initialize a stage
+ * @param context
+ */
+ void init(StageContext context);
+
+ /**
+ * Called before process() on each callback
+ */
+ void preProcess();
+
+ /**
+ * Actual callback processing logic
+ * @param event
+ * @throws Exception
+ */
+ public void process(ClusterEvent event) throws Exception;
+
+ /**
+ * Called after process() on each callback
+ */
+ void postProcess();
+
+ /**
+ * Destruct a stage
+ */
+ void release();
+
+ /**
+ * Get the name of the stage
+ * @return
+ */
+ public String getStageName();
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/pipeline/StageContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/pipeline/StageContext.java b/helix-core/src/main/java/org/apache/helix/controller/pipeline/StageContext.java
new file mode 100644
index 0000000..17ede49
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/pipeline/StageContext.java
@@ -0,0 +1,21 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.pipeline;
+
+public class StageContext
+{
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/pipeline/StageException.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/pipeline/StageException.java b/helix-core/src/main/java/org/apache/helix/controller/pipeline/StageException.java
new file mode 100644
index 0000000..0ad9c28
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/pipeline/StageException.java
@@ -0,0 +1,29 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.pipeline;
+
+public class StageException extends Exception
+{
+
+ public StageException(String message)
+ {
+ super(message);
+ }
+ public StageException(String message,Exception e)
+ {
+ super(message,e);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/pipeline/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/pipeline/package-info.java b/helix-core/src/main/java/org/apache/helix/controller/pipeline/package-info.java
new file mode 100644
index 0000000..03f31ba
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/pipeline/package-info.java
@@ -0,0 +1,5 @@
+/**
+ * Helix controller pipeline classes that process cluster changes
+ *
+ */
+package org.apache.helix.controller.pipeline;
\ No newline at end of file