You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:58 UTC

[29/42] Refactoring the package names and removing jsql parser

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/alerts/TestArrivingParticipantStats.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/alerts/TestArrivingParticipantStats.java b/helix-core/src/test/java/org/apache/helix/alerts/TestArrivingParticipantStats.java
new file mode 100644
index 0000000..010c440
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/alerts/TestArrivingParticipantStats.java
@@ -0,0 +1,487 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.alerts;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.Mocks.MockManager;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.alerts.StatsHolder;
+import org.apache.helix.alerts.Tuple;
+import org.apache.helix.controller.stages.HealthDataCache;
+import org.testng.AssertJUnit;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class TestArrivingParticipantStats
+{
+  protected static final String CLUSTER_NAME = "TestCluster";
+
+  MockManager                   _helixManager;
+  StatsHolder                   _statsHolder;
+
+  @BeforeMethod(groups = { "unitTest" })
+  public void setup()
+  {
+    _helixManager = new MockManager(CLUSTER_NAME);
+    _statsHolder = new StatsHolder(_helixManager, new HealthDataCache());
+  }
+
+  public Map<String, String> getStatFields(String value, String timestamp)
+  {
+    Map<String, String> statMap = new HashMap<String, String>();
+    statMap.put(StatsHolder.VALUE_NAME, value);
+    statMap.put(StatsHolder.TIMESTAMP_NAME, timestamp);
+    return statMap;
+  }
+
+  public boolean statRecordContains(ZNRecord rec, String statName)
+  {
+    Map<String, Map<String, String>> stats = rec.getMapFields();
+    return stats.containsKey(statName);
+  }
+
+  public boolean statRecordHasValue(ZNRecord rec, String statName, String value)
+  {
+    Map<String, Map<String, String>> stats = rec.getMapFields();
+    Map<String, String> statFields = stats.get(statName);
+    return (statFields.get(StatsHolder.VALUE_NAME).equals(value));
+  }
+
+  public boolean statRecordHasTimestamp(ZNRecord rec, String statName, String timestamp)
+  {
+    Map<String, Map<String, String>> stats = rec.getMapFields();
+    Map<String, String> statFields = stats.get(statName);
+    return (statFields.get(StatsHolder.TIMESTAMP_NAME).equals(timestamp));
+  }
+
+  // Exact matching persistent stat, but has no values yet
+  @Test(groups = { "unitTest" })
+  public void testAddFirstParticipantStat() throws Exception
+  {
+    // add a persistent stat
+    String persistentStat = "accumulate()(dbFoo.partition10.latency)";
+    _statsHolder.addStat(persistentStat);
+
+    // generate incoming stat
+    String incomingStatName = "dbFoo.partition10.latency";
+    Map<String, String> statFields = getStatFields("0", "0");
+    _statsHolder.applyStat(incomingStatName, statFields);
+    _statsHolder.persistStats();
+
+    // check persistent stats
+    HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
+    Builder keyBuilder = accessor.keyBuilder();
+
+    ZNRecord rec = accessor.getProperty(keyBuilder.persistantStat()).getRecord();
+
+    System.out.println("rec: " + rec.toString());
+    AssertJUnit.assertTrue(statRecordHasValue(rec, persistentStat, "0.0"));
+    AssertJUnit.assertTrue(statRecordHasTimestamp(rec, persistentStat, "0.0"));
+  }
+
+  // Exact matching persistent stat, but has no values yet
+  @Test(groups = { "unitTest" })
+  public void testAddRepeatParticipantStat() throws Exception
+  {
+    // add a persistent stat
+    String persistentStat = "accumulate()(dbFoo.partition10.latency)";
+    _statsHolder.addStat(persistentStat);
+
+    // generate incoming stat
+    String incomingStatName = "dbFoo.partition10.latency";
+    // apply stat once and then again
+    Map<String, String> statFields = getStatFields("0", "0");
+    _statsHolder.applyStat(incomingStatName, statFields);
+    statFields = getStatFields("1", "10");
+    _statsHolder.applyStat(incomingStatName, statFields);
+    _statsHolder.persistStats();
+
+    // check persistent stats
+    HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
+    Builder keyBuilder = accessor.keyBuilder();
+
+    ZNRecord rec = accessor.getProperty(keyBuilder.persistantStat()).getRecord();
+
+    System.out.println("rec: " + rec.toString());
+    AssertJUnit.assertTrue(statRecordHasValue(rec, persistentStat, "1.0"));
+    AssertJUnit.assertTrue(statRecordHasTimestamp(rec, persistentStat, "10.0"));
+  }
+
+  // test to ensure backdated stats not applied
+  @Test(groups = { "unitTest" })
+  public void testBackdatedParticipantStat() throws Exception
+  {
+    // add a persistent stat
+    String persistentStat = "accumulate()(dbFoo.partition10.latency)";
+    _statsHolder.addStat(persistentStat);
+
+    // generate incoming stat
+    String incomingStatName = "dbFoo.partition10.latency";
+    // apply stat once and then again
+    Map<String, String> statFields = getStatFields("0", "0");
+    _statsHolder.applyStat(incomingStatName, statFields);
+    statFields = getStatFields("1", "10");
+    _statsHolder.applyStat(incomingStatName, statFields);
+    statFields = getStatFields("5", "15");
+    _statsHolder.applyStat(incomingStatName, statFields);
+    statFields = getStatFields("1", "10");
+    _statsHolder.applyStat(incomingStatName, statFields);
+    _statsHolder.persistStats();
+
+    // check persistent stats
+    HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
+    Builder keyBuilder = accessor.keyBuilder();
+
+    ZNRecord rec = accessor.getProperty(keyBuilder.persistantStat()).getRecord();
+
+    System.out.println("rec: " + rec.toString());
+    AssertJUnit.assertTrue(statRecordHasValue(rec, persistentStat, "6.0"));
+    AssertJUnit.assertTrue(statRecordHasTimestamp(rec, persistentStat, "15.0"));
+  }
+
+  // Exact matching persistent stat, but has no values yet
+  @Test(groups = { "unitTest" })
+  public void testAddFirstParticipantStatToWildCard() throws Exception
+  {
+    // add a persistent stat
+    String persistentWildcardStat = "accumulate()(dbFoo.partition*.latency)";
+    _statsHolder.addStat(persistentWildcardStat);
+
+    // generate incoming stat
+    String incomingStatName = "dbFoo.partition10.latency";
+    Map<String, String> statFields = getStatFields("0", "0");
+    _statsHolder.applyStat(incomingStatName, statFields);
+    _statsHolder.persistStats();
+
+    // check persistent stats
+    HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
+    Builder keyBuilder = accessor.keyBuilder();
+
+    ZNRecord rec = accessor.getProperty(keyBuilder.persistantStat()).getRecord();
+
+    System.out.println("rec: " + rec.toString());
+    String persistentStat = "accumulate()(dbFoo.partition10.latency)";
+    AssertJUnit.assertTrue(statRecordHasValue(rec, persistentStat, "0.0"));
+    AssertJUnit.assertTrue(statRecordHasTimestamp(rec, persistentStat, "0.0"));
+  }
+
+  // test to add 2nd report to same stat
+  @Test(groups = { "unitTest" })
+  public void testAddSecondParticipantStatToWildCard() throws Exception
+  {
+    // add a persistent stat
+    String persistentWildcardStat = "accumulate()(dbFoo.partition*.latency)";
+    _statsHolder.addStat(persistentWildcardStat);
+
+    // generate incoming stat
+    String incomingStatName = "dbFoo.partition10.latency";
+    Map<String, String> statFields = getStatFields("1", "0");
+    _statsHolder.applyStat(incomingStatName, statFields);
+    statFields = getStatFields("1", "10");
+    _statsHolder.applyStat(incomingStatName, statFields);
+    _statsHolder.persistStats();
+
+    // check persistent stats
+    HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
+    Builder keyBuilder = accessor.keyBuilder();
+
+    ZNRecord rec = accessor.getProperty(keyBuilder.persistantStat()).getRecord();
+
+    System.out.println("rec: " + rec.toString());
+    String persistentStat = "accumulate()(dbFoo.partition10.latency)";
+    AssertJUnit.assertTrue(statRecordHasValue(rec, persistentStat, "2.0"));
+    AssertJUnit.assertTrue(statRecordHasTimestamp(rec, persistentStat, "10.0"));
+  }
+
+  // Exact matching persistent stat, but has no values yet
+  @Test(groups = { "unitTest" })
+  public void testAddParticipantStatToDoubleWildCard() throws Exception
+  {
+    // add a persistent stat
+    String persistentWildcardStat = "accumulate()(db*.partition*.latency)";
+    _statsHolder.addStat(persistentWildcardStat);
+
+    // generate incoming stat
+    String incomingStatName = "dbFoo.partition10.latency";
+    Map<String, String> statFields = getStatFields("0", "0");
+    _statsHolder.applyStat(incomingStatName, statFields);
+    _statsHolder.persistStats();
+
+    // check persistent stats
+    HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
+    Builder keyBuilder = accessor.keyBuilder();
+
+    ZNRecord rec = accessor.getProperty(keyBuilder.persistantStat()).getRecord();
+
+    System.out.println("rec: " + rec.toString());
+    String persistentStat = "accumulate()(dbFoo.partition10.latency)";
+    AssertJUnit.assertTrue(statRecordHasValue(rec, persistentStat, "0.0"));
+    AssertJUnit.assertTrue(statRecordHasTimestamp(rec, persistentStat, "0.0"));
+  }
+
+  @Test(groups = { "unitTest" })
+  public void testAddWildcardInFirstStatToken() throws Exception
+  {
+    String persistentWildcardStat = "accumulate()(instance*.reportingage)";
+    _statsHolder.addStat(persistentWildcardStat);
+
+    // generate incoming stat
+    String incomingStatName = "instance10.reportingage";
+    Map<String, String> statFields = getStatFields("1", "10");
+    _statsHolder.applyStat(incomingStatName, statFields);
+    _statsHolder.persistStats();
+
+    // check persistent stats
+    HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
+    Builder keyBuilder = accessor.keyBuilder();
+
+    ZNRecord rec = accessor.getProperty(keyBuilder.persistantStat()).getRecord();
+
+    System.out.println("rec: " + rec.toString());
+    String persistentStat = "accumulate()(instance10.reportingage)";
+    AssertJUnit.assertTrue(statRecordHasValue(rec, persistentStat, "1.0"));
+    AssertJUnit.assertTrue(statRecordHasTimestamp(rec, persistentStat, "10.0"));
+
+  }
+
+  // test to add report to same wildcard stat, different actual stat
+  @Test(groups = { "unitTest" })
+  public void testAddTwoDistinctParticipantStatsToSameWildCard() throws Exception
+  {
+    // add a persistent stat
+    String persistentWildcardStat = "accumulate()(dbFoo.partition*.latency)";
+    _statsHolder.addStat(persistentWildcardStat);
+
+    // generate incoming stat
+    String incomingStatName = "dbFoo.partition10.latency";
+    Map<String, String> statFields = getStatFields("1", "10");
+    _statsHolder.applyStat(incomingStatName, statFields);
+    incomingStatName = "dbFoo.partition11.latency";
+    statFields = getStatFields("5", "10");
+    _statsHolder.applyStat(incomingStatName, statFields);
+    _statsHolder.persistStats();
+
+    // check persistent stats
+    HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
+    Builder keyBuilder = accessor.keyBuilder();
+
+    ZNRecord rec = accessor.getProperty(keyBuilder.persistantStat()).getRecord();
+
+    System.out.println("rec: " + rec.toString());
+    String persistentStat = "accumulate()(dbFoo.partition10.latency)";
+    AssertJUnit.assertTrue(statRecordHasValue(rec, persistentStat, "1.0"));
+    AssertJUnit.assertTrue(statRecordHasTimestamp(rec, persistentStat, "10.0"));
+    persistentStat = "accumulate()(dbFoo.partition11.latency)";
+    AssertJUnit.assertTrue(statRecordHasValue(rec, persistentStat, "5.0"));
+    AssertJUnit.assertTrue(statRecordHasTimestamp(rec, persistentStat, "10.0"));
+  }
+
+  // Exact matching persistent stat, but has no values yet
+  @Test(groups = { "unitTest" })
+  public void testWindowStat() throws Exception
+  {
+    // add a persistent stat
+    String persistentWildcardStat = "window(3)(dbFoo.partition*.latency)";
+    _statsHolder.addStat(persistentWildcardStat);
+
+    // generate incoming stat
+    String incomingStatName = "dbFoo.partition10.latency";
+    Map<String, String> statFields = getStatFields("0", "0");
+    _statsHolder.applyStat(incomingStatName, statFields);
+    _statsHolder.persistStats();
+
+    // check persistent stats
+    HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
+    Builder keyBuilder = accessor.keyBuilder();
+
+    ZNRecord rec = accessor.getProperty(keyBuilder.persistantStat()).getRecord();
+
+    System.out.println("rec: " + rec.toString());
+    String persistentStat = "window(3)(dbFoo.partition10.latency)";
+    AssertJUnit.assertTrue(statRecordHasValue(rec, persistentStat, "0.0"));
+    AssertJUnit.assertTrue(statRecordHasTimestamp(rec, persistentStat, "0.0"));
+
+    // add 2nd stat
+    statFields = getStatFields("10", "1");
+    _statsHolder.applyStat(incomingStatName, statFields);
+    _statsHolder.persistStats();
+
+    rec = accessor.getProperty(keyBuilder.persistantStat()).getRecord();
+
+    System.out.println("rec: " + rec.toString());
+    AssertJUnit.assertTrue(statRecordHasValue(rec, persistentStat, "0.0,10.0"));
+    AssertJUnit.assertTrue(statRecordHasTimestamp(rec, persistentStat, "0.0,1.0"));
+
+    // add 3rd stat
+    statFields = getStatFields("20", "2");
+    _statsHolder.applyStat(incomingStatName, statFields);
+    _statsHolder.persistStats();
+    
+    rec = accessor.getProperty(keyBuilder.persistantStat()).getRecord();
+
+    System.out.println("rec: " + rec.toString());
+    AssertJUnit.assertTrue(statRecordHasValue(rec, persistentStat, "0.0,10.0,20.0"));
+    AssertJUnit.assertTrue(statRecordHasTimestamp(rec, persistentStat, "0.0,1.0,2.0"));
+
+  }
+
+  @Test(groups = { "unitTest" })
+  public void testWindowStatExpiration() throws Exception
+  {
+    String persistentWildcardStat = "window(3)(dbFoo.partition*.latency)";
+    String persistentStat = "window(3)(dbFoo.partition10.latency)";
+    // init with 3 elements
+    testWindowStat();
+
+    String incomingStatName = "dbFoo.partition10.latency";
+    Map<String, String> statFields = getStatFields("30", "3");
+    _statsHolder.applyStat(incomingStatName, statFields);
+    _statsHolder.persistStats();
+
+    HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
+    Builder keyBuilder = accessor.keyBuilder();
+
+    ZNRecord rec = accessor.getProperty(keyBuilder.persistantStat()).getRecord();
+
+    System.out.println("rec: " + rec.toString());
+    AssertJUnit.assertTrue(statRecordHasValue(rec, persistentStat, "10.0,20.0,30.0"));
+    AssertJUnit.assertTrue(statRecordHasTimestamp(rec, persistentStat, "1.0,2.0,3.0"));
+  }
+
+  @Test(groups = { "unitTest" })
+  public void testWindowStatStale() throws Exception
+  {
+    String persistentWildcardStat = "window(3)(dbFoo.partition*.latency)";
+    String persistentStat = "window(3)(dbFoo.partition10.latency)";
+    // init with 3 elements
+    testWindowStat();
+
+    String incomingStatName = "dbFoo.partition10.latency";
+    Map<String, String> statFields = getStatFields("10", "1");
+    _statsHolder.applyStat(incomingStatName, statFields);
+    _statsHolder.persistStats();
+
+    HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
+    Builder keyBuilder = accessor.keyBuilder();
+
+    ZNRecord rec = accessor.getProperty(keyBuilder.persistantStat()).getRecord();
+
+    System.out.println("rec: " + rec.toString());
+    AssertJUnit.assertTrue(statRecordHasValue(rec, persistentStat, "0.0,10.0,20.0"));
+    AssertJUnit.assertTrue(statRecordHasTimestamp(rec, persistentStat, "0.0,1.0,2.0"));
+  }
+
+  // test that has 2 agg stats for same raw stat
+  // Exact matching persistent stat, but has no values yet
+  @Test(groups = { "unitTest" })
+  public void testAddStatForTwoAggTypes() throws Exception
+  {
+    // add a persistent stat
+    String persistentStatOne = "accumulate()(dbFoo.partition10.latency)";
+    String persistentStatTwo = "window(3)(dbFoo.partition10.latency)";
+    _statsHolder.addStat(persistentStatOne);
+    _statsHolder.persistStats();
+    _statsHolder.addStat(persistentStatTwo);
+    _statsHolder.persistStats();
+
+    // generate incoming stat
+    String incomingStatName = "dbFoo.partition10.latency";
+    Map<String, String> statFields = getStatFields("0", "0");
+    _statsHolder.applyStat(incomingStatName, statFields);
+    _statsHolder.persistStats();
+
+    // check persistent stats
+    HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
+    Builder keyBuilder = accessor.keyBuilder();
+
+    ZNRecord rec = accessor.getProperty(keyBuilder.persistantStat()).getRecord();
+
+    System.out.println("rec: " + rec.toString());
+    AssertJUnit.assertTrue(statRecordHasValue(rec, persistentStatOne, "0.0"));
+    AssertJUnit.assertTrue(statRecordHasTimestamp(rec, persistentStatOne, "0.0"));
+    AssertJUnit.assertTrue(statRecordHasValue(rec, persistentStatTwo, "0.0"));
+    AssertJUnit.assertTrue(statRecordHasTimestamp(rec, persistentStatTwo, "0.0"));
+  }
+
+  // test merging 2 window stats, new is applied
+  @Test(groups = { "unitTest" })
+  public void testMergeTwoWindowsYesMerge() throws Exception
+  {
+    String persistentWildcardStat = "window(3)(dbFoo.partition*.latency)";
+    String persistentStat = "window(3)(dbFoo.partition10.latency)";
+    String incomingStatName = "dbFoo.partition10.latency";
+    // init with 3 elements
+    testWindowStat();
+
+    // create a two tuples, value and time
+    Tuple<String> valTuple = new Tuple<String>();
+    Tuple<String> timeTuple = new Tuple<String>();
+    valTuple.add("30.0");
+    valTuple.add("40.0");
+    timeTuple.add("3.0");
+    timeTuple.add("4.0");
+    Map<String, String> statFields =
+        getStatFields(valTuple.toString(), timeTuple.toString());
+    _statsHolder.applyStat(incomingStatName, statFields);
+    _statsHolder.persistStats();
+
+    // check persistent stats
+    HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
+    Builder keyBuilder = accessor.keyBuilder();
+
+    ZNRecord rec = accessor.getProperty(keyBuilder.persistantStat()).getRecord();
+    System.out.println("rec: " + rec.toString());
+    AssertJUnit.assertTrue(statRecordHasValue(rec, persistentStat, "20.0,30.0,40.0"));
+    AssertJUnit.assertTrue(statRecordHasTimestamp(rec, persistentStat, "2.0,3.0,4.0"));
+  }
+
+  // test merging 2 window stats, new is ignored
+  @Test(groups = { "unitTest" })
+  public void testMergeTwoWindowsNoMerge() throws Exception
+  {
+    String persistentWildcardStat = "window(3)(dbFoo.partition*.latency)";
+    String persistentStat = "window(3)(dbFoo.partition10.latency)";
+    String incomingStatName = "dbFoo.partition10.latency";
+    // init with 3 elements
+    testWindowStat();
+
+    // create a two tuples, value and time
+    Tuple<String> valTuple = new Tuple<String>();
+    Tuple<String> timeTuple = new Tuple<String>();
+    valTuple.add("0.0");
+    valTuple.add("40.0");
+    timeTuple.add("0.0");
+    timeTuple.add("4.0");
+    Map<String, String> statFields =
+        getStatFields(valTuple.toString(), timeTuple.toString());
+    _statsHolder.applyStat(incomingStatName, statFields);
+    _statsHolder.persistStats();
+
+    // check persistent stats
+    HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
+    Builder keyBuilder = accessor.keyBuilder();
+
+    ZNRecord rec = accessor.getProperty(keyBuilder.persistantStat()).getRecord();
+    System.out.println("rec: " + rec.toString());
+    AssertJUnit.assertTrue(statRecordHasValue(rec, persistentStat, "0.0,10.0,20.0"));
+    AssertJUnit.assertTrue(statRecordHasTimestamp(rec, persistentStat, "0.0,1.0,2.0"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/alerts/TestBaseStatsValidation.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/alerts/TestBaseStatsValidation.java b/helix-core/src/test/java/org/apache/helix/alerts/TestBaseStatsValidation.java
new file mode 100644
index 0000000..01d232e
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/alerts/TestBaseStatsValidation.java
@@ -0,0 +1,181 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.alerts;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.alerts.ExpressionOperatorType;
+import org.apache.helix.alerts.ExpressionParser;
+import org.testng.annotations.Test;
+import org.testng.AssertJUnit;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+@Test
+public class TestBaseStatsValidation {
+	
+	 @Test
+	  public void testParseSingletonExpression()
+	  {
+	    String[] actual = null;
+	    
+	    String statName = "window(5)(dbFoo.partition10.latency)";
+	    try {
+			actual = ExpressionParser.getBaseStats(statName);
+		} catch (HelixException e) {		
+			e.printStackTrace();
+		}
+	    AssertJUnit.assertEquals(statName, actual[0]);
+	  }
+	 
+	 @Test
+	  public void testExtraParen()
+	  {
+	    String[] actual = null;
+	    
+	    String statName = "window(5)(dbFoo.partition10.latency)()";
+	    boolean caughtException = false;
+	    try {
+			actual = ExpressionParser.getBaseStats(statName);
+		} catch (HelixException e) {		
+			caughtException = true;
+			//e.printStackTrace();
+		}
+	    AssertJUnit.assertEquals(true, caughtException);
+	  }
+	 
+	 
+	 @Test
+	  public void testParseSingletonWildcardExpression()
+	  {
+	    String[] actual = null;
+	    
+	    String statName = "accumulate()(dbFoo.partition*.latency)";
+	    try {
+			actual = ExpressionParser.getBaseStats(statName);
+		} catch (HelixException e) {		
+			e.printStackTrace();
+		}
+	    AssertJUnit.assertEquals(statName, actual[0]);
+	  }
+	 
+	 @Test
+	  public void testParsePairOfExpressions()
+	  {
+	    String[] actual = null;
+	    
+	    String expression = "accumulate()(dbFoo.partition10.latency, dbFoo.partition10.count)";
+	    try {
+			actual = ExpressionParser.getBaseStats(expression);
+		} catch (HelixException e) {		
+			e.printStackTrace();
+		}
+	    AssertJUnit.assertEquals("accumulate()(dbFoo.partition10.latency)",actual[0]);
+	    AssertJUnit.assertEquals("accumulate()(dbFoo.partition10.count)",actual[1]);
+	  }
+	 
+
+	/*
+	 * SUM is not to be persisted, so pull out the pieces
+	 */
+	 @Test
+	  public void testSUMExpression()
+	  {
+	    String[] actual = null;
+	    
+	    String expression = "accumulate()(dbFoo.partition*.latency)|SUM";
+	    try {
+			actual = ExpressionParser.getBaseStats(expression);
+		} catch (HelixException e) {		
+			e.printStackTrace();
+		}
+	    AssertJUnit.assertEquals("accumulate()(dbFoo.partition*.latency)", actual[0]);
+	  }
+	 
+	 @Test
+	  public void testSumPairExpression()
+	  {
+	    String[] actual = null;
+	    
+	    String expression = "window(5)(dbFoo.partition10.latency, dbFoo.partition11.latency)|SUM";
+	    try {
+			actual = ExpressionParser.getBaseStats(expression);
+		} catch (HelixException e) {		
+			e.printStackTrace();
+		}
+	    AssertJUnit.assertEquals("window(5)(dbFoo.partition10.latency)", actual[0]);
+	    AssertJUnit.assertEquals("window(5)(dbFoo.partition11.latency)", actual[1]);
+	  }
+	 
+	 @Test
+	  public void testEachPairExpression()
+	  {
+	    String[] actual = null;
+	    
+	    String expression = "accumulate()(dbFoo.partition*.latency, dbFoo.partition*.count)|EACH";
+	    try {
+			actual = ExpressionParser.getBaseStats(expression);
+		} catch (HelixException e) {		
+			e.printStackTrace();
+		}
+	    AssertJUnit.assertEquals("accumulate()(dbFoo.partition*.latency)", actual[0]);
+	    AssertJUnit.assertEquals("accumulate()(dbFoo.partition*.count)", actual[1]);
+	  }
+	 
+	 @Test
+	  public void testAccumulateExpression()
+	  {
+	    String[] actual = null;
+	    
+	    String expression = "accumulate()(dbFoo.partition10.latency)|ACCUMULATE";
+	    try {
+			actual = ExpressionParser.getBaseStats(expression);
+		} catch (HelixException e) {		
+			e.printStackTrace();
+		}
+	    AssertJUnit.assertEquals("accumulate()(dbFoo.partition10.latency)", actual[0]);
+	  }
+	 
+	 @Test
+	  public void testAccumulateEachExpression()
+	  {
+	    String[] actual = null;
+	    
+	    String expression = "window(5)(dbFoo.partition*.latency)|EACH|ACCUMULATE";
+	    try {
+			actual = ExpressionParser.getBaseStats(expression);
+		} catch (HelixException e) {		
+			e.printStackTrace();
+		}
+	    AssertJUnit.assertEquals("window(5)(dbFoo.partition*.latency)", actual[0]);
+	  }
+
+	 @Test
+	  public void testAccumulateEachPairExpression()
+	  {
+	    String[] actual = null;
+	    
+	    String expression = "accumulate()(dbFoo.partition*.latency, dbFoo.partition*.count)|EACH|ACCUMULATE|DIVIDE";
+	    try {
+			actual = ExpressionParser.getBaseStats(expression);
+		} catch (HelixException e) {		
+			e.printStackTrace();
+		}
+	    AssertJUnit.assertEquals("accumulate()(dbFoo.partition*.latency)", actual[0]);
+	    AssertJUnit.assertEquals("accumulate()(dbFoo.partition*.count)", actual[1]);
+	  }
+	 
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/alerts/TestEvaluateAlerts.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/alerts/TestEvaluateAlerts.java b/helix-core/src/test/java/org/apache/helix/alerts/TestEvaluateAlerts.java
new file mode 100644
index 0000000..c83c2fc
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/alerts/TestEvaluateAlerts.java
@@ -0,0 +1,391 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.alerts;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.Mocks.MockManager;
+import org.apache.helix.alerts.AlertParser;
+import org.apache.helix.alerts.AlertProcessor;
+import org.apache.helix.alerts.AlertValueAndStatus;
+import org.apache.helix.alerts.AlertsHolder;
+import org.apache.helix.alerts.StatsHolder;
+import org.apache.helix.controller.stages.HealthDataCache;
+import org.testng.AssertJUnit;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class TestEvaluateAlerts {
+  protected static final String CLUSTER_NAME = "TestCluster";
+
+  MockManager _helixManager;
+  AlertsHolder _alertsHolder;
+  StatsHolder _statsHolder;
+
+  public final String EXP = AlertParser.EXPRESSION_NAME;
+  public final String CMP = AlertParser.COMPARATOR_NAME;
+  public final String CON = AlertParser.CONSTANT_NAME;
+
+  @BeforeMethod (groups = {"unitTest"})
+  public void setup()
+  {
+    HealthDataCache cache = new HealthDataCache();
+    _helixManager = new MockManager(CLUSTER_NAME);
+    _alertsHolder = new AlertsHolder(_helixManager, cache);
+    _statsHolder = _alertsHolder._statsHolder;
+  }
+
+  public Map<String,String> getStatFields(String value, String timestamp)
+  {
+    Map<String, String> statMap = new HashMap<String,String>();
+    statMap.put(StatsHolder.VALUE_NAME, value);
+    statMap.put(StatsHolder.TIMESTAMP_NAME, timestamp);
+    return statMap;
+  }
+
+  public String getSimpleStat() throws HelixException
+  {
+    String stat = "accumulate()(dbFoo.partition10.latency)";
+    //_statsHolder.addStat(stat);
+    return stat;
+  }
+
+  public String addPairOfStats() throws HelixException
+  {
+    String stat = "accumulate()(dbFoo.partition10.latency, dbFoo.partition11.latency)";
+    _statsHolder.addStat(stat);
+    _statsHolder.persistStats();
+    return stat;
+  }
+
+  public String getWildcardStat() throws HelixException
+  {
+    String stat = "accumulate()(dbFoo.partition*.latency)";
+    //_statsHolder.addStat(stat);
+    return stat;
+  }
+
+  public String addSimpleAlert() throws HelixException
+  {
+    String alert = EXP + "(accumulate()(dbFoo.partition10.latency))"
+        + CMP + "(GREATER)" + CON + "(100)";
+       _alertsHolder.addAlert(alert);
+       return alert;
+  }
+
+  public String addWildcardAlert() throws HelixException
+  {
+    String alert = EXP + "(accumulate()(dbFoo.partition*.latency))"
+        + CMP + "(GREATER)" + CON + "(100)";
+       _alertsHolder.addAlert(alert);
+       return alert;
+  }
+
+  public String addTwoWildcardAlert() throws HelixException
+  {
+    String alert = EXP + "(accumulate()(dbFoo.partition*.put*))"
+        + CMP + "(GREATER)" + CON + "(100)";
+       _alertsHolder.addAlert(alert);
+       return alert;
+  }
+
+
+  public String addExpandWildcardAlert() throws HelixException
+  {
+    String alert = EXP + "(accumulate()(dbFoo.partition*.latency)|EXPAND)"
+        + CMP + "(GREATER)" + CON + "(100)";
+       _alertsHolder.addAlert(alert);
+       return alert;
+  }
+
+  public String addExpandSumAlert() throws HelixException
+  {
+    String alert = EXP + "(accumulate()(dbFoo.partition10.latency,dbFoo.partition11.latency)|EXPAND|SUM)"
+        + CMP + "(GREATER)" + CON + "(100)";
+       _alertsHolder.addAlert(alert);
+       return alert;
+  }
+
+  public String addExpandSumWildcardAlert() throws HelixException
+  {
+    String alert = EXP + "(accumulate()(dbFoo.partition*.success,dbFoo.partition*.failure)|EXPAND|SUM)"
+        + CMP + "(GREATER)" + CON + "(100)";
+       _alertsHolder.addAlert(alert);
+       return alert;
+  }
+
+  public String addExpandSumEachWildcardAlert() throws HelixException
+  {
+    String alert = EXP + "(accumulate()(dbFoo.partition*.success,dbFoo.partition*.failure)|EXPAND|SUMEACH)"
+        + CMP + "(GREATER)" + CON + "(100)";
+       _alertsHolder.addAlert(alert);
+       return alert;
+  }
+
+  public String addExpandSumEachSumWildcardAlert() throws HelixException
+  {
+    String alert = EXP + "(accumulate()(dbFoo.partition*.success,dbFoo.partition*.failure)|EXPAND|SUMEACH|SUM)"
+        + CMP + "(GREATER)" + CON + "(100)";
+       _alertsHolder.addAlert(alert);
+       return alert;
+  }
+
+  public String addArrivingSimpleStat() throws HelixException
+  {
+    _statsHolder.refreshStats();
+    String incomingStatName = "dbFoo.partition10.latency";
+    Map<String, String> statFields = getStatFields("110","0");
+    _statsHolder.applyStat(incomingStatName, statFields);
+    _statsHolder.persistStats();
+    return incomingStatName;
+  }
+
+  public String addArrivingPairOfStats() throws HelixException
+  {
+    _statsHolder.refreshStats();
+    String incomingStatName1 = "dbFoo.partition10.latency";
+    String incomingStatName2 = "dbFoo.partition11.latency";
+    Map<String, String> statFields = getStatFields("50","0");
+    _statsHolder.applyStat(incomingStatName1, statFields);
+    statFields = getStatFields("51","0");
+    _statsHolder.applyStat(incomingStatName2, statFields);
+    _statsHolder.persistStats();
+    return null;
+  }
+
+  @Test (groups = {"unitTest"})
+  public void testSimpleAlertFires()
+  {
+    String alert = addSimpleAlert();
+    String stat = AlertParser.getComponent(AlertParser.EXPRESSION_NAME, alert);
+    _statsHolder.refreshStats(); //need to refresh since not triggered by stats aggregation stage
+    addArrivingSimpleStat();
+    Map<String, Map<String, AlertValueAndStatus>> alertResult = AlertProcessor.executeAllAlerts(_alertsHolder.getAlertList(), _statsHolder.getStatsList());
+    boolean alertFired = alertResult.get(alert).get(AlertProcessor.noWildcardAlertKey).isFired();
+     AssertJUnit.assertTrue(alertFired);
+  }
+
+  @Test (groups = {"unitTest"})
+  public void testSimpleAlertNoStatArrivesFires()
+  {
+    String alert = addSimpleAlert();
+    String stat = AlertParser.getComponent(AlertParser.EXPRESSION_NAME, alert);
+    Map<String, Map<String, AlertValueAndStatus>> alertResult = AlertProcessor.executeAllAlerts(_alertsHolder.getAlertList(), _statsHolder.getStatsList());
+    AssertJUnit.assertEquals(null, alertResult.get(AlertProcessor.noWildcardAlertKey));
+  }
+
+  @Test (groups = {"unitTest"})
+  public void testWildcardAlertFires()
+  {
+    String alert = addWildcardAlert();
+    String stat = AlertParser.getComponent(AlertParser.EXPRESSION_NAME, alert);
+    String incomingStatName = addArrivingSimpleStat();
+
+    Map<String, Map<String, AlertValueAndStatus>> alertResult = AlertProcessor.executeAllAlerts(_alertsHolder.getAlertList(), _statsHolder.getStatsList());
+    String wildcardBinding = incomingStatName;
+    boolean alertFired = alertResult.get(alert).get(wildcardBinding).isFired();
+    AssertJUnit.assertTrue(alertFired);
+  }
+
+  @Test (groups = {"unitTest"})
+  public void testExpandOperatorWildcardAlertFires()
+  {
+    String alert = addExpandWildcardAlert();
+    String stat = AlertParser.getComponent(AlertParser.EXPRESSION_NAME, alert);
+    String incomingStatName = addArrivingSimpleStat();
+    Map<String, Map<String, AlertValueAndStatus>> alertResult = AlertProcessor.executeAllAlerts(_alertsHolder.getAlertList(), _statsHolder.getStatsList());
+    String wildcardBinding = incomingStatName;
+    boolean alertFired = alertResult.get(alert).get(wildcardBinding).isFired();
+    AssertJUnit.assertTrue(alertFired);
+  }
+
+  @Test (groups = {"unitTest"})
+  public void testExpandSumOperatorAlertFires()
+  {
+    String alert = addExpandSumAlert();
+    String stat = AlertParser.getComponent(AlertParser.EXPRESSION_NAME, alert);
+    addArrivingPairOfStats();
+    Map<String, Map<String, AlertValueAndStatus>> alertResult = AlertProcessor.executeAllAlerts(_alertsHolder.getAlertList(), _statsHolder.getStatsList());
+    boolean alertFired = alertResult.get(alert).get(AlertProcessor.noWildcardAlertKey).isFired();
+    AssertJUnit.assertTrue(alertFired);
+  }
+/**
+ *
+ * We need to re-decide how to support the feature to specify more than one stats in
+ * an alert.
+ *
+ * Probabaly instead of
+ * "(dbFoo.partition*.success,dbFoo.partition*.failure)", use the form
+ * "(dbFoo.partition*.(success, failure))" as it seems that the stat source is always the
+ * same.
+ *
+  //@Test (groups = {"unitTest"})
+  public void testExpandSumOperatorWildcardAlert()
+  {
+    String alert = addExpandSumWildcardAlert();
+    String stat = AlertParser.getComponent(AlertParser.EXPRESSION_NAME, alert);
+    String part10SuccStat = "dbFoo.partition10.success";
+    String part10FailStat = "dbFoo.partition10.failure";
+    String part11SuccStat = "dbFoo.partition11.success";
+    String part11FailStat = "dbFoo.partition11.failure";
+
+
+    Map<String, String> statFields = getStatFields("50","0");
+    _statsHolder.applyStat(part10SuccStat, statFields);
+    statFields = getStatFields("51","0");
+    _statsHolder.applyStat(part10FailStat, statFields);
+    statFields = getStatFields("50","0");
+    _statsHolder.applyStat(part11SuccStat, statFields);
+    statFields = getStatFields("49","0");
+    _statsHolder.applyStat(part11FailStat, statFields);
+    Map<String, Map<String, AlertValueAndStatus>> alertResult = AlertProcessor.executeAllAlerts(_alertsHolder.getAlertList(), _statsHolder.getStatsList());
+    boolean alertFired = alertResult.get(alert).get("10").isFired(); //10 should fire
+    AssertJUnit.assertTrue(alertFired);
+    alertFired = alertResult.get(alert).get("11").isFired(); //11 should not fire
+    AssertJUnit.assertFalse(alertFired);
+  }
+
+  //@Test (groups = {"unitTest"})
+  public void testExpandSumEachSumOperatorWildcardAlert()
+  {
+    String alert = addExpandSumEachSumWildcardAlert();
+    String stat = AlertParser.getComponent(AlertParser.EXPRESSION_NAME, alert);
+    String part10SuccStat = "dbFoo.partition10.success";
+    String part10FailStat = "dbFoo.partition10.failure";
+    String part11SuccStat = "dbFoo.partition11.success";
+    String part11FailStat = "dbFoo.partition11.failure";
+
+
+    Map<String, String> statFields = getStatFields("50","0");
+    _statsHolder.applyStat(part10SuccStat, statFields);
+    statFields = getStatFields("51","0");
+    _statsHolder.applyStat(part10FailStat, statFields);
+    statFields = getStatFields("50","0");
+    _statsHolder.applyStat(part11SuccStat, statFields);
+    statFields = getStatFields("49","0");
+    _statsHolder.applyStat(part11FailStat, statFields);
+    Map<String, Map<String, AlertValueAndStatus>> alertResult = AlertProcessor.executeAllAlerts(_alertsHolder.getAlertList(), _statsHolder.getStatsList());
+    boolean alertFired = alertResult.get(alert).get(_statsHolder.getStatsList().get(0)).isFired(); //10 should fire
+    AssertJUnit.assertTrue(alertFired);
+  }
+
+  //@Test (groups = {"unitTest"})
+  public void testTwoAlerts()
+  {
+    //alert 1
+    String alert1 = addSimpleAlert();
+    String stat = AlertParser.getComponent(AlertParser.EXPRESSION_NAME, alert1);
+    addArrivingSimpleStat();
+
+    //alert 2
+    String alert2 = addExpandSumWildcardAlert();
+    stat = AlertParser.getComponent(AlertParser.EXPRESSION_NAME, alert2);
+    String part10SuccStat = "dbFoo.partition10.success";
+    String part10FailStat = "dbFoo.partition10.failure";
+    String part11SuccStat = "dbFoo.partition11.success";
+    String part11FailStat = "dbFoo.partition11.failure";
+
+
+    Map<String, String> statFields = getStatFields("50","0");
+    _statsHolder.applyStat(part10SuccStat, statFields);
+    statFields = getStatFields("51","0");
+    _statsHolder.applyStat(part10FailStat, statFields);
+    statFields = getStatFields("50","0");
+    _statsHolder.applyStat(part11SuccStat, statFields);
+    statFields = getStatFields("49","0");
+    _statsHolder.applyStat(part11FailStat, statFields);
+    Map<String, Map<String, AlertValueAndStatus>> alertResult = AlertProcessor.executeAllAlerts(_alertsHolder.getAlertList(), _statsHolder.getStatsList());
+
+    //alert 1 check
+    boolean alertFired = alertResult.get(alert1).get(AlertProcessor.noWildcardAlertKey).isFired();
+    AssertJUnit.assertTrue(alertFired);
+
+    //alert 2 check
+    alertFired = alertResult.get(alert2).get("10").isFired(); //10 should fire
+    AssertJUnit.assertTrue(alertFired);
+    alertFired = alertResult.get(alert2).get("11").isFired(); //11 should not fire
+    AssertJUnit.assertFalse(alertFired);
+
+  }
+*/
+  @Test (groups = {"unitTest"})
+    public void testAddWildcardInFirstStatToken() throws Exception
+    {
+    String alert = "EXP(decay(1)(instance*.reportingage))CMP(GREATER)CON(300)";
+     _alertsHolder.addAlert(alert);
+     _statsHolder.persistStats();
+     
+     _statsHolder.refreshStats();
+     //generate incoming stat
+     String incomingStatName = "instance10.reportingage";
+     Map<String, String> statFields = getStatFields("301","10");
+     _statsHolder.refreshStats();
+     
+     _statsHolder.applyStat(incomingStatName, statFields);
+     _statsHolder.persistStats();
+
+     Map<String, Map<String, AlertValueAndStatus>> alertResult = AlertProcessor.executeAllAlerts(_alertsHolder.getAlertList(), _statsHolder.getStatsList());
+     String wildcardBinding = incomingStatName;
+     boolean alertFired = alertResult.get(alert).get(wildcardBinding).isFired();
+     AssertJUnit.assertTrue(alertFired);
+    }
+
+  @Test (groups = {"unitTest"})
+  public void testTwoWildcardAlertFires()
+  {
+    //error is with * and )
+    String alert = addTwoWildcardAlert();
+    String stat = AlertParser.getComponent(AlertParser.EXPRESSION_NAME, alert);
+    String incomingStatName = "dbFoo.partition10.putCount";
+    Map<String, String> statFields = getStatFields("110","0");
+    _statsHolder.refreshStats();
+    _statsHolder.applyStat(incomingStatName, statFields);
+    _statsHolder.persistStats();
+    Map<String, Map<String, AlertValueAndStatus>> alertResult = AlertProcessor.executeAllAlerts(_alertsHolder.getAlertList(), _statsHolder.getStatsList());
+    String wildcardBinding = incomingStatName; //XXX: this is not going to work...need "Count" in here too.
+    boolean alertFired = alertResult.get(alert).get(wildcardBinding).isFired();
+    AssertJUnit.assertTrue(alertFired);
+  }
+
+  /* only supporting wildcards at end of components right now
+  @Test (groups = {"unitTest"})
+  public void testTwoWildcardsNotAtEndFires()
+  {
+    String alert = EXP + "(accumulate()(dbFoo.partition*.*Count))"
+        + CMP + "(GREATER)" + CON + "(100)";
+    _alertsHolder.addAlert(alert);
+    String incomingStatName = "dbFoo.partition10.putCount";
+    Map<String, String> statFields = getStatFields("110","0");
+    _statsHolder.applyStat(incomingStatName, statFields);
+    Map<String, Map<String, AlertValueAndStatus>> alertResult = AlertProcessor.executeAllAlerts(_alertsHolder.getAlertList(), _statsHolder.getStatsList());
+    String wildcardBinding = "10,put"; //XXX: this is not going to work...need "Count" in here too.
+    boolean alertFired = alertResult.get(alert).get(wildcardBinding).isFired();
+    AssertJUnit.assertTrue(alertFired);
+  }
+  */
+
+  //test using sumall
+  //test using rows where some tuples are null (no stat sent)
+  //test with window tuples where some windows are different lengths
+  //anything else, look around at the code
+
+  //next: review all older tests
+  //next: actually write the fired alerts to ZK
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/alerts/TestOperators.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/alerts/TestOperators.java b/helix-core/src/test/java/org/apache/helix/alerts/TestOperators.java
new file mode 100644
index 0000000..3cf15ea
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/alerts/TestOperators.java
@@ -0,0 +1,289 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.alerts;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.alerts.SumEachOperator;
+import org.apache.helix.alerts.SumOperator;
+import org.apache.helix.alerts.Tuple;
+import org.testng.AssertJUnit;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+
+public class TestOperators {
+	
+	SumOperator _sumOp;
+	SumEachOperator _sumEachOp;
+	
+	@BeforeMethod (groups = {"unitTest"})
+	public void setup()
+	{
+		_sumOp = new SumOperator();
+		_sumEachOp = new SumEachOperator();
+	}
+	
+	
+	@Test (groups = {"unitTest"})
+	public void testTwoNulls()
+	{
+		Tuple<String> tup1 = null;
+		Tuple<String> tup2 = null;
+		List<Tuple<String>> tup1List = new ArrayList<Tuple<String>>();
+		List<Tuple<String>> tup2List = new ArrayList<Tuple<String>>();
+		tup1List.add(tup1);
+		tup2List.add(tup2);
+		List<Iterator<Tuple<String>>> tupsList = new ArrayList<Iterator<Tuple<String>>>();
+		tupsList.add(tup1List.iterator());
+		tupsList.add(tup2List.iterator());
+		List<Iterator<Tuple<String>>> result = _sumOp.execute(tupsList);
+		AssertJUnit.assertEquals(1, result.size()); //should be just 1 iter
+		Iterator<Tuple<String>> resultIter = result.get(0);
+		AssertJUnit.assertTrue(resultIter.hasNext());
+		Tuple<String>resultTup = resultIter.next();
+		AssertJUnit.assertEquals(null, resultTup);
+	}
+	
+	@Test (groups = {"unitTest"})
+	public void testOneNullLeft()
+	{
+		Tuple<String> tup1 = null;
+		Tuple<String> tup2 = new Tuple<String>();
+		tup2.add("1.0");
+		List<Tuple<String>> tup1List = new ArrayList<Tuple<String>>();
+		List<Tuple<String>> tup2List = new ArrayList<Tuple<String>>();
+		tup1List.add(tup1);
+		tup2List.add(tup2);
+		List<Iterator<Tuple<String>>> tupsList = new ArrayList<Iterator<Tuple<String>>>();
+		tupsList.add(tup1List.iterator());
+		tupsList.add(tup2List.iterator());
+		List<Iterator<Tuple<String>>> result = _sumOp.execute(tupsList);
+		AssertJUnit.assertEquals(1, result.size()); //should be just 1 iter
+		Iterator<Tuple<String>> resultIter = result.get(0);
+		AssertJUnit.assertTrue(resultIter.hasNext());
+		Tuple<String>resultTup = resultIter.next();
+		AssertJUnit.assertEquals("1.0", resultTup.toString());
+	}
+	
+	@Test (groups = {"unitTest"})
+	public void testOneNullRight()
+	{
+		Tuple<String> tup1 = new Tuple<String>();
+		Tuple<String> tup2 = null;
+		tup1.add("1.0");
+		List<Tuple<String>> tup1List = new ArrayList<Tuple<String>>();
+		List<Tuple<String>> tup2List = new ArrayList<Tuple<String>>();
+		tup1List.add(tup1);
+		tup2List.add(tup2);
+		List<Iterator<Tuple<String>>> tupsList = new ArrayList<Iterator<Tuple<String>>>();
+		tupsList.add(tup1List.iterator());
+		tupsList.add(tup2List.iterator());
+		List<Iterator<Tuple<String>>> result = _sumOp.execute(tupsList);
+		AssertJUnit.assertEquals(1, result.size()); //should be just 1 iter
+		Iterator<Tuple<String>> resultIter = result.get(0);
+		AssertJUnit.assertTrue(resultIter.hasNext());
+		Tuple<String>resultTup = resultIter.next();
+		AssertJUnit.assertEquals("1.0", resultTup.toString());
+	}
+	
+	@Test (groups = {"unitTest"})
+	public void testTwoSingeltons()
+	{
+		Tuple<String> tup1 = new Tuple<String>();
+		Tuple<String> tup2 = new Tuple<String>();
+		tup1.add("1.0");
+		tup2.add("2.0");
+		List<Tuple<String>> tup1List = new ArrayList<Tuple<String>>();
+		List<Tuple<String>> tup2List = new ArrayList<Tuple<String>>();
+		tup1List.add(tup1);
+		tup2List.add(tup2);
+		List<Iterator<Tuple<String>>> tupsList = new ArrayList<Iterator<Tuple<String>>>();
+		tupsList.add(tup1List.iterator());
+		tupsList.add(tup2List.iterator());
+		List<Iterator<Tuple<String>>> result = _sumOp.execute(tupsList);
+		AssertJUnit.assertEquals(1, result.size()); //should be just 1 iter
+		Iterator<Tuple<String>> resultIter = result.get(0);
+		AssertJUnit.assertTrue(resultIter.hasNext());
+		Tuple<String>resultTup = resultIter.next();
+		AssertJUnit.assertEquals("3.0", resultTup.toString());
+	}
+	
+	@Test (groups = {"unitTest"})
+	public void testThreeSingeltons()
+	{
+		Tuple<String> tup1 = new Tuple<String>();
+		Tuple<String> tup2 = new Tuple<String>();
+		Tuple<String> tup3 = new Tuple<String>();
+		tup1.add("1.0");
+		tup2.add("2.0");
+		tup3.add("3.0");
+		List<Tuple<String>> tup1List = new ArrayList<Tuple<String>>();
+		List<Tuple<String>> tup2List = new ArrayList<Tuple<String>>();
+		List<Tuple<String>> tup3List = new ArrayList<Tuple<String>>();
+		tup1List.add(tup1);
+		tup2List.add(tup2);
+		tup3List.add(tup3);
+		List<Iterator<Tuple<String>>> tupsList = new ArrayList<Iterator<Tuple<String>>>();
+		tupsList.add(tup1List.iterator());
+		tupsList.add(tup2List.iterator());
+		tupsList.add(tup3List.iterator());
+		List<Iterator<Tuple<String>>> result = _sumOp.execute(tupsList);
+		AssertJUnit.assertEquals(1, result.size()); //should be just 1 iter
+		Iterator<Tuple<String>> resultIter = result.get(0);
+		AssertJUnit.assertTrue(resultIter.hasNext());
+		Tuple<String>resultTup = resultIter.next();
+		AssertJUnit.assertEquals("6.0", resultTup.toString());
+	}
+	
+	@Test (groups = {"unitTest"})
+	public void testThreeTriples()
+	{
+		Tuple<String> tup1 = new Tuple<String>();
+		Tuple<String> tup2 = new Tuple<String>();
+		Tuple<String> tup3 = new Tuple<String>();
+		tup1.add("1.0"); tup1.add("2.0"); tup1.add("3.0");
+		tup2.add("4.0"); tup2.add("5.0"); tup2.add("6.0");
+		tup3.add("7.0"); tup3.add("8.0"); tup3.add("9.0");
+		List<Tuple<String>> tup1List = new ArrayList<Tuple<String>>();
+		List<Tuple<String>> tup2List = new ArrayList<Tuple<String>>();
+		List<Tuple<String>> tup3List = new ArrayList<Tuple<String>>();
+		tup1List.add(tup1);
+		tup2List.add(tup2);
+		tup3List.add(tup3);
+		List<Iterator<Tuple<String>>> tupsList = new ArrayList<Iterator<Tuple<String>>>();
+		tupsList.add(tup1List.iterator());
+		tupsList.add(tup2List.iterator());
+		tupsList.add(tup3List.iterator());
+		List<Iterator<Tuple<String>>> result = _sumOp.execute(tupsList);
+		AssertJUnit.assertEquals(1, result.size()); //should be just 1 iter
+		Iterator<Tuple<String>> resultIter = result.get(0);
+		AssertJUnit.assertTrue(resultIter.hasNext());
+		Tuple<String>resultTup = resultIter.next();
+		AssertJUnit.assertEquals("12.0,15.0,18.0", resultTup.toString());
+	}
+	
+	@Test (groups = {"unitTest"})
+	public void testThreeTriplesOneMissing()
+	{
+		Tuple<String> tup1 = new Tuple<String>();
+		Tuple<String> tup2 = new Tuple<String>();
+		Tuple<String> tup3 = new Tuple<String>();
+		tup1.add("1.0"); tup1.add("2.0"); tup1.add("3.0");
+		tup2.add("5.0"); tup2.add("6.0");
+		tup3.add("7.0"); tup3.add("8.0"); tup3.add("9.0");
+		List<Tuple<String>> tup1List = new ArrayList<Tuple<String>>();
+		List<Tuple<String>> tup2List = new ArrayList<Tuple<String>>();
+		List<Tuple<String>> tup3List = new ArrayList<Tuple<String>>();
+		tup1List.add(tup1);
+		tup2List.add(tup2);
+		tup3List.add(tup3);
+		List<Iterator<Tuple<String>>> tupsList = new ArrayList<Iterator<Tuple<String>>>();
+		tupsList.add(tup1List.iterator());
+		tupsList.add(tup2List.iterator());
+		tupsList.add(tup3List.iterator());
+		List<Iterator<Tuple<String>>> result = _sumOp.execute(tupsList);
+		AssertJUnit.assertEquals(1, result.size()); //should be just 1 iter
+		Iterator<Tuple<String>> resultIter = result.get(0);
+		AssertJUnit.assertTrue(resultIter.hasNext());
+		Tuple<String>resultTup = resultIter.next();
+		//tuple 2 missing 1 entry, other 2 get bumped to right
+		AssertJUnit.assertEquals("8.0,15.0,18.0", resultTup.toString());
+	}
+
+	//test multiple rows
+	@Test (groups = {"unitTest"})
+	public void testThreeTriplesOneMissingTwoRows()
+	{
+		Tuple<String> tup1Dot1 = new Tuple<String>();
+		Tuple<String> tup2Dot1 = new Tuple<String>();
+		Tuple<String> tup3Dot1 = new Tuple<String>();
+		Tuple<String> tup1Dot2 = new Tuple<String>();
+		Tuple<String> tup2Dot2 = new Tuple<String>();
+		Tuple<String> tup3Dot2 = new Tuple<String>();
+		tup1Dot1.add("1.0"); tup1Dot1.add("2.0"); tup1Dot1.add("3.0");
+		tup2Dot1.add("5.0"); tup2Dot1.add("6.0");
+		tup3Dot1.add("7.0"); tup3Dot1.add("8.0"); tup3Dot1.add("9.0");
+		tup1Dot2.add("10.0"); tup1Dot2.add("11.0"); tup1Dot2.add("12.0");
+		tup2Dot2.add("13.0"); tup2Dot2.add("14.0"); tup2Dot2.add("15.0");
+		tup3Dot2.add("16.0"); tup3Dot2.add("17.0"); tup3Dot2.add("18.0");
+		List<Tuple<String>> tup1List = new ArrayList<Tuple<String>>();
+		List<Tuple<String>> tup2List = new ArrayList<Tuple<String>>();
+		List<Tuple<String>> tup3List = new ArrayList<Tuple<String>>();
+		tup1List.add(tup1Dot1);
+		tup2List.add(tup2Dot1);
+		tup3List.add(tup3Dot1);
+		tup1List.add(tup1Dot2);
+		tup2List.add(tup2Dot2);
+		tup3List.add(tup3Dot2);
+		List<Iterator<Tuple<String>>> tupsList = new ArrayList<Iterator<Tuple<String>>>();
+		tupsList.add(tup1List.iterator());
+		tupsList.add(tup2List.iterator());
+		tupsList.add(tup3List.iterator());
+		List<Iterator<Tuple<String>>> result = _sumOp.execute(tupsList);
+		AssertJUnit.assertEquals(1, result.size()); //should be just 1 iter
+		Iterator<Tuple<String>> resultIter = result.get(0);
+		AssertJUnit.assertTrue(resultIter.hasNext());
+		Tuple<String>resultTup1 = resultIter.next();
+		//tuple 2 missing 1 entry, other 2 get bumped to right
+		AssertJUnit.assertEquals("8.0,15.0,18.0", resultTup1.toString());
+		AssertJUnit.assertTrue(resultIter.hasNext());
+		Tuple<String>resultTup2 = resultIter.next();
+		AssertJUnit.assertEquals("39.0,42.0,45.0", resultTup2.toString());
+	}
+	
+	@Test (groups = {"unitTest"})
+	public void testSumAll()
+	{
+		Tuple<String> tup1 = new Tuple<String>();
+		Tuple<String> tup2 = new Tuple<String>();
+		Tuple<String> tup3 = new Tuple<String>();
+		Tuple<String> tup4 = new Tuple<String>();
+		Tuple<String> tup5 = new Tuple<String>();
+		Tuple<String> tup6 = new Tuple<String>();
+		tup1.add("1.0"); tup2.add("2.0"); tup3.add("3.0");
+		tup4.add("4.0"); tup5.add("5.0"); tup6.add("6.0");
+		List<Tuple<String>> list1 = new ArrayList<Tuple<String>>();
+		List<Tuple<String>> list2 = new ArrayList<Tuple<String>>();
+		List<Tuple<String>> list3 = new ArrayList<Tuple<String>>();
+		list1.add(tup1); list1.add(tup4); 
+		list2.add(tup2); list2.add(tup5);
+		list3.add(tup3); list3.add(tup6);
+		
+		List<Iterator<Tuple<String>>> tupsList = new ArrayList<Iterator<Tuple<String>>>();
+		tupsList.add(list1.iterator());
+		tupsList.add(list2.iterator());
+		tupsList.add(list3.iterator());
+		List<Iterator<Tuple<String>>> result = _sumEachOp.execute(tupsList);
+		AssertJUnit.assertEquals(3, result.size()); //should be just 1 iter
+		Iterator<Tuple<String>> resultIter = result.get(0);
+		AssertJUnit.assertTrue(resultIter.hasNext());
+		Tuple<String>resultTup1 = resultIter.next();
+		AssertJUnit.assertEquals("5.0", resultTup1.toString());
+		resultIter = result.get(1);
+		AssertJUnit.assertTrue(resultIter.hasNext());
+		resultTup1 = resultIter.next();
+		AssertJUnit.assertEquals("7.0", resultTup1.toString());
+		resultIter = result.get(2);
+		AssertJUnit.assertTrue(resultIter.hasNext());
+		resultTup1 = resultIter.next();
+		AssertJUnit.assertEquals("9.0", resultTup1.toString());
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/alerts/TestStatsMatch.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/alerts/TestStatsMatch.java b/helix-core/src/test/java/org/apache/helix/alerts/TestStatsMatch.java
new file mode 100644
index 0000000..ccd6c2d
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/alerts/TestStatsMatch.java
@@ -0,0 +1,105 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.alerts;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.alerts.ExpressionParser;
+import org.testng.annotations.Test;
+import org.testng.AssertJUnit;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+@Test
+public class TestStatsMatch {
+
+	@Test
+	  public void testExactMatch()
+	  {
+	    
+	    String persistedStatName = "window(5)(dbFoo.partition10.latency)";
+	    String incomingStatName = "dbFoo.partition10.latency";
+	    AssertJUnit.assertTrue(ExpressionParser.isIncomingStatExactMatch(persistedStatName, incomingStatName));
+	  }
+	
+	@Test
+	  public void testSingleWildcardMatch()
+	  {
+	    
+	    String persistedStatName = "window(5)(dbFoo.partition*.latency)";
+	    String incomingStatName = "dbFoo.partition10.latency";
+	    AssertJUnit.assertTrue(ExpressionParser.isIncomingStatWildcardMatch(persistedStatName, incomingStatName));
+	  }
+	
+	@Test
+	  public void testDoubleWildcardMatch()
+	  {
+	    
+	    String persistedStatName = "window(5)(db*.partition*.latency)";
+	    String incomingStatName = "dbFoo.partition10.latency";
+	    AssertJUnit.assertTrue(ExpressionParser.isIncomingStatWildcardMatch(persistedStatName, incomingStatName));
+	  }
+	
+	@Test
+	  public void testWildcardMatchNoWildcard()
+	  {
+	    
+	    String persistedStatName = "window(5)(dbFoo.partition10.latency)";
+	    String incomingStatName = "dbFoo.partition10.latency";
+	    AssertJUnit.assertFalse(ExpressionParser.isIncomingStatWildcardMatch(persistedStatName, incomingStatName));
+	  }
+	
+	@Test
+	  public void testWildcardMatchTooManyFields()
+	  {
+	    
+	    String persistedStatName = "window(5)(dbFoo.partition*.latency)";
+	    String incomingStatName = "dbFoo.tableBar.partition10.latency";
+	    AssertJUnit.assertFalse(ExpressionParser.isIncomingStatWildcardMatch(persistedStatName, incomingStatName));
+	  }
+	
+	@Test
+	  public void testWildcardMatchTooFewFields()
+	  {
+	    
+	    String persistedStatName = "window(5)(dbFoo.partition*.latency)";
+	    String incomingStatName = "dbFoo.latency";
+	    AssertJUnit.assertFalse(ExpressionParser.isIncomingStatWildcardMatch(persistedStatName, incomingStatName));
+	  }
+	
+
+	@Test
+	  public void testBadWildcardRepeated()
+	  {
+	    
+	    String persistedStatName = "window(5)(dbFoo.partition**4.latency)";
+	    String incomingStatName = "dbFoo.partition10.latency";
+	    boolean match = ExpressionParser.isIncomingStatWildcardMatch(persistedStatName, incomingStatName);
+	    
+	    AssertJUnit.assertFalse(match);
+	  }
+	
+	@Test
+	  public void testBadWildcardNotAtEnd()
+	  {
+	    
+	    String persistedStatName = "window(5)(dbFoo.*partition.latency)";
+	    String incomingStatName = "dbFoo.partition10.latency";
+	    boolean match = ExpressionParser.isIncomingStatWildcardMatch(persistedStatName, incomingStatName);
+	    
+	    AssertJUnit.assertFalse(match);
+	  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java b/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
new file mode 100644
index 0000000..ed54e6f
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
@@ -0,0 +1,173 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.stages;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.Mocks;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.pipeline.Stage;
+import org.apache.helix.controller.pipeline.StageContext;
+import org.apache.helix.controller.stages.ClusterEvent;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.model.IdealState.IdealStateModeProperty;
+import org.apache.helix.tools.StateModelConfigGenerator;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+
+
+public class BaseStageTest
+{
+  protected HelixManager manager;
+  protected HelixDataAccessor accessor;
+  protected ClusterEvent event;
+
+  @BeforeClass()
+  public void beforeClass()
+  {
+    String className = this.getClass().getName();
+    System.out.println("START " + className.substring(className.lastIndexOf('.') + 1)
+                       + " at "+ new Date(System.currentTimeMillis()));
+  }
+
+  @AfterClass()
+  public void afterClass()
+  {
+    String className = this.getClass().getName();
+    System.out.println("END " + className.substring(className.lastIndexOf('.') + 1)
+                       + " at "+ new Date(System.currentTimeMillis()));
+  }
+
+  @BeforeMethod()
+  public void setup()
+  {
+    String clusterName = "testCluster-" + UUID.randomUUID().toString();
+    manager = new Mocks.MockManager(clusterName);
+    accessor = manager.getHelixDataAccessor();
+    event = new ClusterEvent("sampleEvent");
+  }
+
+  protected List<IdealState> setupIdealState(int nodes, String[] resources,
+                                             int partitions, int replicas)
+  {
+    List<IdealState> idealStates = new ArrayList<IdealState>();
+    List<String> instances = new ArrayList<String>();
+    for (int i = 0; i < nodes; i++)
+    {
+      instances.add("localhost_" + i);
+    }
+
+    for (int i = 0; i < resources.length; i++)
+    {
+      String resourceName = resources[i];
+      ZNRecord record = new ZNRecord(resourceName);
+      for (int p = 0; p < partitions; p++)
+      {
+        List<String> value = new ArrayList<String>();
+        for (int r = 0; r < replicas; r++)
+        {
+          value.add("localhost_" + (p + r + 1) % nodes);
+        }
+        record.setListField(resourceName + "_" + p, value);
+      }
+      IdealState idealState = new IdealState(record);
+      idealState.setStateModelDefRef("MasterSlave");
+      idealState.setIdealStateMode(IdealStateModeProperty.AUTO.toString());
+      idealState.setNumPartitions(partitions);
+      idealStates.add(idealState);
+
+//      System.out.println(idealState);
+
+      Builder keyBuilder = accessor.keyBuilder();
+
+      accessor.setProperty(keyBuilder.idealStates(resourceName), idealState);
+    }
+    return idealStates;
+  }
+
+  protected void setupLiveInstances(int numLiveInstances)
+  {
+    // setup liveInstances
+    for (int i = 0; i < numLiveInstances; i++)
+    {
+      LiveInstance liveInstance = new LiveInstance("localhost_" + i);
+      liveInstance.setSessionId("session_" + i);
+      
+      Builder keyBuilder = accessor.keyBuilder();
+      accessor.setProperty(keyBuilder.liveInstance("localhost_" + i), liveInstance);
+    }
+  }
+
+  protected void runStage(ClusterEvent event, Stage stage)
+  {
+    event.addAttribute("helixmanager", manager);
+    StageContext context = new StageContext();
+    stage.init(context);
+    stage.preProcess();
+    try
+    {
+      stage.process(event);
+    } catch (Exception e)
+    {
+      e.printStackTrace();
+    }
+    stage.postProcess();
+  }
+
+  protected void setupStateModel()
+  {
+    ZNRecord masterSlave = new StateModelConfigGenerator()
+        .generateConfigForMasterSlave();
+    
+    Builder keyBuilder = accessor.keyBuilder();
+    accessor.setProperty(keyBuilder.stateModelDef(masterSlave.getId()), new StateModelDefinition(masterSlave));
+    
+    ZNRecord leaderStandby = new StateModelConfigGenerator()
+        .generateConfigForLeaderStandby();
+    accessor.setProperty(keyBuilder.stateModelDef(leaderStandby.getId()), new StateModelDefinition(leaderStandby));
+
+    ZNRecord onlineOffline = new StateModelConfigGenerator()
+        .generateConfigForOnlineOffline();
+    accessor.setProperty(keyBuilder.stateModelDef(onlineOffline.getId()), new StateModelDefinition(onlineOffline));
+  }
+
+  protected Map<String, Resource> getResourceMap()
+  {
+    Map<String, Resource> resourceMap = new HashMap<String, Resource>();
+    Resource testResource = new Resource("testResourceName");
+    testResource.setStateModelDefRef("MasterSlave");
+    testResource.addPartition("testResourceName_0");
+    testResource.addPartition("testResourceName_1");
+    testResource.addPartition("testResourceName_2");
+    testResource.addPartition("testResourceName_3");
+    testResource.addPartition("testResourceName_4");
+    resourceMap.put("testResourceName", testResource);
+
+    return resourceMap;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java b/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
new file mode 100644
index 0000000..58e84c7
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
@@ -0,0 +1,268 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.stages;
+
+import org.apache.helix.ClusterMessagingService;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.ConfigChangeListener;
+import org.apache.helix.ControllerChangeListener;
+import org.apache.helix.CurrentStateChangeListener;
+import org.apache.helix.DataAccessor;
+import org.apache.helix.ExternalViewChangeListener;
+import org.apache.helix.HealthStateChangeListener;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.IdealStateChangeListener;
+import org.apache.helix.InstanceType;
+import org.apache.helix.LiveInstanceChangeListener;
+import org.apache.helix.MessageListener;
+import org.apache.helix.PreConnectCallback;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.store.PropertyStore;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+
+public class DummyClusterManager implements HelixManager
+{
+  HelixDataAccessor _accessor;
+  String _clusterName;
+  String _sessionId;
+
+  public DummyClusterManager(String clusterName, HelixDataAccessor accessor)
+  {
+    _clusterName = clusterName;
+    _accessor = accessor;
+    _sessionId = "session_" + clusterName;
+  }
+
+  @Override
+  public void connect() throws Exception
+  {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public boolean isConnected()
+  {
+    // TODO Auto-generated method stub
+    return false;
+  }
+
+  @Override
+  public void disconnect()
+  {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public void addIdealStateChangeListener(IdealStateChangeListener listener) throws Exception
+  {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public void addLiveInstanceChangeListener(LiveInstanceChangeListener listener) throws Exception
+  {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public void addConfigChangeListener(ConfigChangeListener listener) throws Exception
+  {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public void addMessageListener(MessageListener listener, String instanceName) throws Exception
+  {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public void addCurrentStateChangeListener(CurrentStateChangeListener listener,
+                                            String instanceName,
+                                            String sessionId) throws Exception
+  {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public void addExternalViewChangeListener(ExternalViewChangeListener listener) throws Exception
+  {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public boolean removeListener(Object listener)
+  {
+    // TODO Auto-generated method stub
+    return false;
+  }
+
+  @Override
+  public DataAccessor getDataAccessor()
+  {
+    return null;
+  }
+
+  @Override
+  public String getClusterName()
+  {
+    return _clusterName;
+  }
+
+  @Override
+  public String getInstanceName()
+  {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public String getSessionId()
+  {
+    return _sessionId;
+  }
+
+  @Override
+  public long getLastNotificationTime()
+  {
+    // TODO Auto-generated method stub
+    return 0;
+  }
+
+  @Override
+  public void addControllerListener(ControllerChangeListener listener)
+  {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public HelixAdmin getClusterManagmentTool()
+  {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public PropertyStore<ZNRecord> getPropertyStore()
+  {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public ClusterMessagingService getMessagingService()
+  {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public ParticipantHealthReportCollector getHealthReportCollector()
+  {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public InstanceType getInstanceType()
+  {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public String getVersion()
+  {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public void addHealthStateChangeListener(HealthStateChangeListener listener,
+                                           String instanceName) throws Exception
+  {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public StateMachineEngine getStateMachineEngine()
+  {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public boolean isLeader()
+  {
+    // TODO Auto-generated method stub
+    return false;
+  }
+
+  @Override
+  public ConfigAccessor getConfigAccessor()
+  {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public void startTimerTasks()
+  {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public void stopTimerTasks()
+  {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public HelixDataAccessor getHelixDataAccessor()
+  {
+    return _accessor;
+  }
+
+  @Override
+  public void addPreConnectCallback(PreConnectCallback callback)
+  {
+    // TODO Auto-generated method stub
+    
+  }
+
+  @Override
+  public ZkHelixPropertyStore<ZNRecord> getHelixPropertyStore()
+  {
+    // TODO Auto-generated method stub
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
new file mode 100644
index 0000000..d227784
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
@@ -0,0 +1,69 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.stages;
+
+import java.util.Date;
+import java.util.Map;
+
+import org.apache.helix.controller.stages.AttributeName;
+import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
+import org.apache.helix.controller.stages.BestPossibleStateOutput;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.controller.stages.ReadClusterDataStage;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.testng.AssertJUnit;
+import org.testng.annotations.Test;
+
+
+public class TestBestPossibleStateCalcStage extends BaseStageTest
+{
+  @Test
+  public void testSimple()
+  {
+  	System.out.println("START TestBestPossibleStateCalcStage at " + new Date(System.currentTimeMillis()));
+//    List<IdealState> idealStates = new ArrayList<IdealState>();
+
+    String[] resources = new String[]{ "testResourceName" };
+    setupIdealState(5, resources, 10, 1);
+    setupLiveInstances(5);
+    setupStateModel();
+
+    Map<String, Resource> resourceMap = getResourceMap();
+    CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+    event.addAttribute(AttributeName.RESOURCES.toString(),
+        resourceMap);
+    event.addAttribute(AttributeName.CURRENT_STATE.toString(),
+        currentStateOutput);
+
+    ReadClusterDataStage stage1 = new ReadClusterDataStage();
+    runStage(event, stage1);
+    BestPossibleStateCalcStage stage2 = new BestPossibleStateCalcStage();
+    runStage(event, stage2);
+
+    BestPossibleStateOutput output = event
+        .getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
+    for (int p = 0; p < 5; p++)
+    {
+      Partition resource = new Partition("testResourceName_" + p);
+      AssertJUnit.assertEquals(
+          "MASTER",
+          output.getInstanceStateMap("testResourceName", resource).get(
+              "localhost_" + (p + 1) % 5));
+    }
+    System.out.println("END TestBestPossibleStateCalcStage at " + new Date(System.currentTimeMillis()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/controller/stages/TestClusterEvent.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestClusterEvent.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestClusterEvent.java
new file mode 100644
index 0000000..3080428
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestClusterEvent.java
@@ -0,0 +1,34 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.stages;
+
+import org.apache.helix.controller.stages.ClusterEvent;
+import org.testng.annotations.Test;
+import org.testng.AssertJUnit;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+@Test
+public class TestClusterEvent
+{
+  @Test
+  public void testSimplePutandGet(){
+    ClusterEvent event = new ClusterEvent("name");
+    AssertJUnit.assertEquals(event.getName(), "name");
+    event.addAttribute("attr1", "value");
+    AssertJUnit.assertEquals(event.getAttribute("attr1"), "value");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/controller/stages/TestCompatibilityCheckStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCompatibilityCheckStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCompatibilityCheckStage.java
new file mode 100644
index 0000000..4068fc4
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCompatibilityCheckStage.java
@@ -0,0 +1,170 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.stages;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.helix.Mocks;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.pipeline.StageContext;
+import org.apache.helix.controller.stages.CompatibilityCheckStage;
+import org.apache.helix.controller.stages.ReadClusterDataStage;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.LiveInstance.LiveInstanceProperty;
+import org.apache.helix.tools.IdealStateCalculatorForStorageNode;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestCompatibilityCheckStage extends BaseStageTest
+{
+  private void prepare(String controllerVersion, String participantVersion)
+  {
+    List<String> instances = Arrays.asList("localhost_0", "localhost_1",
+                                           "localhost_2", "localhost_3", "localhost_4");
+    int partitions = 10;
+    int replicas = 1;
+
+    // set ideal state
+    String resourceName = "testResource";
+    ZNRecord record = IdealStateCalculatorForStorageNode.calculateIdealState(
+        instances, partitions, replicas, resourceName, "MASTER", "SLAVE");
+    IdealState idealState = new IdealState(record);
+    idealState.setStateModelDefRef("MasterSlave");
+    
+    Builder keyBuilder = accessor.keyBuilder();
+    accessor.setProperty(keyBuilder.idealStates(resourceName), idealState);
+
+    // set live instances
+    record = new ZNRecord("localhost_0");
+    if (participantVersion != null)
+    {
+      record.setSimpleField(LiveInstanceProperty.HELIX_VERSION.toString(), participantVersion);
+    }
+    LiveInstance liveInstance = new LiveInstance(record);
+    liveInstance.setSessionId("session_0");
+    accessor.setProperty(keyBuilder.liveInstance("localhost_0"), liveInstance);
+
+    if (controllerVersion != null)
+    {
+      ((Mocks.MockManager)manager).setVersion(controllerVersion);
+    }
+    event.addAttribute("helixmanager", manager);
+    runStage(event, new ReadClusterDataStage());
+  }
+
+  @Test
+  public void testCompatible()
+  {
+    prepare("0.4.0", "0.4.0");
+    CompatibilityCheckStage stage = new CompatibilityCheckStage();
+    StageContext context = new StageContext();
+    stage.init(context);
+    stage.preProcess();
+    try
+    {
+      stage.process(event);
+    }
+    catch (Exception e)
+    {
+      Assert.fail("Should not fail since versions are compatible");
+    }
+    stage.postProcess();
+  }
+
+  @Test
+  public void testNullParticipantVersion()
+  {
+    prepare("0.4.0", null);
+    CompatibilityCheckStage stage = new CompatibilityCheckStage();
+    StageContext context = new StageContext();
+    stage.init(context);
+    stage.preProcess();
+    try
+    {
+      stage.process(event);
+    }
+    catch (Exception e)
+    {
+      Assert.fail("Should not fail since only participant version is null");
+    }
+    stage.postProcess();
+  }
+
+  @Test
+  public void testNullControllerVersion()
+  {
+    prepare(null, "0.4.0");
+    CompatibilityCheckStage stage = new CompatibilityCheckStage();
+    StageContext context = new StageContext();
+    stage.init(context);
+    stage.preProcess();
+    try
+    {
+      stage.process(event);
+      Assert.fail("Should fail since controller version is null");
+    }
+    catch (Exception e)
+    {
+      // OK
+    }
+    stage.postProcess();
+  }
+
+  @Test
+  public void testControllerVersionLessThanParticipantVersion()
+  {
+    prepare("0.2.12", "0.3.4");
+    CompatibilityCheckStage stage = new CompatibilityCheckStage();
+    StageContext context = new StageContext();
+    stage.init(context);
+    stage.preProcess();
+    try
+    {
+      stage.process(event);
+      Assert.fail("Should fail since controller primary version is less than participant primary version");
+    }
+    catch (Exception e)
+    {
+      // OK
+    }
+    stage.postProcess();
+  }
+
+  @Test
+  public void testIncompatible()
+  {
+    prepare("0.4.12", "0.3.4");
+    CompatibilityCheckStage stage = new CompatibilityCheckStage();
+    StageContext context = new StageContext();
+    stage.init(context);
+    stage.preProcess();
+    try
+    {
+      stage.process(event);
+      Assert.fail("Should fail since controller primary version is incompatible with participant primary version");
+    }
+    catch (Exception e)
+    {
+      // OK
+    }
+    stage.postProcess();
+  }
+
+}