You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:57 UTC

[7/42] Refactoring the package names and removing jsql parser

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/alerts/Operator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/Operator.java b/helix-core/src/main/java/org/apache/helix/alerts/Operator.java
new file mode 100644
index 0000000..d307c2e
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/alerts/Operator.java
@@ -0,0 +1,115 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.alerts;
+
+import java.util.Iterator;
+import java.util.List;
+
+public abstract class Operator {
+	
+	public int minInputTupleLists;
+	public int maxInputTupleLists;
+	public int numOutputTupleLists = -1;
+	public boolean inputOutputTupleListsCountsEqual = false;
+	
+	public Operator()
+	{
+		
+	}
+	
+	public Tuple<String> multiplyTuples(Tuple<String> tup1, Tuple<String> tup2)
+	{
+		if (tup1 == null) {
+			return tup2;
+		}
+		if (tup2 == null) {
+			return tup1;
+		}
+		Tuple<String>outputTup = new Tuple<String>();
+		
+
+		//sum staggers if the tuples are same length
+		//e.g. 1,2,3 + 4,5 = 1,6,8
+		//so this is a bit tricky
+		Tuple<String>largerTup;
+		Tuple<String>smallerTup;
+		if (tup1.size() >= tup2.size()) {
+			largerTup = tup1;
+			smallerTup = tup2;
+		}
+		else {
+			largerTup = tup2;
+			smallerTup = tup1;
+		}		
+		int gap = largerTup.size() - smallerTup.size();
+		
+		for (int i=0; i< largerTup.size();i++) {
+			if (i < gap) {
+				outputTup.add(largerTup.getElement(i));
+			}
+			else {
+				double elementProduct = 0;
+				elementProduct = Double.parseDouble(largerTup.getElement(i)) *
+						Double.parseDouble(smallerTup.getElement(i-gap));
+				outputTup.add(String.valueOf(elementProduct));
+			}
+		}
+		return outputTup;
+	}
+	
+	public Tuple<String> sumTuples(Tuple<String> tup1, Tuple<String> tup2)
+	{
+		if (tup1 == null) {
+			return tup2;
+		}
+		if (tup2 == null) {
+			return tup1;
+		}
+		Tuple<String>outputTup = new Tuple<String>();
+		
+
+		//sum staggers if the tuples are same length
+		//e.g. 1,2,3 + 4,5 = 1,6,8
+		//so this is a bit tricky
+		Tuple<String>largerTup;
+		Tuple<String>smallerTup;
+		if (tup1.size() >= tup2.size()) {
+			largerTup = tup1;
+			smallerTup = tup2;
+		}
+		else {
+			largerTup = tup2;
+			smallerTup = tup1;
+		}		
+		int gap = largerTup.size() - smallerTup.size();
+		
+		for (int i=0; i< largerTup.size();i++) {
+			if (i < gap) {
+				outputTup.add(largerTup.getElement(i));
+			}
+			else {
+				double elementSum = 0;
+				elementSum = Double.parseDouble(largerTup.getElement(i)) +
+						Double.parseDouble(smallerTup.getElement(i-gap));
+				outputTup.add(String.valueOf(elementSum));
+			}
+		}
+		return outputTup;
+	}
+	
+	public abstract List<Iterator<Tuple<String>>> execute(List<Iterator<Tuple<String>>> input);
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/alerts/Stat.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/Stat.java b/helix-core/src/main/java/org/apache/helix/alerts/Stat.java
new file mode 100644
index 0000000..b5d94f3
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/alerts/Stat.java
@@ -0,0 +1,44 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.alerts;
+
+public class Stat {
+	String _name;
+	Tuple<String> _value;
+	Tuple<String> _timestamp;
+	
+	public Stat(String name, Tuple<String> value, Tuple<String> timestamp)
+	{
+		_name = name;
+		_value = value;
+		_timestamp = timestamp;
+	}
+	
+	public String getName()
+	{
+		return _name;
+	}
+	
+	public Tuple<String> getValue()
+	{
+		return _value;
+	}
+	
+	public Tuple<String> getTimestamp()
+	{
+		return _timestamp;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/alerts/StatsHolder.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/StatsHolder.java b/helix-core/src/main/java/org/apache/helix/alerts/StatsHolder.java
new file mode 100644
index 0000000..da45fca
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/alerts/StatsHolder.java
@@ -0,0 +1,358 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.alerts;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.stages.HealthDataCache;
+import org.apache.helix.model.PersistentStats;
+import org.apache.log4j.Logger;
+
+
+public class StatsHolder
+{
+  enum MatchResult {WILDCARDMATCH, EXACTMATCH, NOMATCH};
+  
+  private static final Logger logger = Logger.getLogger(StatsHolder.class
+      .getName());
+
+  public static final String VALUE_NAME = "value";
+  public static final String TIMESTAMP_NAME = "TimeStamp";
+
+  HelixDataAccessor _accessor;
+  HealthDataCache _cache;
+
+  Map<String, Map<String, String>> _statMap;
+  Map<String, Map<String, MatchResult>> _statAlertMatchResult;
+
+  private Builder _keyBuilder;
+  // PersistentStats _persistentStats;
+
+  public StatsHolder(HelixManager manager, HealthDataCache cache)
+  {
+    _accessor = manager.getHelixDataAccessor();
+    _cache = cache;
+    _keyBuilder = new PropertyKey.Builder(manager.getClusterName());
+    updateCache(_cache);
+    _statAlertMatchResult = new HashMap<String, Map<String, MatchResult>>();
+    
+  }
+
+  public void refreshStats()
+  {
+    logger.info("Refreshing cached stats");
+    _cache.refresh(_accessor);
+    updateCache(_cache);
+  }
+
+  public void persistStats()
+  {
+    // XXX: Am I using _accessor too directly here?
+    // took around 35 ms from desktop to ESV4 machine
+    PersistentStats stats = _accessor.getProperty(_keyBuilder.persistantStat());
+    if (stats == null)
+    {
+      stats = new PersistentStats(PersistentStats.nodeName); // TODO: fix naming of
+                                                         // this record, if it
+                                                         // matters
+    }
+    stats.getRecord().setMapFields(_statMap);
+    boolean retVal = _accessor.setProperty(_keyBuilder.persistantStat(),
+        stats);
+  }
+
+  public void getStatsFromCache(boolean refresh)
+  {
+    long refreshStartTime = System.currentTimeMillis();
+    if (refresh) {
+      _cache.refresh(_accessor);
+    }
+    PersistentStats persistentStatRecord = _cache.getPersistentStats();
+    if (persistentStatRecord != null) {
+      _statMap = persistentStatRecord.getMapFields();
+    }
+    else {
+      _statMap = new HashMap<String,Map<String,String>>();
+    }
+    /*
+		if (_cache.getPersistentStats() != null) {
+
+			_statMap = _cache.getPersistentStats();
+		}
+     */
+    //TODO: confirm this a good place to init the _statMap when null
+    /*
+		if (_statMap == null) {
+			_statMap = new HashMap<String, Map<String, String>>();
+		}
+     */
+    System.out.println("Refresh stats done: "+(System.currentTimeMillis() - refreshStartTime));
+  }
+
+  public Iterator<String> getAllStats()
+  {
+    return null;
+  }
+
+  /*
+   * TODO: figure out pre-conditions here. I think not allowing anything to be
+   * null on input
+   */
+  public Map<String, String> mergeStats(String statName,
+      Map<String, String> existingStat, Map<String, String> incomingStat)
+      throws HelixException
+  {
+    if (existingStat == null)
+    {
+      throw new HelixException("existing stat for merge is null");
+    }
+    if (incomingStat == null)
+    {
+      throw new HelixException("incoming stat for merge is null");
+    }
+    // get agg type and arguments, then get agg object
+    String aggTypeStr = ExpressionParser.getAggregatorStr(statName);
+    String[] aggArgs = ExpressionParser.getAggregatorArgs(statName);
+    Aggregator agg = ExpressionParser.getAggregator(aggTypeStr);
+    // XXX: some of below lines might fail with null exceptions
+
+    // get timestamps, values out of zk maps
+    String existingTime = existingStat.get(TIMESTAMP_NAME);
+    String existingVal = existingStat.get(VALUE_NAME);
+    String incomingTime = incomingStat.get(TIMESTAMP_NAME);
+    String incomingVal = incomingStat.get(VALUE_NAME);
+    // parse values into tuples, if the values exist. else, tuples are null
+    Tuple<String> existingTimeTuple = (existingTime != null) ? Tuple
+        .fromString(existingTime) : null;
+    Tuple<String> existingValueTuple = (existingVal != null) ? Tuple
+        .fromString(existingVal) : null;
+    Tuple<String> incomingTimeTuple = (incomingTime != null) ? Tuple
+        .fromString(incomingTime) : null;
+    Tuple<String> incomingValueTuple = (incomingVal != null) ? Tuple
+        .fromString(incomingVal) : null;
+
+    // dp merge
+    agg.merge(existingValueTuple, incomingValueTuple, existingTimeTuple,
+        incomingTimeTuple, aggArgs);
+    // put merged tuples back in map
+    Map<String, String> mergedMap = new HashMap<String, String>();
+    if (existingTimeTuple.size() == 0)
+    {
+      throw new HelixException("merged time tuple has size zero");
+    }
+    if (existingValueTuple.size() == 0)
+    {
+      throw new HelixException("merged value tuple has size zero");
+    }
+
+    mergedMap.put(TIMESTAMP_NAME, existingTimeTuple.toString());
+    mergedMap.put(VALUE_NAME, existingValueTuple.toString());
+    return mergedMap;
+  }
+
+  /*
+   * Find all persisted stats this stat matches. Update those stats. An incoming
+   * stat can match multiple stats exactly (if that stat has multiple agg types)
+   * An incoming stat can match multiple wildcard stats
+   */
+
+  // need to do a time check here!
+
+  public void applyStat(String incomingStatName, Map<String, String> statFields)
+  {
+    // TODO: consider locking stats here
+    //refreshStats(); //will have refreshed by now during stage
+
+    Map<String, Map<String, String>> pendingAdds = new HashMap<String, Map<String, String>>();
+    
+    if(!_statAlertMatchResult.containsKey(incomingStatName))
+    {
+      _statAlertMatchResult.put(incomingStatName, new HashMap<String, MatchResult>());
+    }
+    Map<String, MatchResult> resultMap = _statAlertMatchResult.get(incomingStatName);
+    // traverse through all persistent stats
+    for (String key : _statMap.keySet())
+    {
+      if(resultMap.containsKey(key))
+      {
+        MatchResult cachedMatchResult = resultMap.get(key);
+        if(cachedMatchResult == MatchResult.EXACTMATCH)
+        {
+          processExactMatch(key, statFields);
+        }
+        else if(cachedMatchResult == MatchResult.WILDCARDMATCH)
+        {
+          processWildcardMatch(incomingStatName, key,statFields, pendingAdds);
+        }
+        // don't care about NOMATCH
+        continue;
+      }
+      // exact match on stat and stat portion of persisted stat, just update
+      if (ExpressionParser.isIncomingStatExactMatch(key, incomingStatName))
+      {
+        processExactMatch(key, statFields);
+        resultMap.put(key, MatchResult.EXACTMATCH);
+      }
+      // wildcard match
+      else if (ExpressionParser.isIncomingStatWildcardMatch(key,
+          incomingStatName))
+      {
+        processWildcardMatch(incomingStatName, key,statFields, pendingAdds);
+        resultMap.put(key, MatchResult.WILDCARDMATCH);
+      }
+      else
+      {
+        resultMap.put(key, MatchResult.NOMATCH);
+      }
+    }
+    _statMap.putAll(pendingAdds);
+  } 
+  
+  void processExactMatch(String key, Map<String, String> statFields)
+  {
+    Map<String, String> mergedStat = mergeStats(key, _statMap.get(key),
+        statFields);
+    // update in place, no problem with hash map
+    _statMap.put(key, mergedStat);
+  }
+  
+  void processWildcardMatch(String incomingStatName, String key, 
+      Map<String, String> statFields,  Map<String, Map<String, String>> pendingAdds)
+  {
+
+    // make sure incoming stat doesn't already exist, either in previous
+    // round or this round
+    // form new key (incomingStatName with agg type from the wildcarded
+    // stat)
+    String statToAdd = ExpressionParser.getWildcardStatSubstitution(key,
+        incomingStatName);
+    // if the stat already existed in _statMap, we have/will apply it as an
+    // exact match
+    // if the stat was added this round to pendingAdds, no need to recreate
+    // (it would have same value)
+    if (!_statMap.containsKey(statToAdd)
+        && !pendingAdds.containsKey(statToAdd))
+    {
+      // add this stat to persisted stats
+      Map<String, String> mergedStat = mergeStats(statToAdd,
+          getEmptyStat(), statFields);
+      // add to pendingAdds so we don't mess up ongoing traversal of
+      // _statMap
+      pendingAdds.put(statToAdd, mergedStat);
+    }
+  }
+
+  // add parsing of stat (or is that in expression holder?) at least add
+  // validate
+  public void addStat(String exp) throws HelixException
+  {
+    refreshStats(); // get current stats
+
+    String[] parsedStats = ExpressionParser.getBaseStats(exp);
+
+    for (String stat : parsedStats)
+    {
+      if (_statMap.containsKey(stat))
+      {
+        logger.debug("Stat " + stat + " already exists; not adding");
+        continue;
+      }
+      _statMap.put(stat, getEmptyStat()); // add new stat to map
+    }
+  }
+
+  public static Map<String, Map<String, String>> parseStat(String exp)
+      throws HelixException
+  {
+    String[] parsedStats = ExpressionParser.getBaseStats(exp);
+    Map<String, Map<String, String>> statMap = new HashMap<String, Map<String, String>>();
+
+    for (String stat : parsedStats)
+    {
+      if (statMap.containsKey(stat))
+      {
+        logger.debug("Stat " + stat + " already exists; not adding");
+        continue;
+      }
+      statMap.put(stat, getEmptyStat()); // add new stat to map
+    }
+    return statMap;
+  }
+
+
+  public static Map<String, String> getEmptyStat()
+  {
+    Map<String, String> statFields = new HashMap<String, String>();
+    statFields.put(TIMESTAMP_NAME, "");
+    statFields.put(VALUE_NAME, "");
+    return statFields;
+  }
+
+  public List<Stat> getStatsList()
+  {
+    List<Stat> stats = new LinkedList<Stat>();
+    for (String stat : _statMap.keySet())
+    {
+      Map<String, String> statFields = _statMap.get(stat);
+      Tuple<String> valTup = Tuple.fromString(statFields.get(VALUE_NAME));
+      Tuple<String> timeTup = Tuple.fromString(statFields.get(TIMESTAMP_NAME));
+      Stat s = new Stat(stat, valTup, timeTup);
+      stats.add(s);
+    }
+    return stats;
+  }
+
+  public Map<String, Tuple<String>> getStatsMap()
+  {
+    //refreshStats(); //don't refresh, stage will have refreshed by this time
+    HashMap<String, Tuple<String>> stats = new HashMap<String, Tuple<String>>();
+    for (String stat : _statMap.keySet())
+    {
+      Map<String, String> statFields = _statMap.get(stat);
+      Tuple<String> valTup = Tuple.fromString(statFields.get(VALUE_NAME));
+      Tuple<String> timeTup = Tuple.fromString(statFields.get(TIMESTAMP_NAME));
+      stats.put(stat, valTup);
+    }
+    return stats;
+  }
+
+  public void updateCache(HealthDataCache cache)
+  {
+    _cache = cache;
+    PersistentStats persistentStatRecord = _cache.getPersistentStats();
+    if (persistentStatRecord != null)
+    {
+      _statMap = persistentStatRecord.getMapFields();
+    }
+    else
+    {
+      _statMap = new HashMap<String, Map<String, String>>();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/alerts/SumEachOperator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/SumEachOperator.java b/helix-core/src/main/java/org/apache/helix/alerts/SumEachOperator.java
new file mode 100644
index 0000000..1028be3
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/alerts/SumEachOperator.java
@@ -0,0 +1,47 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.alerts;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class SumEachOperator extends Operator {
+
+	public SumEachOperator() {
+		minInputTupleLists = 1;
+		maxInputTupleLists = Integer.MAX_VALUE;
+		inputOutputTupleListsCountsEqual = true;
+		numOutputTupleLists = -1;
+	}
+
+	//for each column, generate sum
+	@Override
+	public List<Iterator<Tuple<String>>> execute(List<Iterator<Tuple<String>>> input) {
+		List<Iterator<Tuple<String>>> out = new ArrayList<Iterator<Tuple<String>>>();
+		for (Iterator<Tuple<String>> currIt : input) {
+			Tuple<String> currSum = null;
+			while (currIt.hasNext()) {
+				currSum = sumTuples(currSum, currIt.next());
+			}
+			ArrayList<Tuple<String>> currOutList = new ArrayList<Tuple<String>>();
+			currOutList.add(currSum);
+			out.add(currOutList.iterator());
+		}
+		return out;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/alerts/SumOperator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/SumOperator.java b/helix-core/src/main/java/org/apache/helix/alerts/SumOperator.java
new file mode 100644
index 0000000..2b6cd89
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/alerts/SumOperator.java
@@ -0,0 +1,56 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.alerts;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class SumOperator extends Operator {
+
+	public SumOperator() {
+		minInputTupleLists = 1;
+		maxInputTupleLists = Integer.MAX_VALUE;
+		inputOutputTupleListsCountsEqual = false;
+		numOutputTupleLists = 1;
+	}
+
+	
+	public List<Iterator<Tuple<String>>> singleSetToIter(ArrayList<Tuple<String>> input) 
+	{
+		List out = new ArrayList();
+		out.add(input.iterator());
+		return out;
+	}
+	
+	@Override
+	public List<Iterator<Tuple<String>>> execute(List<Iterator<Tuple<String>>> input) {
+		ArrayList<Tuple<String>> output = new ArrayList<Tuple<String>>();
+		if (input == null || input.size() == 0) {
+			return singleSetToIter(output);
+		}
+		while (true) { //loop through set of iters, return when 1 runs out (not completing the row in progress)
+			Tuple<String> rowSum = null;
+			for (Iterator<Tuple<String>> it : input) {
+				if (!it.hasNext()) { //when any iterator runs out, we are done
+					return singleSetToIter(output);
+				}
+				rowSum = sumTuples(rowSum, it.next());
+			}
+			output.add(rowSum);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/alerts/Tuple.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/Tuple.java b/helix-core/src/main/java/org/apache/helix/alerts/Tuple.java
new file mode 100644
index 0000000..2526633
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/alerts/Tuple.java
@@ -0,0 +1,94 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.alerts;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.StringTokenizer;
+import java.util.Vector;
+
+public class Tuple<T> {
+	List<T> elements;
+	
+	public Tuple() 
+	{
+		elements = new ArrayList<T>();
+	}
+	
+	public int size()
+	{
+		return elements.size();
+	}
+
+	public void add(T entry)
+	{
+		elements.add(entry);
+	}
+	
+	public void addAll(Tuple<T> incoming)
+	{
+		elements.addAll(incoming.getElements());
+	}
+	
+	public Iterator<T> iterator()
+	{
+		return elements.listIterator();
+	}
+	
+	public T getElement(int ind)
+	{
+		return elements.get(ind);
+	}
+	
+	public List<T> getElements()
+	{
+		return elements;
+	}
+	
+	public void clear() 
+	{
+		elements.clear();
+	}
+	
+	public static Tuple<String> fromString(String in) 
+	{
+		Tuple<String> tup = new Tuple<String>();
+		if (in.length() > 0) {
+			String[] elements = in.split(",");
+			for (String element : elements) {
+				tup.add(element);
+			}
+		}
+		return tup;
+	}
+	
+	public String toString() 
+	{
+		StringBuilder out = new StringBuilder();
+		Iterator<T> it = iterator();
+		boolean outEmpty=true;
+		while (it.hasNext()) {
+			if (!outEmpty) {
+				out.append(",");
+			}
+			out.append(it.next());
+			outEmpty = false;
+		}
+		return out.toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/alerts/WindowAggregator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/WindowAggregator.java b/helix-core/src/main/java/org/apache/helix/alerts/WindowAggregator.java
new file mode 100644
index 0000000..a60605e
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/alerts/WindowAggregator.java
@@ -0,0 +1,91 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.alerts;
+
+import java.util.Iterator;
+
+import org.apache.helix.HelixException;
+
+
+public class WindowAggregator extends Aggregator {
+
+	
+	int _windowSize;
+	
+	public WindowAggregator(String windowSize) 
+	{
+		_windowSize = Integer.parseInt(windowSize);
+		_numArgs = 1;
+	}
+	
+	public WindowAggregator()
+	{
+		this("1");
+	}
+
+	@Override
+	public void merge(Tuple<String> currValTup, Tuple<String> newValTup,
+			Tuple<String> currTimeTup, Tuple<String> newTimeTup, String... args) {
+		
+		_windowSize = Integer.parseInt(args[0]);
+		
+		//figure out how many curr tuple values we displace
+		Tuple<String> mergedTimeTuple = new Tuple<String>();
+		Tuple<String> mergedValTuple = new Tuple<String>();
+		
+		Iterator<String> currTimeIter = currTimeTup.iterator();
+		Iterator<String> currValIter = currValTup.iterator();
+		Iterator<String> newTimeIter = newTimeTup.iterator();
+		Iterator<String> newValIter = newValTup.iterator();
+		int currCtr = 0;
+		//traverse current vals
+		double currTime = -1;
+		double currVal;
+		while (currTimeIter.hasNext()) {
+			currTime = Double.parseDouble(currTimeIter.next());
+			currVal = Double.parseDouble(currValIter.next());
+			currCtr++;
+			//number of evicted currVals equal to total size of both minus _windowSize
+			if (currCtr > (newTimeTup.size()+currTimeTup.size()-_windowSize)) { //non-evicted element, just bump down
+				mergedTimeTuple.add(String.valueOf(currTime));
+				mergedValTuple.add(String.valueOf(currVal));
+			}
+		}
+
+		double newVal;
+		double newTime;
+		while (newTimeIter.hasNext()) {
+			newVal = Double.parseDouble(newValIter.next());
+			newTime = Double.parseDouble(newTimeIter.next());
+			if (newTime <= currTime) { //oldest new time older than newest curr time.  we will not apply new tuple!
+				return; //curr tuples remain the same
+			}
+			currCtr++;
+			if (currCtr > (newTimeTup.size()+currTimeTup.size()-_windowSize)) { //non-evicted element
+				mergedTimeTuple.add(String.valueOf(newTime));
+				mergedValTuple.add(String.valueOf(newVal));
+			}
+		}
+		 //set curr tuples to merged tuples
+		currTimeTup.clear();
+		currTimeTup.addAll(mergedTimeTuple);
+		currValTup.clear();
+		currValTup.addAll(mergedValTuple);
+		//TODO: see if we can do merger in place on curr
+	}
+
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/alerts/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/package-info.java b/helix-core/src/main/java/org/apache/helix/alerts/package-info.java
new file mode 100644
index 0000000..5ff2e7d
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/alerts/package-info.java
@@ -0,0 +1,4 @@
+/**
+ * Classes for Helix alerts
+ */
+package org.apache.helix.alerts;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/ExternalViewGenerator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/ExternalViewGenerator.java b/helix-core/src/main/java/org/apache/helix/controller/ExternalViewGenerator.java
new file mode 100644
index 0000000..21b6a4e
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/ExternalViewGenerator.java
@@ -0,0 +1,154 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.helix.ZNRecord;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.CurrentState.CurrentStateProperty;
+import org.apache.log4j.Logger;
+
+
+/*
+ * ZKRoutingInfoProvider keeps a copy of the routing table. Given a partition id,
+ * it will return
+ *
+ * 1. The list of partition that can be read
+ * 2. the master partition, for write operation
+ *
+ * The routing table is constructed from the currentState of each storage nodes.
+ * The current state is a list of following pairs: partition-id:State(MASTER / SLAVE)
+ *
+ * TODO: move the code as part of router process
+ * TODO: add listeners to node current state changes
+ * */
+public class ExternalViewGenerator
+{
+  static Logger _logger = Logger.getLogger(ExternalViewGenerator.class);
+
+  /*
+   * Given a list of external view ZNRecord nodes(one for each cluster),
+   * calculate the routing map.
+   *
+   * The format of the routing map is like this:
+   *
+   * Map<String, Map<String, Set<String>>> maps from a partitionName to its
+   * states Map<String, List<String>> The second Map maps from a state
+   * ("MASTER", "SLAVE"...) to a list of nodeNames
+   *
+   * So that the we can query the map for the list of nodes by providing the
+   * partition name and the expected state.
+   */
+  public Map<String, Map<String, Set<String>>> getRouterMapFromExternalView(
+      List<ZNRecord> dbExternalViewList)
+  {
+    Map<String, Map<String, Set<String>>> result = new TreeMap<String, Map<String, Set<String>>>();
+
+    for (ZNRecord dbNodeView : dbExternalViewList)
+    {
+      Map<String, Map<String, String>> dbNodeStateMap = dbNodeView
+          .getMapFields();
+      for (String partitionId : dbNodeStateMap.keySet())
+      {
+        if (!result.containsKey(partitionId))
+        {
+          result.put(partitionId, new TreeMap<String, Set<String>>());
+        }
+        Map<String, String> nodeStateMap = dbNodeStateMap.get(partitionId);
+        for (String nodeName : nodeStateMap.keySet())
+        {
+          String state = nodeStateMap.get(nodeName);
+          if (!result.get(partitionId).containsKey(state))
+          {
+            result.get(partitionId).put(state, new TreeSet<String>());
+          }
+          result.get(partitionId).get(state).add(nodeName);
+        }
+      }
+    }
+    return result;
+  }
+
+  /*
+   * The parameter is a map that maps the nodeName to a list of ZNRecords.
+   */
+  public List<ZNRecord> computeExternalView(
+      Map<String, List<ZNRecord>> currentStates, List<ZNRecord> idealStates)
+  {
+    List<ZNRecord> resultList = new ArrayList<ZNRecord>();
+    Map<String, ZNRecord> resultRoutingTable = new HashMap<String, ZNRecord>();
+    // maps from dbName to another map : partition -> map <nodename,
+    // master/slave>;
+    // Fill the routing table with "empty" default state according to ideals
+    // states
+    // in the cluster
+    if (idealStates != null)
+    {
+      for (ZNRecord idealState : idealStates)
+      {
+        ZNRecord defaultDBExternalView = new ZNRecord(idealState.getId());
+        resultRoutingTable.put(idealState.getId(), defaultDBExternalView);
+      }
+    } else
+    {
+      assert (!currentStates.isEmpty());
+      return resultList;
+    }
+    for (String nodeName : currentStates.keySet())
+    {
+      List<ZNRecord> zndbStates = currentStates.get(nodeName);
+      for (ZNRecord dbNodeStateRecord : zndbStates)
+      {
+        Map<String, Map<String, String>> dbStates = dbNodeStateRecord
+            .getMapFields();
+        for (String stateUnitKey : dbStates.keySet())
+        {
+          Map<String, String> dbPartitionStates = dbStates.get(stateUnitKey);
+          String dbName = dbPartitionStates
+              .get(Message.Attributes.RESOURCE_NAME.toString());
+          ZNRecord partitionStatus = resultRoutingTable.get(dbName);
+          if (partitionStatus == null)
+          {
+            partitionStatus = new ZNRecord(dbName);
+            resultRoutingTable.put(dbName, partitionStatus);
+          }
+          String currentStateKey = CurrentStateProperty.CURRENT_STATE.toString();
+
+          if (!partitionStatus.getMapFields().containsKey(stateUnitKey))
+          {
+            partitionStatus.setMapField(stateUnitKey,
+                new TreeMap<String, String>());
+          }
+          partitionStatus.getMapField(stateUnitKey).put(nodeName,
+              dbPartitionStates.get(currentStateKey));
+
+        }
+      }
+    }
+    for (ZNRecord record : resultRoutingTable.values())
+    {
+      resultList.add(record);
+    }
+    return resultList;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
new file mode 100644
index 0000000..8d82c2e
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -0,0 +1,608 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.ConfigChangeListener;
+import org.apache.helix.ConfigScope;
+import org.apache.helix.ConfigScopeBuilder;
+import org.apache.helix.ControllerChangeListener;
+import org.apache.helix.CurrentStateChangeListener;
+import org.apache.helix.ExternalViewChangeListener;
+import org.apache.helix.HealthStateChangeListener;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.IdealStateChangeListener;
+import org.apache.helix.LiveInstanceChangeListener;
+import org.apache.helix.MessageListener;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.NotificationContext.Type;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.pipeline.Pipeline;
+import org.apache.helix.controller.pipeline.PipelineRegistry;
+import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
+import org.apache.helix.controller.stages.ClusterEvent;
+import org.apache.helix.controller.stages.CompatibilityCheckStage;
+import org.apache.helix.controller.stages.CurrentStateComputationStage;
+import org.apache.helix.controller.stages.ExternalViewComputeStage;
+import org.apache.helix.controller.stages.MessageGenerationPhase;
+import org.apache.helix.controller.stages.MessageSelectionStage;
+import org.apache.helix.controller.stages.MessageThrottleStage;
+import org.apache.helix.controller.stages.ReadClusterDataStage;
+import org.apache.helix.controller.stages.ResourceComputationStage;
+import org.apache.helix.controller.stages.TaskAssignmentStage;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.HealthStat;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.PauseSignal;
+import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
+import org.apache.helix.monitoring.mbeans.MessageQueueMonitor;
+import org.apache.log4j.Logger;
+
+
+/**
+ * Cluster Controllers main goal is to keep the cluster state as close as possible to
+ * Ideal State. It does this by listening to changes in cluster state and scheduling new
+ * tasks to get cluster state to best possible ideal state. Every instance of this class
+ * can control can control only one cluster
+ *
+ *
+ * Get all the partitions use IdealState, CurrentState and Messages <br>
+ * foreach partition <br>
+ * 1. get the (instance,state) from IdealState, CurrentState and PendingMessages <br>
+ * 2. compute best possible state (instance,state) pair. This needs previous step data and
+ * state model constraints <br>
+ * 3. compute the messages/tasks needed to move to 1 to 2 <br>
+ * 4. select the messages that can be sent, needs messages and state model constraints <br>
+ * 5. send messages
+ */
+public class GenericHelixController implements
+    ConfigChangeListener,
+    IdealStateChangeListener,
+    LiveInstanceChangeListener,
+    MessageListener,
+    CurrentStateChangeListener,
+    ExternalViewChangeListener,
+    ControllerChangeListener,
+    HealthStateChangeListener
+{
+  private static final Logger    logger =
+                                            Logger.getLogger(GenericHelixController.class.getName());
+  volatile boolean               init   = false;
+  private final PipelineRegistry _registry;
+
+  /**
+   * Since instance current state is per-session-id, we need to track the session-ids of
+   * the current states that the ClusterController is observing. this set contains all the
+   * session ids that we add currentState listener
+   */
+  private final Set<String>      _instanceCurrentStateChangeSubscriptionSessionIds;
+
+  /**
+   * this set contains all the instance names that we add message listener
+   */
+  private final Set<String>      _instanceSubscriptionNames;
+
+  ClusterStatusMonitor           _clusterStatusMonitor;
+  
+
+  /**
+   * The _paused flag is checked by function handleEvent(), while if the flag is set
+   * handleEvent() will be no-op. Other event handling logic keeps the same when the flag
+   * is set.
+   */
+  private boolean                _paused;
+
+  /**
+   * The timer that can periodically run the rebalancing pipeline. The timer will start if there
+   * is one resource group has the config to use the timer.
+   */
+  Timer _rebalanceTimer = null;
+  int _timerPeriod = Integer.MAX_VALUE;
+
+  /**
+   * Default constructor that creates a default pipeline registry. This is sufficient in
+   * most cases, but if there is a some thing specific needed use another constructor
+   * where in you can pass a pipeline registry
+   */
+  public GenericHelixController()
+  {
+    this(createDefaultRegistry());
+  }
+
+  class RebalanceTask extends TimerTask
+  {
+    HelixManager _manager;
+    
+    public RebalanceTask(HelixManager manager)
+    {
+      _manager = manager;
+    }
+    
+    @Override
+    public void run()
+    {
+      NotificationContext changeContext = new NotificationContext(_manager);
+      changeContext.setType(NotificationContext.Type.CALLBACK);
+      ClusterEvent event = new ClusterEvent("periodicalRebalance");
+      event.addAttribute("helixmanager", changeContext.getManager());
+      event.addAttribute("changeContext", changeContext);
+      List<ZNRecord> dummy = new ArrayList<ZNRecord>();
+      event.addAttribute("eventData", dummy);
+      // Should be able to process  
+      handleEvent(event);
+    }
+  }
+  
+  /**
+   * Starts the rebalancing timer with the specified period. Start the timer if necessary;
+   * If the period is smaller than the current period, cancel the current timer and use 
+   * the new period.
+   */
+  void startRebalancingTimer(int period, HelixManager manager)
+  {
+    logger.info("Controller starting timer at period " + period);
+    if(period < _timerPeriod)
+    {
+      if(_rebalanceTimer != null)
+      {
+        _rebalanceTimer.cancel();
+      }
+      _rebalanceTimer = new Timer(true);
+      _timerPeriod = period;
+      _rebalanceTimer.scheduleAtFixedRate(new RebalanceTask(manager), _timerPeriod, _timerPeriod);
+    }
+    else
+    {
+      logger.info("Controller already has timer at period " + _timerPeriod);
+    }
+  }
+  
+  /**
+   * Starts the rebalancing timer 
+   */
+  void stopRebalancingTimer()
+  {
+    if(_rebalanceTimer != null)
+    {
+      _rebalanceTimer.cancel();
+      _rebalanceTimer = null;
+    }
+    _timerPeriod = Integer.MAX_VALUE;
+  }
+  
+  private static PipelineRegistry createDefaultRegistry()
+  {
+    logger.info("createDefaultRegistry");
+    synchronized (GenericHelixController.class)
+    {
+      PipelineRegistry registry = new PipelineRegistry();
+
+      // cluster data cache refresh
+      Pipeline dataRefresh = new Pipeline();
+      dataRefresh.addStage(new ReadClusterDataStage());
+
+      // rebalance pipeline
+      Pipeline rebalancePipeline = new Pipeline();
+      rebalancePipeline.addStage(new ResourceComputationStage());
+      rebalancePipeline.addStage(new CurrentStateComputationStage());
+      rebalancePipeline.addStage(new BestPossibleStateCalcStage());
+      rebalancePipeline.addStage(new MessageGenerationPhase());
+      rebalancePipeline.addStage(new MessageSelectionStage());
+      rebalancePipeline.addStage(new MessageThrottleStage());
+      rebalancePipeline.addStage(new TaskAssignmentStage());
+
+      // external view generation
+      Pipeline externalViewPipeline = new Pipeline();
+      externalViewPipeline.addStage(new ExternalViewComputeStage());
+
+      // backward compatibility check
+      Pipeline liveInstancePipeline = new Pipeline();
+      liveInstancePipeline.addStage(new CompatibilityCheckStage());
+
+      registry.register("idealStateChange", dataRefresh, rebalancePipeline);
+      registry.register("currentStateChange",
+                        dataRefresh,
+                        rebalancePipeline,
+                        externalViewPipeline);
+      registry.register("configChange", dataRefresh, rebalancePipeline);
+      registry.register("liveInstanceChange",
+                        dataRefresh,
+                        liveInstancePipeline,
+                        rebalancePipeline,
+                        externalViewPipeline);
+
+      registry.register("messageChange",
+                        dataRefresh,
+                        rebalancePipeline);
+      registry.register("externalView", dataRefresh);
+      registry.register("resume", dataRefresh, rebalancePipeline, externalViewPipeline);
+      registry.register("periodicalRebalance", dataRefresh, rebalancePipeline, externalViewPipeline);
+
+      // health stats pipeline
+      // Pipeline healthStatsAggregationPipeline = new Pipeline();
+      // StatsAggregationStage statsStage = new StatsAggregationStage();
+      // healthStatsAggregationPipeline.addStage(new ReadHealthDataStage());
+      // healthStatsAggregationPipeline.addStage(statsStage);
+      // registry.register("healthChange", healthStatsAggregationPipeline);
+
+      return registry;
+    }
+  }
+
+  public GenericHelixController(PipelineRegistry registry)
+  {
+    _paused = false;
+    _registry = registry;
+    _instanceCurrentStateChangeSubscriptionSessionIds =
+        new ConcurrentSkipListSet<String>();
+    _instanceSubscriptionNames = new ConcurrentSkipListSet<String>();
+    // _externalViewGenerator = new ExternalViewGenerator();
+  }
+
+  /**
+   * lock-always: caller always needs to obtain an external lock before call, calls to
+   * handleEvent() should be serialized
+   *
+   * @param event
+   */
+  protected synchronized void handleEvent(ClusterEvent event)
+  {
+    HelixManager manager = event.getAttribute("helixmanager");
+    if (manager == null)
+    {
+      logger.error("No cluster manager in event:" + event.getName());
+      return;
+    }
+
+    if (!manager.isLeader())
+    {
+      logger.error("Cluster manager: " + manager.getInstanceName()
+          + " is not leader. Pipeline will not be invoked");
+      return;
+    }
+
+    if (_paused)
+    {
+      logger.info("Cluster is paused. Ignoring the event:" + event.getName());
+      return;
+    }
+
+    NotificationContext context = null;
+    if (event.getAttribute("changeContext") != null)
+    {
+      context = (NotificationContext) (event.getAttribute("changeContext"));
+    }
+
+    // Initialize _clusterStatusMonitor
+    if (context != null)
+    {
+      if (context.getType() == Type.FINALIZE)
+      {
+        if (_clusterStatusMonitor != null)
+        {
+          _clusterStatusMonitor.reset();
+          _clusterStatusMonitor = null;
+        }
+        
+        stopRebalancingTimer();
+        logger.info("Get FINALIZE notification, skip the pipeline. Event :" + event.getName());
+        return;
+      }
+      else
+      {
+        if (_clusterStatusMonitor == null)
+        {
+          _clusterStatusMonitor = new ClusterStatusMonitor(manager.getClusterName());
+        }
+        
+        event.addAttribute("clusterStatusMonitor", _clusterStatusMonitor);
+      }
+    }
+
+    List<Pipeline> pipelines = _registry.getPipelinesForEvent(event.getName());
+    if (pipelines == null || pipelines.size() == 0)
+    {
+      logger.info("No pipeline to run for event:" + event.getName());
+      return;
+    }
+
+    for (Pipeline pipeline : pipelines)
+    {
+      try
+      {
+        pipeline.handle(event);
+        pipeline.finish();
+      }
+      catch (Exception e)
+      {
+        logger.error("Exception while executing pipeline: " + pipeline
+            + ". Will not continue to next pipeline", e);
+        break;
+      }
+    }
+  }
+
+  // TODO since we read data in pipeline, we can get rid of reading from zookeeper in
+  // callback
+
+  @Override
+  public void onExternalViewChange(List<ExternalView> externalViewList,
+                                   NotificationContext changeContext)
+  {
+//    logger.info("START: GenericClusterController.onExternalViewChange()");
+//    ClusterEvent event = new ClusterEvent("externalViewChange");
+//    event.addAttribute("helixmanager", changeContext.getManager());
+//    event.addAttribute("changeContext", changeContext);
+//    event.addAttribute("eventData", externalViewList);
+//    // handleEvent(event);
+//    logger.info("END: GenericClusterController.onExternalViewChange()");
+  }
+
+  @Override
+  public void onStateChange(String instanceName,
+                            List<CurrentState> statesInfo,
+                            NotificationContext changeContext)
+  {
+    logger.info("START: GenericClusterController.onStateChange()");
+    ClusterEvent event = new ClusterEvent("currentStateChange");
+    event.addAttribute("helixmanager", changeContext.getManager());
+    event.addAttribute("instanceName", instanceName);
+    event.addAttribute("changeContext", changeContext);
+    event.addAttribute("eventData", statesInfo);
+    handleEvent(event);
+    logger.info("END: GenericClusterController.onStateChange()");
+  }
+
+  @Override
+  public void onHealthChange(String instanceName,
+                             List<HealthStat> reports,
+                             NotificationContext changeContext)
+  {
+    /**
+     * When there are more participant ( > 20, can be in hundreds), This callback can be
+     * called quite frequently as each participant reports health stat every minute. Thus
+     * we change the health check pipeline to run in a timer callback.
+     */
+  }
+
+  @Override
+  public void onMessage(String instanceName,
+                        List<Message> messages,
+                        NotificationContext changeContext)
+  {
+    logger.info("START: GenericClusterController.onMessage()");
+    
+    ClusterEvent event = new ClusterEvent("messageChange");
+    event.addAttribute("helixmanager", changeContext.getManager());
+    event.addAttribute("instanceName", instanceName);
+    event.addAttribute("changeContext", changeContext);
+    event.addAttribute("eventData", messages);
+    handleEvent(event);
+    
+    if (_clusterStatusMonitor != null && messages != null)
+    {
+      _clusterStatusMonitor.addMessageQueueSize(instanceName, messages.size());
+    }
+        
+    logger.info("END: GenericClusterController.onMessage()");
+  }
+
+  @Override
+  public void onLiveInstanceChange(List<LiveInstance> liveInstances,
+                                   NotificationContext changeContext)
+  {
+    logger.info("START: Generic GenericClusterController.onLiveInstanceChange()");
+    if (liveInstances == null)
+    {
+      liveInstances = Collections.emptyList();
+    }
+    // Go though the live instance list and make sure that we are observing them
+    // accordingly. The action is done regardless of the paused flag.
+    if (changeContext.getType() == NotificationContext.Type.INIT ||
+        changeContext.getType() == NotificationContext.Type.CALLBACK)
+    {
+      checkLiveInstancesObservation(liveInstances, changeContext);
+    }
+
+    ClusterEvent event = new ClusterEvent("liveInstanceChange");
+    event.addAttribute("helixmanager", changeContext.getManager());
+    event.addAttribute("changeContext", changeContext);
+    event.addAttribute("eventData", liveInstances);
+    handleEvent(event);
+    logger.info("END: Generic GenericClusterController.onLiveInstanceChange()");
+  }
+  
+  void checkRebalancingTimer(HelixManager manager, List<IdealState> idealStates)
+  {
+    if (manager.getConfigAccessor() == null)
+    {
+      logger.warn(manager.getInstanceName() + " config accessor doesn't exist. should be in file-based mode.");
+      return;
+    }
+    
+    for(IdealState idealState : idealStates)
+    {
+      int period = idealState.getRebalanceTimerPeriod();
+      if(period > 0)
+      {
+        startRebalancingTimer(period, manager);
+      }
+    }
+  }
+  
+  @Override
+  public void onIdealStateChange(List<IdealState> idealStates,
+                                 NotificationContext changeContext)
+  {
+    logger.info("START: Generic GenericClusterController.onIdealStateChange()");
+    ClusterEvent event = new ClusterEvent("idealStateChange");
+    event.addAttribute("helixmanager", changeContext.getManager());
+    event.addAttribute("changeContext", changeContext);
+    event.addAttribute("eventData", idealStates);
+    handleEvent(event);
+    
+    if(changeContext.getType() != Type.FINALIZE)
+    {
+      checkRebalancingTimer(changeContext.getManager(), idealStates);
+    }
+    
+    logger.info("END: Generic GenericClusterController.onIdealStateChange()");
+  }
+
+  @Override
+  public void onConfigChange(List<InstanceConfig> configs,
+                             NotificationContext changeContext)
+  {
+    logger.info("START: GenericClusterController.onConfigChange()");
+    ClusterEvent event = new ClusterEvent("configChange");
+    event.addAttribute("changeContext", changeContext);
+    event.addAttribute("helixmanager", changeContext.getManager());
+    event.addAttribute("eventData", configs);
+    handleEvent(event);
+    logger.info("END: GenericClusterController.onConfigChange()");
+  }
+
+  @Override
+  public void onControllerChange(NotificationContext changeContext)
+  {
+    logger.info("START: GenericClusterController.onControllerChange()");
+    HelixDataAccessor accessor = changeContext.getManager().getHelixDataAccessor();
+
+    // double check if this controller is the leader
+    Builder keyBuilder = accessor.keyBuilder();
+    LiveInstance leader =
+        accessor.getProperty(keyBuilder.controllerLeader());
+    if (leader == null)
+    {
+      logger.warn("No controller exists for cluster:"
+          + changeContext.getManager().getClusterName());
+      return;
+    }
+    else
+    {
+      String leaderName = leader.getInstanceName();
+
+      String instanceName = changeContext.getManager().getInstanceName();
+      if (leaderName == null || !leaderName.equals(instanceName))
+      {
+        logger.warn("leader name does NOT match, my name: " + instanceName + ", leader: "
+            + leader);
+        return;
+      }
+    }
+
+    PauseSignal pauseSignal = accessor.getProperty(keyBuilder.pause());
+    if (pauseSignal != null)
+    {
+      _paused = true;
+      logger.info("controller is now paused");
+    }
+    else
+    {
+      if (_paused)
+      {
+        // it currently paused
+        logger.info("controller is now resumed");
+        _paused = false;
+        ClusterEvent event = new ClusterEvent("resume");
+        event.addAttribute("changeContext", changeContext);
+        event.addAttribute("helixmanager", changeContext.getManager());
+        event.addAttribute("eventData", pauseSignal);
+        handleEvent(event);
+      }
+      else
+      {
+        _paused = false;
+      }
+    }
+    logger.info("END: GenericClusterController.onControllerChange()");
+  }
+
+  /**
+   * Go through the list of liveinstances in the cluster, and add currentstateChange
+   * listener and Message listeners to them if they are newly added. For current state
+   * change, the observation is tied to the session id of each live instance.
+   *
+   */
+  protected void checkLiveInstancesObservation(List<LiveInstance> liveInstances,
+                                               NotificationContext changeContext)
+  {
+    for (LiveInstance instance : liveInstances)
+    {
+      String instanceName = instance.getId();
+      String clientSessionId = instance.getSessionId();
+      HelixManager manager = changeContext.getManager();
+
+      // _instanceCurrentStateChangeSubscriptionSessionIds contains all the sessionIds
+      // that we've added a currentState listener
+      if (!_instanceCurrentStateChangeSubscriptionSessionIds.contains(clientSessionId))
+      {
+        try
+        {
+          manager.addCurrentStateChangeListener(this, instanceName, clientSessionId);
+          _instanceCurrentStateChangeSubscriptionSessionIds.add(clientSessionId);
+          logger.info("Observing client session id: " + clientSessionId);
+        }
+        catch (Exception e)
+        {
+          logger.error("Exception adding current state and message listener for instance:"
+                           + instanceName,
+                       e);
+        }
+      }
+
+      // _instanceSubscriptionNames contains all the instanceNames that we've added a
+      // message listener
+      if (!_instanceSubscriptionNames.contains(instanceName))
+      {
+        try
+        {
+          logger.info("Adding message listener for " + instanceName);
+          manager.addMessageListener(this, instanceName);
+          _instanceSubscriptionNames.add(instanceName);
+        }
+        catch (Exception e)
+        {
+          logger.error("Exception adding message listener for instance:" + instanceName,
+                       e);
+        }
+      }
+
+      // TODO we need to remove currentState listeners and message listeners
+      // when a session or an instance no longer exists. This may happen
+      // in case of session expiry, participant rebound, participant goes and new
+      // participant comes
+
+      // TODO shi should call removeListener on the previous session id;
+      // but the removeListener with that functionality is not implemented yet
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java b/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
new file mode 100644
index 0000000..1bc5cad
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
@@ -0,0 +1,266 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller;
+
+/**
+ * start cluster manager controller
+ * cluster manager controller has two modes:
+ * 1) stand-alone mode: in this mode each controller gets a list of clusters
+ *  and competes via leader election to become the controller for any of the clusters.
+ *  if a controller fails to become the leader of a given cluster, it remains as a standby
+ *  and re-does the leader election when the current leader fails
+ *
+ * 2) distributed mode: in this mode each controller first joins as participant into
+ *   a special CONTROLLER_CLUSTER. Leader election happens in this special
+ *   cluster. The one that becomes the leader controls all controllers (including itself
+ *   to become leaders of other clusters.
+ */
+
+import java.util.Arrays;
+
+import org.I0Itec.zkclient.exception.ZkInterruptedException;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.controller.restlet.ZKPropertyTransferServer;
+import org.apache.helix.participant.DistClusterControllerStateModelFactory;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.log4j.Logger;
+
+
+public class HelixControllerMain
+{
+  public static final String zkServerAddress = "zkSvr";
+  public static final String cluster = "cluster";
+  public static final String help = "help";
+  public static final String mode = "mode";
+  public static final String propertyTransferServicePort = "propertyTransferPort";
+  public static final String name = "controllerName";
+  public static final String STANDALONE = "STANDALONE";
+  public static final String DISTRIBUTED = "DISTRIBUTED";
+  private static final Logger logger = Logger.getLogger(HelixControllerMain.class);
+
+  // hack: OptionalBuilder is not thread safe
+  @SuppressWarnings("static-access")
+  synchronized private static Options constructCommandLineOptions()
+  {
+    Option helpOption = OptionBuilder.withLongOpt(help)
+        .withDescription("Prints command-line options info").create();
+
+    Option zkServerOption = OptionBuilder.withLongOpt(zkServerAddress)
+        .withDescription("Provide zookeeper address").create();
+    zkServerOption.setArgs(1);
+    zkServerOption.setRequired(true);
+    zkServerOption.setArgName("ZookeeperServerAddress(Required)");
+
+    Option clusterOption = OptionBuilder.withLongOpt(cluster)
+        .withDescription("Provide cluster name").create();
+    clusterOption.setArgs(1);
+    clusterOption.setRequired(true);
+    clusterOption.setArgName("Cluster name (Required)");
+
+    Option modeOption = OptionBuilder
+        .withLongOpt(mode)
+        .withDescription(
+            "Provide cluster controller mode (Optional): STANDALONE (default) or DISTRIBUTED")
+        .create();
+    modeOption.setArgs(1);
+    modeOption.setRequired(false);
+    modeOption.setArgName("Cluster controller mode (Optional)");
+
+    Option controllerNameOption = OptionBuilder.withLongOpt(name)
+        .withDescription("Provide cluster controller name (Optional)").create();
+    controllerNameOption.setArgs(1);
+    controllerNameOption.setRequired(false);
+    controllerNameOption.setArgName("Cluster controller name (Optional)");
+    
+    Option portOption = OptionBuilder
+        .withLongOpt(propertyTransferServicePort)
+        .withDescription(
+            "Webservice port for ZkProperty controller transfer")
+        .create();
+    portOption.setArgs(1);
+    portOption.setRequired(false);
+    portOption.setArgName("Cluster controller property transfer port (Optional)");
+    
+    Options options = new Options();
+    options.addOption(helpOption);
+    options.addOption(zkServerOption);
+    options.addOption(clusterOption);
+    options.addOption(modeOption);
+    options.addOption(portOption);
+    options.addOption(controllerNameOption);
+
+    return options;
+  }
+
+  public static void printUsage(Options cliOptions)
+  {
+    HelpFormatter helpFormatter = new HelpFormatter();
+    helpFormatter.setWidth(1000);
+    helpFormatter.printHelp("java " + HelixControllerMain.class.getName(), cliOptions);
+  }
+
+  public static CommandLine processCommandLineArgs(String[] cliArgs) throws Exception
+  {
+    CommandLineParser cliParser = new GnuParser();
+    Options cliOptions = constructCommandLineOptions();
+
+    try
+    {
+      return cliParser.parse(cliOptions, cliArgs);
+    } catch (ParseException pe)
+    {
+      logger.error("fail to parse command-line options. cliArgs: " + Arrays.toString(cliArgs), pe);
+      printUsage(cliOptions);
+      System.exit(1);
+    }
+    return null;
+  }
+
+  public static void addListenersToController(HelixManager manager,
+      GenericHelixController controller)
+  {
+    try
+    {
+      manager.addConfigChangeListener(controller);
+      manager.addLiveInstanceChangeListener(controller);
+      manager.addIdealStateChangeListener(controller);
+      manager.addExternalViewChangeListener(controller);
+      manager.addControllerListener(controller);
+    } catch (ZkInterruptedException e)
+    {
+      logger
+          .warn("zk connection is interrupted during HelixManagerMain.addListenersToController(). "
+              + e);
+    } catch (Exception e)
+    {
+      logger.error("Error when creating HelixManagerContollerMonitor", e);
+    }
+  }
+
+  public static HelixManager startHelixController(final String zkConnectString,
+      final String clusterName, final String controllerName, final String controllerMode)
+  {
+    HelixManager manager = null;
+    try
+    {
+      if (controllerMode.equalsIgnoreCase(STANDALONE))
+      {
+        manager = HelixManagerFactory.getZKHelixManager(clusterName, controllerName,
+            InstanceType.CONTROLLER, zkConnectString);
+        manager.connect();
+      } else if (controllerMode.equalsIgnoreCase(DISTRIBUTED))
+      {
+        manager = HelixManagerFactory.getZKHelixManager(clusterName, controllerName,
+            InstanceType.CONTROLLER_PARTICIPANT, zkConnectString);
+
+        DistClusterControllerStateModelFactory stateModelFactory = new DistClusterControllerStateModelFactory(
+            zkConnectString);
+
+        // StateMachineEngine genericStateMachineHandler = new
+        // StateMachineEngine();
+        StateMachineEngine stateMach = manager.getStateMachineEngine();
+        stateMach.registerStateModelFactory("LeaderStandby", stateModelFactory);
+        // manager.getMessagingService().registerMessageHandlerFactory(MessageType.STATE_TRANSITION.toString(),
+        // genericStateMachineHandler);
+        manager.connect();
+      } else
+      {
+        logger.error("cluster controller mode:" + controllerMode + " NOT supported");
+        // throw new
+        // IllegalArgumentException("Unsupported cluster controller mode:" +
+        // controllerMode);
+      }
+    } catch (Exception e)
+    {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+
+    return manager;
+  }
+
+  public static void main(String[] args) throws Exception
+  {
+    // read the config;
+    // check if the this process is the master wait indefinitely
+    // other approach is always process the events but when updating the zk
+    // check if this is master.
+    // This is difficult to get right
+    // get the clusters to manage
+    // for each cluster create a manager
+    // add the respective listeners for each manager
+    CommandLine cmd = processCommandLineArgs(args);
+    String zkConnectString = cmd.getOptionValue(zkServerAddress);
+    String clusterName = cmd.getOptionValue(cluster);
+    String controllerMode = STANDALONE;
+    String controllerName = null;
+    int propertyTransServicePort = -1;
+    
+    if (cmd.hasOption(mode))
+    {
+      controllerMode = cmd.getOptionValue(mode);
+    }
+    
+    if(cmd.hasOption(propertyTransferServicePort))
+    {
+        propertyTransServicePort = Integer.parseInt(cmd.getOptionValue(propertyTransferServicePort));
+    }
+    if (controllerMode.equalsIgnoreCase(DISTRIBUTED) && !cmd.hasOption(name))
+    {
+      throw new IllegalArgumentException(
+          "A unique cluster controller name is required in DISTRIBUTED mode");
+    }
+
+    controllerName = cmd.getOptionValue(name);
+
+    // Espresso_driver.py will consume this
+    logger.info("Cluster manager started, zkServer: " + zkConnectString + ", clusterName:"
+        + clusterName + ", controllerName:" + controllerName + ", mode:" + controllerMode);
+
+    if (propertyTransServicePort > 0)
+    {
+      ZKPropertyTransferServer.getInstance().init(propertyTransServicePort, zkConnectString);
+    }
+    
+    HelixManager manager = startHelixController(zkConnectString, clusterName, controllerName,
+        controllerMode);
+    try
+    {
+      Thread.currentThread().join();
+    } 
+    catch (InterruptedException e)
+    {
+      logger.info("controller:" + controllerName + ", " + Thread.currentThread().getName()
+          + " interrupted");
+    }
+    finally
+    {
+      manager.disconnect();
+      ZKPropertyTransferServer.getInstance().shutdown();
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/HierarchicalDataHolder.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/HierarchicalDataHolder.java b/helix-core/src/main/java/org/apache/helix/controller/HierarchicalDataHolder.java
new file mode 100644
index 0000000..ec6f3f6
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/HierarchicalDataHolder.java
@@ -0,0 +1,168 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller;
+
+import java.io.FileFilter;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.data.Stat;
+
+
+/**
+ * Generic class that will read the data given the root path.
+ * 
+ * @author kgopalak
+ * 
+ */
+public class HierarchicalDataHolder<T>
+{
+  private static final Logger logger = Logger
+      .getLogger(HierarchicalDataHolder.class.getName());
+  AtomicReference<Node<T>> root;
+
+  /**
+   * currentVersion, gets updated when data is read from original source
+   */
+  AtomicLong currentVersion;
+  private final ZkClient _zkClient;
+  private final String _rootPath;
+  private final FileFilter _filter;
+
+  public HierarchicalDataHolder(ZkClient client, String rootPath,
+      FileFilter filter)
+  {
+    this._zkClient = client;
+    this._rootPath = rootPath;
+    this._filter = filter;
+    // Node<T> initialValue = new Node<T>();
+    root = new AtomicReference<HierarchicalDataHolder.Node<T>>();
+    currentVersion = new AtomicLong(1);
+    refreshData();
+  }
+
+  public long getVersion()
+  {
+    return currentVersion.get();
+  }
+
+  public boolean refreshData()
+  {
+    Node<T> newRoot = new Node<T>();
+    boolean dataChanged = refreshRecursively(root.get(), newRoot, _rootPath);
+    if (dataChanged)
+    {
+      currentVersion.getAndIncrement();
+      root.set(newRoot);
+      return true;
+    } else
+    {
+      return false;
+    }
+  }
+
+  // private void refreshRecursively(Node<T> oldRoot, Stat oldStat, Node<T>
+  // newRoot,Stat newStat, String path)
+  private boolean refreshRecursively(Node<T> oldRoot, Node<T> newRoot,
+      String path)
+  {
+    boolean dataChanged = false;
+    Stat newStat = _zkClient.getStat(path);
+    Stat oldStat = (oldRoot != null) ? oldRoot.stat : null;
+    newRoot.name = path;
+    if (newStat != null)
+    {
+      if (oldStat == null)
+      {
+        newRoot.stat = newStat;
+        newRoot.data = _zkClient.<T> readData(path, true);
+        dataChanged = true;
+      } else if (newStat.equals(oldStat))
+      {
+        newRoot.stat = oldStat;
+        newRoot.data = oldRoot.data;
+      } else
+      {
+        dataChanged = true;
+        newRoot.stat = newStat;
+        newRoot.data = _zkClient.<T> readData(path, true);
+      }
+      if (newStat.getNumChildren() > 0)
+      {
+        List<String> children = _zkClient.getChildren(path);
+        for (String child : children)
+        {
+          String newPath = path + "/" + child;
+          Node<T> oldChild = (oldRoot != null && oldRoot.children != null) ? oldRoot.children
+              .get(child) : null;
+          if (newRoot.children == null)
+          {
+            newRoot.children = new ConcurrentHashMap<String, HierarchicalDataHolder.Node<T>>();
+          }
+          if (!newRoot.children.contains(child))
+          {
+            newRoot.children.put(child, new Node<T>());
+          }
+          Node<T> newChild = newRoot.children.get(child);
+          boolean childChanged = refreshRecursively(oldChild, newChild, newPath);
+          dataChanged = dataChanged || childChanged;
+        }
+      }
+    } else
+    {
+      logger.info(path + " does not exist");
+    }
+    return dataChanged;
+  }
+
+  static class Node<T>
+  {
+    String name;
+    Stat stat;
+    T data;
+    ConcurrentHashMap<String, Node<T>> children;
+
+  }
+
+  public void print()
+  {
+    logger.info("START "+ _rootPath);
+    LinkedList<Node<T>> stack = new LinkedList<HierarchicalDataHolder.Node<T>>();
+    stack.push(root.get());
+    while (!stack.isEmpty())
+    {
+      Node<T> pop = stack.pop();
+      if (pop != null)
+      {
+        logger.info("name:"+ pop.name);
+        logger.info("\tdata:"+pop.data);
+        if (pop.children != null)
+        {
+          for (Node<T> child : pop.children.values())
+          {
+            stack.push(child);
+          }
+        }
+      }
+    }
+    logger.info("END "+ _rootPath);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/package-info.java b/helix-core/src/main/java/org/apache/helix/controller/package-info.java
new file mode 100644
index 0000000..f999c38
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/package-info.java
@@ -0,0 +1,4 @@
+/**
+ * Helix cluster controller
+ */
+package org.apache.helix.controller;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java
new file mode 100644
index 0000000..c2fb7fa
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java
@@ -0,0 +1,79 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.pipeline;
+
+import java.util.Map;
+
+import org.apache.helix.controller.stages.ClusterEvent;
+import org.apache.helix.monitoring.mbeans.HelixStageLatencyMonitor;
+
+
+public class AbstractBaseStage implements Stage
+{
+  @Override
+  public void init(StageContext context)
+  {
+
+  }
+
+  @Override
+  public void preProcess()
+  {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public void process(ClusterEvent event) throws Exception
+  {
+
+  }
+
+  @Override
+  public void postProcess()
+  {
+
+  }
+
+  @Override
+  public void release()
+  {
+
+  }
+
+  @Override
+  public String getStageName()
+  {
+    // default stage name will be the class name
+    String className = this.getClass().getName();
+    return className;
+  }
+
+  public void addLatencyToMonitor(ClusterEvent event, long latency)
+  {
+    Map<String, HelixStageLatencyMonitor> stgLatencyMonitorMap =
+        event.getAttribute("HelixStageLatencyMonitorMap");
+    if (stgLatencyMonitorMap != null)
+    {
+      if (stgLatencyMonitorMap.containsKey(getStageName()))
+      {
+        HelixStageLatencyMonitor stgLatencyMonitor =
+            stgLatencyMonitorMap.get(getStageName());
+        stgLatencyMonitor.addStgLatency(latency);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java b/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java
new file mode 100644
index 0000000..7362747
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java
@@ -0,0 +1,65 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.pipeline;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.helix.controller.stages.ClusterEvent;
+import org.apache.log4j.Logger;
+
+
+public class Pipeline
+{
+  private static final Logger logger = Logger.getLogger(Pipeline.class.getName());
+  List<Stage> _stages;
+
+  public Pipeline()
+  {
+    _stages = new ArrayList<Stage>();
+  }
+
+  public void addStage(Stage stage)
+  {
+    _stages.add(stage);
+    StageContext context = null;
+    stage.init(context);
+  }
+
+  public void handle(ClusterEvent event) throws Exception
+  {
+    if (_stages == null)
+    {
+      return;
+    }
+    for (Stage stage : _stages)
+    {
+      stage.preProcess();
+      stage.process(event);
+      stage.postProcess();
+    }
+  }
+
+  public void finish()
+  {
+
+  }
+
+  public List<Stage> getStages()
+  {
+    return _stages;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/pipeline/PipelineRegistry.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/pipeline/PipelineRegistry.java b/helix-core/src/main/java/org/apache/helix/controller/pipeline/PipelineRegistry.java
new file mode 100644
index 0000000..e72fc4c
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/pipeline/PipelineRegistry.java
@@ -0,0 +1,54 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.pipeline;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class PipelineRegistry
+{
+  Map<String, List<Pipeline>> _map;
+
+  public PipelineRegistry()
+  {
+    _map = new HashMap<String, List<Pipeline>>();
+  }
+
+  public void register(String eventName, Pipeline... pipelines)
+  {
+    if (!_map.containsKey(eventName))
+    {
+      _map.put(eventName, new ArrayList<Pipeline>());
+    }
+    List<Pipeline> list = _map.get(eventName);
+    for (Pipeline pipeline : pipelines)
+    {
+      list.add(pipeline);
+    }
+  }
+
+  public List<Pipeline> getPipelinesForEvent(String eventName)
+  {
+    if (_map.containsKey(eventName))
+    {
+      return _map.get(eventName);
+    }
+    return Collections.emptyList();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/pipeline/Stage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/pipeline/Stage.java b/helix-core/src/main/java/org/apache/helix/controller/pipeline/Stage.java
new file mode 100644
index 0000000..662d573
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/pipeline/Stage.java
@@ -0,0 +1,60 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.pipeline;
+
+import org.apache.helix.controller.stages.ClusterEvent;
+
+/**
+ * Logically independent unit in processing callbacks for cluster changes  
+ *
+ */
+public interface Stage
+{
+
+  /**
+   * Initialize a stage
+   * @param context
+   */
+  void init(StageContext context);
+
+  /**
+   * Called before process() on each callback
+   */
+  void preProcess();
+
+  /**
+   * Actual callback processing logic
+   * @param event
+   * @throws Exception
+   */
+  public void process(ClusterEvent event) throws Exception;
+
+  /**
+   * Called after process() on each callback
+   */
+  void postProcess();
+
+  /**
+   * Destruct a stage
+   */
+  void release();
+
+  /**
+   * Get the name of the stage
+   * @return
+   */
+  public String getStageName();
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/pipeline/StageContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/pipeline/StageContext.java b/helix-core/src/main/java/org/apache/helix/controller/pipeline/StageContext.java
new file mode 100644
index 0000000..17ede49
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/pipeline/StageContext.java
@@ -0,0 +1,21 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.pipeline;
+
+public class StageContext
+{
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/pipeline/StageException.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/pipeline/StageException.java b/helix-core/src/main/java/org/apache/helix/controller/pipeline/StageException.java
new file mode 100644
index 0000000..0ad9c28
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/pipeline/StageException.java
@@ -0,0 +1,29 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.pipeline;
+
+public class StageException extends Exception
+{
+
+  public StageException(String message)
+  {
+    super(message);
+  }
+  public StageException(String message,Exception e)
+  {
+    super(message,e);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/pipeline/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/pipeline/package-info.java b/helix-core/src/main/java/org/apache/helix/controller/pipeline/package-info.java
new file mode 100644
index 0000000..03f31ba
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/pipeline/package-info.java
@@ -0,0 +1,5 @@
+/**
+ * Helix controller pipeline classes that process cluster changes
+ * 
+ */
+package org.apache.helix.controller.pipeline;
\ No newline at end of file