You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 00:26:40 UTC
[9/47] Refactoring from com.linkedin.helix to org.apache.helix
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/alerts/Operator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/alerts/Operator.java b/helix-core/src/main/java/com/linkedin/helix/alerts/Operator.java
deleted file mode 100644
index 8b1713c..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/alerts/Operator.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.alerts;
-
-import java.util.Iterator;
-import java.util.List;
-
-public abstract class Operator {
-
- public int minInputTupleLists;
- public int maxInputTupleLists;
- public int numOutputTupleLists = -1;
- public boolean inputOutputTupleListsCountsEqual = false;
-
- public Operator()
- {
-
- }
-
- public Tuple<String> multiplyTuples(Tuple<String> tup1, Tuple<String> tup2)
- {
- if (tup1 == null) {
- return tup2;
- }
- if (tup2 == null) {
- return tup1;
- }
- Tuple<String>outputTup = new Tuple<String>();
-
-
- //sum staggers if the tuples are same length
- //e.g. 1,2,3 + 4,5 = 1,6,8
- //so this is a bit tricky
- Tuple<String>largerTup;
- Tuple<String>smallerTup;
- if (tup1.size() >= tup2.size()) {
- largerTup = tup1;
- smallerTup = tup2;
- }
- else {
- largerTup = tup2;
- smallerTup = tup1;
- }
- int gap = largerTup.size() - smallerTup.size();
-
- for (int i=0; i< largerTup.size();i++) {
- if (i < gap) {
- outputTup.add(largerTup.getElement(i));
- }
- else {
- double elementProduct = 0;
- elementProduct = Double.parseDouble(largerTup.getElement(i)) *
- Double.parseDouble(smallerTup.getElement(i-gap));
- outputTup.add(String.valueOf(elementProduct));
- }
- }
- return outputTup;
- }
-
- public Tuple<String> sumTuples(Tuple<String> tup1, Tuple<String> tup2)
- {
- if (tup1 == null) {
- return tup2;
- }
- if (tup2 == null) {
- return tup1;
- }
- Tuple<String>outputTup = new Tuple<String>();
-
-
- //sum staggers if the tuples are same length
- //e.g. 1,2,3 + 4,5 = 1,6,8
- //so this is a bit tricky
- Tuple<String>largerTup;
- Tuple<String>smallerTup;
- if (tup1.size() >= tup2.size()) {
- largerTup = tup1;
- smallerTup = tup2;
- }
- else {
- largerTup = tup2;
- smallerTup = tup1;
- }
- int gap = largerTup.size() - smallerTup.size();
-
- for (int i=0; i< largerTup.size();i++) {
- if (i < gap) {
- outputTup.add(largerTup.getElement(i));
- }
- else {
- double elementSum = 0;
- elementSum = Double.parseDouble(largerTup.getElement(i)) +
- Double.parseDouble(smallerTup.getElement(i-gap));
- outputTup.add(String.valueOf(elementSum));
- }
- }
- return outputTup;
- }
-
- public abstract List<Iterator<Tuple<String>>> execute(List<Iterator<Tuple<String>>> input);
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/alerts/Stat.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/alerts/Stat.java b/helix-core/src/main/java/com/linkedin/helix/alerts/Stat.java
deleted file mode 100644
index 01d9fea..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/alerts/Stat.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.alerts;
-
-public class Stat {
- String _name;
- Tuple<String> _value;
- Tuple<String> _timestamp;
-
- public Stat(String name, Tuple<String> value, Tuple<String> timestamp)
- {
- _name = name;
- _value = value;
- _timestamp = timestamp;
- }
-
- public String getName()
- {
- return _name;
- }
-
- public Tuple<String> getValue()
- {
- return _value;
- }
-
- public Tuple<String> getTimestamp()
- {
- return _timestamp;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/alerts/StatsHolder.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/alerts/StatsHolder.java b/helix-core/src/main/java/com/linkedin/helix/alerts/StatsHolder.java
deleted file mode 100644
index 43ec957..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/alerts/StatsHolder.java
+++ /dev/null
@@ -1,358 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.alerts;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.regex.Pattern;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.HelixException;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.PropertyKey;
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.PropertyType;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.controller.stages.HealthDataCache;
-import com.linkedin.helix.model.PersistentStats;
-
-public class StatsHolder
-{
- enum MatchResult {WILDCARDMATCH, EXACTMATCH, NOMATCH};
-
- private static final Logger logger = Logger.getLogger(StatsHolder.class
- .getName());
-
- public static final String VALUE_NAME = "value";
- public static final String TIMESTAMP_NAME = "TimeStamp";
-
- HelixDataAccessor _accessor;
- HealthDataCache _cache;
-
- Map<String, Map<String, String>> _statMap;
- Map<String, Map<String, MatchResult>> _statAlertMatchResult;
-
- private Builder _keyBuilder;
- // PersistentStats _persistentStats;
-
- public StatsHolder(HelixManager manager, HealthDataCache cache)
- {
- _accessor = manager.getHelixDataAccessor();
- _cache = cache;
- _keyBuilder = new PropertyKey.Builder(manager.getClusterName());
- updateCache(_cache);
- _statAlertMatchResult = new HashMap<String, Map<String, MatchResult>>();
-
- }
-
- public void refreshStats()
- {
- logger.info("Refreshing cached stats");
- _cache.refresh(_accessor);
- updateCache(_cache);
- }
-
- public void persistStats()
- {
- // XXX: Am I using _accessor too directly here?
- // took around 35 ms from desktop to ESV4 machine
- PersistentStats stats = _accessor.getProperty(_keyBuilder.persistantStat());
- if (stats == null)
- {
- stats = new PersistentStats(PersistentStats.nodeName); // TODO: fix naming of
- // this record, if it
- // matters
- }
- stats.getRecord().setMapFields(_statMap);
- boolean retVal = _accessor.setProperty(_keyBuilder.persistantStat(),
- stats);
- }
-
- public void getStatsFromCache(boolean refresh)
- {
- long refreshStartTime = System.currentTimeMillis();
- if (refresh) {
- _cache.refresh(_accessor);
- }
- PersistentStats persistentStatRecord = _cache.getPersistentStats();
- if (persistentStatRecord != null) {
- _statMap = persistentStatRecord.getMapFields();
- }
- else {
- _statMap = new HashMap<String,Map<String,String>>();
- }
- /*
- if (_cache.getPersistentStats() != null) {
-
- _statMap = _cache.getPersistentStats();
- }
- */
- //TODO: confirm this a good place to init the _statMap when null
- /*
- if (_statMap == null) {
- _statMap = new HashMap<String, Map<String, String>>();
- }
- */
- System.out.println("Refresh stats done: "+(System.currentTimeMillis() - refreshStartTime));
- }
-
- public Iterator<String> getAllStats()
- {
- return null;
- }
-
- /*
- * TODO: figure out pre-conditions here. I think not allowing anything to be
- * null on input
- */
- public Map<String, String> mergeStats(String statName,
- Map<String, String> existingStat, Map<String, String> incomingStat)
- throws HelixException
- {
- if (existingStat == null)
- {
- throw new HelixException("existing stat for merge is null");
- }
- if (incomingStat == null)
- {
- throw new HelixException("incoming stat for merge is null");
- }
- // get agg type and arguments, then get agg object
- String aggTypeStr = ExpressionParser.getAggregatorStr(statName);
- String[] aggArgs = ExpressionParser.getAggregatorArgs(statName);
- Aggregator agg = ExpressionParser.getAggregator(aggTypeStr);
- // XXX: some of below lines might fail with null exceptions
-
- // get timestamps, values out of zk maps
- String existingTime = existingStat.get(TIMESTAMP_NAME);
- String existingVal = existingStat.get(VALUE_NAME);
- String incomingTime = incomingStat.get(TIMESTAMP_NAME);
- String incomingVal = incomingStat.get(VALUE_NAME);
- // parse values into tuples, if the values exist. else, tuples are null
- Tuple<String> existingTimeTuple = (existingTime != null) ? Tuple
- .fromString(existingTime) : null;
- Tuple<String> existingValueTuple = (existingVal != null) ? Tuple
- .fromString(existingVal) : null;
- Tuple<String> incomingTimeTuple = (incomingTime != null) ? Tuple
- .fromString(incomingTime) : null;
- Tuple<String> incomingValueTuple = (incomingVal != null) ? Tuple
- .fromString(incomingVal) : null;
-
- // dp merge
- agg.merge(existingValueTuple, incomingValueTuple, existingTimeTuple,
- incomingTimeTuple, aggArgs);
- // put merged tuples back in map
- Map<String, String> mergedMap = new HashMap<String, String>();
- if (existingTimeTuple.size() == 0)
- {
- throw new HelixException("merged time tuple has size zero");
- }
- if (existingValueTuple.size() == 0)
- {
- throw new HelixException("merged value tuple has size zero");
- }
-
- mergedMap.put(TIMESTAMP_NAME, existingTimeTuple.toString());
- mergedMap.put(VALUE_NAME, existingValueTuple.toString());
- return mergedMap;
- }
-
- /*
- * Find all persisted stats this stat matches. Update those stats. An incoming
- * stat can match multiple stats exactly (if that stat has multiple agg types)
- * An incoming stat can match multiple wildcard stats
- */
-
- // need to do a time check here!
-
- public void applyStat(String incomingStatName, Map<String, String> statFields)
- {
- // TODO: consider locking stats here
- //refreshStats(); //will have refreshed by now during stage
-
- Map<String, Map<String, String>> pendingAdds = new HashMap<String, Map<String, String>>();
-
- if(!_statAlertMatchResult.containsKey(incomingStatName))
- {
- _statAlertMatchResult.put(incomingStatName, new HashMap<String, MatchResult>());
- }
- Map<String, MatchResult> resultMap = _statAlertMatchResult.get(incomingStatName);
- // traverse through all persistent stats
- for (String key : _statMap.keySet())
- {
- if(resultMap.containsKey(key))
- {
- MatchResult cachedMatchResult = resultMap.get(key);
- if(cachedMatchResult == MatchResult.EXACTMATCH)
- {
- processExactMatch(key, statFields);
- }
- else if(cachedMatchResult == MatchResult.WILDCARDMATCH)
- {
- processWildcardMatch(incomingStatName, key,statFields, pendingAdds);
- }
- // don't care about NOMATCH
- continue;
- }
- // exact match on stat and stat portion of persisted stat, just update
- if (ExpressionParser.isIncomingStatExactMatch(key, incomingStatName))
- {
- processExactMatch(key, statFields);
- resultMap.put(key, MatchResult.EXACTMATCH);
- }
- // wildcard match
- else if (ExpressionParser.isIncomingStatWildcardMatch(key,
- incomingStatName))
- {
- processWildcardMatch(incomingStatName, key,statFields, pendingAdds);
- resultMap.put(key, MatchResult.WILDCARDMATCH);
- }
- else
- {
- resultMap.put(key, MatchResult.NOMATCH);
- }
- }
- _statMap.putAll(pendingAdds);
- }
-
- void processExactMatch(String key, Map<String, String> statFields)
- {
- Map<String, String> mergedStat = mergeStats(key, _statMap.get(key),
- statFields);
- // update in place, no problem with hash map
- _statMap.put(key, mergedStat);
- }
-
- void processWildcardMatch(String incomingStatName, String key,
- Map<String, String> statFields, Map<String, Map<String, String>> pendingAdds)
- {
-
- // make sure incoming stat doesn't already exist, either in previous
- // round or this round
- // form new key (incomingStatName with agg type from the wildcarded
- // stat)
- String statToAdd = ExpressionParser.getWildcardStatSubstitution(key,
- incomingStatName);
- // if the stat already existed in _statMap, we have/will apply it as an
- // exact match
- // if the stat was added this round to pendingAdds, no need to recreate
- // (it would have same value)
- if (!_statMap.containsKey(statToAdd)
- && !pendingAdds.containsKey(statToAdd))
- {
- // add this stat to persisted stats
- Map<String, String> mergedStat = mergeStats(statToAdd,
- getEmptyStat(), statFields);
- // add to pendingAdds so we don't mess up ongoing traversal of
- // _statMap
- pendingAdds.put(statToAdd, mergedStat);
- }
- }
-
- // add parsing of stat (or is that in expression holder?) at least add
- // validate
- public void addStat(String exp) throws HelixException
- {
- refreshStats(); // get current stats
-
- String[] parsedStats = ExpressionParser.getBaseStats(exp);
-
- for (String stat : parsedStats)
- {
- if (_statMap.containsKey(stat))
- {
- logger.debug("Stat " + stat + " already exists; not adding");
- continue;
- }
- _statMap.put(stat, getEmptyStat()); // add new stat to map
- }
- }
-
- public static Map<String, Map<String, String>> parseStat(String exp)
- throws HelixException
- {
- String[] parsedStats = ExpressionParser.getBaseStats(exp);
- Map<String, Map<String, String>> statMap = new HashMap<String, Map<String, String>>();
-
- for (String stat : parsedStats)
- {
- if (statMap.containsKey(stat))
- {
- logger.debug("Stat " + stat + " already exists; not adding");
- continue;
- }
- statMap.put(stat, getEmptyStat()); // add new stat to map
- }
- return statMap;
- }
-
-
- public static Map<String, String> getEmptyStat()
- {
- Map<String, String> statFields = new HashMap<String, String>();
- statFields.put(TIMESTAMP_NAME, "");
- statFields.put(VALUE_NAME, "");
- return statFields;
- }
-
- public List<Stat> getStatsList()
- {
- List<Stat> stats = new LinkedList<Stat>();
- for (String stat : _statMap.keySet())
- {
- Map<String, String> statFields = _statMap.get(stat);
- Tuple<String> valTup = Tuple.fromString(statFields.get(VALUE_NAME));
- Tuple<String> timeTup = Tuple.fromString(statFields.get(TIMESTAMP_NAME));
- Stat s = new Stat(stat, valTup, timeTup);
- stats.add(s);
- }
- return stats;
- }
-
- public Map<String, Tuple<String>> getStatsMap()
- {
- //refreshStats(); //don't refresh, stage will have refreshed by this time
- HashMap<String, Tuple<String>> stats = new HashMap<String, Tuple<String>>();
- for (String stat : _statMap.keySet())
- {
- Map<String, String> statFields = _statMap.get(stat);
- Tuple<String> valTup = Tuple.fromString(statFields.get(VALUE_NAME));
- Tuple<String> timeTup = Tuple.fromString(statFields.get(TIMESTAMP_NAME));
- stats.put(stat, valTup);
- }
- return stats;
- }
-
- public void updateCache(HealthDataCache cache)
- {
- _cache = cache;
- PersistentStats persistentStatRecord = _cache.getPersistentStats();
- if (persistentStatRecord != null)
- {
- _statMap = persistentStatRecord.getMapFields();
- }
- else
- {
- _statMap = new HashMap<String, Map<String, String>>();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/alerts/SumEachOperator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/alerts/SumEachOperator.java b/helix-core/src/main/java/com/linkedin/helix/alerts/SumEachOperator.java
deleted file mode 100644
index 9f3b2a2..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/alerts/SumEachOperator.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.alerts;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-public class SumEachOperator extends Operator {
-
- public SumEachOperator() {
- minInputTupleLists = 1;
- maxInputTupleLists = Integer.MAX_VALUE;
- inputOutputTupleListsCountsEqual = true;
- numOutputTupleLists = -1;
- }
-
- //for each column, generate sum
- @Override
- public List<Iterator<Tuple<String>>> execute(List<Iterator<Tuple<String>>> input) {
- List<Iterator<Tuple<String>>> out = new ArrayList<Iterator<Tuple<String>>>();
- for (Iterator<Tuple<String>> currIt : input) {
- Tuple<String> currSum = null;
- while (currIt.hasNext()) {
- currSum = sumTuples(currSum, currIt.next());
- }
- ArrayList<Tuple<String>> currOutList = new ArrayList<Tuple<String>>();
- currOutList.add(currSum);
- out.add(currOutList.iterator());
- }
- return out;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/alerts/SumOperator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/alerts/SumOperator.java b/helix-core/src/main/java/com/linkedin/helix/alerts/SumOperator.java
deleted file mode 100644
index 760f076..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/alerts/SumOperator.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.alerts;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-public class SumOperator extends Operator {
-
- public SumOperator() {
- minInputTupleLists = 1;
- maxInputTupleLists = Integer.MAX_VALUE;
- inputOutputTupleListsCountsEqual = false;
- numOutputTupleLists = 1;
- }
-
-
- public List<Iterator<Tuple<String>>> singleSetToIter(ArrayList<Tuple<String>> input)
- {
- List out = new ArrayList();
- out.add(input.iterator());
- return out;
- }
-
- @Override
- public List<Iterator<Tuple<String>>> execute(List<Iterator<Tuple<String>>> input) {
- ArrayList<Tuple<String>> output = new ArrayList<Tuple<String>>();
- if (input == null || input.size() == 0) {
- return singleSetToIter(output);
- }
- while (true) { //loop through set of iters, return when 1 runs out (not completing the row in progress)
- Tuple<String> rowSum = null;
- for (Iterator<Tuple<String>> it : input) {
- if (!it.hasNext()) { //when any iterator runs out, we are done
- return singleSetToIter(output);
- }
- rowSum = sumTuples(rowSum, it.next());
- }
- output.add(rowSum);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/alerts/Tuple.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/alerts/Tuple.java b/helix-core/src/main/java/com/linkedin/helix/alerts/Tuple.java
deleted file mode 100644
index 9336e47..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/alerts/Tuple.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.alerts;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.StringTokenizer;
-import java.util.Vector;
-
-public class Tuple<T> {
- List<T> elements;
-
- public Tuple()
- {
- elements = new ArrayList<T>();
- }
-
- public int size()
- {
- return elements.size();
- }
-
- public void add(T entry)
- {
- elements.add(entry);
- }
-
- public void addAll(Tuple<T> incoming)
- {
- elements.addAll(incoming.getElements());
- }
-
- public Iterator<T> iterator()
- {
- return elements.listIterator();
- }
-
- public T getElement(int ind)
- {
- return elements.get(ind);
- }
-
- public List<T> getElements()
- {
- return elements;
- }
-
- public void clear()
- {
- elements.clear();
- }
-
- public static Tuple<String> fromString(String in)
- {
- Tuple<String> tup = new Tuple<String>();
- if (in.length() > 0) {
- String[] elements = in.split(",");
- for (String element : elements) {
- tup.add(element);
- }
- }
- return tup;
- }
-
- public String toString()
- {
- StringBuilder out = new StringBuilder();
- Iterator<T> it = iterator();
- boolean outEmpty=true;
- while (it.hasNext()) {
- if (!outEmpty) {
- out.append(",");
- }
- out.append(it.next());
- outEmpty = false;
- }
- return out.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/alerts/WindowAggregator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/alerts/WindowAggregator.java b/helix-core/src/main/java/com/linkedin/helix/alerts/WindowAggregator.java
deleted file mode 100644
index 132f23a..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/alerts/WindowAggregator.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.alerts;
-
-import java.util.Iterator;
-
-import com.linkedin.helix.HelixException;
-
-public class WindowAggregator extends Aggregator {
-
-
- int _windowSize;
-
- public WindowAggregator(String windowSize)
- {
- _windowSize = Integer.parseInt(windowSize);
- _numArgs = 1;
- }
-
- public WindowAggregator()
- {
- this("1");
- }
-
- @Override
- public void merge(Tuple<String> currValTup, Tuple<String> newValTup,
- Tuple<String> currTimeTup, Tuple<String> newTimeTup, String... args) {
-
- _windowSize = Integer.parseInt(args[0]);
-
- //figure out how many curr tuple values we displace
- Tuple<String> mergedTimeTuple = new Tuple<String>();
- Tuple<String> mergedValTuple = new Tuple<String>();
-
- Iterator<String> currTimeIter = currTimeTup.iterator();
- Iterator<String> currValIter = currValTup.iterator();
- Iterator<String> newTimeIter = newTimeTup.iterator();
- Iterator<String> newValIter = newValTup.iterator();
- int currCtr = 0;
- //traverse current vals
- double currTime = -1;
- double currVal;
- while (currTimeIter.hasNext()) {
- currTime = Double.parseDouble(currTimeIter.next());
- currVal = Double.parseDouble(currValIter.next());
- currCtr++;
- //number of evicted currVals equal to total size of both minus _windowSize
- if (currCtr > (newTimeTup.size()+currTimeTup.size()-_windowSize)) { //non-evicted element, just bump down
- mergedTimeTuple.add(String.valueOf(currTime));
- mergedValTuple.add(String.valueOf(currVal));
- }
- }
-
- double newVal;
- double newTime;
- while (newTimeIter.hasNext()) {
- newVal = Double.parseDouble(newValIter.next());
- newTime = Double.parseDouble(newTimeIter.next());
- if (newTime <= currTime) { //oldest new time older than newest curr time. we will not apply new tuple!
- return; //curr tuples remain the same
- }
- currCtr++;
- if (currCtr > (newTimeTup.size()+currTimeTup.size()-_windowSize)) { //non-evicted element
- mergedTimeTuple.add(String.valueOf(newTime));
- mergedValTuple.add(String.valueOf(newVal));
- }
- }
- //set curr tuples to merged tuples
- currTimeTup.clear();
- currTimeTup.addAll(mergedTimeTuple);
- currValTup.clear();
- currValTup.addAll(mergedValTuple);
- //TODO: see if we can do merger in place on curr
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/alerts/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/alerts/package-info.java b/helix-core/src/main/java/com/linkedin/helix/alerts/package-info.java
deleted file mode 100644
index 88a99ae..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/alerts/package-info.java
+++ /dev/null
@@ -1,4 +0,0 @@
-/**
- * Classes for Helix alerts
- */
-package com.linkedin.helix.alerts;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/ExternalViewGenerator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/ExternalViewGenerator.java b/helix-core/src/main/java/com/linkedin/helix/controller/ExternalViewGenerator.java
deleted file mode 100644
index 28bcb34..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/ExternalViewGenerator.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.model.CurrentState.CurrentStateProperty;
-
-/*
- * ZKRoutingInfoProvider keeps a copy of the routing table. Given a partition id,
- * it will return
- *
- * 1. The list of partition that can be read
- * 2. the master partition, for write operation
- *
- * The routing table is constructed from the currentState of each storage nodes.
- * The current state is a list of following pairs: partition-id:State(MASTER / SLAVE)
- *
- * TODO: move the code as part of router process
- * TODO: add listeners to node current state changes
- * */
-public class ExternalViewGenerator
-{
- static Logger _logger = Logger.getLogger(ExternalViewGenerator.class);
-
- /*
- * Given a list of external view ZNRecord nodes(one for each cluster),
- * calculate the routing map.
- *
- * The format of the routing map is like this:
- *
- * Map<String, Map<String, Set<String>>> maps from a partitionName to its
- * states Map<String, List<String>> The second Map maps from a state
- * ("MASTER", "SLAVE"...) to a list of nodeNames
- *
- * So that the we can query the map for the list of nodes by providing the
- * partition name and the expected state.
- */
- public Map<String, Map<String, Set<String>>> getRouterMapFromExternalView(
- List<ZNRecord> dbExternalViewList)
- {
- Map<String, Map<String, Set<String>>> result = new TreeMap<String, Map<String, Set<String>>>();
-
- for (ZNRecord dbNodeView : dbExternalViewList)
- {
- Map<String, Map<String, String>> dbNodeStateMap = dbNodeView
- .getMapFields();
- for (String partitionId : dbNodeStateMap.keySet())
- {
- if (!result.containsKey(partitionId))
- {
- result.put(partitionId, new TreeMap<String, Set<String>>());
- }
- Map<String, String> nodeStateMap = dbNodeStateMap.get(partitionId);
- for (String nodeName : nodeStateMap.keySet())
- {
- String state = nodeStateMap.get(nodeName);
- if (!result.get(partitionId).containsKey(state))
- {
- result.get(partitionId).put(state, new TreeSet<String>());
- }
- result.get(partitionId).get(state).add(nodeName);
- }
- }
- }
- return result;
- }
-
- /*
- * The parameter is a map that maps the nodeName to a list of ZNRecords.
- */
- public List<ZNRecord> computeExternalView(
- Map<String, List<ZNRecord>> currentStates, List<ZNRecord> idealStates)
- {
- List<ZNRecord> resultList = new ArrayList<ZNRecord>();
- Map<String, ZNRecord> resultRoutingTable = new HashMap<String, ZNRecord>();
- // maps from dbName to another map : partition -> map <nodename,
- // master/slave>;
- // Fill the routing table with "empty" default state according to ideals
- // states
- // in the cluster
- if (idealStates != null)
- {
- for (ZNRecord idealState : idealStates)
- {
- ZNRecord defaultDBExternalView = new ZNRecord(idealState.getId());
- resultRoutingTable.put(idealState.getId(), defaultDBExternalView);
- }
- } else
- {
- assert (!currentStates.isEmpty());
- return resultList;
- }
- for (String nodeName : currentStates.keySet())
- {
- List<ZNRecord> zndbStates = currentStates.get(nodeName);
- for (ZNRecord dbNodeStateRecord : zndbStates)
- {
- Map<String, Map<String, String>> dbStates = dbNodeStateRecord
- .getMapFields();
- for (String stateUnitKey : dbStates.keySet())
- {
- Map<String, String> dbPartitionStates = dbStates.get(stateUnitKey);
- String dbName = dbPartitionStates
- .get(Message.Attributes.RESOURCE_NAME.toString());
- ZNRecord partitionStatus = resultRoutingTable.get(dbName);
- if (partitionStatus == null)
- {
- partitionStatus = new ZNRecord(dbName);
- resultRoutingTable.put(dbName, partitionStatus);
- }
- String currentStateKey = CurrentStateProperty.CURRENT_STATE.toString();
-
- if (!partitionStatus.getMapFields().containsKey(stateUnitKey))
- {
- partitionStatus.setMapField(stateUnitKey,
- new TreeMap<String, String>());
- }
- partitionStatus.getMapField(stateUnitKey).put(nodeName,
- dbPartitionStates.get(currentStateKey));
-
- }
- }
- }
- for (ZNRecord record : resultRoutingTable.values())
- {
- resultList.add(record);
- }
- return resultList;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/GenericHelixController.java b/helix-core/src/main/java/com/linkedin/helix/controller/GenericHelixController.java
deleted file mode 100644
index 08e99c6..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/GenericHelixController.java
+++ /dev/null
@@ -1,608 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ConcurrentSkipListSet;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.ConfigAccessor;
-import com.linkedin.helix.ConfigChangeListener;
-import com.linkedin.helix.ConfigScope;
-import com.linkedin.helix.ConfigScopeBuilder;
-import com.linkedin.helix.ControllerChangeListener;
-import com.linkedin.helix.CurrentStateChangeListener;
-import com.linkedin.helix.ExternalViewChangeListener;
-import com.linkedin.helix.HealthStateChangeListener;
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.IdealStateChangeListener;
-import com.linkedin.helix.LiveInstanceChangeListener;
-import com.linkedin.helix.MessageListener;
-import com.linkedin.helix.NotificationContext;
-import com.linkedin.helix.NotificationContext.Type;
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.controller.pipeline.Pipeline;
-import com.linkedin.helix.controller.pipeline.PipelineRegistry;
-import com.linkedin.helix.controller.stages.BestPossibleStateCalcStage;
-import com.linkedin.helix.controller.stages.ClusterEvent;
-import com.linkedin.helix.controller.stages.CompatibilityCheckStage;
-import com.linkedin.helix.controller.stages.CurrentStateComputationStage;
-import com.linkedin.helix.controller.stages.ExternalViewComputeStage;
-import com.linkedin.helix.controller.stages.MessageGenerationPhase;
-import com.linkedin.helix.controller.stages.MessageSelectionStage;
-import com.linkedin.helix.controller.stages.MessageThrottleStage;
-import com.linkedin.helix.controller.stages.ReadClusterDataStage;
-import com.linkedin.helix.controller.stages.ResourceComputationStage;
-import com.linkedin.helix.controller.stages.TaskAssignmentStage;
-import com.linkedin.helix.model.CurrentState;
-import com.linkedin.helix.model.ExternalView;
-import com.linkedin.helix.model.HealthStat;
-import com.linkedin.helix.model.IdealState;
-import com.linkedin.helix.model.InstanceConfig;
-import com.linkedin.helix.model.LiveInstance;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.model.PauseSignal;
-import com.linkedin.helix.monitoring.mbeans.ClusterStatusMonitor;
-import com.linkedin.helix.monitoring.mbeans.MessageQueueMonitor;
-
-/**
- * Cluster Controllers main goal is to keep the cluster state as close as possible to
- * Ideal State. It does this by listening to changes in cluster state and scheduling new
- * tasks to get cluster state to best possible ideal state. Every instance of this class
- * can control can control only one cluster
- *
- *
- * Get all the partitions use IdealState, CurrentState and Messages <br>
- * foreach partition <br>
- * 1. get the (instance,state) from IdealState, CurrentState and PendingMessages <br>
- * 2. compute best possible state (instance,state) pair. This needs previous step data and
- * state model constraints <br>
- * 3. compute the messages/tasks needed to move to 1 to 2 <br>
- * 4. select the messages that can be sent, needs messages and state model constraints <br>
- * 5. send messages
- */
-public class GenericHelixController implements
- ConfigChangeListener,
- IdealStateChangeListener,
- LiveInstanceChangeListener,
- MessageListener,
- CurrentStateChangeListener,
- ExternalViewChangeListener,
- ControllerChangeListener,
- HealthStateChangeListener
-{
- private static final Logger logger =
- Logger.getLogger(GenericHelixController.class.getName());
- volatile boolean init = false;
- private final PipelineRegistry _registry;
-
- /**
- * Since instance current state is per-session-id, we need to track the session-ids of
- * the current states that the ClusterController is observing. this set contains all the
- * session ids that we add currentState listener
- */
- private final Set<String> _instanceCurrentStateChangeSubscriptionSessionIds;
-
- /**
- * this set contains all the instance names that we add message listener
- */
- private final Set<String> _instanceSubscriptionNames;
-
- ClusterStatusMonitor _clusterStatusMonitor;
-
-
- /**
- * The _paused flag is checked by function handleEvent(), while if the flag is set
- * handleEvent() will be no-op. Other event handling logic keeps the same when the flag
- * is set.
- */
- private boolean _paused;
-
- /**
- * The timer that can periodically run the rebalancing pipeline. The timer will start if there
- * is one resource group has the config to use the timer.
- */
- Timer _rebalanceTimer = null;
- int _timerPeriod = Integer.MAX_VALUE;
-
- /**
- * Default constructor that creates a default pipeline registry. This is sufficient in
- * most cases, but if there is a some thing specific needed use another constructor
- * where in you can pass a pipeline registry
- */
- public GenericHelixController()
- {
- this(createDefaultRegistry());
- }
-
- class RebalanceTask extends TimerTask
- {
- HelixManager _manager;
-
- public RebalanceTask(HelixManager manager)
- {
- _manager = manager;
- }
-
- @Override
- public void run()
- {
- NotificationContext changeContext = new NotificationContext(_manager);
- changeContext.setType(NotificationContext.Type.CALLBACK);
- ClusterEvent event = new ClusterEvent("periodicalRebalance");
- event.addAttribute("helixmanager", changeContext.getManager());
- event.addAttribute("changeContext", changeContext);
- List<ZNRecord> dummy = new ArrayList<ZNRecord>();
- event.addAttribute("eventData", dummy);
- // Should be able to process
- handleEvent(event);
- }
- }
-
- /**
- * Starts the rebalancing timer with the specified period. Start the timer if necessary;
- * If the period is smaller than the current period, cancel the current timer and use
- * the new period.
- */
- void startRebalancingTimer(int period, HelixManager manager)
- {
- logger.info("Controller starting timer at period " + period);
- if(period < _timerPeriod)
- {
- if(_rebalanceTimer != null)
- {
- _rebalanceTimer.cancel();
- }
- _rebalanceTimer = new Timer(true);
- _timerPeriod = period;
- _rebalanceTimer.scheduleAtFixedRate(new RebalanceTask(manager), _timerPeriod, _timerPeriod);
- }
- else
- {
- logger.info("Controller already has timer at period " + _timerPeriod);
- }
- }
-
- /**
- * Starts the rebalancing timer
- */
- void stopRebalancingTimer()
- {
- if(_rebalanceTimer != null)
- {
- _rebalanceTimer.cancel();
- _rebalanceTimer = null;
- }
- _timerPeriod = Integer.MAX_VALUE;
- }
-
- private static PipelineRegistry createDefaultRegistry()
- {
- logger.info("createDefaultRegistry");
- synchronized (GenericHelixController.class)
- {
- PipelineRegistry registry = new PipelineRegistry();
-
- // cluster data cache refresh
- Pipeline dataRefresh = new Pipeline();
- dataRefresh.addStage(new ReadClusterDataStage());
-
- // rebalance pipeline
- Pipeline rebalancePipeline = new Pipeline();
- rebalancePipeline.addStage(new ResourceComputationStage());
- rebalancePipeline.addStage(new CurrentStateComputationStage());
- rebalancePipeline.addStage(new BestPossibleStateCalcStage());
- rebalancePipeline.addStage(new MessageGenerationPhase());
- rebalancePipeline.addStage(new MessageSelectionStage());
- rebalancePipeline.addStage(new MessageThrottleStage());
- rebalancePipeline.addStage(new TaskAssignmentStage());
-
- // external view generation
- Pipeline externalViewPipeline = new Pipeline();
- externalViewPipeline.addStage(new ExternalViewComputeStage());
-
- // backward compatibility check
- Pipeline liveInstancePipeline = new Pipeline();
- liveInstancePipeline.addStage(new CompatibilityCheckStage());
-
- registry.register("idealStateChange", dataRefresh, rebalancePipeline);
- registry.register("currentStateChange",
- dataRefresh,
- rebalancePipeline,
- externalViewPipeline);
- registry.register("configChange", dataRefresh, rebalancePipeline);
- registry.register("liveInstanceChange",
- dataRefresh,
- liveInstancePipeline,
- rebalancePipeline,
- externalViewPipeline);
-
- registry.register("messageChange",
- dataRefresh,
- rebalancePipeline);
- registry.register("externalView", dataRefresh);
- registry.register("resume", dataRefresh, rebalancePipeline, externalViewPipeline);
- registry.register("periodicalRebalance", dataRefresh, rebalancePipeline, externalViewPipeline);
-
- // health stats pipeline
- // Pipeline healthStatsAggregationPipeline = new Pipeline();
- // StatsAggregationStage statsStage = new StatsAggregationStage();
- // healthStatsAggregationPipeline.addStage(new ReadHealthDataStage());
- // healthStatsAggregationPipeline.addStage(statsStage);
- // registry.register("healthChange", healthStatsAggregationPipeline);
-
- return registry;
- }
- }
-
- public GenericHelixController(PipelineRegistry registry)
- {
- _paused = false;
- _registry = registry;
- _instanceCurrentStateChangeSubscriptionSessionIds =
- new ConcurrentSkipListSet<String>();
- _instanceSubscriptionNames = new ConcurrentSkipListSet<String>();
- // _externalViewGenerator = new ExternalViewGenerator();
- }
-
- /**
- * lock-always: caller always needs to obtain an external lock before call, calls to
- * handleEvent() should be serialized
- *
- * @param event
- */
- protected synchronized void handleEvent(ClusterEvent event)
- {
- HelixManager manager = event.getAttribute("helixmanager");
- if (manager == null)
- {
- logger.error("No cluster manager in event:" + event.getName());
- return;
- }
-
- if (!manager.isLeader())
- {
- logger.error("Cluster manager: " + manager.getInstanceName()
- + " is not leader. Pipeline will not be invoked");
- return;
- }
-
- if (_paused)
- {
- logger.info("Cluster is paused. Ignoring the event:" + event.getName());
- return;
- }
-
- NotificationContext context = null;
- if (event.getAttribute("changeContext") != null)
- {
- context = (NotificationContext) (event.getAttribute("changeContext"));
- }
-
- // Initialize _clusterStatusMonitor
- if (context != null)
- {
- if (context.getType() == Type.FINALIZE)
- {
- if (_clusterStatusMonitor != null)
- {
- _clusterStatusMonitor.reset();
- _clusterStatusMonitor = null;
- }
-
- stopRebalancingTimer();
- logger.info("Get FINALIZE notification, skip the pipeline. Event :" + event.getName());
- return;
- }
- else
- {
- if (_clusterStatusMonitor == null)
- {
- _clusterStatusMonitor = new ClusterStatusMonitor(manager.getClusterName());
- }
-
- event.addAttribute("clusterStatusMonitor", _clusterStatusMonitor);
- }
- }
-
- List<Pipeline> pipelines = _registry.getPipelinesForEvent(event.getName());
- if (pipelines == null || pipelines.size() == 0)
- {
- logger.info("No pipeline to run for event:" + event.getName());
- return;
- }
-
- for (Pipeline pipeline : pipelines)
- {
- try
- {
- pipeline.handle(event);
- pipeline.finish();
- }
- catch (Exception e)
- {
- logger.error("Exception while executing pipeline: " + pipeline
- + ". Will not continue to next pipeline", e);
- break;
- }
- }
- }
-
- // TODO since we read data in pipeline, we can get rid of reading from zookeeper in
- // callback
-
- @Override
- public void onExternalViewChange(List<ExternalView> externalViewList,
- NotificationContext changeContext)
- {
-// logger.info("START: GenericClusterController.onExternalViewChange()");
-// ClusterEvent event = new ClusterEvent("externalViewChange");
-// event.addAttribute("helixmanager", changeContext.getManager());
-// event.addAttribute("changeContext", changeContext);
-// event.addAttribute("eventData", externalViewList);
-// // handleEvent(event);
-// logger.info("END: GenericClusterController.onExternalViewChange()");
- }
-
- @Override
- public void onStateChange(String instanceName,
- List<CurrentState> statesInfo,
- NotificationContext changeContext)
- {
- logger.info("START: GenericClusterController.onStateChange()");
- ClusterEvent event = new ClusterEvent("currentStateChange");
- event.addAttribute("helixmanager", changeContext.getManager());
- event.addAttribute("instanceName", instanceName);
- event.addAttribute("changeContext", changeContext);
- event.addAttribute("eventData", statesInfo);
- handleEvent(event);
- logger.info("END: GenericClusterController.onStateChange()");
- }
-
- @Override
- public void onHealthChange(String instanceName,
- List<HealthStat> reports,
- NotificationContext changeContext)
- {
- /**
- * When there are more participant ( > 20, can be in hundreds), This callback can be
- * called quite frequently as each participant reports health stat every minute. Thus
- * we change the health check pipeline to run in a timer callback.
- */
- }
-
- @Override
- public void onMessage(String instanceName,
- List<Message> messages,
- NotificationContext changeContext)
- {
- logger.info("START: GenericClusterController.onMessage()");
-
- ClusterEvent event = new ClusterEvent("messageChange");
- event.addAttribute("helixmanager", changeContext.getManager());
- event.addAttribute("instanceName", instanceName);
- event.addAttribute("changeContext", changeContext);
- event.addAttribute("eventData", messages);
- handleEvent(event);
-
- if (_clusterStatusMonitor != null && messages != null)
- {
- _clusterStatusMonitor.addMessageQueueSize(instanceName, messages.size());
- }
-
- logger.info("END: GenericClusterController.onMessage()");
- }
-
- @Override
- public void onLiveInstanceChange(List<LiveInstance> liveInstances,
- NotificationContext changeContext)
- {
- logger.info("START: Generic GenericClusterController.onLiveInstanceChange()");
- if (liveInstances == null)
- {
- liveInstances = Collections.emptyList();
- }
- // Go though the live instance list and make sure that we are observing them
- // accordingly. The action is done regardless of the paused flag.
- if (changeContext.getType() == NotificationContext.Type.INIT ||
- changeContext.getType() == NotificationContext.Type.CALLBACK)
- {
- checkLiveInstancesObservation(liveInstances, changeContext);
- }
-
- ClusterEvent event = new ClusterEvent("liveInstanceChange");
- event.addAttribute("helixmanager", changeContext.getManager());
- event.addAttribute("changeContext", changeContext);
- event.addAttribute("eventData", liveInstances);
- handleEvent(event);
- logger.info("END: Generic GenericClusterController.onLiveInstanceChange()");
- }
-
- void checkRebalancingTimer(HelixManager manager, List<IdealState> idealStates)
- {
- if (manager.getConfigAccessor() == null)
- {
- logger.warn(manager.getInstanceName() + " config accessor doesn't exist. should be in file-based mode.");
- return;
- }
-
- for(IdealState idealState : idealStates)
- {
- int period = idealState.getRebalanceTimerPeriod();
- if(period > 0)
- {
- startRebalancingTimer(period, manager);
- }
- }
- }
-
- @Override
- public void onIdealStateChange(List<IdealState> idealStates,
- NotificationContext changeContext)
- {
- logger.info("START: Generic GenericClusterController.onIdealStateChange()");
- ClusterEvent event = new ClusterEvent("idealStateChange");
- event.addAttribute("helixmanager", changeContext.getManager());
- event.addAttribute("changeContext", changeContext);
- event.addAttribute("eventData", idealStates);
- handleEvent(event);
-
- if(changeContext.getType() != Type.FINALIZE)
- {
- checkRebalancingTimer(changeContext.getManager(), idealStates);
- }
-
- logger.info("END: Generic GenericClusterController.onIdealStateChange()");
- }
-
- @Override
- public void onConfigChange(List<InstanceConfig> configs,
- NotificationContext changeContext)
- {
- logger.info("START: GenericClusterController.onConfigChange()");
- ClusterEvent event = new ClusterEvent("configChange");
- event.addAttribute("changeContext", changeContext);
- event.addAttribute("helixmanager", changeContext.getManager());
- event.addAttribute("eventData", configs);
- handleEvent(event);
- logger.info("END: GenericClusterController.onConfigChange()");
- }
-
- @Override
- public void onControllerChange(NotificationContext changeContext)
- {
- logger.info("START: GenericClusterController.onControllerChange()");
- HelixDataAccessor accessor = changeContext.getManager().getHelixDataAccessor();
-
- // double check if this controller is the leader
- Builder keyBuilder = accessor.keyBuilder();
- LiveInstance leader =
- accessor.getProperty(keyBuilder.controllerLeader());
- if (leader == null)
- {
- logger.warn("No controller exists for cluster:"
- + changeContext.getManager().getClusterName());
- return;
- }
- else
- {
- String leaderName = leader.getInstanceName();
-
- String instanceName = changeContext.getManager().getInstanceName();
- if (leaderName == null || !leaderName.equals(instanceName))
- {
- logger.warn("leader name does NOT match, my name: " + instanceName + ", leader: "
- + leader);
- return;
- }
- }
-
- PauseSignal pauseSignal = accessor.getProperty(keyBuilder.pause());
- if (pauseSignal != null)
- {
- _paused = true;
- logger.info("controller is now paused");
- }
- else
- {
- if (_paused)
- {
- // it currently paused
- logger.info("controller is now resumed");
- _paused = false;
- ClusterEvent event = new ClusterEvent("resume");
- event.addAttribute("changeContext", changeContext);
- event.addAttribute("helixmanager", changeContext.getManager());
- event.addAttribute("eventData", pauseSignal);
- handleEvent(event);
- }
- else
- {
- _paused = false;
- }
- }
- logger.info("END: GenericClusterController.onControllerChange()");
- }
-
- /**
- * Go through the list of liveinstances in the cluster, and add currentstateChange
- * listener and Message listeners to them if they are newly added. For current state
- * change, the observation is tied to the session id of each live instance.
- *
- */
- protected void checkLiveInstancesObservation(List<LiveInstance> liveInstances,
- NotificationContext changeContext)
- {
- for (LiveInstance instance : liveInstances)
- {
- String instanceName = instance.getId();
- String clientSessionId = instance.getSessionId();
- HelixManager manager = changeContext.getManager();
-
- // _instanceCurrentStateChangeSubscriptionSessionIds contains all the sessionIds
- // that we've added a currentState listener
- if (!_instanceCurrentStateChangeSubscriptionSessionIds.contains(clientSessionId))
- {
- try
- {
- manager.addCurrentStateChangeListener(this, instanceName, clientSessionId);
- _instanceCurrentStateChangeSubscriptionSessionIds.add(clientSessionId);
- logger.info("Observing client session id: " + clientSessionId);
- }
- catch (Exception e)
- {
- logger.error("Exception adding current state and message listener for instance:"
- + instanceName,
- e);
- }
- }
-
- // _instanceSubscriptionNames contains all the instanceNames that we've added a
- // message listener
- if (!_instanceSubscriptionNames.contains(instanceName))
- {
- try
- {
- logger.info("Adding message listener for " + instanceName);
- manager.addMessageListener(this, instanceName);
- _instanceSubscriptionNames.add(instanceName);
- }
- catch (Exception e)
- {
- logger.error("Exception adding message listener for instance:" + instanceName,
- e);
- }
- }
-
- // TODO we need to remove currentState listeners and message listeners
- // when a session or an instance no longer exists. This may happen
- // in case of session expiry, participant rebound, participant goes and new
- // participant comes
-
- // TODO shi should call removeListener on the previous session id;
- // but the removeListener with that functionality is not implemented yet
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/HelixControllerMain.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/HelixControllerMain.java b/helix-core/src/main/java/com/linkedin/helix/controller/HelixControllerMain.java
deleted file mode 100644
index 9ab8851..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/HelixControllerMain.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller;
-
-/**
- * start cluster manager controller
- * cluster manager controller has two modes:
- * 1) stand-alone mode: in this mode each controller gets a list of clusters
- * and competes via leader election to become the controller for any of the clusters.
- * if a controller fails to become the leader of a given cluster, it remains as a standby
- * and re-does the leader election when the current leader fails
- *
- * 2) distributed mode: in this mode each controller first joins as participant into
- * a special CONTROLLER_CLUSTER. Leader election happens in this special
- * cluster. The one that becomes the leader controls all controllers (including itself
- * to become leaders of other clusters.
- */
-
-import java.util.Arrays;
-
-import org.I0Itec.zkclient.exception.ZkInterruptedException;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.HelixManagerFactory;
-import com.linkedin.helix.InstanceType;
-import com.linkedin.helix.controller.restlet.ZKPropertyTransferServer;
-import com.linkedin.helix.participant.DistClusterControllerStateModelFactory;
-import com.linkedin.helix.participant.StateMachineEngine;
-
-public class HelixControllerMain
-{
- public static final String zkServerAddress = "zkSvr";
- public static final String cluster = "cluster";
- public static final String help = "help";
- public static final String mode = "mode";
- public static final String propertyTransferServicePort = "propertyTransferPort";
- public static final String name = "controllerName";
- public static final String STANDALONE = "STANDALONE";
- public static final String DISTRIBUTED = "DISTRIBUTED";
- private static final Logger logger = Logger.getLogger(HelixControllerMain.class);
-
- // hack: OptionalBuilder is not thread safe
- @SuppressWarnings("static-access")
- synchronized private static Options constructCommandLineOptions()
- {
- Option helpOption = OptionBuilder.withLongOpt(help)
- .withDescription("Prints command-line options info").create();
-
- Option zkServerOption = OptionBuilder.withLongOpt(zkServerAddress)
- .withDescription("Provide zookeeper address").create();
- zkServerOption.setArgs(1);
- zkServerOption.setRequired(true);
- zkServerOption.setArgName("ZookeeperServerAddress(Required)");
-
- Option clusterOption = OptionBuilder.withLongOpt(cluster)
- .withDescription("Provide cluster name").create();
- clusterOption.setArgs(1);
- clusterOption.setRequired(true);
- clusterOption.setArgName("Cluster name (Required)");
-
- Option modeOption = OptionBuilder
- .withLongOpt(mode)
- .withDescription(
- "Provide cluster controller mode (Optional): STANDALONE (default) or DISTRIBUTED")
- .create();
- modeOption.setArgs(1);
- modeOption.setRequired(false);
- modeOption.setArgName("Cluster controller mode (Optional)");
-
- Option controllerNameOption = OptionBuilder.withLongOpt(name)
- .withDescription("Provide cluster controller name (Optional)").create();
- controllerNameOption.setArgs(1);
- controllerNameOption.setRequired(false);
- controllerNameOption.setArgName("Cluster controller name (Optional)");
-
- Option portOption = OptionBuilder
- .withLongOpt(propertyTransferServicePort)
- .withDescription(
- "Webservice port for ZkProperty controller transfer")
- .create();
- portOption.setArgs(1);
- portOption.setRequired(false);
- portOption.setArgName("Cluster controller property transfer port (Optional)");
-
- Options options = new Options();
- options.addOption(helpOption);
- options.addOption(zkServerOption);
- options.addOption(clusterOption);
- options.addOption(modeOption);
- options.addOption(portOption);
- options.addOption(controllerNameOption);
-
- return options;
- }
-
- public static void printUsage(Options cliOptions)
- {
- HelpFormatter helpFormatter = new HelpFormatter();
- helpFormatter.setWidth(1000);
- helpFormatter.printHelp("java " + HelixControllerMain.class.getName(), cliOptions);
- }
-
- public static CommandLine processCommandLineArgs(String[] cliArgs) throws Exception
- {
- CommandLineParser cliParser = new GnuParser();
- Options cliOptions = constructCommandLineOptions();
-
- try
- {
- return cliParser.parse(cliOptions, cliArgs);
- } catch (ParseException pe)
- {
- logger.error("fail to parse command-line options. cliArgs: " + Arrays.toString(cliArgs), pe);
- printUsage(cliOptions);
- System.exit(1);
- }
- return null;
- }
-
- public static void addListenersToController(HelixManager manager,
- GenericHelixController controller)
- {
- try
- {
- manager.addConfigChangeListener(controller);
- manager.addLiveInstanceChangeListener(controller);
- manager.addIdealStateChangeListener(controller);
- manager.addExternalViewChangeListener(controller);
- manager.addControllerListener(controller);
- } catch (ZkInterruptedException e)
- {
- logger
- .warn("zk connection is interrupted during HelixManagerMain.addListenersToController(). "
- + e);
- } catch (Exception e)
- {
- logger.error("Error when creating HelixManagerContollerMonitor", e);
- }
- }
-
- public static HelixManager startHelixController(final String zkConnectString,
- final String clusterName, final String controllerName, final String controllerMode)
- {
- HelixManager manager = null;
- try
- {
- if (controllerMode.equalsIgnoreCase(STANDALONE))
- {
- manager = HelixManagerFactory.getZKHelixManager(clusterName, controllerName,
- InstanceType.CONTROLLER, zkConnectString);
- manager.connect();
- } else if (controllerMode.equalsIgnoreCase(DISTRIBUTED))
- {
- manager = HelixManagerFactory.getZKHelixManager(clusterName, controllerName,
- InstanceType.CONTROLLER_PARTICIPANT, zkConnectString);
-
- DistClusterControllerStateModelFactory stateModelFactory = new DistClusterControllerStateModelFactory(
- zkConnectString);
-
- // StateMachineEngine genericStateMachineHandler = new
- // StateMachineEngine();
- StateMachineEngine stateMach = manager.getStateMachineEngine();
- stateMach.registerStateModelFactory("LeaderStandby", stateModelFactory);
- // manager.getMessagingService().registerMessageHandlerFactory(MessageType.STATE_TRANSITION.toString(),
- // genericStateMachineHandler);
- manager.connect();
- } else
- {
- logger.error("cluster controller mode:" + controllerMode + " NOT supported");
- // throw new
- // IllegalArgumentException("Unsupported cluster controller mode:" +
- // controllerMode);
- }
- } catch (Exception e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
- return manager;
- }
-
- public static void main(String[] args) throws Exception
- {
- // read the config;
- // check if the this process is the master wait indefinitely
- // other approach is always process the events but when updating the zk
- // check if this is master.
- // This is difficult to get right
- // get the clusters to manage
- // for each cluster create a manager
- // add the respective listeners for each manager
- CommandLine cmd = processCommandLineArgs(args);
- String zkConnectString = cmd.getOptionValue(zkServerAddress);
- String clusterName = cmd.getOptionValue(cluster);
- String controllerMode = STANDALONE;
- String controllerName = null;
- int propertyTransServicePort = -1;
-
- if (cmd.hasOption(mode))
- {
- controllerMode = cmd.getOptionValue(mode);
- }
-
- if(cmd.hasOption(propertyTransferServicePort))
- {
- propertyTransServicePort = Integer.parseInt(cmd.getOptionValue(propertyTransferServicePort));
- }
- if (controllerMode.equalsIgnoreCase(DISTRIBUTED) && !cmd.hasOption(name))
- {
- throw new IllegalArgumentException(
- "A unique cluster controller name is required in DISTRIBUTED mode");
- }
-
- controllerName = cmd.getOptionValue(name);
-
- // Espresso_driver.py will consume this
- logger.info("Cluster manager started, zkServer: " + zkConnectString + ", clusterName:"
- + clusterName + ", controllerName:" + controllerName + ", mode:" + controllerMode);
-
- if (propertyTransServicePort > 0)
- {
- ZKPropertyTransferServer.getInstance().init(propertyTransServicePort, zkConnectString);
- }
-
- HelixManager manager = startHelixController(zkConnectString, clusterName, controllerName,
- controllerMode);
- try
- {
- Thread.currentThread().join();
- }
- catch (InterruptedException e)
- {
- logger.info("controller:" + controllerName + ", " + Thread.currentThread().getName()
- + " interrupted");
- }
- finally
- {
- manager.disconnect();
- ZKPropertyTransferServer.getInstance().shutdown();
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/HierarchicalDataHolder.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/HierarchicalDataHolder.java b/helix-core/src/main/java/com/linkedin/helix/controller/HierarchicalDataHolder.java
deleted file mode 100644
index 4d9c217..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/HierarchicalDataHolder.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller;
-
-import java.io.FileFilter;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.data.Stat;
-
-import com.linkedin.helix.manager.zk.ZkClient;
-
-/**
- * Generic class that will read the data given the root path.
- *
- * @author kgopalak
- *
- */
-public class HierarchicalDataHolder<T>
-{
- private static final Logger logger = Logger
- .getLogger(HierarchicalDataHolder.class.getName());
- AtomicReference<Node<T>> root;
-
- /**
- * currentVersion, gets updated when data is read from original source
- */
- AtomicLong currentVersion;
- private final ZkClient _zkClient;
- private final String _rootPath;
- private final FileFilter _filter;
-
- public HierarchicalDataHolder(ZkClient client, String rootPath,
- FileFilter filter)
- {
- this._zkClient = client;
- this._rootPath = rootPath;
- this._filter = filter;
- // Node<T> initialValue = new Node<T>();
- root = new AtomicReference<HierarchicalDataHolder.Node<T>>();
- currentVersion = new AtomicLong(1);
- refreshData();
- }
-
- public long getVersion()
- {
- return currentVersion.get();
- }
-
- public boolean refreshData()
- {
- Node<T> newRoot = new Node<T>();
- boolean dataChanged = refreshRecursively(root.get(), newRoot, _rootPath);
- if (dataChanged)
- {
- currentVersion.getAndIncrement();
- root.set(newRoot);
- return true;
- } else
- {
- return false;
- }
- }
-
- // private void refreshRecursively(Node<T> oldRoot, Stat oldStat, Node<T>
- // newRoot,Stat newStat, String path)
- private boolean refreshRecursively(Node<T> oldRoot, Node<T> newRoot,
- String path)
- {
- boolean dataChanged = false;
- Stat newStat = _zkClient.getStat(path);
- Stat oldStat = (oldRoot != null) ? oldRoot.stat : null;
- newRoot.name = path;
- if (newStat != null)
- {
- if (oldStat == null)
- {
- newRoot.stat = newStat;
- newRoot.data = _zkClient.<T> readData(path, true);
- dataChanged = true;
- } else if (newStat.equals(oldStat))
- {
- newRoot.stat = oldStat;
- newRoot.data = oldRoot.data;
- } else
- {
- dataChanged = true;
- newRoot.stat = newStat;
- newRoot.data = _zkClient.<T> readData(path, true);
- }
- if (newStat.getNumChildren() > 0)
- {
- List<String> children = _zkClient.getChildren(path);
- for (String child : children)
- {
- String newPath = path + "/" + child;
- Node<T> oldChild = (oldRoot != null && oldRoot.children != null) ? oldRoot.children
- .get(child) : null;
- if (newRoot.children == null)
- {
- newRoot.children = new ConcurrentHashMap<String, HierarchicalDataHolder.Node<T>>();
- }
- if (!newRoot.children.contains(child))
- {
- newRoot.children.put(child, new Node<T>());
- }
- Node<T> newChild = newRoot.children.get(child);
- boolean childChanged = refreshRecursively(oldChild, newChild, newPath);
- dataChanged = dataChanged || childChanged;
- }
- }
- } else
- {
- logger.info(path + " does not exist");
- }
- return dataChanged;
- }
-
- static class Node<T>
- {
- String name;
- Stat stat;
- T data;
- ConcurrentHashMap<String, Node<T>> children;
-
- }
-
- public void print()
- {
- logger.info("START "+ _rootPath);
- LinkedList<Node<T>> stack = new LinkedList<HierarchicalDataHolder.Node<T>>();
- stack.push(root.get());
- while (!stack.isEmpty())
- {
- Node<T> pop = stack.pop();
- if (pop != null)
- {
- logger.info("name:"+ pop.name);
- logger.info("\tdata:"+pop.data);
- if (pop.children != null)
- {
- for (Node<T> child : pop.children.values())
- {
- stack.push(child);
- }
- }
- }
- }
- logger.info("END "+ _rootPath);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/package-info.java b/helix-core/src/main/java/com/linkedin/helix/controller/package-info.java
deleted file mode 100644
index 54013e4..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/package-info.java
+++ /dev/null
@@ -1,4 +0,0 @@
-/**
- * Helix cluster controller
- */
-package com.linkedin.helix.controller;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/pipeline/AbstractBaseStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/pipeline/AbstractBaseStage.java b/helix-core/src/main/java/com/linkedin/helix/controller/pipeline/AbstractBaseStage.java
deleted file mode 100644
index 71b4a92..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/pipeline/AbstractBaseStage.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller.pipeline;
-
-import java.util.Map;
-
-import com.linkedin.helix.controller.stages.ClusterEvent;
-import com.linkedin.helix.monitoring.mbeans.HelixStageLatencyMonitor;
-
-public class AbstractBaseStage implements Stage
-{
- @Override
- public void init(StageContext context)
- {
-
- }
-
- @Override
- public void preProcess()
- {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void process(ClusterEvent event) throws Exception
- {
-
- }
-
- @Override
- public void postProcess()
- {
-
- }
-
- @Override
- public void release()
- {
-
- }
-
- @Override
- public String getStageName()
- {
- // default stage name will be the class name
- String className = this.getClass().getName();
- return className;
- }
-
- public void addLatencyToMonitor(ClusterEvent event, long latency)
- {
- Map<String, HelixStageLatencyMonitor> stgLatencyMonitorMap =
- event.getAttribute("HelixStageLatencyMonitorMap");
- if (stgLatencyMonitorMap != null)
- {
- if (stgLatencyMonitorMap.containsKey(getStageName()))
- {
- HelixStageLatencyMonitor stgLatencyMonitor =
- stgLatencyMonitorMap.get(getStageName());
- stgLatencyMonitor.addStgLatency(latency);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/pipeline/Pipeline.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/pipeline/Pipeline.java b/helix-core/src/main/java/com/linkedin/helix/controller/pipeline/Pipeline.java
deleted file mode 100644
index 63518b1..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/pipeline/Pipeline.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller.pipeline;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.controller.stages.ClusterEvent;
-
-public class Pipeline
-{
- private static final Logger logger = Logger.getLogger(Pipeline.class.getName());
- List<Stage> _stages;
-
- public Pipeline()
- {
- _stages = new ArrayList<Stage>();
- }
-
- public void addStage(Stage stage)
- {
- _stages.add(stage);
- StageContext context = null;
- stage.init(context);
- }
-
- public void handle(ClusterEvent event) throws Exception
- {
- if (_stages == null)
- {
- return;
- }
- for (Stage stage : _stages)
- {
- stage.preProcess();
- stage.process(event);
- stage.postProcess();
- }
- }
-
- public void finish()
- {
-
- }
-
- public List<Stage> getStages()
- {
- return _stages;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/pipeline/PipelineRegistry.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/pipeline/PipelineRegistry.java b/helix-core/src/main/java/com/linkedin/helix/controller/pipeline/PipelineRegistry.java
deleted file mode 100644
index fea8cba..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/pipeline/PipelineRegistry.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller.pipeline;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class PipelineRegistry
-{
- Map<String, List<Pipeline>> _map;
-
- public PipelineRegistry()
- {
- _map = new HashMap<String, List<Pipeline>>();
- }
-
- public void register(String eventName, Pipeline... pipelines)
- {
- if (!_map.containsKey(eventName))
- {
- _map.put(eventName, new ArrayList<Pipeline>());
- }
- List<Pipeline> list = _map.get(eventName);
- for (Pipeline pipeline : pipelines)
- {
- list.add(pipeline);
- }
- }
-
- public List<Pipeline> getPipelinesForEvent(String eventName)
- {
- if (_map.containsKey(eventName))
- {
- return _map.get(eventName);
- }
- return Collections.emptyList();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/pipeline/Stage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/pipeline/Stage.java b/helix-core/src/main/java/com/linkedin/helix/controller/pipeline/Stage.java
deleted file mode 100644
index 153332a..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/pipeline/Stage.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller.pipeline;
-
-import com.linkedin.helix.controller.stages.ClusterEvent;
-
-/**
- * Logically independent unit in processing callbacks for cluster changes
- *
- */
-public interface Stage
-{
-
- /**
- * Initialize a stage
- * @param context
- */
- void init(StageContext context);
-
- /**
- * Called before process() on each callback
- */
- void preProcess();
-
- /**
- * Actual callback processing logic
- * @param event
- * @throws Exception
- */
- public void process(ClusterEvent event) throws Exception;
-
- /**
- * Called after process() on each callback
- */
- void postProcess();
-
- /**
- * Destruct a stage
- */
- void release();
-
- /**
- * Get the name of the stage
- * @return
- */
- public String getStageName();
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/pipeline/StageContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/pipeline/StageContext.java b/helix-core/src/main/java/com/linkedin/helix/controller/pipeline/StageContext.java
deleted file mode 100644
index e400244..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/pipeline/StageContext.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller.pipeline;
-
-public class StageContext
-{
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/pipeline/StageException.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/pipeline/StageException.java b/helix-core/src/main/java/com/linkedin/helix/controller/pipeline/StageException.java
deleted file mode 100644
index 40aa0a0..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/pipeline/StageException.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller.pipeline;
-
-public class StageException extends Exception
-{
-
- public StageException(String message)
- {
- super(message);
- }
- public StageException(String message,Exception e)
- {
- super(message,e);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/pipeline/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/pipeline/package-info.java b/helix-core/src/main/java/com/linkedin/helix/controller/pipeline/package-info.java
deleted file mode 100644
index 664d204..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/pipeline/package-info.java
+++ /dev/null
@@ -1,5 +0,0 @@
-/**
- * Helix controller pipeline classes that process cluster changes
- *
- */
-package com.linkedin.helix.controller.pipeline;
\ No newline at end of file