You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:58 UTC

[28/42] Refactoring the package names and removing jsql parser

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/TestZKCallback.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestZKCallback.java b/helix-core/src/test/java/org/apache/helix/TestZKCallback.java
new file mode 100644
index 0000000..291cd1a
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/TestZKCallback.java
@@ -0,0 +1,258 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.helix.ConfigChangeListener;
+import org.apache.helix.CurrentStateChangeListener;
+import org.apache.helix.ExternalViewChangeListener;
+import org.apache.helix.HelixConstants;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.IdealStateChangeListener;
+import org.apache.helix.InstanceType;
+import org.apache.helix.LiveInstanceChangeListener;
+import org.apache.helix.MessageListener;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.tools.ClusterSetup;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestZKCallback extends ZkUnitTestBase
+{
+  private final String clusterName = CLUSTER_PREFIX + "_" + getShortClassName();
+
+  ZkClient _zkClient;
+
+  private static String[] createArgs(String str)
+  {
+    String[] split = str.split("[ ]+");
+    System.out.println(Arrays.toString(split));
+    return split;
+  }
+
+  public class TestCallbackListener implements MessageListener, LiveInstanceChangeListener,
+      ConfigChangeListener, CurrentStateChangeListener, ExternalViewChangeListener,
+      IdealStateChangeListener
+  {
+    boolean externalViewChangeReceived = false;
+    boolean liveInstanceChangeReceived = false;
+    boolean configChangeReceived = false;
+    boolean currentStateChangeReceived = false;
+    boolean messageChangeReceived = false;
+    boolean idealStateChangeReceived = false;
+
+    @Override
+    public void onExternalViewChange(List<ExternalView> externalViewList,
+        NotificationContext changeContext)
+    {
+      externalViewChangeReceived = true;
+    }
+
+    @Override
+    public void onStateChange(String instanceName, List<CurrentState> statesInfo,
+        NotificationContext changeContext)
+    {
+      currentStateChangeReceived = true;
+    }
+
+    @Override
+    public void onConfigChange(List<InstanceConfig> configs, NotificationContext changeContext)
+    {
+      configChangeReceived = true;
+    }
+
+    @Override
+    public void onLiveInstanceChange(List<LiveInstance> liveInstances,
+        NotificationContext changeContext)
+    {
+      liveInstanceChangeReceived = true;
+    }
+
+    @Override
+    public void onMessage(String instanceName, List<Message> messages,
+        NotificationContext changeContext)
+    {
+      messageChangeReceived = true;
+    }
+
+    void Reset()
+    {
+      externalViewChangeReceived = false;
+      liveInstanceChangeReceived = false;
+      configChangeReceived = false;
+      currentStateChangeReceived = false;
+      messageChangeReceived = false;
+      idealStateChangeReceived = false;
+    }
+
+    @Override
+    public void onIdealStateChange(List<IdealState> idealState, NotificationContext changeContext)
+    {
+      // TODO Auto-generated method stub
+      idealStateChangeReceived = true;
+    }
+  }
+
+  @Test()
+  public void testInvocation() throws Exception
+  {
+
+    HelixManager testHelixManager = HelixManagerFactory.getZKHelixManager(clusterName,
+        "localhost_8900", InstanceType.PARTICIPANT, ZK_ADDR);
+    testHelixManager.connect();
+
+    TestZKCallback test = new TestZKCallback();
+
+    TestZKCallback.TestCallbackListener testListener = test.new TestCallbackListener();
+
+    testHelixManager.addMessageListener(testListener, "localhost_8900");
+    testHelixManager.addCurrentStateChangeListener(testListener, "localhost_8900",
+        testHelixManager.getSessionId());
+    testHelixManager.addConfigChangeListener(testListener);
+    testHelixManager.addIdealStateChangeListener(testListener);
+    testHelixManager.addExternalViewChangeListener(testListener);
+    testHelixManager.addLiveInstanceChangeListener(testListener);
+    // Initial add listener should trigger the first execution of the
+    // listener callbacks
+    AssertJUnit.assertTrue(testListener.configChangeReceived
+        & testListener.currentStateChangeReceived & testListener.externalViewChangeReceived
+        & testListener.idealStateChangeReceived & testListener.liveInstanceChangeReceived
+        & testListener.messageChangeReceived);
+
+    testListener.Reset();
+    HelixDataAccessor accessor = testHelixManager.getHelixDataAccessor();
+    Builder keyBuilder = accessor.keyBuilder();
+    
+    ExternalView extView = new ExternalView("db-12345");
+    accessor.setProperty(keyBuilder.externalView("db-12345"), extView);
+    Thread.sleep(100);
+    AssertJUnit.assertTrue(testListener.externalViewChangeReceived);
+    testListener.Reset();
+
+    CurrentState curState = new CurrentState("db-12345");
+    curState.setSessionId("sessionId");
+    curState.setStateModelDefRef("StateModelDef");
+    accessor.setProperty(keyBuilder.currentState("localhost_8900", testHelixManager.getSessionId(), curState.getId()), curState);
+    Thread.sleep(100);
+    AssertJUnit.assertTrue(testListener.currentStateChangeReceived);
+    testListener.Reset();
+
+    IdealState idealState = new IdealState("db-1234");
+    idealState.setNumPartitions(400);
+    idealState.setReplicas(Integer.toString(2));
+    idealState.setStateModelDefRef("StateModeldef");
+    accessor.setProperty(keyBuilder.idealStates("db-1234"), idealState);
+    Thread.sleep(100);
+    AssertJUnit.assertTrue(testListener.idealStateChangeReceived);
+    testListener.Reset();
+
+    // dummyRecord = new ZNRecord("db-12345");
+    // dataAccessor.setProperty(PropertyType.IDEALSTATES, idealState, "db-12345"
+    // );
+    // Thread.sleep(100);
+    // AssertJUnit.assertTrue(testListener.idealStateChangeReceived);
+    // testListener.Reset();
+
+    // dummyRecord = new ZNRecord("localhost:8900");
+    // List<ZNRecord> recList = new ArrayList<ZNRecord>();
+    // recList.add(dummyRecord);
+
+    testListener.Reset();
+    Message message = new Message(MessageType.STATE_TRANSITION, UUID.randomUUID().toString());
+    message.setTgtSessionId("*");
+    message.setResourceName("testResource");
+    message.setPartitionName("testPartitionKey");
+    message.setStateModelDef("MasterSlave");
+    message.setToState("toState");
+    message.setFromState("fromState");
+    message.setTgtName("testTarget");
+    message.setStateModelFactoryName(HelixConstants.DEFAULT_STATE_MODEL_FACTORY);
+
+    accessor.setProperty(keyBuilder.message("localhost_8900", message.getId()), message);
+    Thread.sleep(500);
+    AssertJUnit.assertTrue(testListener.messageChangeReceived);
+
+    // dummyRecord = new ZNRecord("localhost_9801");
+    LiveInstance liveInstance = new LiveInstance("localhost_9801");
+    liveInstance.setSessionId(UUID.randomUUID().toString());
+    liveInstance.setHelixVersion(UUID.randomUUID().toString());
+    accessor.setProperty(keyBuilder.liveInstance("localhost_9801"), liveInstance);
+    Thread.sleep(500);
+    AssertJUnit.assertTrue(testListener.liveInstanceChangeReceived);
+    testListener.Reset();
+
+    // dataAccessor.setNodeConfigs(recList); Thread.sleep(100);
+    // AssertJUnit.assertTrue(testListener.configChangeReceived);
+    // testListener.Reset();
+  }
+
+  @BeforeClass()
+  public void beforeClass() throws IOException, Exception
+  {
+    _zkClient = new ZkClient(ZK_ADDR);
+    _zkClient.setZkSerializer(new ZNRecordSerializer());
+    if (_zkClient.exists("/" + clusterName))
+    {
+      _zkClient.deleteRecursive("/" + clusterName);
+    }
+
+    ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR + " -addCluster "
+        + clusterName));
+    // ClusterSetup
+    // .processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR +
+    // " -addCluster relay-cluster-12345"));
+    ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR + " -addResource "
+        + clusterName + " db-12345 120 MasterSlave"));
+    ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR + " -addNode " + clusterName
+        + " localhost:8900"));
+    ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR + " -addNode " + clusterName
+        + " localhost:8901"));
+    ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR + " -addNode " + clusterName
+        + " localhost:8902"));
+    ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR + " -addNode " + clusterName
+        + " localhost:8903"));
+    ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR + " -addNode " + clusterName
+        + " localhost:8904"));
+    ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR + " -rebalance "
+        + clusterName + " db-12345 3"));
+  }
+
+  @AfterClass()
+  public void afterClass()
+  {
+    _zkClient.close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/TestZKRoutingInfoProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestZKRoutingInfoProvider.java b/helix-core/src/test/java/org/apache/helix/TestZKRoutingInfoProvider.java
new file mode 100644
index 0000000..92c858c
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/TestZKRoutingInfoProvider.java
@@ -0,0 +1,208 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.ExternalViewGenerator;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.CurrentState.CurrentStateProperty;
+import org.testng.AssertJUnit;
+import org.testng.annotations.Test;
+
+
+public class TestZKRoutingInfoProvider
+{
+  public Map<String, List<ZNRecord>> createCurrentStates(String[] dbNames,
+      String[] nodeNames, int[] partitions, int[] replicas)
+  {
+    Map<String, List<ZNRecord>> currentStates = new TreeMap<String, List<ZNRecord>>();
+    Map<String, Map<String, ZNRecord>> currentStates2 = new TreeMap<String, Map<String, ZNRecord>>();
+
+    Map<String, String> stateMaster = new TreeMap<String, String>();
+    stateMaster.put(CurrentStateProperty.CURRENT_STATE.toString(), "MASTER");
+
+    Map<String, String> stateSlave = new TreeMap<String, String>();
+    stateSlave.put(CurrentStateProperty.CURRENT_STATE.toString(), "SLAVE");
+
+    for (int i = 0; i < nodeNames.length; i++)
+    {
+      currentStates.put(nodeNames[i], new ArrayList<ZNRecord>());
+      currentStates2.put(nodeNames[i], new TreeMap<String, ZNRecord>());
+      for (int j = 0; j < dbNames.length; j++)
+      {
+        ZNRecord dbPartitionState = new ZNRecord(dbNames[j]);
+        currentStates2.get(nodeNames[i]).put(dbNames[j], dbPartitionState);
+      }
+    }
+
+    Random rand = new Random(1234);
+    for (int j = 0; j < dbNames.length; j++)
+    {
+      int partition = partitions[j];
+      ArrayList<Integer> randomArray = new ArrayList<Integer>();
+      for (int i = 0; i < partition; i++)
+      {
+        randomArray.add(i);
+      }
+      Collections.shuffle(randomArray, rand);
+
+      for (int i = 0; i < partition; i++)
+      {
+        stateMaster.put(Message.Attributes.RESOURCE_NAME.toString(),
+            dbNames[j]);
+        stateSlave.put(Message.Attributes.RESOURCE_NAME.toString(),
+            dbNames[j]);
+        int nodes = nodeNames.length;
+        int master = randomArray.get(i) % nodes;
+        String partitionName = dbNames[j] + ".partition-" + i;
+        Map<String, Map<String, String>> map = (currentStates2
+            .get(nodeNames[master]).get(dbNames[j]).getMapFields());
+        assert (map != null);
+        map.put(partitionName, stateMaster);
+
+        for (int k = 1; k <= replicas[j]; k++)
+        {
+          int slave = (master + k) % nodes;
+          Map<String, Map<String, String>> map2 = currentStates2
+              .get(nodeNames[slave]).get(dbNames[j]).getMapFields();
+
+          map2.put(partitionName, stateSlave);
+        }
+      }
+    }
+    for (String nodeName : currentStates2.keySet())
+    {
+      Map<String, ZNRecord> recMap = currentStates2.get(nodeName);
+      List<ZNRecord> list = new ArrayList<ZNRecord>();
+      for (ZNRecord rec : recMap.values())
+      {
+        list.add(rec);
+      }
+      currentStates.put(nodeName, list);
+    }
+    return currentStates;
+  }
+
+  private void verify(Map<String, List<ZNRecord>> currentStates,
+      Map<String, Map<String, Set<String>>> routingMap)
+  {
+    int counter1 = 0;
+    int counter2 = 0;
+    for (String nodeName : currentStates.keySet())
+    {
+      List<ZNRecord> dbStateList = currentStates.get(nodeName);
+      for (ZNRecord dbState : dbStateList)
+      {
+        Map<String, Map<String, String>> dbStateMap = dbState.getMapFields();
+        for (String partitionName : dbStateMap.keySet())
+        {
+          Map<String, String> stateMap = dbStateMap
+              .get(partitionName);
+          String state = stateMap
+              .get(CurrentStateProperty.CURRENT_STATE.toString());
+          AssertJUnit.assertTrue(routingMap.get(partitionName).get(state)
+              .contains(nodeName));
+          counter1++;
+        }
+      }
+    }
+
+    for (String partitionName : routingMap.keySet())
+    {
+      Map<String, Set<String>> partitionState = routingMap.get(partitionName);
+      for (String state : partitionState.keySet())
+      {
+        counter2 += partitionState.get(state).size();
+      }
+    }
+    AssertJUnit.assertTrue(counter2 == counter1);
+  }
+
+  // public static void main(String[] args)
+  @Test ()
+  public void testInvocation() throws Exception
+  {
+    String[] dbNames = new String[3];
+    for (int i = 0; i < dbNames.length; i++)
+    {
+      dbNames[i] = "DB_" + i;
+    }
+    String[] nodeNames = new String[6];
+    for (int i = 0; i < nodeNames.length; i++)
+    {
+      nodeNames[i] = "LOCALHOST_100" + i;
+    }
+
+    int[] partitions = new int[dbNames.length];
+    for (int i = 0; i < partitions.length; i++)
+    {
+      partitions[i] = (i + 1) * 10;
+    }
+
+    int[] replicas = new int[dbNames.length];
+    for (int i = 0; i < replicas.length; i++)
+    {
+      replicas[i] = 3;
+    }
+    Map<String, List<ZNRecord>> currentStates = createCurrentStates(dbNames,
+        nodeNames, partitions, replicas);
+    ExternalViewGenerator provider = new ExternalViewGenerator();
+
+    List<ZNRecord> mockIdealStates = new ArrayList<ZNRecord>();
+    for (String dbName : dbNames)
+    {
+      ZNRecord rec = new ZNRecord(dbName);
+      mockIdealStates.add(rec);
+    }
+    List<ZNRecord> externalView = provider.computeExternalView(currentStates,
+        mockIdealStates);
+
+    Map<String, Map<String, Set<String>>> routingMap = provider
+        .getRouterMapFromExternalView(externalView);
+
+    verify(currentStates, routingMap);
+
+    /* write current state and external view to ZK */
+    /*
+     * String clusterName = "test-cluster44"; ZkClient zkClient = new
+     * ZkClient("localhost:2181"); zkClient.setZkSerializer(new
+     * ZNRecordSerializer());
+     *
+     * for(String nodeName : currentStates.keySet()) {
+     * if(zkClient.exists(CMUtil.getCurrentStatePath(clusterName, nodeName))) {
+     * zkClient.deleteRecursive(CMUtil.getCurrentStatePath(clusterName,
+     * nodeName)); } ZKUtil.createChildren(zkClient,CMUtil.getCurrentStatePath
+     * (clusterName, nodeName), currentStates.get(nodeName)); }
+     *
+     * //List<ZNRecord> externalView =
+     * ZKRoutingInfoProvider.computeExternalView(currentStates); String
+     * routingTablePath = CMUtil.getExternalViewPath(clusterName);
+     * if(zkClient.exists(routingTablePath)) {
+     * zkClient.deleteRecursive(routingTablePath); }
+     *
+     * ZKUtil.createChildren(zkClient, CMUtil.getExternalViewPath(clusterName),
+     * externalView);
+     */
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/TestZNRecord.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestZNRecord.java b/helix-core/src/test/java/org/apache/helix/TestZNRecord.java
new file mode 100644
index 0000000..2041da3
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/TestZNRecord.java
@@ -0,0 +1,128 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.ZNRecord;
+import org.testng.Assert;
+import org.testng.AssertJUnit;
+import org.testng.annotations.Test;
+
+
+public class TestZNRecord
+{
+
+  @Test
+  public void testEquals()
+  {
+    ZNRecord record1 = new ZNRecord("id");
+    record1.setSimpleField("k1", "v1");
+    record1.setMapField("m1", new HashMap<String, String>());
+    record1.getMapField("m1").put("k1", "v1");
+    record1.setListField("l1", new ArrayList<String>());
+    record1.getListField("l1").add("v1");
+    ZNRecord record2 = new ZNRecord("id");
+    record2.setSimpleField("k1", "v1");
+    record2.setMapField("m1", new HashMap<String, String>());
+    record2.getMapField("m1").put("k1", "v1");
+    record2.setListField("l1", new ArrayList<String>());
+    record2.getListField("l1").add("v1");
+
+    AssertJUnit.assertEquals(record1, record2);
+    record2.setSimpleField("k2", "v1");
+    AssertJUnit.assertNotSame(record1, record2);
+  }
+
+  @Test
+  public void testMerge()
+  {
+    ZNRecord record = new ZNRecord("record1");
+
+    // set simple field
+    record.setSimpleField("simpleKey1", "simpleValue1");
+
+    // set list field
+    List<String> list1 = new ArrayList<String>();
+    list1.add("list1Value1");
+    list1.add("list1Value2");
+    record.setListField("listKey1", list1);
+
+    // set map field
+    Map<String, String> map1 = new HashMap<String, String>();
+    map1.put("map1Key1", "map1Value1");
+    record.setMapField("mapKey1", map1);
+    // System.out.println(record);
+
+    ZNRecord updateRecord = new ZNRecord("updateRecord");
+
+    // set simple field
+    updateRecord.setSimpleField("simpleKey2", "simpleValue2");
+
+    // set list field
+    List<String> newList1 = new ArrayList<String>();
+    newList1.add("list1Value1");
+    newList1.add("list1Value2");
+    newList1.add("list1NewValue1");
+    newList1.add("list1NewValue2");
+    updateRecord.setListField("listKey1", newList1);
+
+    List<String> list2 = new ArrayList<String>();
+    list2.add("list2Value1");
+    list2.add("list2Value2");
+    updateRecord.setListField("listKey2", list2);
+
+    // set map field
+    Map<String, String> newMap1 = new HashMap<String, String>();
+    newMap1.put("map1NewKey1", "map1NewValue1");
+    updateRecord.setMapField("mapKey1", newMap1);
+
+    Map<String, String> map2 = new HashMap<String, String>();
+    map2.put("map2Key1", "map2Value1");
+    updateRecord.setMapField("mapKey2", map2);
+    // System.out.println(updateRecord);
+
+    record.merge(updateRecord);
+    // System.out.println(record);
+
+    ZNRecord expectRecord = new ZNRecord("record1");
+    expectRecord.setSimpleField("simpleKey1", "simpleValue1");
+    expectRecord.setSimpleField("simpleKey2", "simpleValue2");
+    List<String> expectList1 = new ArrayList<String>();
+    expectList1.add("list1Value1");
+    expectList1.add("list1Value2");
+    expectList1.add("list1Value1");
+    expectList1.add("list1Value2");
+    expectList1.add("list1NewValue1");
+    expectList1.add("list1NewValue2");
+    expectRecord.setListField("listKey1", expectList1);
+    List<String> expectList2 = new ArrayList<String>();
+    expectList2.add("list2Value1");
+    expectList2.add("list2Value2");
+    expectRecord.setListField("listKey2", expectList2);
+    Map<String, String> expectMap1 = new HashMap<String, String>();
+    expectMap1.put("map1Key1", "map1Value1");
+    expectMap1.put("map1NewKey1", "map1NewValue1");
+    expectRecord.setMapField("mapKey1", expectMap1);
+    Map<String, String> expectMap2 = new HashMap<String, String>();
+    expectMap2.put("map2Key1", "map2Value1");
+    expectRecord.setMapField("mapKey2", expectMap2);
+    Assert.assertEquals(record, expectRecord, "Should be equal.");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/TestZNRecordBucketizer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestZNRecordBucketizer.java b/helix-core/src/test/java/org/apache/helix/TestZNRecordBucketizer.java
new file mode 100644
index 0000000..3576177
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/TestZNRecordBucketizer.java
@@ -0,0 +1,41 @@
+package org.apache.helix;
+
+import org.apache.helix.ZNRecordBucketizer;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestZNRecordBucketizer
+{
+  @Test
+  public void testZNRecordBucketizer()
+  {
+    final int bucketSize = 3;
+    ZNRecordBucketizer bucketizer = new ZNRecordBucketizer(bucketSize);
+    String[] partitionNames =
+        { "TestDB_0", "TestDB_1", "TestDB_2", "TestDB_3", "TestDB_4" };
+    for (int i = 0; i < partitionNames.length; i++)
+    {
+      String partitionName = partitionNames[i];
+      String bucketName = bucketizer.getBucketName(partitionName);
+      int startBucketNb = i / bucketSize * bucketSize;
+      int endBucketNb = startBucketNb + bucketSize - 1;
+      String expectBucketName = "TestDB_p" + startBucketNb + "-p" + endBucketNb;
+      System.out.println("Expect: " + expectBucketName + ", actual: " + bucketName);
+      Assert.assertEquals(expectBucketName, bucketName);
+      
+    }
+    
+//    ZNRecord record = new ZNRecord("TestDB");
+//    record.setSimpleField("key0", "value0");
+//    record.setSimpleField("key1", "value1");
+//    record.setListField("TestDB_0", Arrays.asList("localhost_00", "localhost_01"));
+//    record.setListField("TestDB_1", Arrays.asList("localhost_10", "localhost_11"));
+//    record.setListField("TestDB_2", Arrays.asList("localhost_20", "localhost_21"));
+//    record.setListField("TestDB_3", Arrays.asList("localhost_30", "localhost_31"));
+//    record.setListField("TestDB_4", Arrays.asList("localhost_40", "localhost_41"));
+//    
+//    System.out.println(bucketizer.bucketize(record));
+    
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/TestZkClientWrapper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestZkClientWrapper.java b/helix-core/src/test/java/org/apache/helix/TestZkClientWrapper.java
new file mode 100644
index 0000000..9b3fcdb
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/TestZkClientWrapper.java
@@ -0,0 +1,121 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import org.I0Itec.zkclient.IZkStateListener;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestZkClientWrapper extends ZkUnitTestBase
+{
+	ZkClient _zkClient;
+
+	@BeforeClass
+	public void beforeClass()
+	{
+		_zkClient = new ZkClient(ZK_ADDR);
+		_zkClient.setZkSerializer(new ZNRecordSerializer());
+	}
+
+	@AfterClass
+	public void afterClass()
+	{
+		_zkClient.close();
+	}
+
+	@Test ()
+	void testGetStat()
+	{
+		String path = "/tmp/getStatTest";
+		_zkClient.deleteRecursive(path);
+
+		Stat stat, newStat;
+		stat = _zkClient.getStat(path);
+		AssertJUnit.assertNull(stat);
+		_zkClient.createPersistent(path, true);
+
+		stat = _zkClient.getStat(path);
+		AssertJUnit.assertNotNull(stat);
+
+		newStat = _zkClient.getStat(path);
+		AssertJUnit.assertEquals(stat, newStat);
+
+		_zkClient.writeData(path, new ZNRecord("Test"));
+		newStat = _zkClient.getStat(path);
+		AssertJUnit.assertNotSame(stat, newStat);
+	}
+
+  @Test ()
+	void testSessioExpire()
+	{
+		IZkStateListener listener = new IZkStateListener()
+		{
+
+			@Override
+			public void handleStateChanged(KeeperState state) throws Exception
+			{
+				System.out.println("In Old connection New state " + state);
+			}
+
+			@Override
+			public void handleNewSession() throws Exception
+			{
+				System.out.println("In Old connection New session");
+			}
+		};
+		_zkClient.subscribeStateChanges(listener);
+		ZkConnection connection = ((ZkConnection) _zkClient.getConnection());
+		ZooKeeper zookeeper = connection.getZookeeper();
+		System.out.println("old sessionId= " + zookeeper.getSessionId());
+		try
+		{
+			Watcher watcher = new Watcher(){
+				@Override
+        public void process(WatchedEvent event)
+        {
+					System.out.println("In New connection In process event:"+ event);
+        }
+			};
+			ZooKeeper newZookeeper = new ZooKeeper(connection.getServers(),
+			    zookeeper.getSessionTimeout(), watcher , zookeeper.getSessionId(),
+			    zookeeper.getSessionPasswd());
+			Thread.sleep(3000);
+			System.out.println("New sessionId= " + newZookeeper.getSessionId());
+			Thread.sleep(3000);
+			newZookeeper.close();
+			Thread.sleep(10000);
+			connection = ((ZkConnection) _zkClient.getConnection());
+			zookeeper = connection.getZookeeper();
+			System.out.println("After session expiry sessionId= " + zookeeper.getSessionId());
+		} catch (Exception e)
+		{
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/TestZkConnectionCount.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestZkConnectionCount.java b/helix-core/src/test/java/org/apache/helix/TestZkConnectionCount.java
new file mode 100644
index 0000000..fdf7887
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/TestZkConnectionCount.java
@@ -0,0 +1,46 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import org.apache.log4j.Logger;
+
+// TODO fix this test
+public class TestZkConnectionCount extends ZkUnitTestBase
+{
+  private static Logger LOG = Logger.getLogger(TestZkConnectionCount.class);
+
+  // @Test ()
+  public void testZkConnectionCount()
+  {
+
+//    ZkClient zkClient;
+//    int nrOfConn = ZkClient.getNumberOfConnections();
+//    System.out.println("Number of zk connections made " + nrOfConn);
+//
+//    ZkConnection zkConn = new ZkConnection(ZK_ADDR);
+//
+//    zkClient = new ZkClient(zkConn);
+//    AssertJUnit.assertEquals(nrOfConn + 1, ZkClient.getNumberOfConnections());
+//
+//    zkClient = new ZkClient(ZK_ADDR);
+//    AssertJUnit.assertEquals(nrOfConn + 2, ZkClient.getNumberOfConnections());
+//
+//    zkClient = ZKClientPool.getZkClient(ZK_ADDR);
+//    AssertJUnit.assertEquals(nrOfConn + 2, ZkClient.getNumberOfConnections());
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/TestZnodeModify.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestZnodeModify.java b/helix-core/src/test/java/org/apache/helix/TestZnodeModify.java
new file mode 100644
index 0000000..fa6b76f
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/TestZnodeModify.java
@@ -0,0 +1,278 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.IdealState.IdealStateModeProperty;
+import org.apache.helix.model.IdealState.IdealStateProperty;
+import org.apache.helix.tools.TestCommand;
+import org.apache.helix.tools.TestExecutor;
+import org.apache.helix.tools.TestTrigger;
+import org.apache.helix.tools.ZnodeOpArg;
+import org.apache.helix.tools.TestCommand.CommandType;
+import org.apache.helix.tools.TestExecutor.ZnodePropertyType;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestZnodeModify extends ZkUnitTestBase
+{
+  private static Logger logger = Logger.getLogger(TestZnodeModify.class);
+  private final String PREFIX = "/" + getShortClassName();
+
+  @Test ()
+  public void testBasic() throws Exception
+  {
+    logger.info("RUN: " + new Date(System.currentTimeMillis()));
+    List<TestCommand> commandList = new ArrayList<TestCommand>();
+
+    // test case for the basic flow, no timeout, no data trigger
+    String pathChild1 = PREFIX + "/basic_child1";
+    String pathChild2 = PREFIX + "/basic_child2";
+
+    TestCommand command;
+    ZnodeOpArg arg;
+    arg = new ZnodeOpArg(pathChild1, ZnodePropertyType.SIMPLE, "+", "key1", "simpleValue1");
+    command = new TestCommand(CommandType.MODIFY, arg);
+    commandList.add(command);
+
+    List<String> list = new ArrayList<String>();
+    list.add("listValue1");
+    list.add("listValue2");
+    arg = new ZnodeOpArg(pathChild1, ZnodePropertyType.LIST, "+", "key2", list);
+    command = new TestCommand(CommandType.MODIFY, arg);
+    commandList.add(command);
+
+    ZNRecord record = getExampleZNRecord();
+    arg = new ZnodeOpArg(pathChild2, ZnodePropertyType.ZNODE, "+", record);
+    command = new TestCommand(CommandType.MODIFY, arg);
+    commandList.add(command);
+
+    arg = new ZnodeOpArg(pathChild1, ZnodePropertyType.SIMPLE, "==", "key1");
+    command = new TestCommand(CommandType.VERIFY, new TestTrigger(1000, 0, "simpleValue1"), arg);
+    commandList.add(command);
+
+    arg = new ZnodeOpArg(pathChild1, ZnodePropertyType.LIST, "==", "key2");
+    command = new TestCommand(CommandType.VERIFY, new TestTrigger(1000, 0, list), arg);
+    commandList.add(command);
+
+    arg = new ZnodeOpArg(pathChild2, ZnodePropertyType.ZNODE, "==");
+    command = new TestCommand(CommandType.VERIFY, new TestTrigger(1000, 0, record), arg);
+    commandList.add(command);
+
+    Map<TestCommand, Boolean> results = TestExecutor.executeTest(commandList, ZK_ADDR);
+    for (Map.Entry<TestCommand, Boolean> entry : results.entrySet())
+    {
+      Assert.assertTrue(entry.getValue());
+    }
+
+    logger.info("END: " + new Date(System.currentTimeMillis()));
+  }
+
+  @Test ()
+  public void testDataTrigger() throws Exception
+  {
+    logger.info("RUN: " + new Date(System.currentTimeMillis()));
+    List<TestCommand> commandList = new ArrayList<TestCommand>();
+
+    // test case for data trigger, no timeout
+    String pathChild1 = PREFIX + "/data_trigger_child1";
+    String pathChild2 = PREFIX + "/data_trigger_child2";
+
+    ZnodeOpArg arg;
+    TestCommand command;
+
+    ZnodeOpArg arg1 = new ZnodeOpArg(pathChild1, ZnodePropertyType.SIMPLE, "+", "key1", "simpleValue1-new");
+    TestCommand command1 = new TestCommand(CommandType.MODIFY, new TestTrigger(0, 0, "simpleValue1"), arg1);
+    commandList.add(command1);
+
+    ZNRecord record = getExampleZNRecord();
+    ZNRecord recordNew = new ZNRecord(record);
+    recordNew.setSimpleField(IdealStateProperty.IDEAL_STATE_MODE.toString(), IdealStateModeProperty.AUTO.toString());
+    arg = new ZnodeOpArg(pathChild2, ZnodePropertyType.ZNODE, "+", recordNew);
+    command = new TestCommand(CommandType.MODIFY, new TestTrigger(0, 3000, record), arg);
+    commandList.add(command);
+
+    arg = new ZnodeOpArg(pathChild2, ZnodePropertyType.ZNODE, "+", record);
+    command = new TestCommand(CommandType.MODIFY, new TestTrigger(1000), arg);
+    commandList.add(command);
+
+    arg = new ZnodeOpArg(pathChild1, ZnodePropertyType.SIMPLE, "!=", "key1");
+    command = new TestCommand(CommandType.VERIFY, new TestTrigger(3100, 0, "simpleValue1-new"), arg);
+    commandList.add(command);
+
+    arg = new ZnodeOpArg(pathChild2, ZnodePropertyType.ZNODE, "==");
+    command = new TestCommand(CommandType.VERIFY, new TestTrigger(3100, 0, recordNew), arg);
+    commandList.add(command);
+
+    Map<TestCommand, Boolean> results = TestExecutor.executeTest(commandList, ZK_ADDR);
+
+    boolean result = results.remove(command1).booleanValue();
+    AssertJUnit.assertFalse(result);
+    for (Map.Entry<TestCommand, Boolean> entry : results.entrySet())
+    {
+      Assert.assertTrue(entry.getValue());
+    }
+
+    logger.info("END: " + new Date(System.currentTimeMillis()));
+  }
+
+  @Test ()
+  public void testTimeout() throws Exception
+  {
+    logger.info("RUN: " + new Date(System.currentTimeMillis()));
+    List<TestCommand> commandList = new ArrayList<TestCommand>();
+
+    // test case for timeout, no data trigger
+    String pathChild1 = PREFIX + "/timeout_child1";
+    String pathChild2 = PREFIX + "/timeout_child2";
+
+    ZnodeOpArg arg1 = new ZnodeOpArg(pathChild1, ZnodePropertyType.SIMPLE, "+", "key1", "simpleValue1-new");
+    TestCommand command1 = new TestCommand(CommandType.MODIFY, new TestTrigger(0, 1000, "simpleValue1"), arg1);
+    commandList.add(command1);
+
+    ZNRecord record = getExampleZNRecord();
+    ZNRecord recordNew = new ZNRecord(record);
+    recordNew.setSimpleField(IdealStateProperty.IDEAL_STATE_MODE.toString(), IdealStateModeProperty.AUTO.toString());
+    arg1 = new ZnodeOpArg(pathChild2, ZnodePropertyType.ZNODE, "+", recordNew);
+    command1 = new TestCommand(CommandType.MODIFY, new TestTrigger(0, 500, record), arg1);
+    commandList.add(command1);
+
+    arg1 = new ZnodeOpArg(pathChild1, ZnodePropertyType.SIMPLE, "==", "key1");
+    command1 = new TestCommand(CommandType.VERIFY, new TestTrigger(1000, 500, "simpleValue1-new"), arg1);
+    commandList.add(command1);
+
+    arg1 = new ZnodeOpArg(pathChild1, ZnodePropertyType.ZNODE, "==");
+    command1 = new TestCommand(CommandType.VERIFY, new TestTrigger(1000, 500, recordNew), arg1);
+    commandList.add(command1);
+
+    Map<TestCommand, Boolean> results = TestExecutor.executeTest(commandList, ZK_ADDR);
+    for (Map.Entry<TestCommand, Boolean> entry : results.entrySet())
+    {
+      Assert.assertFalse(entry.getValue());
+    }
+
+    logger.info("END: " + new Date(System.currentTimeMillis()));
+  }
+
+
+  @Test ()
+  public void testDataTriggerWithTimeout() throws Exception
+  {
+    logger.info("RUN: " + new Date(System.currentTimeMillis()));
+    List<TestCommand> commandList = new ArrayList<TestCommand>();
+
+    // test case for data trigger with timeout
+    final String pathChild1 = PREFIX + "/dataTriggerWithTimeout_child1";
+
+    final ZNRecord record = getExampleZNRecord();
+    ZNRecord recordNew = new ZNRecord(record);
+    recordNew.setSimpleField(IdealStateProperty.IDEAL_STATE_MODE.toString(), IdealStateModeProperty.AUTO.toString());
+    ZnodeOpArg arg1 = new ZnodeOpArg(pathChild1, ZnodePropertyType.ZNODE, "+", recordNew);
+    TestCommand command1 = new TestCommand(CommandType.MODIFY, new TestTrigger(0, 8000, record), arg1);
+    commandList.add(command1);
+
+    arg1 = new ZnodeOpArg(pathChild1, ZnodePropertyType.ZNODE, "==");
+    command1 = new TestCommand(CommandType.VERIFY, new TestTrigger(9000, 500, recordNew), arg1);
+    commandList.add(command1);
+
+    // start a separate thread to change znode at pathChild1
+    new Thread()
+    {
+      @Override
+      public void run()
+      {
+        try
+        {
+          Thread.sleep(3000);
+          final ZkClient zkClient = new ZkClient(ZK_ADDR);
+          zkClient.setZkSerializer(new ZNRecordSerializer());
+          zkClient.createPersistent(pathChild1, true);
+          zkClient.writeData(pathChild1, record);
+        }
+        catch (InterruptedException e)
+        {
+          // TODO Auto-generated catch block
+          e.printStackTrace();
+        }
+      }
+    }.start();
+
+    Map<TestCommand, Boolean> results = TestExecutor.executeTest(commandList, ZK_ADDR);
+    for (Map.Entry<TestCommand, Boolean> entry : results.entrySet())
+    {
+      Assert.assertTrue(entry.getValue());
+      // System.out.println(entry.getValue() + ":" + entry.getKey());
+    }
+
+  }
+
+  ZkClient _zkClient;
+
+  @BeforeClass ()
+  public void beforeClass()
+  {
+    System.out.println("START " + getShortClassName() + " at "
+        + new Date(System.currentTimeMillis()));
+
+    _zkClient = new ZkClient(ZK_ADDR);
+    _zkClient.setZkSerializer(new ZNRecordSerializer());
+    if (_zkClient.exists(PREFIX))
+    {
+      _zkClient.deleteRecursive(PREFIX);
+    }
+
+  }
+
+  @AfterClass
+  public void afterClass()
+  {
+  	_zkClient.close();
+
+    System.out.println("END " + getShortClassName() + " at "
+        + new Date(System.currentTimeMillis()));
+
+  }
+
+  private ZNRecord getExampleZNRecord()
+  {
+    ZNRecord record = new ZNRecord("TestDB");
+    record.setSimpleField(IdealStateProperty.IDEAL_STATE_MODE.toString(), IdealStateModeProperty.CUSTOMIZED.toString());
+    Map<String, String> map = new HashMap<String, String>();
+    map.put("localhost_12918", "MASTER");
+    map.put("localhost_12919", "SLAVE");
+    record.setMapField("TestDB_0", map);
+
+    List<String> list = new ArrayList<String>();
+    list.add("localhost_12918");
+    list.add("localhost_12919");
+    record.setListField("TestDB_0", list);
+    return record;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java b/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
new file mode 100644
index 0000000..5839f4f
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
@@ -0,0 +1,175 @@
+package org.apache.helix;
+
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import org.I0Itec.zkclient.IZkStateListener;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.helix.InstanceType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.ExternalView;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooKeeper.States;
+
+
+public class ZkTestHelper
+{
+  private static Logger LOG = Logger.getLogger(ZkTestHelper.class);
+
+  static
+  {
+    // Logger.getRootLogger().setLevel(Level.DEBUG);
+  }
+
+  // zkClusterManager that exposes zkclient
+  public static class TestZkHelixManager extends ZKHelixManager
+  {
+
+    public TestZkHelixManager(String clusterName,
+                              String instanceName,
+                              InstanceType instanceType,
+                              String zkConnectString) throws Exception
+    {
+      super(clusterName, instanceName, instanceType, zkConnectString);
+      // TODO Auto-generated constructor stub
+    }
+
+    public ZkClient getZkClient()
+    {
+      return _zkClient;
+    }
+
+  }
+
+  public static void expireSession(final ZkClient zkClient) throws Exception
+  {
+    final CountDownLatch waitExpire = new CountDownLatch(1);
+
+    IZkStateListener listener = new IZkStateListener()
+    {
+      @Override
+      public void handleStateChanged(KeeperState state) throws Exception
+      {
+        // System.err.println("handleStateChanged. state: " + state);
+      }
+
+      @Override
+      public void handleNewSession() throws Exception
+      {
+        // make sure zkclient is connected again
+        zkClient.waitUntilConnected();
+
+        ZkConnection connection = ((ZkConnection) zkClient.getConnection());
+        ZooKeeper curZookeeper = connection.getZookeeper();
+
+        LOG.info("handleNewSession. sessionId: "
+            + Long.toHexString(curZookeeper.getSessionId()));
+        waitExpire.countDown();
+      }
+    };
+
+    zkClient.subscribeStateChanges(listener);
+
+    ZkConnection connection = ((ZkConnection) zkClient.getConnection());
+    ZooKeeper curZookeeper = connection.getZookeeper();
+    LOG.info("Before expiry. sessionId: " + Long.toHexString(curZookeeper.getSessionId()));
+
+    Watcher watcher = new Watcher()
+    {
+      @Override
+      public void process(WatchedEvent event)
+      {
+        LOG.info("Process watchEvent: " + event);
+      }
+    };
+
+    final ZooKeeper dupZookeeper =
+        new ZooKeeper(connection.getServers(),
+                      curZookeeper.getSessionTimeout(),
+                      watcher,
+                      curZookeeper.getSessionId(),
+                      curZookeeper.getSessionPasswd());
+    // wait until connected, then close
+    while (dupZookeeper.getState() != States.CONNECTED)
+    {
+      Thread.sleep(10);
+    }
+    dupZookeeper.close();
+
+    // make sure session expiry really happens
+    waitExpire.await();
+    zkClient.unsubscribeStateChanges(listener);
+
+    connection = (ZkConnection) zkClient.getConnection();
+    curZookeeper = connection.getZookeeper();
+
+    // System.err.println("zk: " + oldZookeeper);
+    LOG.info("After expiry. sessionId: " + Long.toHexString(curZookeeper.getSessionId()));
+  }
+
+  /*
+   * stateMap: partition->instance->state
+   */
+  public static boolean verifyState(ZkClient zkclient,
+                                    String clusterName,
+                                    String resourceName,
+                                    Map<String, Map<String, String>> expectStateMap,
+                                    String op)
+  {
+    boolean result = true;
+    ZkBaseDataAccessor<ZNRecord> baseAccessor =
+        new ZkBaseDataAccessor<ZNRecord>(zkclient);
+    ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
+    Builder keyBuilder = accessor.keyBuilder();
+
+    ExternalView extView = accessor.getProperty(keyBuilder.externalView(resourceName));
+    Map<String, Map<String, String>> actualStateMap = extView.getRecord().getMapFields();
+    for (String partition : actualStateMap.keySet())
+    {
+      for (String expectPartiton : expectStateMap.keySet())
+      {
+        if (!partition.matches(expectPartiton))
+        {
+          continue;
+        }
+
+        Map<String, String> actualInstanceStateMap = actualStateMap.get(partition);
+        Map<String, String> expectInstanceStateMap = expectStateMap.get(expectPartiton);
+        for (String instance : actualInstanceStateMap.keySet())
+        {
+          for (String expectInstance : expectStateMap.get(expectPartiton).keySet())
+          {
+            if (!instance.matches(expectInstance))
+            {
+              continue;
+            }
+
+            String actualState = actualInstanceStateMap.get(instance);
+            String expectState = expectInstanceStateMap.get(expectInstance);
+            boolean equals = expectState.equals(actualState);
+            if (op.equals("==") && !equals || op.equals("!=") && equals)
+            {
+              System.out.println(partition + "/" + instance
+                  + " state mismatch. actual state: " + actualState + ", but expect: "
+                  + expectState + ", op: " + op);
+              result = false;
+            }
+
+          }
+        }
+
+      }
+    }
+    return result;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java b/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java
new file mode 100644
index 0000000..09d48a9
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java
@@ -0,0 +1,433 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.I0Itec.zkclient.IZkStateListener;
+import org.I0Itec.zkclient.ZkConnection;
+import org.I0Itec.zkclient.ZkServer;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ConfigScope.ConfigScopeProperty;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.pipeline.Pipeline;
+import org.apache.helix.controller.pipeline.Stage;
+import org.apache.helix.controller.pipeline.StageContext;
+import org.apache.helix.controller.stages.ClusterEvent;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.model.IdealState.IdealStateModeProperty;
+import org.apache.helix.model.Message.Attributes;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.tools.StateModelConfigGenerator;
+import org.apache.helix.util.HelixUtil;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooKeeper;
+import org.testng.Assert;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeSuite;
+
+
+// TODO merge code with ZkIntegrationTestBase
+public class ZkUnitTestBase
+{
+  private static Logger LOG = Logger.getLogger(ZkUnitTestBase.class);
+  protected static ZkServer _zkServer = null;
+  protected static ZkClient _gZkClient;
+
+  public static final String ZK_ADDR = "localhost:2185";
+  protected static final String CLUSTER_PREFIX = "CLUSTER";
+  protected static final String CONTROLLER_CLUSTER_PREFIX = "CONTROLLER_CLUSTER";
+
+  @BeforeSuite(alwaysRun = true)
+  public void beforeSuite() throws Exception
+  {
+    _zkServer = TestHelper.startZkSever(ZK_ADDR);
+    AssertJUnit.assertTrue(_zkServer != null);
+
+    // System.out.println("Number of open zkClient before ZkUnitTests: "
+    // + ZkClient.getNumberOfConnections());
+
+    _gZkClient = new ZkClient(ZK_ADDR);
+    _gZkClient.setZkSerializer(new ZNRecordSerializer());
+  }
+
+  @AfterSuite(alwaysRun = true)
+  public void afterTest()
+  {
+    TestHelper.stopZkServer(_zkServer);
+    _zkServer = null;
+    _gZkClient.close();
+
+    // System.out.println("Number of open zkClient after ZkUnitTests: "
+    // + ZkClient.getNumberOfConnections());
+
+  }
+
+  protected String getShortClassName()
+  {
+    String className = this.getClass().getName();
+    return className.substring(className.lastIndexOf('.') + 1);
+  }
+
+  protected String getCurrentLeader(ZkClient zkClient, String clusterName)
+  {
+    String leaderPath =
+        HelixUtil.getControllerPropertyPath(clusterName, PropertyType.LEADER);
+    ZNRecord leaderRecord = zkClient.<ZNRecord> readData(leaderPath);
+    if (leaderRecord == null)
+    {
+      return null;
+    }
+
+    String leader = leaderRecord.getSimpleField(PropertyType.LEADER.toString());
+    return leader;
+  }
+
+  protected void stopCurrentLeader(ZkClient zkClient,
+                                   String clusterName,
+                                   Map<String, Thread> threadMap,
+                                   Map<String, HelixManager> managerMap)
+  {
+    String leader = getCurrentLeader(zkClient, clusterName);
+    Assert.assertTrue(leader != null);
+    System.out.println("stop leader:" + leader + " in " + clusterName);
+    Assert.assertTrue(leader != null);
+
+    HelixManager manager = managerMap.remove(leader);
+    Assert.assertTrue(manager != null);
+    manager.disconnect();
+
+    Thread thread = threadMap.remove(leader);
+    Assert.assertTrue(thread != null);
+    thread.interrupt();
+
+    boolean isNewLeaderElected = false;
+    try
+    {
+      // Thread.sleep(2000);
+      for (int i = 0; i < 5; i++)
+      {
+        Thread.sleep(1000);
+        String newLeader = getCurrentLeader(zkClient, clusterName);
+        if (!newLeader.equals(leader))
+        {
+          isNewLeaderElected = true;
+          System.out.println("new leader elected: " + newLeader + " in " + clusterName);
+          break;
+        }
+      }
+    }
+    catch (InterruptedException e)
+    {
+      e.printStackTrace();
+    }
+    if (isNewLeaderElected == false)
+    {
+      System.out.println("fail to elect a new leader elected in " + clusterName);
+    }
+    AssertJUnit.assertTrue(isNewLeaderElected);
+  }
+
+  public void verifyInstance(ZkClient zkClient,
+                             String clusterName,
+                             String instance,
+                             boolean wantExists)
+  {
+    // String instanceConfigsPath = HelixUtil.getConfigPath(clusterName);
+    String instanceConfigsPath =
+        PropertyPathConfig.getPath(PropertyType.CONFIGS,
+                                   clusterName,
+                                   ConfigScopeProperty.PARTICIPANT.toString());
+    String instanceConfigPath = instanceConfigsPath + "/" + instance;
+    String instancePath = HelixUtil.getInstancePath(clusterName, instance);
+    AssertJUnit.assertEquals(wantExists, zkClient.exists(instanceConfigPath));
+    AssertJUnit.assertEquals(wantExists, zkClient.exists(instancePath));
+  }
+
+  public void verifyResource(ZkClient zkClient,
+                             String clusterName,
+                             String resource,
+                             boolean wantExists)
+  {
+    String resourcePath = HelixUtil.getIdealStatePath(clusterName) + "/" + resource;
+    AssertJUnit.assertEquals(wantExists, zkClient.exists(resourcePath));
+  }
+
+  public void verifyEnabled(ZkClient zkClient,
+                            String clusterName,
+                            String instance,
+                            boolean wantEnabled)
+  {
+    ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(zkClient));
+    Builder keyBuilder = accessor.keyBuilder();
+
+    InstanceConfig config = accessor.getProperty(keyBuilder.instanceConfig(instance));
+    AssertJUnit.assertEquals(wantEnabled, config.getInstanceEnabled());
+  }
+
+  public void verifyReplication(ZkClient zkClient,
+                                String clusterName,
+                                String resource,
+                                int repl)
+  {
+    ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(zkClient));
+    Builder keyBuilder = accessor.keyBuilder();
+
+    IdealState idealState = accessor.getProperty(keyBuilder.idealStates(resource));
+    for (String partitionName : idealState.getPartitionSet())
+    {
+      if (idealState.getIdealStateMode() == IdealStateModeProperty.AUTO)
+      {
+        AssertJUnit.assertEquals(repl, idealState.getPreferenceList(partitionName).size());
+      }
+      else if (idealState.getIdealStateMode() == IdealStateModeProperty.CUSTOMIZED)
+      {
+        AssertJUnit.assertEquals(repl, idealState.getInstanceStateMap(partitionName)
+                                                 .size());
+      }
+    }
+  }
+
+  protected void simulateSessionExpiry(ZkConnection zkConnection) throws IOException,
+      InterruptedException
+  {
+    ZooKeeper oldZookeeper = zkConnection.getZookeeper();
+    LOG.info("Old sessionId = " + oldZookeeper.getSessionId());
+
+    Watcher watcher = new Watcher()
+    {
+      @Override
+      public void process(WatchedEvent event)
+      {
+        LOG.info("In New connection, process event:" + event);
+      }
+    };
+
+    ZooKeeper newZookeeper =
+        new ZooKeeper(zkConnection.getServers(),
+                      oldZookeeper.getSessionTimeout(),
+                      watcher,
+                      oldZookeeper.getSessionId(),
+                      oldZookeeper.getSessionPasswd());
+    LOG.info("New sessionId = " + newZookeeper.getSessionId());
+    // Thread.sleep(3000);
+    newZookeeper.close();
+    Thread.sleep(10000);
+    oldZookeeper = zkConnection.getZookeeper();
+    LOG.info("After session expiry sessionId = " + oldZookeeper.getSessionId());
+  }
+
+  protected void simulateSessionExpiry(ZkClient zkClient) throws IOException,
+      InterruptedException
+  {
+    IZkStateListener listener = new IZkStateListener()
+    {
+      @Override
+      public void handleStateChanged(KeeperState state) throws Exception
+      {
+        LOG.info("In Old connection, state changed:" + state);
+      }
+
+      @Override
+      public void handleNewSession() throws Exception
+      {
+        LOG.info("In Old connection, new session");
+      }
+    };
+    zkClient.subscribeStateChanges(listener);
+    ZkConnection connection = ((ZkConnection) zkClient.getConnection());
+    ZooKeeper oldZookeeper = connection.getZookeeper();
+    LOG.info("Old sessionId = " + oldZookeeper.getSessionId());
+
+    Watcher watcher = new Watcher()
+    {
+      @Override
+      public void process(WatchedEvent event)
+      {
+        LOG.info("In New connection, process event:" + event);
+      }
+    };
+
+    ZooKeeper newZookeeper =
+        new ZooKeeper(connection.getServers(),
+                      oldZookeeper.getSessionTimeout(),
+                      watcher,
+                      oldZookeeper.getSessionId(),
+                      oldZookeeper.getSessionPasswd());
+    LOG.info("New sessionId = " + newZookeeper.getSessionId());
+    // Thread.sleep(3000);
+    newZookeeper.close();
+    Thread.sleep(10000);
+    connection = (ZkConnection) zkClient.getConnection();
+    oldZookeeper = connection.getZookeeper();
+    LOG.info("After session expiry sessionId = " + oldZookeeper.getSessionId());
+  }
+
+  protected void setupStateModel(String clusterName)
+  {
+    ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
+    Builder keyBuilder = accessor.keyBuilder();
+
+    StateModelConfigGenerator generator = new StateModelConfigGenerator();
+    StateModelDefinition masterSlave =
+        new StateModelDefinition(generator.generateConfigForMasterSlave());
+    accessor.setProperty(keyBuilder.stateModelDef(masterSlave.getId()), masterSlave);
+
+    StateModelDefinition leaderStandby =
+        new StateModelDefinition(generator.generateConfigForLeaderStandby());
+    accessor.setProperty(keyBuilder.stateModelDef(leaderStandby.getId()), leaderStandby);
+
+    StateModelDefinition onlineOffline =
+        new StateModelDefinition(generator.generateConfigForOnlineOffline());
+    accessor.setProperty(keyBuilder.stateModelDef(onlineOffline.getId()), onlineOffline);
+
+  }
+
+  protected List<IdealState> setupIdealState(String clusterName,
+                                             int[] nodes,
+                                             String[] resources,
+                                             int partitions,
+                                             int replicas)
+  {
+    ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
+    Builder keyBuilder = accessor.keyBuilder();
+
+    List<IdealState> idealStates = new ArrayList<IdealState>();
+    List<String> instances = new ArrayList<String>();
+    for (int i : nodes)
+    {
+      instances.add("localhost_" + i);
+    }
+
+    for (String resourceName : resources)
+    {
+      IdealState idealState = new IdealState(resourceName);
+      for (int p = 0; p < partitions; p++)
+      {
+        List<String> value = new ArrayList<String>();
+        for (int r = 0; r < replicas; r++)
+        {
+          int n = nodes[(p + r) % nodes.length];
+          value.add("localhost_" + n);
+        }
+        idealState.getRecord().setListField(resourceName + "_" + p, value);
+      }
+
+      idealState.setReplicas(Integer.toString(replicas));
+      idealState.setStateModelDefRef("MasterSlave");
+      idealState.setIdealStateMode(IdealStateModeProperty.AUTO.toString());
+      idealState.setNumPartitions(partitions);
+      idealStates.add(idealState);
+
+      // System.out.println(idealState);
+      accessor.setProperty(keyBuilder.idealStates(resourceName), idealState);
+    }
+    return idealStates;
+  }
+
+  protected void setupLiveInstances(String clusterName, int[] liveInstances)
+  {
+    ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
+    Builder keyBuilder = accessor.keyBuilder();
+
+    for (int i = 0; i < liveInstances.length; i++)
+    {
+      String instance = "localhost_" + liveInstances[i];
+      LiveInstance liveInstance = new LiveInstance(instance);
+      liveInstance.setSessionId("session_" + liveInstances[i]);
+      liveInstance.setHelixVersion("0.0.0");
+      accessor.setProperty(keyBuilder.liveInstance(instance), liveInstance);
+    }
+  }
+
+  protected void setupInstances(String clusterName, int[] instances)
+  {
+    HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+    for (int i = 0; i < instances.length; i++)
+    {
+      String instance = "localhost_" + instances[i];
+      InstanceConfig instanceConfig = new InstanceConfig(instance);
+      instanceConfig.setHostName("localhost");
+      instanceConfig.setPort("" + instances[i]);
+      instanceConfig.setInstanceEnabled(true);
+      admin.addInstance(clusterName, instanceConfig);
+    }
+  }
+
+  protected void runPipeline(ClusterEvent event, Pipeline pipeline)
+  {
+    try
+    {
+      pipeline.handle(event);
+      pipeline.finish();
+    }
+    catch (Exception e)
+    {
+      LOG.error("Exception while executing pipeline:" + pipeline
+          + ". Will not continue to next pipeline", e);
+    }
+  }
+
+  protected void runStage(ClusterEvent event, Stage stage) throws Exception
+  {
+    StageContext context = new StageContext();
+    stage.init(context);
+    stage.preProcess();
+    stage.process(event);
+    stage.postProcess();
+  }
+
+  protected Message createMessage(MessageType type,
+                                  String msgId,
+                                  String fromState,
+                                  String toState,
+                                  String resourceName,
+                                  String tgtName)
+  {
+    Message msg = new Message(type.toString(), msgId);
+    msg.setFromState(fromState);
+    msg.setToState(toState);
+    msg.getRecord().setSimpleField(Attributes.RESOURCE_NAME.toString(), resourceName);
+    msg.setTgtName(tgtName);
+    return msg;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/alerts/TestAddAlerts.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/alerts/TestAddAlerts.java b/helix-core/src/test/java/org/apache/helix/alerts/TestAddAlerts.java
new file mode 100644
index 0000000..acce1f5
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/alerts/TestAddAlerts.java
@@ -0,0 +1,121 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.alerts;
+
+import java.util.Map;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.Mocks.MockManager;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.alerts.AlertParser;
+import org.apache.helix.alerts.AlertsHolder;
+import org.apache.helix.controller.stages.HealthDataCache;
+import org.testng.AssertJUnit;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class TestAddAlerts
+{
+
+  protected static final String CLUSTER_NAME = "TestCluster";
+
+  MockManager                   _helixManager;
+  AlertsHolder                  _alertsHolder;
+
+  public final String           EXP          = AlertParser.EXPRESSION_NAME;
+  public final String           CMP          = AlertParser.COMPARATOR_NAME;
+  public final String           CON          = AlertParser.CONSTANT_NAME;
+
+  @BeforeMethod()
+  public void setup()
+  {
+    _helixManager = new MockManager(CLUSTER_NAME);
+    _alertsHolder = new AlertsHolder(_helixManager, new HealthDataCache());
+  }
+
+  public boolean alertRecordContains(ZNRecord rec, String alertName)
+  {
+    Map<String, Map<String, String>> alerts = rec.getMapFields();
+    return alerts.containsKey(alertName);
+  }
+
+  public int alertsSize(ZNRecord rec)
+  {
+    Map<String, Map<String, String>> alerts = rec.getMapFields();
+    return alerts.size();
+  }
+
+  @Test()
+  public void testAddAlert() throws Exception
+  {
+    String alert =
+        EXP + "(accumulate()(dbFoo.partition10.latency))" + CMP + "(GREATER)" + CON
+            + "(10)";
+    _alertsHolder.addAlert(alert);
+    HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
+    Builder keyBuilder = accessor.keyBuilder();
+
+    ZNRecord rec = accessor.getProperty(keyBuilder.alerts()).getRecord();
+    System.out.println("alert: " + alert);
+    System.out.println("rec: " + rec.toString());
+    AssertJUnit.assertTrue(alertRecordContains(rec, alert));
+    AssertJUnit.assertEquals(1, alertsSize(rec));
+  }
+
+  @Test()
+  public void testAddTwoAlerts() throws Exception
+  {
+    String alert1 =
+        EXP + "(accumulate()(dbFoo.partition10.latency))" + CMP + "(GREATER)" + CON
+            + "(10)";
+    String alert2 =
+        EXP + "(accumulate()(dbFoo.partition10.latency))" + CMP + "(GREATER)" + CON
+            + "(100)";
+    _alertsHolder.addAlert(alert1);
+    _alertsHolder.addAlert(alert2);
+
+    HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
+    Builder keyBuilder = accessor.keyBuilder();
+
+    ZNRecord rec = accessor.getProperty(keyBuilder.alerts()).getRecord();
+    // System.out.println("alert: "+alert1);
+    System.out.println("rec: " + rec.toString());
+    AssertJUnit.assertTrue(alertRecordContains(rec, alert1));
+    AssertJUnit.assertTrue(alertRecordContains(rec, alert2));
+    AssertJUnit.assertEquals(2, alertsSize(rec));
+  }
+
+  @Test(groups = { "unitTest" })
+  public void testAddTwoWildcardAlert() throws Exception
+  {
+    String alert1 =
+        EXP + "(accumulate()(dbFoo.partition*.put*))" + CMP + "(GREATER)" + CON + "(10)";
+    _alertsHolder.addAlert(alert1);
+    
+    HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
+    Builder keyBuilder = accessor.keyBuilder();
+
+    ZNRecord rec = accessor.getProperty(keyBuilder.alerts()).getRecord();
+    // System.out.println("alert: "+alert1);
+    System.out.println("rec: " + rec.toString());
+    AssertJUnit.assertTrue(alertRecordContains(rec, alert1));
+    AssertJUnit.assertEquals(1, alertsSize(rec));
+  }
+
+  // add 2 wildcard alert here
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/alerts/TestAddPersistentStats.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/alerts/TestAddPersistentStats.java b/helix-core/src/test/java/org/apache/helix/alerts/TestAddPersistentStats.java
new file mode 100644
index 0000000..be3b1d9
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/alerts/TestAddPersistentStats.java
@@ -0,0 +1,217 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.alerts;
+
+import java.util.Map;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.Mocks.MockManager;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.alerts.StatsHolder;
+import org.apache.helix.controller.stages.HealthDataCache;
+import org.testng.AssertJUnit;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class TestAddPersistentStats
+{
+
+  protected static final String CLUSTER_NAME = "TestCluster";
+
+  MockManager                   _helixManager;
+  StatsHolder                   _statsHolder;
+
+  @BeforeMethod(groups = { "unitTest" })
+  public void setup()
+  {
+    _helixManager = new MockManager(CLUSTER_NAME);
+    _statsHolder = new StatsHolder(_helixManager, new HealthDataCache());
+  }
+
+  public boolean statRecordContains(ZNRecord rec, String statName)
+  {
+    Map<String, Map<String, String>> stats = rec.getMapFields();
+    return stats.containsKey(statName);
+  }
+
+  public int statsSize(ZNRecord rec)
+  {
+    Map<String, Map<String, String>> stats = rec.getMapFields();
+    return stats.size();
+  }
+
+  @Test(groups = { "unitTest" })
+  public void testAddStat() throws Exception
+  {
+    String stat = "window(5)(dbFoo.partition10.latency)";
+    _statsHolder.addStat(stat);
+    _statsHolder.persistStats();
+
+    HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
+    Builder keyBuilder = accessor.keyBuilder();
+
+    ZNRecord rec = accessor.getProperty(keyBuilder.persistantStat()).getRecord();
+    System.out.println("rec: " + rec.toString());
+    AssertJUnit.assertTrue(statRecordContains(rec, stat));
+    AssertJUnit.assertEquals(1, statsSize(rec));
+  }
+
+  @Test(groups = { "unitTest" })
+  public void testAddTwoStats() throws Exception
+  {
+    String stat1 = "window(5)(dbFoo.partition10.latency)";
+    _statsHolder.addStat(stat1);
+    _statsHolder.persistStats();
+    String stat2 = "window(5)(dbFoo.partition11.latency)";
+    _statsHolder.addStat(stat2);
+    _statsHolder.persistStats();
+
+    HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
+    Builder keyBuilder = accessor.keyBuilder();
+
+    ZNRecord rec = accessor.getProperty(keyBuilder.persistantStat()).getRecord();
+
+    System.out.println("rec: " + rec.toString());
+    AssertJUnit.assertTrue(statRecordContains(rec, stat1));
+    AssertJUnit.assertTrue(statRecordContains(rec, stat2));
+    AssertJUnit.assertEquals(2, statsSize(rec));
+  }
+
+  @Test(groups = { "unitTest" })
+  public void testAddDuplicateStat() throws Exception
+  {
+    String stat = "window(5)(dbFoo.partition10.latency)";
+    _statsHolder.addStat(stat);
+    _statsHolder.addStat(stat);
+    _statsHolder.persistStats();
+
+    HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
+    Builder keyBuilder = accessor.keyBuilder();
+
+    ZNRecord rec = accessor.getProperty(keyBuilder.persistantStat()).getRecord();
+
+    System.out.println("rec: " + rec.toString());
+    AssertJUnit.assertTrue(statRecordContains(rec, stat));
+    AssertJUnit.assertEquals(1, statsSize(rec));
+  }
+
+  @Test(groups = { "unitTest" })
+  public void testAddPairOfStats() throws Exception
+  {
+    String exp = "accumulate()(dbFoo.partition10.latency, dbFoo.partition10.count)";
+    _statsHolder.addStat(exp);
+    _statsHolder.persistStats();
+
+    HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
+    Builder keyBuilder = accessor.keyBuilder();
+
+    ZNRecord rec = accessor.getProperty(keyBuilder.persistantStat()).getRecord();
+    System.out.println("rec: " + rec.toString());
+    AssertJUnit.assertTrue(statRecordContains(rec,
+                                              "accumulate()(dbFoo.partition10.latency)"));
+    AssertJUnit.assertTrue(statRecordContains(rec,
+                                              "accumulate()(dbFoo.partition10.count)"));
+    AssertJUnit.assertEquals(2, statsSize(rec));
+  }
+
+  @Test(groups = { "unitTest" })
+  public void testAddStatsWithOperators() throws Exception
+  {
+    String exp =
+        "accumulate()(dbFoo.partition10.latency, dbFoo.partition10.count)|EACH|ACCUMULATE|DIVIDE";
+    _statsHolder.addStat(exp);
+    _statsHolder.persistStats();
+
+    HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
+    Builder keyBuilder = accessor.keyBuilder();
+
+    ZNRecord rec = accessor.getProperty(keyBuilder.persistantStat()).getRecord();
+
+    System.out.println("rec: " + rec.toString());
+    AssertJUnit.assertTrue(statRecordContains(rec,
+                                              "accumulate()(dbFoo.partition10.latency)"));
+    AssertJUnit.assertTrue(statRecordContains(rec,
+                                              "accumulate()(dbFoo.partition10.count)"));
+    AssertJUnit.assertEquals(2, statsSize(rec));
+  }
+
+  @Test(groups = { "unitTest" })
+  public void testAddNonExistentAggregator() throws Exception
+  {
+    String exp = "fakeagg()(dbFoo.partition10.latency)";
+    boolean caughtException = false;
+    try
+    {
+      _statsHolder.addStat(exp);
+    }
+    catch (HelixException e)
+    {
+      caughtException = true;
+    }
+    AssertJUnit.assertTrue(caughtException);
+  }
+
+  @Test(groups = { "unitTest" })
+  public void testGoodAggregatorBadArgs() throws Exception
+  {
+    String exp = "accumulate(10)(dbFoo.partition10.latency)";
+    boolean caughtException = false;
+    try
+    {
+      _statsHolder.addStat(exp);
+    }
+    catch (HelixException e)
+    {
+      caughtException = true;
+    }
+    AssertJUnit.assertTrue(caughtException);
+  }
+
+  @Test(groups = { "unitTest" })
+  public void testAddBadNestingStat1() throws Exception
+  {
+    String exp = "window((5)(dbFoo.partition10.latency)";
+    boolean caughtException = false;
+    try
+    {
+      _statsHolder.addStat(exp);
+    }
+    catch (HelixException e)
+    {
+      caughtException = true;
+    }
+    AssertJUnit.assertTrue(caughtException);
+  }
+
+  @Test(groups = { "unitTest" })
+  public void testAddBadNestingStat2() throws Exception
+  {
+    String exp = "window(5)(dbFoo.partition10.latency))";
+    boolean caughtException = false;
+    try
+    {
+      _statsHolder.addStat(exp);
+    }
+    catch (HelixException e)
+    {
+      caughtException = true;
+    }
+    AssertJUnit.assertTrue(caughtException);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/alerts/TestAlertValidation.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/alerts/TestAlertValidation.java b/helix-core/src/test/java/org/apache/helix/alerts/TestAlertValidation.java
new file mode 100644
index 0000000..bb08b6c
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/alerts/TestAlertValidation.java
@@ -0,0 +1,165 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.alerts;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.alerts.AlertParser;
+import org.testng.annotations.Test;
+import org.testng.AssertJUnit;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+@Test
+public class TestAlertValidation {
+
+	public final String EXP = AlertParser.EXPRESSION_NAME;
+	public final String CMP = AlertParser.COMPARATOR_NAME;
+	public final String CON = AlertParser.CONSTANT_NAME;
+
+	@Test
+	public void testSimple() {
+		String alertName = EXP + "(accumulate()(dbFoo.partition10.latency)) "
+				+ CMP + "(GREATER) " + CON + "(10)";
+		boolean caughtException = false;
+		try {
+			AlertParser.validateAlert(alertName);
+		} catch (HelixException e) {
+			caughtException = true;
+			e.printStackTrace();
+		}
+		AssertJUnit.assertFalse(caughtException);
+	}
+	
+	@Test
+	public void testSingleInSingleOut() {
+		String alertName = EXP
+				+ "(accumulate()(dbFoo.partition10.latency)|EXPAND) " + CMP
+				+ "(GREATER) " + CON + "(10)";
+		boolean caughtException = false;
+		try {
+			AlertParser.validateAlert(alertName);
+		} catch (HelixException e) {
+			caughtException = true;
+			e.printStackTrace();
+		}
+		AssertJUnit.assertFalse(caughtException);
+	}
+
+	@Test
+	public void testDoubleInDoubleOut() {
+		String alertName = EXP
+				+ "(accumulate()(dbFoo.partition10.latency, dbFoo.partition11.latency)|EXPAND) "
+				+ CMP + "(GREATER) " + CON + "(10)";
+		boolean caughtException = false;
+		try {
+			AlertParser.validateAlert(alertName);
+		} catch (HelixException e) {
+			caughtException = true;
+			e.printStackTrace();
+		}
+		AssertJUnit.assertTrue(caughtException);
+	}
+
+	@Test
+	public void testTwoStageOps() {
+		String alertName = EXP
+				+ "(accumulate()(dbFoo.partition*.latency, dbFoo.partition*.count)|EXPAND|DIVIDE) "
+				+ CMP + "(GREATER) " + CON + "(10)";
+		boolean caughtException = false;
+		try {
+			AlertParser.validateAlert(alertName);
+		} catch (HelixException e) {
+			caughtException = true;
+			e.printStackTrace();
+		}
+		AssertJUnit.assertFalse(caughtException);
+	}
+
+	@Test
+	public void testTwoListsIntoOne() {
+		String alertName = EXP
+				+ "(accumulate()(dbFoo.partition10.latency, dbFoo.partition11.count)|SUM) "
+				+ CMP + "(GREATER) " + CON + "(10)";
+		boolean caughtException = false;
+		try {
+			AlertParser.validateAlert(alertName);
+		} catch (HelixException e) {
+			caughtException = true;
+			e.printStackTrace();
+		}
+		AssertJUnit.assertFalse(caughtException);
+	}
+
+	@Test
+	public void testSumEach() {
+		String alertName = EXP
+				+ "(accumulate()(dbFoo.partition*.latency, dbFoo.partition*.count)|EXPAND|SUMEACH|DIVIDE) "
+				+ CMP + "(GREATER) " + CON + "(10)";
+		boolean caughtException = false;
+		try {
+			AlertParser.validateAlert(alertName);
+		} catch (HelixException e) {
+			caughtException = true;
+			e.printStackTrace();
+		}
+		AssertJUnit.assertFalse(caughtException);
+	}
+
+	@Test
+	public void testNeedTwoTuplesGetOne() {
+		String alertName = EXP
+				+ "(accumulate()(dbFoo.partition*.latency)|EXPAND|DIVIDE) "
+				+ CMP + "(GREATER) " + CON + "(10)";
+		boolean caughtException = false;
+		try {
+			AlertParser.validateAlert(alertName);
+		} catch (HelixException e) {
+			caughtException = true;
+			e.printStackTrace();
+		}
+		AssertJUnit.assertTrue(caughtException);
+	}
+
+	@Test
+	public void testExtraPipe() {
+		String alertName = EXP + "(accumulate()(dbFoo.partition10.latency)|) "
+				+ CMP + "(GREATER) " + CON + "(10)";
+		boolean caughtException = false;
+		try {
+			AlertParser.validateAlert(alertName);
+		} catch (HelixException e) {
+			caughtException = true;
+			e.printStackTrace();
+		}
+		AssertJUnit.assertTrue(caughtException);
+	}
+
+	@Test
+	public void testAlertUnknownOp() {
+		String alertName = EXP
+				+ "(accumulate()(dbFoo.partition10.latency)|BADOP) " + CMP
+				+ "(GREATER) " + CON + "(10)";
+		boolean caughtException = false;
+		try {
+			AlertParser.validateAlert(alertName);
+		} catch (HelixException e) {
+			caughtException = true;
+			e.printStackTrace();
+		}
+		AssertJUnit.assertTrue(caughtException);
+	}
+}