You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 00:26:40 UTC

[9/47] Refactoring from com.linkedin.helix to org.apache.helix

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/alerts/Operator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/alerts/Operator.java b/helix-core/src/main/java/com/linkedin/helix/alerts/Operator.java
deleted file mode 100644
index 8b1713c..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/alerts/Operator.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.alerts;
-
-import java.util.Iterator;
-import java.util.List;
-
-public abstract class Operator {
-	
-	public int minInputTupleLists;
-	public int maxInputTupleLists;
-	public int numOutputTupleLists = -1;
-	public boolean inputOutputTupleListsCountsEqual = false;
-	
-	public Operator()
-	{
-		
-	}
-	
-	public Tuple<String> multiplyTuples(Tuple<String> tup1, Tuple<String> tup2)
-	{
-		if (tup1 == null) {
-			return tup2;
-		}
-		if (tup2 == null) {
-			return tup1;
-		}
-		Tuple<String>outputTup = new Tuple<String>();
-		
-
-		//sum staggers if the tuples are same length
-		//e.g. 1,2,3 + 4,5 = 1,6,8
-		//so this is a bit tricky
-		Tuple<String>largerTup;
-		Tuple<String>smallerTup;
-		if (tup1.size() >= tup2.size()) {
-			largerTup = tup1;
-			smallerTup = tup2;
-		}
-		else {
-			largerTup = tup2;
-			smallerTup = tup1;
-		}		
-		int gap = largerTup.size() - smallerTup.size();
-		
-		for (int i=0; i< largerTup.size();i++) {
-			if (i < gap) {
-				outputTup.add(largerTup.getElement(i));
-			}
-			else {
-				double elementProduct = 0;
-				elementProduct = Double.parseDouble(largerTup.getElement(i)) *
-						Double.parseDouble(smallerTup.getElement(i-gap));
-				outputTup.add(String.valueOf(elementProduct));
-			}
-		}
-		return outputTup;
-	}
-	
-	public Tuple<String> sumTuples(Tuple<String> tup1, Tuple<String> tup2)
-	{
-		if (tup1 == null) {
-			return tup2;
-		}
-		if (tup2 == null) {
-			return tup1;
-		}
-		Tuple<String>outputTup = new Tuple<String>();
-		
-
-		//sum staggers if the tuples are same length
-		//e.g. 1,2,3 + 4,5 = 1,6,8
-		//so this is a bit tricky
-		Tuple<String>largerTup;
-		Tuple<String>smallerTup;
-		if (tup1.size() >= tup2.size()) {
-			largerTup = tup1;
-			smallerTup = tup2;
-		}
-		else {
-			largerTup = tup2;
-			smallerTup = tup1;
-		}		
-		int gap = largerTup.size() - smallerTup.size();
-		
-		for (int i=0; i< largerTup.size();i++) {
-			if (i < gap) {
-				outputTup.add(largerTup.getElement(i));
-			}
-			else {
-				double elementSum = 0;
-				elementSum = Double.parseDouble(largerTup.getElement(i)) +
-						Double.parseDouble(smallerTup.getElement(i-gap));
-				outputTup.add(String.valueOf(elementSum));
-			}
-		}
-		return outputTup;
-	}
-	
-	public abstract List<Iterator<Tuple<String>>> execute(List<Iterator<Tuple<String>>> input);
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/alerts/Stat.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/alerts/Stat.java b/helix-core/src/main/java/com/linkedin/helix/alerts/Stat.java
deleted file mode 100644
index 01d9fea..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/alerts/Stat.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.alerts;
-
-public class Stat {
-	String _name;
-	Tuple<String> _value;
-	Tuple<String> _timestamp;
-	
-	public Stat(String name, Tuple<String> value, Tuple<String> timestamp)
-	{
-		_name = name;
-		_value = value;
-		_timestamp = timestamp;
-	}
-	
-	public String getName()
-	{
-		return _name;
-	}
-	
-	public Tuple<String> getValue()
-	{
-		return _value;
-	}
-	
-	public Tuple<String> getTimestamp()
-	{
-		return _timestamp;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/alerts/StatsHolder.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/alerts/StatsHolder.java b/helix-core/src/main/java/com/linkedin/helix/alerts/StatsHolder.java
deleted file mode 100644
index 43ec957..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/alerts/StatsHolder.java
+++ /dev/null
@@ -1,358 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.alerts;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.regex.Pattern;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.HelixException;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.PropertyKey;
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.PropertyType;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.controller.stages.HealthDataCache;
-import com.linkedin.helix.model.PersistentStats;
-
-public class StatsHolder
-{
-  enum MatchResult {WILDCARDMATCH, EXACTMATCH, NOMATCH};
-  
-  private static final Logger logger = Logger.getLogger(StatsHolder.class
-      .getName());
-
-  public static final String VALUE_NAME = "value";
-  public static final String TIMESTAMP_NAME = "TimeStamp";
-
-  HelixDataAccessor _accessor;
-  HealthDataCache _cache;
-
-  Map<String, Map<String, String>> _statMap;
-  Map<String, Map<String, MatchResult>> _statAlertMatchResult;
-
-  private Builder _keyBuilder;
-  // PersistentStats _persistentStats;
-
-  public StatsHolder(HelixManager manager, HealthDataCache cache)
-  {
-    _accessor = manager.getHelixDataAccessor();
-    _cache = cache;
-    _keyBuilder = new PropertyKey.Builder(manager.getClusterName());
-    updateCache(_cache);
-    _statAlertMatchResult = new HashMap<String, Map<String, MatchResult>>();
-    
-  }
-
-  public void refreshStats()
-  {
-    logger.info("Refreshing cached stats");
-    _cache.refresh(_accessor);
-    updateCache(_cache);
-  }
-
-  public void persistStats()
-  {
-    // XXX: Am I using _accessor too directly here?
-    // took around 35 ms from desktop to ESV4 machine
-    PersistentStats stats = _accessor.getProperty(_keyBuilder.persistantStat());
-    if (stats == null)
-    {
-      stats = new PersistentStats(PersistentStats.nodeName); // TODO: fix naming of
-                                                         // this record, if it
-                                                         // matters
-    }
-    stats.getRecord().setMapFields(_statMap);
-    boolean retVal = _accessor.setProperty(_keyBuilder.persistantStat(),
-        stats);
-  }
-
-  public void getStatsFromCache(boolean refresh)
-  {
-    long refreshStartTime = System.currentTimeMillis();
-    if (refresh) {
-      _cache.refresh(_accessor);
-    }
-    PersistentStats persistentStatRecord = _cache.getPersistentStats();
-    if (persistentStatRecord != null) {
-      _statMap = persistentStatRecord.getMapFields();
-    }
-    else {
-      _statMap = new HashMap<String,Map<String,String>>();
-    }
-    /*
-		if (_cache.getPersistentStats() != null) {
-
-			_statMap = _cache.getPersistentStats();
-		}
-     */
-    //TODO: confirm this a good place to init the _statMap when null
-    /*
-		if (_statMap == null) {
-			_statMap = new HashMap<String, Map<String, String>>();
-		}
-     */
-    System.out.println("Refresh stats done: "+(System.currentTimeMillis() - refreshStartTime));
-  }
-
-  public Iterator<String> getAllStats()
-  {
-    return null;
-  }
-
-  /*
-   * TODO: figure out pre-conditions here. I think not allowing anything to be
-   * null on input
-   */
-  public Map<String, String> mergeStats(String statName,
-      Map<String, String> existingStat, Map<String, String> incomingStat)
-      throws HelixException
-  {
-    if (existingStat == null)
-    {
-      throw new HelixException("existing stat for merge is null");
-    }
-    if (incomingStat == null)
-    {
-      throw new HelixException("incoming stat for merge is null");
-    }
-    // get agg type and arguments, then get agg object
-    String aggTypeStr = ExpressionParser.getAggregatorStr(statName);
-    String[] aggArgs = ExpressionParser.getAggregatorArgs(statName);
-    Aggregator agg = ExpressionParser.getAggregator(aggTypeStr);
-    // XXX: some of below lines might fail with null exceptions
-
-    // get timestamps, values out of zk maps
-    String existingTime = existingStat.get(TIMESTAMP_NAME);
-    String existingVal = existingStat.get(VALUE_NAME);
-    String incomingTime = incomingStat.get(TIMESTAMP_NAME);
-    String incomingVal = incomingStat.get(VALUE_NAME);
-    // parse values into tuples, if the values exist. else, tuples are null
-    Tuple<String> existingTimeTuple = (existingTime != null) ? Tuple
-        .fromString(existingTime) : null;
-    Tuple<String> existingValueTuple = (existingVal != null) ? Tuple
-        .fromString(existingVal) : null;
-    Tuple<String> incomingTimeTuple = (incomingTime != null) ? Tuple
-        .fromString(incomingTime) : null;
-    Tuple<String> incomingValueTuple = (incomingVal != null) ? Tuple
-        .fromString(incomingVal) : null;
-
-    // dp merge
-    agg.merge(existingValueTuple, incomingValueTuple, existingTimeTuple,
-        incomingTimeTuple, aggArgs);
-    // put merged tuples back in map
-    Map<String, String> mergedMap = new HashMap<String, String>();
-    if (existingTimeTuple.size() == 0)
-    {
-      throw new HelixException("merged time tuple has size zero");
-    }
-    if (existingValueTuple.size() == 0)
-    {
-      throw new HelixException("merged value tuple has size zero");
-    }
-
-    mergedMap.put(TIMESTAMP_NAME, existingTimeTuple.toString());
-    mergedMap.put(VALUE_NAME, existingValueTuple.toString());
-    return mergedMap;
-  }
-
-  /*
-   * Find all persisted stats this stat matches. Update those stats. An incoming
-   * stat can match multiple stats exactly (if that stat has multiple agg types)
-   * An incoming stat can match multiple wildcard stats
-   */
-
-  // need to do a time check here!
-
-  public void applyStat(String incomingStatName, Map<String, String> statFields)
-  {
-    // TODO: consider locking stats here
-    //refreshStats(); //will have refreshed by now during stage
-
-    Map<String, Map<String, String>> pendingAdds = new HashMap<String, Map<String, String>>();
-    
-    if(!_statAlertMatchResult.containsKey(incomingStatName))
-    {
-      _statAlertMatchResult.put(incomingStatName, new HashMap<String, MatchResult>());
-    }
-    Map<String, MatchResult> resultMap = _statAlertMatchResult.get(incomingStatName);
-    // traverse through all persistent stats
-    for (String key : _statMap.keySet())
-    {
-      if(resultMap.containsKey(key))
-      {
-        MatchResult cachedMatchResult = resultMap.get(key);
-        if(cachedMatchResult == MatchResult.EXACTMATCH)
-        {
-          processExactMatch(key, statFields);
-        }
-        else if(cachedMatchResult == MatchResult.WILDCARDMATCH)
-        {
-          processWildcardMatch(incomingStatName, key,statFields, pendingAdds);
-        }
-        // don't care about NOMATCH
-        continue;
-      }
-      // exact match on stat and stat portion of persisted stat, just update
-      if (ExpressionParser.isIncomingStatExactMatch(key, incomingStatName))
-      {
-        processExactMatch(key, statFields);
-        resultMap.put(key, MatchResult.EXACTMATCH);
-      }
-      // wildcard match
-      else if (ExpressionParser.isIncomingStatWildcardMatch(key,
-          incomingStatName))
-      {
-        processWildcardMatch(incomingStatName, key,statFields, pendingAdds);
-        resultMap.put(key, MatchResult.WILDCARDMATCH);
-      }
-      else
-      {
-        resultMap.put(key, MatchResult.NOMATCH);
-      }
-    }
-    _statMap.putAll(pendingAdds);
-  } 
-  
-  void processExactMatch(String key, Map<String, String> statFields)
-  {
-    Map<String, String> mergedStat = mergeStats(key, _statMap.get(key),
-        statFields);
-    // update in place, no problem with hash map
-    _statMap.put(key, mergedStat);
-  }
-  
-  void processWildcardMatch(String incomingStatName, String key, 
-      Map<String, String> statFields,  Map<String, Map<String, String>> pendingAdds)
-  {
-
-    // make sure incoming stat doesn't already exist, either in previous
-    // round or this round
-    // form new key (incomingStatName with agg type from the wildcarded
-    // stat)
-    String statToAdd = ExpressionParser.getWildcardStatSubstitution(key,
-        incomingStatName);
-    // if the stat already existed in _statMap, we have/will apply it as an
-    // exact match
-    // if the stat was added this round to pendingAdds, no need to recreate
-    // (it would have same value)
-    if (!_statMap.containsKey(statToAdd)
-        && !pendingAdds.containsKey(statToAdd))
-    {
-      // add this stat to persisted stats
-      Map<String, String> mergedStat = mergeStats(statToAdd,
-          getEmptyStat(), statFields);
-      // add to pendingAdds so we don't mess up ongoing traversal of
-      // _statMap
-      pendingAdds.put(statToAdd, mergedStat);
-    }
-  }
-
-  // add parsing of stat (or is that in expression holder?) at least add
-  // validate
-  public void addStat(String exp) throws HelixException
-  {
-    refreshStats(); // get current stats
-
-    String[] parsedStats = ExpressionParser.getBaseStats(exp);
-
-    for (String stat : parsedStats)
-    {
-      if (_statMap.containsKey(stat))
-      {
-        logger.debug("Stat " + stat + " already exists; not adding");
-        continue;
-      }
-      _statMap.put(stat, getEmptyStat()); // add new stat to map
-    }
-  }
-
-  public static Map<String, Map<String, String>> parseStat(String exp)
-      throws HelixException
-  {
-    String[] parsedStats = ExpressionParser.getBaseStats(exp);
-    Map<String, Map<String, String>> statMap = new HashMap<String, Map<String, String>>();
-
-    for (String stat : parsedStats)
-    {
-      if (statMap.containsKey(stat))
-      {
-        logger.debug("Stat " + stat + " already exists; not adding");
-        continue;
-      }
-      statMap.put(stat, getEmptyStat()); // add new stat to map
-    }
-    return statMap;
-  }
-
-
-  public static Map<String, String> getEmptyStat()
-  {
-    Map<String, String> statFields = new HashMap<String, String>();
-    statFields.put(TIMESTAMP_NAME, "");
-    statFields.put(VALUE_NAME, "");
-    return statFields;
-  }
-
-  public List<Stat> getStatsList()
-  {
-    List<Stat> stats = new LinkedList<Stat>();
-    for (String stat : _statMap.keySet())
-    {
-      Map<String, String> statFields = _statMap.get(stat);
-      Tuple<String> valTup = Tuple.fromString(statFields.get(VALUE_NAME));
-      Tuple<String> timeTup = Tuple.fromString(statFields.get(TIMESTAMP_NAME));
-      Stat s = new Stat(stat, valTup, timeTup);
-      stats.add(s);
-    }
-    return stats;
-  }
-
-  public Map<String, Tuple<String>> getStatsMap()
-  {
-    //refreshStats(); //don't refresh, stage will have refreshed by this time
-    HashMap<String, Tuple<String>> stats = new HashMap<String, Tuple<String>>();
-    for (String stat : _statMap.keySet())
-    {
-      Map<String, String> statFields = _statMap.get(stat);
-      Tuple<String> valTup = Tuple.fromString(statFields.get(VALUE_NAME));
-      Tuple<String> timeTup = Tuple.fromString(statFields.get(TIMESTAMP_NAME));
-      stats.put(stat, valTup);
-    }
-    return stats;
-  }
-
-  public void updateCache(HealthDataCache cache)
-  {
-    _cache = cache;
-    PersistentStats persistentStatRecord = _cache.getPersistentStats();
-    if (persistentStatRecord != null)
-    {
-      _statMap = persistentStatRecord.getMapFields();
-    }
-    else
-    {
-      _statMap = new HashMap<String, Map<String, String>>();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/alerts/SumEachOperator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/alerts/SumEachOperator.java b/helix-core/src/main/java/com/linkedin/helix/alerts/SumEachOperator.java
deleted file mode 100644
index 9f3b2a2..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/alerts/SumEachOperator.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.alerts;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-public class SumEachOperator extends Operator {
-
-	public SumEachOperator() {
-		minInputTupleLists = 1;
-		maxInputTupleLists = Integer.MAX_VALUE;
-		inputOutputTupleListsCountsEqual = true;
-		numOutputTupleLists = -1;
-	}
-
-	//for each column, generate sum
-	@Override
-	public List<Iterator<Tuple<String>>> execute(List<Iterator<Tuple<String>>> input) {
-		List<Iterator<Tuple<String>>> out = new ArrayList<Iterator<Tuple<String>>>();
-		for (Iterator<Tuple<String>> currIt : input) {
-			Tuple<String> currSum = null;
-			while (currIt.hasNext()) {
-				currSum = sumTuples(currSum, currIt.next());
-			}
-			ArrayList<Tuple<String>> currOutList = new ArrayList<Tuple<String>>();
-			currOutList.add(currSum);
-			out.add(currOutList.iterator());
-		}
-		return out;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/alerts/SumOperator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/alerts/SumOperator.java b/helix-core/src/main/java/com/linkedin/helix/alerts/SumOperator.java
deleted file mode 100644
index 760f076..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/alerts/SumOperator.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.alerts;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-public class SumOperator extends Operator {
-
-	public SumOperator() {
-		minInputTupleLists = 1;
-		maxInputTupleLists = Integer.MAX_VALUE;
-		inputOutputTupleListsCountsEqual = false;
-		numOutputTupleLists = 1;
-	}
-
-	
-	public List<Iterator<Tuple<String>>> singleSetToIter(ArrayList<Tuple<String>> input) 
-	{
-		List out = new ArrayList();
-		out.add(input.iterator());
-		return out;
-	}
-	
-	@Override
-	public List<Iterator<Tuple<String>>> execute(List<Iterator<Tuple<String>>> input) {
-		ArrayList<Tuple<String>> output = new ArrayList<Tuple<String>>();
-		if (input == null || input.size() == 0) {
-			return singleSetToIter(output);
-		}
-		while (true) { //loop through set of iters, return when 1 runs out (not completing the row in progress)
-			Tuple<String> rowSum = null;
-			for (Iterator<Tuple<String>> it : input) {
-				if (!it.hasNext()) { //when any iterator runs out, we are done
-					return singleSetToIter(output);
-				}
-				rowSum = sumTuples(rowSum, it.next());
-			}
-			output.add(rowSum);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/alerts/Tuple.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/alerts/Tuple.java b/helix-core/src/main/java/com/linkedin/helix/alerts/Tuple.java
deleted file mode 100644
index 9336e47..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/alerts/Tuple.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.alerts;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.StringTokenizer;
-import java.util.Vector;
-
-public class Tuple<T> {
-	List<T> elements;
-	
-	public Tuple() 
-	{
-		elements = new ArrayList<T>();
-	}
-	
-	public int size()
-	{
-		return elements.size();
-	}
-
-	public void add(T entry)
-	{
-		elements.add(entry);
-	}
-	
-	public void addAll(Tuple<T> incoming)
-	{
-		elements.addAll(incoming.getElements());
-	}
-	
-	public Iterator<T> iterator()
-	{
-		return elements.listIterator();
-	}
-	
-	public T getElement(int ind)
-	{
-		return elements.get(ind);
-	}
-	
-	public List<T> getElements()
-	{
-		return elements;
-	}
-	
-	public void clear() 
-	{
-		elements.clear();
-	}
-	
-	public static Tuple<String> fromString(String in) 
-	{
-		Tuple<String> tup = new Tuple<String>();
-		if (in.length() > 0) {
-			String[] elements = in.split(",");
-			for (String element : elements) {
-				tup.add(element);
-			}
-		}
-		return tup;
-	}
-	
-	public String toString() 
-	{
-		StringBuilder out = new StringBuilder();
-		Iterator<T> it = iterator();
-		boolean outEmpty=true;
-		while (it.hasNext()) {
-			if (!outEmpty) {
-				out.append(",");
-			}
-			out.append(it.next());
-			outEmpty = false;
-		}
-		return out.toString();
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/alerts/WindowAggregator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/alerts/WindowAggregator.java b/helix-core/src/main/java/com/linkedin/helix/alerts/WindowAggregator.java
deleted file mode 100644
index 132f23a..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/alerts/WindowAggregator.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.alerts;
-
-import java.util.Iterator;
-
-import com.linkedin.helix.HelixException;
-
-public class WindowAggregator extends Aggregator {
-
-	
-	int _windowSize;
-	
-	public WindowAggregator(String windowSize) 
-	{
-		_windowSize = Integer.parseInt(windowSize);
-		_numArgs = 1;
-	}
-	
-	public WindowAggregator()
-	{
-		this("1");
-	}
-
-	@Override
-	public void merge(Tuple<String> currValTup, Tuple<String> newValTup,
-			Tuple<String> currTimeTup, Tuple<String> newTimeTup, String... args) {
-		
-		_windowSize = Integer.parseInt(args[0]);
-		
-		//figure out how many curr tuple values we displace
-		Tuple<String> mergedTimeTuple = new Tuple<String>();
-		Tuple<String> mergedValTuple = new Tuple<String>();
-		
-		Iterator<String> currTimeIter = currTimeTup.iterator();
-		Iterator<String> currValIter = currValTup.iterator();
-		Iterator<String> newTimeIter = newTimeTup.iterator();
-		Iterator<String> newValIter = newValTup.iterator();
-		int currCtr = 0;
-		//traverse current vals
-		double currTime = -1;
-		double currVal;
-		while (currTimeIter.hasNext()) {
-			currTime = Double.parseDouble(currTimeIter.next());
-			currVal = Double.parseDouble(currValIter.next());
-			currCtr++;
-			//number of evicted currVals equal to total size of both minus _windowSize
-			if (currCtr > (newTimeTup.size()+currTimeTup.size()-_windowSize)) { //non-evicted element, just bump down
-				mergedTimeTuple.add(String.valueOf(currTime));
-				mergedValTuple.add(String.valueOf(currVal));
-			}
-		}
-
-		double newVal;
-		double newTime;
-		while (newTimeIter.hasNext()) {
-			newVal = Double.parseDouble(newValIter.next());
-			newTime = Double.parseDouble(newTimeIter.next());
-			if (newTime <= currTime) { //oldest new time older than newest curr time.  we will not apply new tuple!
-				return; //curr tuples remain the same
-			}
-			currCtr++;
-			if (currCtr > (newTimeTup.size()+currTimeTup.size()-_windowSize)) { //non-evicted element
-				mergedTimeTuple.add(String.valueOf(newTime));
-				mergedValTuple.add(String.valueOf(newVal));
-			}
-		}
-		 //set curr tuples to merged tuples
-		currTimeTup.clear();
-		currTimeTup.addAll(mergedTimeTuple);
-		currValTup.clear();
-		currValTup.addAll(mergedValTuple);
-		//TODO: see if we can do merger in place on curr
-	}
-
-	
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/alerts/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/alerts/package-info.java b/helix-core/src/main/java/com/linkedin/helix/alerts/package-info.java
deleted file mode 100644
index 88a99ae..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/alerts/package-info.java
+++ /dev/null
@@ -1,4 +0,0 @@
-/**
- * Classes for Helix alerts
- */
-package com.linkedin.helix.alerts;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/ExternalViewGenerator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/ExternalViewGenerator.java b/helix-core/src/main/java/com/linkedin/helix/controller/ExternalViewGenerator.java
deleted file mode 100644
index 28bcb34..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/ExternalViewGenerator.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.model.CurrentState.CurrentStateProperty;
-
-/*
- * ZKRoutingInfoProvider keeps a copy of the routing table. Given a partition id,
- * it will return
- *
- * 1. The list of partition that can be read
- * 2. the master partition, for write operation
- *
- * The routing table is constructed from the currentState of each storage nodes.
- * The current state is a list of following pairs: partition-id:State(MASTER / SLAVE)
- *
- * TODO: move the code as part of router process
- * TODO: add listeners to node current state changes
- * */
-public class ExternalViewGenerator
-{
-  static Logger _logger = Logger.getLogger(ExternalViewGenerator.class);
-
-  /*
-   * Given a list of external view ZNRecord nodes(one for each cluster),
-   * calculate the routing map.
-   *
-   * The format of the routing map is like this:
-   *
-   * Map<String, Map<String, Set<String>>> maps from a partitionName to its
-   * states Map<String, List<String>> The second Map maps from a state
-   * ("MASTER", "SLAVE"...) to a list of nodeNames
-   *
-   * So that the we can query the map for the list of nodes by providing the
-   * partition name and the expected state.
-   */
-  public Map<String, Map<String, Set<String>>> getRouterMapFromExternalView(
-      List<ZNRecord> dbExternalViewList)
-  {
-    Map<String, Map<String, Set<String>>> result = new TreeMap<String, Map<String, Set<String>>>();
-
-    for (ZNRecord dbNodeView : dbExternalViewList)
-    {
-      Map<String, Map<String, String>> dbNodeStateMap = dbNodeView
-          .getMapFields();
-      for (String partitionId : dbNodeStateMap.keySet())
-      {
-        if (!result.containsKey(partitionId))
-        {
-          result.put(partitionId, new TreeMap<String, Set<String>>());
-        }
-        Map<String, String> nodeStateMap = dbNodeStateMap.get(partitionId);
-        for (String nodeName : nodeStateMap.keySet())
-        {
-          String state = nodeStateMap.get(nodeName);
-          if (!result.get(partitionId).containsKey(state))
-          {
-            result.get(partitionId).put(state, new TreeSet<String>());
-          }
-          result.get(partitionId).get(state).add(nodeName);
-        }
-      }
-    }
-    return result;
-  }
-
-  /*
-   * The parameter is a map that maps the nodeName to a list of ZNRecords.
-   */
-  public List<ZNRecord> computeExternalView(
-      Map<String, List<ZNRecord>> currentStates, List<ZNRecord> idealStates)
-  {
-    List<ZNRecord> resultList = new ArrayList<ZNRecord>();
-    Map<String, ZNRecord> resultRoutingTable = new HashMap<String, ZNRecord>();
-    // maps from dbName to another map : partition -> map <nodename,
-    // master/slave>;
-    // Fill the routing table with "empty" default state according to ideals
-    // states
-    // in the cluster
-    if (idealStates != null)
-    {
-      for (ZNRecord idealState : idealStates)
-      {
-        ZNRecord defaultDBExternalView = new ZNRecord(idealState.getId());
-        resultRoutingTable.put(idealState.getId(), defaultDBExternalView);
-      }
-    } else
-    {
-      assert (!currentStates.isEmpty());
-      return resultList;
-    }
-    for (String nodeName : currentStates.keySet())
-    {
-      List<ZNRecord> zndbStates = currentStates.get(nodeName);
-      for (ZNRecord dbNodeStateRecord : zndbStates)
-      {
-        Map<String, Map<String, String>> dbStates = dbNodeStateRecord
-            .getMapFields();
-        for (String stateUnitKey : dbStates.keySet())
-        {
-          Map<String, String> dbPartitionStates = dbStates.get(stateUnitKey);
-          String dbName = dbPartitionStates
-              .get(Message.Attributes.RESOURCE_NAME.toString());
-          ZNRecord partitionStatus = resultRoutingTable.get(dbName);
-          if (partitionStatus == null)
-          {
-            partitionStatus = new ZNRecord(dbName);
-            resultRoutingTable.put(dbName, partitionStatus);
-          }
-          String currentStateKey = CurrentStateProperty.CURRENT_STATE.toString();
-
-          if (!partitionStatus.getMapFields().containsKey(stateUnitKey))
-          {
-            partitionStatus.setMapField(stateUnitKey,
-                new TreeMap<String, String>());
-          }
-          partitionStatus.getMapField(stateUnitKey).put(nodeName,
-              dbPartitionStates.get(currentStateKey));
-
-        }
-      }
-    }
-    for (ZNRecord record : resultRoutingTable.values())
-    {
-      resultList.add(record);
-    }
-    return resultList;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/GenericHelixController.java b/helix-core/src/main/java/com/linkedin/helix/controller/GenericHelixController.java
deleted file mode 100644
index 08e99c6..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/GenericHelixController.java
+++ /dev/null
@@ -1,608 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ConcurrentSkipListSet;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.ConfigAccessor;
-import com.linkedin.helix.ConfigChangeListener;
-import com.linkedin.helix.ConfigScope;
-import com.linkedin.helix.ConfigScopeBuilder;
-import com.linkedin.helix.ControllerChangeListener;
-import com.linkedin.helix.CurrentStateChangeListener;
-import com.linkedin.helix.ExternalViewChangeListener;
-import com.linkedin.helix.HealthStateChangeListener;
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.IdealStateChangeListener;
-import com.linkedin.helix.LiveInstanceChangeListener;
-import com.linkedin.helix.MessageListener;
-import com.linkedin.helix.NotificationContext;
-import com.linkedin.helix.NotificationContext.Type;
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.controller.pipeline.Pipeline;
-import com.linkedin.helix.controller.pipeline.PipelineRegistry;
-import com.linkedin.helix.controller.stages.BestPossibleStateCalcStage;
-import com.linkedin.helix.controller.stages.ClusterEvent;
-import com.linkedin.helix.controller.stages.CompatibilityCheckStage;
-import com.linkedin.helix.controller.stages.CurrentStateComputationStage;
-import com.linkedin.helix.controller.stages.ExternalViewComputeStage;
-import com.linkedin.helix.controller.stages.MessageGenerationPhase;
-import com.linkedin.helix.controller.stages.MessageSelectionStage;
-import com.linkedin.helix.controller.stages.MessageThrottleStage;
-import com.linkedin.helix.controller.stages.ReadClusterDataStage;
-import com.linkedin.helix.controller.stages.ResourceComputationStage;
-import com.linkedin.helix.controller.stages.TaskAssignmentStage;
-import com.linkedin.helix.model.CurrentState;
-import com.linkedin.helix.model.ExternalView;
-import com.linkedin.helix.model.HealthStat;
-import com.linkedin.helix.model.IdealState;
-import com.linkedin.helix.model.InstanceConfig;
-import com.linkedin.helix.model.LiveInstance;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.model.PauseSignal;
-import com.linkedin.helix.monitoring.mbeans.ClusterStatusMonitor;
-import com.linkedin.helix.monitoring.mbeans.MessageQueueMonitor;
-
-/**
- * Cluster Controllers main goal is to keep the cluster state as close as possible to
- * Ideal State. It does this by listening to changes in cluster state and scheduling new
- * tasks to get cluster state to best possible ideal state. Every instance of this class
- * can control can control only one cluster
- *
- *
- * Get all the partitions use IdealState, CurrentState and Messages <br>
- * foreach partition <br>
- * 1. get the (instance,state) from IdealState, CurrentState and PendingMessages <br>
- * 2. compute best possible state (instance,state) pair. This needs previous step data and
- * state model constraints <br>
- * 3. compute the messages/tasks needed to move to 1 to 2 <br>
- * 4. select the messages that can be sent, needs messages and state model constraints <br>
- * 5. send messages
- */
-public class GenericHelixController implements
-    ConfigChangeListener,
-    IdealStateChangeListener,
-    LiveInstanceChangeListener,
-    MessageListener,
-    CurrentStateChangeListener,
-    ExternalViewChangeListener,
-    ControllerChangeListener,
-    HealthStateChangeListener
-{
-  private static final Logger    logger =
-                                            Logger.getLogger(GenericHelixController.class.getName());
-  volatile boolean               init   = false;
-  private final PipelineRegistry _registry;
-
-  /**
-   * Since instance current state is per-session-id, we need to track the session-ids of
-   * the current states that the ClusterController is observing. this set contains all the
-   * session ids that we add currentState listener
-   */
-  private final Set<String>      _instanceCurrentStateChangeSubscriptionSessionIds;
-
-  /**
-   * this set contains all the instance names that we add message listener
-   */
-  private final Set<String>      _instanceSubscriptionNames;
-
-  ClusterStatusMonitor           _clusterStatusMonitor;
-  
-
-  /**
-   * The _paused flag is checked by function handleEvent(), while if the flag is set
-   * handleEvent() will be no-op. Other event handling logic keeps the same when the flag
-   * is set.
-   */
-  private boolean                _paused;
-
-  /**
-   * The timer that can periodically run the rebalancing pipeline. The timer will start if there
-   * is one resource group has the config to use the timer.
-   */
-  Timer _rebalanceTimer = null;
-  int _timerPeriod = Integer.MAX_VALUE;
-
-  /**
-   * Default constructor that creates a default pipeline registry. This is sufficient in
-   * most cases, but if there is a some thing specific needed use another constructor
-   * where in you can pass a pipeline registry
-   */
-  public GenericHelixController()
-  {
-    this(createDefaultRegistry());
-  }
-
-  class RebalanceTask extends TimerTask
-  {
-    HelixManager _manager;
-    
-    public RebalanceTask(HelixManager manager)
-    {
-      _manager = manager;
-    }
-    
-    @Override
-    public void run()
-    {
-      NotificationContext changeContext = new NotificationContext(_manager);
-      changeContext.setType(NotificationContext.Type.CALLBACK);
-      ClusterEvent event = new ClusterEvent("periodicalRebalance");
-      event.addAttribute("helixmanager", changeContext.getManager());
-      event.addAttribute("changeContext", changeContext);
-      List<ZNRecord> dummy = new ArrayList<ZNRecord>();
-      event.addAttribute("eventData", dummy);
-      // Should be able to process  
-      handleEvent(event);
-    }
-  }
-  
-  /**
-   * Starts the rebalancing timer with the specified period. Start the timer if necessary;
-   * If the period is smaller than the current period, cancel the current timer and use 
-   * the new period.
-   */
-  void startRebalancingTimer(int period, HelixManager manager)
-  {
-    logger.info("Controller starting timer at period " + period);
-    if(period < _timerPeriod)
-    {
-      if(_rebalanceTimer != null)
-      {
-        _rebalanceTimer.cancel();
-      }
-      _rebalanceTimer = new Timer(true);
-      _timerPeriod = period;
-      _rebalanceTimer.scheduleAtFixedRate(new RebalanceTask(manager), _timerPeriod, _timerPeriod);
-    }
-    else
-    {
-      logger.info("Controller already has timer at period " + _timerPeriod);
-    }
-  }
-  
-  /**
-   * Starts the rebalancing timer 
-   */
-  void stopRebalancingTimer()
-  {
-    if(_rebalanceTimer != null)
-    {
-      _rebalanceTimer.cancel();
-      _rebalanceTimer = null;
-    }
-    _timerPeriod = Integer.MAX_VALUE;
-  }
-  
-  private static PipelineRegistry createDefaultRegistry()
-  {
-    logger.info("createDefaultRegistry");
-    synchronized (GenericHelixController.class)
-    {
-      PipelineRegistry registry = new PipelineRegistry();
-
-      // cluster data cache refresh
-      Pipeline dataRefresh = new Pipeline();
-      dataRefresh.addStage(new ReadClusterDataStage());
-
-      // rebalance pipeline
-      Pipeline rebalancePipeline = new Pipeline();
-      rebalancePipeline.addStage(new ResourceComputationStage());
-      rebalancePipeline.addStage(new CurrentStateComputationStage());
-      rebalancePipeline.addStage(new BestPossibleStateCalcStage());
-      rebalancePipeline.addStage(new MessageGenerationPhase());
-      rebalancePipeline.addStage(new MessageSelectionStage());
-      rebalancePipeline.addStage(new MessageThrottleStage());
-      rebalancePipeline.addStage(new TaskAssignmentStage());
-
-      // external view generation
-      Pipeline externalViewPipeline = new Pipeline();
-      externalViewPipeline.addStage(new ExternalViewComputeStage());
-
-      // backward compatibility check
-      Pipeline liveInstancePipeline = new Pipeline();
-      liveInstancePipeline.addStage(new CompatibilityCheckStage());
-
-      registry.register("idealStateChange", dataRefresh, rebalancePipeline);
-      registry.register("currentStateChange",
-                        dataRefresh,
-                        rebalancePipeline,
-                        externalViewPipeline);
-      registry.register("configChange", dataRefresh, rebalancePipeline);
-      registry.register("liveInstanceChange",
-                        dataRefresh,
-                        liveInstancePipeline,
-                        rebalancePipeline,
-                        externalViewPipeline);
-
-      registry.register("messageChange",
-                        dataRefresh,
-                        rebalancePipeline);
-      registry.register("externalView", dataRefresh);
-      registry.register("resume", dataRefresh, rebalancePipeline, externalViewPipeline);
-      registry.register("periodicalRebalance", dataRefresh, rebalancePipeline, externalViewPipeline);
-
-      // health stats pipeline
-      // Pipeline healthStatsAggregationPipeline = new Pipeline();
-      // StatsAggregationStage statsStage = new StatsAggregationStage();
-      // healthStatsAggregationPipeline.addStage(new ReadHealthDataStage());
-      // healthStatsAggregationPipeline.addStage(statsStage);
-      // registry.register("healthChange", healthStatsAggregationPipeline);
-
-      return registry;
-    }
-  }
-
-  public GenericHelixController(PipelineRegistry registry)
-  {
-    _paused = false;
-    _registry = registry;
-    _instanceCurrentStateChangeSubscriptionSessionIds =
-        new ConcurrentSkipListSet<String>();
-    _instanceSubscriptionNames = new ConcurrentSkipListSet<String>();
-    // _externalViewGenerator = new ExternalViewGenerator();
-  }
-
-  /**
-   * lock-always: caller always needs to obtain an external lock before call, calls to
-   * handleEvent() should be serialized
-   *
-   * @param event
-   */
-  protected synchronized void handleEvent(ClusterEvent event)
-  {
-    HelixManager manager = event.getAttribute("helixmanager");
-    if (manager == null)
-    {
-      logger.error("No cluster manager in event:" + event.getName());
-      return;
-    }
-
-    if (!manager.isLeader())
-    {
-      logger.error("Cluster manager: " + manager.getInstanceName()
-          + " is not leader. Pipeline will not be invoked");
-      return;
-    }
-
-    if (_paused)
-    {
-      logger.info("Cluster is paused. Ignoring the event:" + event.getName());
-      return;
-    }
-
-    NotificationContext context = null;
-    if (event.getAttribute("changeContext") != null)
-    {
-      context = (NotificationContext) (event.getAttribute("changeContext"));
-    }
-
-    // Initialize _clusterStatusMonitor
-    if (context != null)
-    {
-      if (context.getType() == Type.FINALIZE)
-      {
-        if (_clusterStatusMonitor != null)
-        {
-          _clusterStatusMonitor.reset();
-          _clusterStatusMonitor = null;
-        }
-        
-        stopRebalancingTimer();
-        logger.info("Get FINALIZE notification, skip the pipeline. Event :" + event.getName());
-        return;
-      }
-      else
-      {
-        if (_clusterStatusMonitor == null)
-        {
-          _clusterStatusMonitor = new ClusterStatusMonitor(manager.getClusterName());
-        }
-        
-        event.addAttribute("clusterStatusMonitor", _clusterStatusMonitor);
-      }
-    }
-
-    List<Pipeline> pipelines = _registry.getPipelinesForEvent(event.getName());
-    if (pipelines == null || pipelines.size() == 0)
-    {
-      logger.info("No pipeline to run for event:" + event.getName());
-      return;
-    }
-
-    for (Pipeline pipeline : pipelines)
-    {
-      try
-      {
-        pipeline.handle(event);
-        pipeline.finish();
-      }
-      catch (Exception e)
-      {
-        logger.error("Exception while executing pipeline: " + pipeline
-            + ". Will not continue to next pipeline", e);
-        break;
-      }
-    }
-  }
-
-  // TODO since we read data in pipeline, we can get rid of reading from zookeeper in
-  // callback
-
-  @Override
-  public void onExternalViewChange(List<ExternalView> externalViewList,
-                                   NotificationContext changeContext)
-  {
-//    logger.info("START: GenericClusterController.onExternalViewChange()");
-//    ClusterEvent event = new ClusterEvent("externalViewChange");
-//    event.addAttribute("helixmanager", changeContext.getManager());
-//    event.addAttribute("changeContext", changeContext);
-//    event.addAttribute("eventData", externalViewList);
-//    // handleEvent(event);
-//    logger.info("END: GenericClusterController.onExternalViewChange()");
-  }
-
-  @Override
-  public void onStateChange(String instanceName,
-                            List<CurrentState> statesInfo,
-                            NotificationContext changeContext)
-  {
-    logger.info("START: GenericClusterController.onStateChange()");
-    ClusterEvent event = new ClusterEvent("currentStateChange");
-    event.addAttribute("helixmanager", changeContext.getManager());
-    event.addAttribute("instanceName", instanceName);
-    event.addAttribute("changeContext", changeContext);
-    event.addAttribute("eventData", statesInfo);
-    handleEvent(event);
-    logger.info("END: GenericClusterController.onStateChange()");
-  }
-
-  @Override
-  public void onHealthChange(String instanceName,
-                             List<HealthStat> reports,
-                             NotificationContext changeContext)
-  {
-    /**
-     * When there are more participant ( > 20, can be in hundreds), This callback can be
-     * called quite frequently as each participant reports health stat every minute. Thus
-     * we change the health check pipeline to run in a timer callback.
-     */
-  }
-
-  @Override
-  public void onMessage(String instanceName,
-                        List<Message> messages,
-                        NotificationContext changeContext)
-  {
-    logger.info("START: GenericClusterController.onMessage()");
-    
-    ClusterEvent event = new ClusterEvent("messageChange");
-    event.addAttribute("helixmanager", changeContext.getManager());
-    event.addAttribute("instanceName", instanceName);
-    event.addAttribute("changeContext", changeContext);
-    event.addAttribute("eventData", messages);
-    handleEvent(event);
-    
-    if (_clusterStatusMonitor != null && messages != null)
-    {
-      _clusterStatusMonitor.addMessageQueueSize(instanceName, messages.size());
-    }
-        
-    logger.info("END: GenericClusterController.onMessage()");
-  }
-
-  @Override
-  public void onLiveInstanceChange(List<LiveInstance> liveInstances,
-                                   NotificationContext changeContext)
-  {
-    logger.info("START: Generic GenericClusterController.onLiveInstanceChange()");
-    if (liveInstances == null)
-    {
-      liveInstances = Collections.emptyList();
-    }
-    // Go though the live instance list and make sure that we are observing them
-    // accordingly. The action is done regardless of the paused flag.
-    if (changeContext.getType() == NotificationContext.Type.INIT ||
-        changeContext.getType() == NotificationContext.Type.CALLBACK)
-    {
-      checkLiveInstancesObservation(liveInstances, changeContext);
-    }
-
-    ClusterEvent event = new ClusterEvent("liveInstanceChange");
-    event.addAttribute("helixmanager", changeContext.getManager());
-    event.addAttribute("changeContext", changeContext);
-    event.addAttribute("eventData", liveInstances);
-    handleEvent(event);
-    logger.info("END: Generic GenericClusterController.onLiveInstanceChange()");
-  }
-  
-  void checkRebalancingTimer(HelixManager manager, List<IdealState> idealStates)
-  {
-    if (manager.getConfigAccessor() == null)
-    {
-      logger.warn(manager.getInstanceName() + " config accessor doesn't exist. should be in file-based mode.");
-      return;
-    }
-    
-    for(IdealState idealState : idealStates)
-    {
-      int period = idealState.getRebalanceTimerPeriod();
-      if(period > 0)
-      {
-        startRebalancingTimer(period, manager);
-      }
-    }
-  }
-  
-  @Override
-  public void onIdealStateChange(List<IdealState> idealStates,
-                                 NotificationContext changeContext)
-  {
-    logger.info("START: Generic GenericClusterController.onIdealStateChange()");
-    ClusterEvent event = new ClusterEvent("idealStateChange");
-    event.addAttribute("helixmanager", changeContext.getManager());
-    event.addAttribute("changeContext", changeContext);
-    event.addAttribute("eventData", idealStates);
-    handleEvent(event);
-    
-    if(changeContext.getType() != Type.FINALIZE)
-    {
-      checkRebalancingTimer(changeContext.getManager(), idealStates);
-    }
-    
-    logger.info("END: Generic GenericClusterController.onIdealStateChange()");
-  }
-
-  @Override
-  public void onConfigChange(List<InstanceConfig> configs,
-                             NotificationContext changeContext)
-  {
-    logger.info("START: GenericClusterController.onConfigChange()");
-    ClusterEvent event = new ClusterEvent("configChange");
-    event.addAttribute("changeContext", changeContext);
-    event.addAttribute("helixmanager", changeContext.getManager());
-    event.addAttribute("eventData", configs);
-    handleEvent(event);
-    logger.info("END: GenericClusterController.onConfigChange()");
-  }
-
-  @Override
-  public void onControllerChange(NotificationContext changeContext)
-  {
-    logger.info("START: GenericClusterController.onControllerChange()");
-    HelixDataAccessor accessor = changeContext.getManager().getHelixDataAccessor();
-
-    // double check if this controller is the leader
-    Builder keyBuilder = accessor.keyBuilder();
-    LiveInstance leader =
-        accessor.getProperty(keyBuilder.controllerLeader());
-    if (leader == null)
-    {
-      logger.warn("No controller exists for cluster:"
-          + changeContext.getManager().getClusterName());
-      return;
-    }
-    else
-    {
-      String leaderName = leader.getInstanceName();
-
-      String instanceName = changeContext.getManager().getInstanceName();
-      if (leaderName == null || !leaderName.equals(instanceName))
-      {
-        logger.warn("leader name does NOT match, my name: " + instanceName + ", leader: "
-            + leader);
-        return;
-      }
-    }
-
-    PauseSignal pauseSignal = accessor.getProperty(keyBuilder.pause());
-    if (pauseSignal != null)
-    {
-      _paused = true;
-      logger.info("controller is now paused");
-    }
-    else
-    {
-      if (_paused)
-      {
-        // it currently paused
-        logger.info("controller is now resumed");
-        _paused = false;
-        ClusterEvent event = new ClusterEvent("resume");
-        event.addAttribute("changeContext", changeContext);
-        event.addAttribute("helixmanager", changeContext.getManager());
-        event.addAttribute("eventData", pauseSignal);
-        handleEvent(event);
-      }
-      else
-      {
-        _paused = false;
-      }
-    }
-    logger.info("END: GenericClusterController.onControllerChange()");
-  }
-
-  /**
-   * Go through the list of liveinstances in the cluster, and add currentstateChange
-   * listener and Message listeners to them if they are newly added. For current state
-   * change, the observation is tied to the session id of each live instance.
-   *
-   */
-  protected void checkLiveInstancesObservation(List<LiveInstance> liveInstances,
-                                               NotificationContext changeContext)
-  {
-    for (LiveInstance instance : liveInstances)
-    {
-      String instanceName = instance.getId();
-      String clientSessionId = instance.getSessionId();
-      HelixManager manager = changeContext.getManager();
-
-      // _instanceCurrentStateChangeSubscriptionSessionIds contains all the sessionIds
-      // that we've added a currentState listener
-      if (!_instanceCurrentStateChangeSubscriptionSessionIds.contains(clientSessionId))
-      {
-        try
-        {
-          manager.addCurrentStateChangeListener(this, instanceName, clientSessionId);
-          _instanceCurrentStateChangeSubscriptionSessionIds.add(clientSessionId);
-          logger.info("Observing client session id: " + clientSessionId);
-        }
-        catch (Exception e)
-        {
-          logger.error("Exception adding current state and message listener for instance:"
-                           + instanceName,
-                       e);
-        }
-      }
-
-      // _instanceSubscriptionNames contains all the instanceNames that we've added a
-      // message listener
-      if (!_instanceSubscriptionNames.contains(instanceName))
-      {
-        try
-        {
-          logger.info("Adding message listener for " + instanceName);
-          manager.addMessageListener(this, instanceName);
-          _instanceSubscriptionNames.add(instanceName);
-        }
-        catch (Exception e)
-        {
-          logger.error("Exception adding message listener for instance:" + instanceName,
-                       e);
-        }
-      }
-
-      // TODO we need to remove currentState listeners and message listeners
-      // when a session or an instance no longer exists. This may happen
-      // in case of session expiry, participant rebound, participant goes and new
-      // participant comes
-
-      // TODO shi should call removeListener on the previous session id;
-      // but the removeListener with that functionality is not implemented yet
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/HelixControllerMain.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/HelixControllerMain.java b/helix-core/src/main/java/com/linkedin/helix/controller/HelixControllerMain.java
deleted file mode 100644
index 9ab8851..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/HelixControllerMain.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller;
-
-/**
- * start cluster manager controller
- * cluster manager controller has two modes:
- * 1) stand-alone mode: in this mode each controller gets a list of clusters
- *  and competes via leader election to become the controller for any of the clusters.
- *  if a controller fails to become the leader of a given cluster, it remains as a standby
- *  and re-does the leader election when the current leader fails
- *
- * 2) distributed mode: in this mode each controller first joins as participant into
- *   a special CONTROLLER_CLUSTER. Leader election happens in this special
- *   cluster. The one that becomes the leader controls all controllers (including itself
- *   to become leaders of other clusters.
- */
-
-import java.util.Arrays;
-
-import org.I0Itec.zkclient.exception.ZkInterruptedException;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.HelixManagerFactory;
-import com.linkedin.helix.InstanceType;
-import com.linkedin.helix.controller.restlet.ZKPropertyTransferServer;
-import com.linkedin.helix.participant.DistClusterControllerStateModelFactory;
-import com.linkedin.helix.participant.StateMachineEngine;
-
-public class HelixControllerMain
-{
-  public static final String zkServerAddress = "zkSvr";
-  public static final String cluster = "cluster";
-  public static final String help = "help";
-  public static final String mode = "mode";
-  public static final String propertyTransferServicePort = "propertyTransferPort";
-  public static final String name = "controllerName";
-  public static final String STANDALONE = "STANDALONE";
-  public static final String DISTRIBUTED = "DISTRIBUTED";
-  private static final Logger logger = Logger.getLogger(HelixControllerMain.class);
-
-  // hack: OptionalBuilder is not thread safe
-  @SuppressWarnings("static-access")
-  synchronized private static Options constructCommandLineOptions()
-  {
-    Option helpOption = OptionBuilder.withLongOpt(help)
-        .withDescription("Prints command-line options info").create();
-
-    Option zkServerOption = OptionBuilder.withLongOpt(zkServerAddress)
-        .withDescription("Provide zookeeper address").create();
-    zkServerOption.setArgs(1);
-    zkServerOption.setRequired(true);
-    zkServerOption.setArgName("ZookeeperServerAddress(Required)");
-
-    Option clusterOption = OptionBuilder.withLongOpt(cluster)
-        .withDescription("Provide cluster name").create();
-    clusterOption.setArgs(1);
-    clusterOption.setRequired(true);
-    clusterOption.setArgName("Cluster name (Required)");
-
-    Option modeOption = OptionBuilder
-        .withLongOpt(mode)
-        .withDescription(
-            "Provide cluster controller mode (Optional): STANDALONE (default) or DISTRIBUTED")
-        .create();
-    modeOption.setArgs(1);
-    modeOption.setRequired(false);
-    modeOption.setArgName("Cluster controller mode (Optional)");
-
-    Option controllerNameOption = OptionBuilder.withLongOpt(name)
-        .withDescription("Provide cluster controller name (Optional)").create();
-    controllerNameOption.setArgs(1);
-    controllerNameOption.setRequired(false);
-    controllerNameOption.setArgName("Cluster controller name (Optional)");
-    
-    Option portOption = OptionBuilder
-        .withLongOpt(propertyTransferServicePort)
-        .withDescription(
-            "Webservice port for ZkProperty controller transfer")
-        .create();
-    portOption.setArgs(1);
-    portOption.setRequired(false);
-    portOption.setArgName("Cluster controller property transfer port (Optional)");
-    
-    Options options = new Options();
-    options.addOption(helpOption);
-    options.addOption(zkServerOption);
-    options.addOption(clusterOption);
-    options.addOption(modeOption);
-    options.addOption(portOption);
-    options.addOption(controllerNameOption);
-
-    return options;
-  }
-
-  public static void printUsage(Options cliOptions)
-  {
-    HelpFormatter helpFormatter = new HelpFormatter();
-    helpFormatter.setWidth(1000);
-    helpFormatter.printHelp("java " + HelixControllerMain.class.getName(), cliOptions);
-  }
-
-  public static CommandLine processCommandLineArgs(String[] cliArgs) throws Exception
-  {
-    CommandLineParser cliParser = new GnuParser();
-    Options cliOptions = constructCommandLineOptions();
-
-    try
-    {
-      return cliParser.parse(cliOptions, cliArgs);
-    } catch (ParseException pe)
-    {
-      logger.error("fail to parse command-line options. cliArgs: " + Arrays.toString(cliArgs), pe);
-      printUsage(cliOptions);
-      System.exit(1);
-    }
-    return null;
-  }
-
-  public static void addListenersToController(HelixManager manager,
-      GenericHelixController controller)
-  {
-    try
-    {
-      manager.addConfigChangeListener(controller);
-      manager.addLiveInstanceChangeListener(controller);
-      manager.addIdealStateChangeListener(controller);
-      manager.addExternalViewChangeListener(controller);
-      manager.addControllerListener(controller);
-    } catch (ZkInterruptedException e)
-    {
-      logger
-          .warn("zk connection is interrupted during HelixManagerMain.addListenersToController(). "
-              + e);
-    } catch (Exception e)
-    {
-      logger.error("Error when creating HelixManagerContollerMonitor", e);
-    }
-  }
-
-  public static HelixManager startHelixController(final String zkConnectString,
-      final String clusterName, final String controllerName, final String controllerMode)
-  {
-    HelixManager manager = null;
-    try
-    {
-      if (controllerMode.equalsIgnoreCase(STANDALONE))
-      {
-        manager = HelixManagerFactory.getZKHelixManager(clusterName, controllerName,
-            InstanceType.CONTROLLER, zkConnectString);
-        manager.connect();
-      } else if (controllerMode.equalsIgnoreCase(DISTRIBUTED))
-      {
-        manager = HelixManagerFactory.getZKHelixManager(clusterName, controllerName,
-            InstanceType.CONTROLLER_PARTICIPANT, zkConnectString);
-
-        DistClusterControllerStateModelFactory stateModelFactory = new DistClusterControllerStateModelFactory(
-            zkConnectString);
-
-        // StateMachineEngine genericStateMachineHandler = new
-        // StateMachineEngine();
-        StateMachineEngine stateMach = manager.getStateMachineEngine();
-        stateMach.registerStateModelFactory("LeaderStandby", stateModelFactory);
-        // manager.getMessagingService().registerMessageHandlerFactory(MessageType.STATE_TRANSITION.toString(),
-        // genericStateMachineHandler);
-        manager.connect();
-      } else
-      {
-        logger.error("cluster controller mode:" + controllerMode + " NOT supported");
-        // throw new
-        // IllegalArgumentException("Unsupported cluster controller mode:" +
-        // controllerMode);
-      }
-    } catch (Exception e)
-    {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
-    }
-
-    return manager;
-  }
-
-  public static void main(String[] args) throws Exception
-  {
-    // read the config;
-    // check if the this process is the master wait indefinitely
-    // other approach is always process the events but when updating the zk
-    // check if this is master.
-    // This is difficult to get right
-    // get the clusters to manage
-    // for each cluster create a manager
-    // add the respective listeners for each manager
-    CommandLine cmd = processCommandLineArgs(args);
-    String zkConnectString = cmd.getOptionValue(zkServerAddress);
-    String clusterName = cmd.getOptionValue(cluster);
-    String controllerMode = STANDALONE;
-    String controllerName = null;
-    int propertyTransServicePort = -1;
-    
-    if (cmd.hasOption(mode))
-    {
-      controllerMode = cmd.getOptionValue(mode);
-    }
-    
-    if(cmd.hasOption(propertyTransferServicePort))
-    {
-        propertyTransServicePort = Integer.parseInt(cmd.getOptionValue(propertyTransferServicePort));
-    }
-    if (controllerMode.equalsIgnoreCase(DISTRIBUTED) && !cmd.hasOption(name))
-    {
-      throw new IllegalArgumentException(
-          "A unique cluster controller name is required in DISTRIBUTED mode");
-    }
-
-    controllerName = cmd.getOptionValue(name);
-
-    // Espresso_driver.py will consume this
-    logger.info("Cluster manager started, zkServer: " + zkConnectString + ", clusterName:"
-        + clusterName + ", controllerName:" + controllerName + ", mode:" + controllerMode);
-
-    if (propertyTransServicePort > 0)
-    {
-      ZKPropertyTransferServer.getInstance().init(propertyTransServicePort, zkConnectString);
-    }
-    
-    HelixManager manager = startHelixController(zkConnectString, clusterName, controllerName,
-        controllerMode);
-    try
-    {
-      Thread.currentThread().join();
-    } 
-    catch (InterruptedException e)
-    {
-      logger.info("controller:" + controllerName + ", " + Thread.currentThread().getName()
-          + " interrupted");
-    }
-    finally
-    {
-      manager.disconnect();
-      ZKPropertyTransferServer.getInstance().shutdown();
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/HierarchicalDataHolder.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/HierarchicalDataHolder.java b/helix-core/src/main/java/com/linkedin/helix/controller/HierarchicalDataHolder.java
deleted file mode 100644
index 4d9c217..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/HierarchicalDataHolder.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller;
-
-import java.io.FileFilter;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.data.Stat;
-
-import com.linkedin.helix.manager.zk.ZkClient;
-
-/**
- * Generic class that will read the data given the root path.
- * 
- * @author kgopalak
- * 
- */
-public class HierarchicalDataHolder<T>
-{
-  private static final Logger logger = Logger
-      .getLogger(HierarchicalDataHolder.class.getName());
-  AtomicReference<Node<T>> root;
-
-  /**
-   * currentVersion, gets updated when data is read from original source
-   */
-  AtomicLong currentVersion;
-  private final ZkClient _zkClient;
-  private final String _rootPath;
-  private final FileFilter _filter;
-
-  public HierarchicalDataHolder(ZkClient client, String rootPath,
-      FileFilter filter)
-  {
-    this._zkClient = client;
-    this._rootPath = rootPath;
-    this._filter = filter;
-    // Node<T> initialValue = new Node<T>();
-    root = new AtomicReference<HierarchicalDataHolder.Node<T>>();
-    currentVersion = new AtomicLong(1);
-    refreshData();
-  }
-
-  public long getVersion()
-  {
-    return currentVersion.get();
-  }
-
-  public boolean refreshData()
-  {
-    Node<T> newRoot = new Node<T>();
-    boolean dataChanged = refreshRecursively(root.get(), newRoot, _rootPath);
-    if (dataChanged)
-    {
-      currentVersion.getAndIncrement();
-      root.set(newRoot);
-      return true;
-    } else
-    {
-      return false;
-    }
-  }
-
-  // private void refreshRecursively(Node<T> oldRoot, Stat oldStat, Node<T>
-  // newRoot,Stat newStat, String path)
-  private boolean refreshRecursively(Node<T> oldRoot, Node<T> newRoot,
-      String path)
-  {
-    boolean dataChanged = false;
-    Stat newStat = _zkClient.getStat(path);
-    Stat oldStat = (oldRoot != null) ? oldRoot.stat : null;
-    newRoot.name = path;
-    if (newStat != null)
-    {
-      if (oldStat == null)
-      {
-        newRoot.stat = newStat;
-        newRoot.data = _zkClient.<T> readData(path, true);
-        dataChanged = true;
-      } else if (newStat.equals(oldStat))
-      {
-        newRoot.stat = oldStat;
-        newRoot.data = oldRoot.data;
-      } else
-      {
-        dataChanged = true;
-        newRoot.stat = newStat;
-        newRoot.data = _zkClient.<T> readData(path, true);
-      }
-      if (newStat.getNumChildren() > 0)
-      {
-        List<String> children = _zkClient.getChildren(path);
-        for (String child : children)
-        {
-          String newPath = path + "/" + child;
-          Node<T> oldChild = (oldRoot != null && oldRoot.children != null) ? oldRoot.children
-              .get(child) : null;
-          if (newRoot.children == null)
-          {
-            newRoot.children = new ConcurrentHashMap<String, HierarchicalDataHolder.Node<T>>();
-          }
-          if (!newRoot.children.contains(child))
-          {
-            newRoot.children.put(child, new Node<T>());
-          }
-          Node<T> newChild = newRoot.children.get(child);
-          boolean childChanged = refreshRecursively(oldChild, newChild, newPath);
-          dataChanged = dataChanged || childChanged;
-        }
-      }
-    } else
-    {
-      logger.info(path + " does not exist");
-    }
-    return dataChanged;
-  }
-
-  static class Node<T>
-  {
-    String name;
-    Stat stat;
-    T data;
-    ConcurrentHashMap<String, Node<T>> children;
-
-  }
-
-  public void print()
-  {
-    logger.info("START "+ _rootPath);
-    LinkedList<Node<T>> stack = new LinkedList<HierarchicalDataHolder.Node<T>>();
-    stack.push(root.get());
-    while (!stack.isEmpty())
-    {
-      Node<T> pop = stack.pop();
-      if (pop != null)
-      {
-        logger.info("name:"+ pop.name);
-        logger.info("\tdata:"+pop.data);
-        if (pop.children != null)
-        {
-          for (Node<T> child : pop.children.values())
-          {
-            stack.push(child);
-          }
-        }
-      }
-    }
-    logger.info("END "+ _rootPath);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/package-info.java b/helix-core/src/main/java/com/linkedin/helix/controller/package-info.java
deleted file mode 100644
index 54013e4..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/package-info.java
+++ /dev/null
@@ -1,4 +0,0 @@
-/**
- * Helix cluster controller
- */
-package com.linkedin.helix.controller;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/pipeline/AbstractBaseStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/pipeline/AbstractBaseStage.java b/helix-core/src/main/java/com/linkedin/helix/controller/pipeline/AbstractBaseStage.java
deleted file mode 100644
index 71b4a92..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/pipeline/AbstractBaseStage.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller.pipeline;
-
-import java.util.Map;
-
-import com.linkedin.helix.controller.stages.ClusterEvent;
-import com.linkedin.helix.monitoring.mbeans.HelixStageLatencyMonitor;
-
-public class AbstractBaseStage implements Stage
-{
-  @Override
-  public void init(StageContext context)
-  {
-
-  }
-
-  @Override
-  public void preProcess()
-  {
-    // TODO Auto-generated method stub
-
-  }
-
-  @Override
-  public void process(ClusterEvent event) throws Exception
-  {
-
-  }
-
-  @Override
-  public void postProcess()
-  {
-
-  }
-
-  @Override
-  public void release()
-  {
-
-  }
-
-  @Override
-  public String getStageName()
-  {
-    // default stage name will be the class name
-    String className = this.getClass().getName();
-    return className;
-  }
-
-  public void addLatencyToMonitor(ClusterEvent event, long latency)
-  {
-    Map<String, HelixStageLatencyMonitor> stgLatencyMonitorMap =
-        event.getAttribute("HelixStageLatencyMonitorMap");
-    if (stgLatencyMonitorMap != null)
-    {
-      if (stgLatencyMonitorMap.containsKey(getStageName()))
-      {
-        HelixStageLatencyMonitor stgLatencyMonitor =
-            stgLatencyMonitorMap.get(getStageName());
-        stgLatencyMonitor.addStgLatency(latency);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/pipeline/Pipeline.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/pipeline/Pipeline.java b/helix-core/src/main/java/com/linkedin/helix/controller/pipeline/Pipeline.java
deleted file mode 100644
index 63518b1..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/pipeline/Pipeline.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller.pipeline;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.controller.stages.ClusterEvent;
-
-public class Pipeline
-{
-  private static final Logger logger = Logger.getLogger(Pipeline.class.getName());
-  List<Stage> _stages;
-
-  public Pipeline()
-  {
-    _stages = new ArrayList<Stage>();
-  }
-
-  public void addStage(Stage stage)
-  {
-    _stages.add(stage);
-    StageContext context = null;
-    stage.init(context);
-  }
-
-  public void handle(ClusterEvent event) throws Exception
-  {
-    if (_stages == null)
-    {
-      return;
-    }
-    for (Stage stage : _stages)
-    {
-      stage.preProcess();
-      stage.process(event);
-      stage.postProcess();
-    }
-  }
-
-  public void finish()
-  {
-
-  }
-
-  public List<Stage> getStages()
-  {
-    return _stages;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/pipeline/PipelineRegistry.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/pipeline/PipelineRegistry.java b/helix-core/src/main/java/com/linkedin/helix/controller/pipeline/PipelineRegistry.java
deleted file mode 100644
index fea8cba..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/pipeline/PipelineRegistry.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller.pipeline;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class PipelineRegistry
-{
-  Map<String, List<Pipeline>> _map;
-
-  public PipelineRegistry()
-  {
-    _map = new HashMap<String, List<Pipeline>>();
-  }
-
-  public void register(String eventName, Pipeline... pipelines)
-  {
-    if (!_map.containsKey(eventName))
-    {
-      _map.put(eventName, new ArrayList<Pipeline>());
-    }
-    List<Pipeline> list = _map.get(eventName);
-    for (Pipeline pipeline : pipelines)
-    {
-      list.add(pipeline);
-    }
-  }
-
-  public List<Pipeline> getPipelinesForEvent(String eventName)
-  {
-    if (_map.containsKey(eventName))
-    {
-      return _map.get(eventName);
-    }
-    return Collections.emptyList();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/pipeline/Stage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/pipeline/Stage.java b/helix-core/src/main/java/com/linkedin/helix/controller/pipeline/Stage.java
deleted file mode 100644
index 153332a..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/pipeline/Stage.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller.pipeline;
-
-import com.linkedin.helix.controller.stages.ClusterEvent;
-
-/**
- * Logically independent unit in processing callbacks for cluster changes  
- *
- */
-public interface Stage
-{
-
-  /**
-   * Initialize a stage
-   * @param context
-   */
-  void init(StageContext context);
-
-  /**
-   * Called before process() on each callback
-   */
-  void preProcess();
-
-  /**
-   * Actual callback processing logic
-   * @param event
-   * @throws Exception
-   */
-  public void process(ClusterEvent event) throws Exception;
-
-  /**
-   * Called after process() on each callback
-   */
-  void postProcess();
-
-  /**
-   * Destruct a stage
-   */
-  void release();
-
-  /**
-   * Get the name of the stage
-   * @return
-   */
-  public String getStageName();
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/pipeline/StageContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/pipeline/StageContext.java b/helix-core/src/main/java/com/linkedin/helix/controller/pipeline/StageContext.java
deleted file mode 100644
index e400244..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/pipeline/StageContext.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller.pipeline;
-
-public class StageContext
-{
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/pipeline/StageException.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/pipeline/StageException.java b/helix-core/src/main/java/com/linkedin/helix/controller/pipeline/StageException.java
deleted file mode 100644
index 40aa0a0..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/pipeline/StageException.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller.pipeline;
-
-public class StageException extends Exception
-{
-
-  public StageException(String message)
-  {
-    super(message);
-  }
-  public StageException(String message,Exception e)
-  {
-    super(message,e);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/pipeline/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/pipeline/package-info.java b/helix-core/src/main/java/com/linkedin/helix/controller/pipeline/package-info.java
deleted file mode 100644
index 664d204..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/pipeline/package-info.java
+++ /dev/null
@@ -1,5 +0,0 @@
-/**
- * Helix controller pipeline classes that process cluster changes
- * 
- */
-package com.linkedin.helix.controller.pipeline;
\ No newline at end of file