You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:59 UTC

[37/42] Refactoring the package names and removing jsql parser

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKLiveInstanceData.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKLiveInstanceData.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKLiveInstanceData.java
new file mode 100644
index 0000000..890d862
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKLiveInstanceData.java
@@ -0,0 +1,152 @@
+package org.apache.helix.manager.zk;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.LiveInstanceChangeListener;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.tools.ClusterSetup;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestZKLiveInstanceData extends ZkUnitTestBase
+{
+  private final String clusterName = CLUSTER_PREFIX + "_" + getShortClassName();
+
+  @Test
+  public void testDataChange() throws Exception
+  {
+    // Create an admin and add LiveInstanceChange listener to it
+    HelixManager adminManager =
+        HelixManagerFactory.getZKHelixManager(clusterName,
+                                              null,
+                                              InstanceType.ADMINISTRATOR,
+                                              ZK_ADDR);
+    adminManager.connect();
+    final BlockingQueue<List<LiveInstance>> changeList =
+        new LinkedBlockingQueue<List<LiveInstance>>();
+
+    adminManager.addLiveInstanceChangeListener(new LiveInstanceChangeListener()
+    {
+      @Override
+      public void onLiveInstanceChange(List<LiveInstance> liveInstances,
+                                       NotificationContext changeContext)
+      {
+        // The queue is basically unbounded, so shouldn't throw exception when calling
+        // "add".
+        changeList.add(deepCopy(liveInstances));
+      }
+    });
+    
+    // Check the initial condition
+    List<LiveInstance> instances = changeList.poll(1, TimeUnit.SECONDS);
+    Assert.assertNotNull(instances, "Expecting a list of live instance");
+    Assert.assertTrue(instances.isEmpty(), "Expecting an empty list of live instance");
+    // Join as participant, should trigger a live instance change event
+    HelixManager manager =
+        HelixManagerFactory.getZKHelixManager(clusterName,
+                                              "localhost_54321",
+                                              InstanceType.PARTICIPANT,
+                                              ZK_ADDR);
+    manager.connect();
+    instances = changeList.poll(1, TimeUnit.SECONDS);
+    Assert.assertNotNull(instances, "Expecting a list of live instance");
+    Assert.assertEquals(instances.size(), 1, "Expecting one live instance");
+    Assert.assertEquals(instances.get(0).getInstanceName(), manager.getInstanceName());
+    // Update data in the live instance node, should trigger another live instance change
+    // event
+    HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+    PropertyKey propertyKey =
+        helixDataAccessor.keyBuilder().liveInstance(manager.getInstanceName());
+    LiveInstance instance = helixDataAccessor.getProperty(propertyKey);
+
+    Map<String, String> map = new TreeMap<String, String>();
+    map.put("k1", "v1");
+    instance.getRecord().setMapField("test", map);
+    Assert.assertTrue(helixDataAccessor.updateProperty(propertyKey, instance),
+                      "Failed to update live instance node");
+
+    instances = changeList.poll(1, TimeUnit.SECONDS);
+    Assert.assertNotNull(instances, "Expecting a list of live instance");
+    Assert.assertEquals(instances.get(0).getRecord().getMapField("test"),
+                        map,
+                        "Wrong map data.");
+    manager.disconnect();
+    Thread.sleep(1000); // wait for callback finish
+
+    instances = changeList.poll(1, TimeUnit.SECONDS);
+    Assert.assertNotNull(instances, "Expecting a list of live instance");
+    Assert.assertTrue(instances.isEmpty(), "Expecting an empty list of live instance");
+
+    adminManager.disconnect();
+
+  }
+
+  @BeforeClass()
+  public void beforeClass() throws Exception
+  {
+    ZkClient zkClient = null;
+    try
+    {
+      zkClient = new ZkClient(ZK_ADDR);
+      zkClient.setZkSerializer(new ZNRecordSerializer());
+      if (zkClient.exists("/" + clusterName))
+      {
+        zkClient.deleteRecursive("/" + clusterName);
+      }
+    }
+    finally
+    {
+      if (zkClient != null)
+      {
+        zkClient.close();
+      }
+    }
+
+    ClusterSetup.processCommandLineArgs(getArgs("-zkSvr",
+                                                ZK_ADDR,
+                                                "-addCluster",
+                                                clusterName));
+    ClusterSetup.processCommandLineArgs(getArgs("-zkSvr",
+                                                ZK_ADDR,
+                                                "-addNode",
+                                                clusterName,
+                                                "localhost:54321"));
+    ClusterSetup.processCommandLineArgs(getArgs("-zkSvr",
+                                                ZK_ADDR,
+                                                "-addNode",
+                                                clusterName,
+                                                "localhost:54322"));
+  }
+
+  private String[] getArgs(String... args)
+  {
+    return args;
+  }
+
+  private List<LiveInstance> deepCopy(List<LiveInstance> instances)
+  {
+    List<LiveInstance> result = new ArrayList<LiveInstance>();
+    for (LiveInstance instance : instances)
+    {
+      result.add(new LiveInstance(instance.getRecord()));
+    }
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKPropertyTransferServer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKPropertyTransferServer.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKPropertyTransferServer.java
new file mode 100644
index 0000000..2bc09bf
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKPropertyTransferServer.java
@@ -0,0 +1,59 @@
+package org.apache.helix.manager.zk;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.TestHelper;
+import org.apache.helix.TestHelper.StartCMResult;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.controller.restlet.ZKPropertyTransferServer;
+import org.apache.helix.integration.ZkStandAloneCMTestBaseWithPropertyServerCheck;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestZKPropertyTransferServer extends ZkStandAloneCMTestBaseWithPropertyServerCheck
+{
+  private static Logger LOG =
+      Logger.getLogger(TestZKPropertyTransferServer.class);
+
+  @Test
+  public void TestControllerChange() throws Exception
+  {
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _startCMResultMap.get(controllerName)._manager.disconnect();
+    
+    Thread.sleep(1000);
+    
+    // kill controller, participant should not know about the svc url
+    for (int i = 0; i < NODE_NR; i++)
+    {
+      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      HelixDataAccessor accessor = _startCMResultMap.get(instanceName)._manager.getHelixDataAccessor();
+      ZKHelixDataAccessor zkAccessor = (ZKHelixDataAccessor) accessor;
+      Assert.assertTrue(zkAccessor._zkPropertyTransferSvcUrl == null || zkAccessor._zkPropertyTransferSvcUrl.equals(""));
+    }
+    _startCMResultMap.get(controllerName)._thread.interrupt();
+    _startCMResultMap.remove(controllerName);
+    
+    StartCMResult startResult =
+        TestHelper.startController(CLUSTER_NAME,
+                                   controllerName,
+                                   ZK_ADDR,
+                                   HelixControllerMain.STANDALONE);
+    _startCMResultMap.put(controllerName, startResult);
+    
+    Thread.sleep(1000);
+    
+    // create controller again, the svc url is notified to the participants
+    for (int i = 0; i < NODE_NR; i++)
+    {
+      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      HelixDataAccessor accessor = _startCMResultMap.get(instanceName)._manager.getHelixDataAccessor();
+      ZKHelixDataAccessor zkAccessor = (ZKHelixDataAccessor) accessor;
+      Assert.assertTrue(zkAccessor._zkPropertyTransferSvcUrl.equals(ZKPropertyTransferServer.getInstance().getWebserviceUrl()));
+    }
+  }
+  
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKUtil.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKUtil.java
new file mode 100644
index 0000000..07fa06a
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKUtil.java
@@ -0,0 +1,163 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.manager.zk;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.ConfigScope.ConfigScopeProperty;
+import org.apache.helix.manager.zk.ZKUtil;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.log4j.Logger;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestZKUtil extends ZkUnitTestBase
+{
+  private static Logger LOG = Logger.getLogger(TestZKUtil.class);
+
+  String clusterName = CLUSTER_PREFIX + "_" + getShortClassName();
+  ZkClient _zkClient;
+
+  @BeforeClass()
+  public void beforeClass() throws IOException, Exception
+  {
+    _zkClient = new ZkClient(ZK_ADDR);
+    _zkClient.setZkSerializer(new ZNRecordSerializer());
+    if (_zkClient.exists("/" + clusterName))
+    {
+      _zkClient.deleteRecursive("/" + clusterName);
+    }
+
+    boolean result = ZKUtil.isClusterSetup(clusterName, _zkClient);
+    AssertJUnit.assertFalse(result);
+    result = ZKUtil.isClusterSetup(null, _zkClient);
+    AssertJUnit.assertFalse(result);
+
+    result = ZKUtil.isClusterSetup(null, null);
+    AssertJUnit.assertFalse(result);
+
+    result = ZKUtil.isClusterSetup(clusterName, null);
+    AssertJUnit.assertFalse(result);
+
+    TestHelper.setupEmptyCluster(_zkClient, clusterName);
+  }
+
+  @AfterClass()
+  public void afterClass()
+  {
+    _zkClient.close();
+  }
+
+  @Test()
+  public void testIsClusterSetup()
+  {
+    boolean result = ZKUtil.isClusterSetup(clusterName, _zkClient);
+    AssertJUnit.assertTrue(result);
+  }
+
+  @Test()
+  public void testChildrenOperations()
+  {
+    List<ZNRecord> list = new ArrayList<ZNRecord>();
+    list.add(new ZNRecord("id1"));
+    list.add(new ZNRecord("id2"));
+    String path = PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
+        ConfigScopeProperty.PARTICIPANT.toString());
+    ZKUtil.createChildren(_zkClient, path, list);
+    list = ZKUtil.getChildren(_zkClient, path);
+    AssertJUnit.assertEquals(2, list.size());
+
+    ZKUtil.dropChildren(_zkClient, path, list);
+    ZKUtil.dropChildren(_zkClient, path, new ZNRecord("id1"));
+    list = ZKUtil.getChildren(_zkClient, path);
+    AssertJUnit.assertEquals(0, list.size());
+
+    ZKUtil.dropChildren(_zkClient, path, (List<ZNRecord>) null);
+  }
+
+  @Test()
+  public void testUpdateIfExists()
+  {
+    String path = PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
+        ConfigScopeProperty.PARTICIPANT.toString(), "id3");
+    ZNRecord record = new ZNRecord("id4");
+    ZKUtil.updateIfExists(_zkClient, path, record, false);
+    AssertJUnit.assertFalse(_zkClient.exists(path));
+    _zkClient.createPersistent(path);
+    ZKUtil.updateIfExists(_zkClient, path, record, false);
+    AssertJUnit.assertTrue(_zkClient.exists(path));
+    record = _zkClient.<ZNRecord> readData(path);
+    AssertJUnit.assertEquals("id4", record.getId());
+  }
+
+  @Test()
+  public void testSubtract()
+  {
+    String path = PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
+        ConfigScopeProperty.PARTICIPANT.toString(), "id5");
+    ZNRecord record = new ZNRecord("id5");
+    record.setSimpleField("key1", "value1");
+    _zkClient.createPersistent(path, record);
+    ZKUtil.subtract(_zkClient, path, record);
+    record = _zkClient.<ZNRecord> readData(path);
+    AssertJUnit.assertNull(record.getSimpleField("key1"));
+  }
+
+  @Test()
+  public void testNullChildren()
+  {
+    String path = PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
+        ConfigScopeProperty.PARTICIPANT.toString(), "id6");
+    ZKUtil.createChildren(_zkClient, path, (List<ZNRecord>) null);
+  }
+
+  @Test()
+  public void testCreateOrUpdate()
+  {
+    String path = PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
+        ConfigScopeProperty.PARTICIPANT.toString(), "id7");
+    ZNRecord record = new ZNRecord("id7");
+    ZKUtil.createOrUpdate(_zkClient, path, record, true, true);
+    record = _zkClient.<ZNRecord> readData(path);
+    AssertJUnit.assertEquals("id7", record.getId());
+  }
+
+  @Test()
+  public void testCreateOrReplace()
+  {
+    String path = PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
+        ConfigScopeProperty.PARTICIPANT.toString(), "id8");
+    ZNRecord record = new ZNRecord("id8");
+    ZKUtil.createOrReplace(_zkClient, path, record, true);
+    record = _zkClient.<ZNRecord> readData(path);
+    AssertJUnit.assertEquals("id8", record.getId());
+    record = new ZNRecord("id9");
+    ZKUtil.createOrReplace(_zkClient, path, record, true);
+    record = _zkClient.<ZNRecord> readData(path);
+    AssertJUnit.assertEquals("id9", record.getId());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSizeLimit.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSizeLimit.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSizeLimit.java
new file mode 100644
index 0000000..bf85bb2
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSizeLimit.java
@@ -0,0 +1,328 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.manager.zk;
+
+import java.util.Arrays;
+import java.util.Date;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZNRecordStreamingSerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestZNRecordSizeLimit extends ZkUnitTestBase
+{
+  private static Logger LOG = Logger.getLogger(TestZNRecordSizeLimit.class);
+
+  @Test
+  public void testZNRecordSizeLimitUseZNRecordSerializer()
+  {
+    String className = getShortClassName();
+    System.out.println("START testZNRecordSizeLimitUseZNRecordSerializer at "
+        + new Date(System.currentTimeMillis()));
+
+    ZNRecordSerializer serializer = new ZNRecordSerializer();
+    ZkClient zkClient = new ZkClient(ZK_ADDR);
+    zkClient.setZkSerializer(serializer);
+    String root = className;
+    byte[] buf = new byte[1024];
+    for (int i = 0; i < 1024; i++)
+    {
+      buf[i] = 'a';
+    }
+    String bufStr = new String(buf);
+
+    // test zkClient
+    // legal-sized data gets written to zk
+    // write a znode of size less than 1m
+    final ZNRecord smallRecord = new ZNRecord("normalsize");
+    smallRecord.getSimpleFields().clear();
+    for (int i = 0; i < 900; i++)
+    {
+      smallRecord.setSimpleField(i + "", bufStr);
+    }
+
+    String path1 = "/" + root + "/test1";
+    zkClient.createPersistent(path1, true);
+    zkClient.writeData(path1, smallRecord);
+
+    ZNRecord record = zkClient.readData(path1);
+    Assert.assertTrue(serializer.serialize(record).length > 900 * 1024);
+
+    // oversized data doesn't create any data on zk
+    // prepare a znode of size larger than 1m
+    final ZNRecord largeRecord = new ZNRecord("oversize");
+    largeRecord.getSimpleFields().clear();
+    for (int i = 0; i < 1024; i++)
+    {
+      largeRecord.setSimpleField(i + "", bufStr);
+    }
+    String path2 = "/" + root + "/test2";
+    zkClient.createPersistent(path2, true);
+    try
+    {
+      zkClient.writeData(path2, largeRecord);
+      Assert.fail("Should fail because data size is larger than 1M");
+    }
+    catch (HelixException e)
+    {
+      // OK
+    }
+    record = zkClient.readData(path2);
+    Assert.assertNull(record);
+
+    // oversized write doesn't overwrite existing data on zk
+    record = zkClient.readData(path1);
+    try
+    {
+      zkClient.writeData(path1, largeRecord);
+      Assert.fail("Should fail because data size is larger than 1M");
+    }
+    catch (HelixException e)
+    {
+      // OK
+    }
+    ZNRecord recordNew = zkClient.readData(path1);
+    byte[] arr = serializer.serialize(record);
+    byte[] arrNew = serializer.serialize(recordNew);
+    Assert.assertTrue(Arrays.equals(arr, arrNew));
+
+    // test ZkDataAccessor
+    ZKHelixAdmin admin = new ZKHelixAdmin(zkClient);
+    admin.addCluster(className, true);
+    InstanceConfig instanceConfig = new InstanceConfig("localhost_12918");
+    admin.addInstance(className, instanceConfig);
+
+    // oversized data should not create any new data on zk
+    ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(className, new ZkBaseDataAccessor(zkClient));
+    Builder keyBuilder = accessor.keyBuilder();
+
+    IdealState idealState = new IdealState("currentState");
+    idealState.setStateModelDefRef("MasterSlave");
+    idealState.setIdealStateMode("AUTO");
+    idealState.setNumPartitions(10);
+
+    for (int i = 0; i < 1024; i++)
+    {
+      idealState.getRecord().setSimpleField(i + "", bufStr);
+    }
+    boolean succeed = accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
+    Assert.assertFalse(succeed);
+    HelixProperty property =
+        accessor.getProperty(keyBuilder.stateTransitionStatus("localhost_12918",
+                                                              "session_1",
+                                                              "partition_1"));
+    Assert.assertNull(property);
+
+    // legal sized data gets written to zk
+    idealState.getRecord().getSimpleFields().clear();
+    idealState.setStateModelDefRef("MasterSlave");
+    idealState.setIdealStateMode("AUTO");
+    idealState.setNumPartitions(10);
+
+    for (int i = 0; i < 900; i++)
+    {
+      idealState.getRecord().setSimpleField(i + "", bufStr);
+    }
+    succeed = accessor.setProperty(keyBuilder.idealStates("TestDB1"), idealState);
+    Assert.assertTrue(succeed);
+    record =
+        accessor.getProperty(keyBuilder.idealStates("TestDB1")).getRecord();
+    Assert.assertTrue(serializer.serialize(record).length > 900 * 1024);
+
+    // oversized data should not update existing data on zk
+    idealState.getRecord().getSimpleFields().clear();
+    idealState.setStateModelDefRef("MasterSlave");
+    idealState.setIdealStateMode("AUTO");
+    idealState.setNumPartitions(10);
+    for (int i = 900; i < 1024; i++)
+    {
+      idealState.getRecord().setSimpleField(i + "", bufStr);
+    }
+    // System.out.println("record: " + idealState.getRecord());
+    succeed =
+        accessor.updateProperty(keyBuilder.idealStates("TestDB1"), idealState);
+    Assert.assertFalse(succeed);
+    recordNew =
+        accessor.getProperty(keyBuilder.idealStates("TestDB1")).getRecord();
+    arr = serializer.serialize(record);
+    arrNew = serializer.serialize(recordNew);
+    Assert.assertTrue(Arrays.equals(arr, arrNew));
+
+    System.out.println("END testZNRecordSizeLimitUseZNRecordSerializer at "
+        + new Date(System.currentTimeMillis()));
+  }
+
+  @Test
+  public void testZNRecordSizeLimitUseZNRecordStreamingSerializer()
+  {
+    String className = getShortClassName();
+    System.out.println("START testZNRecordSizeLimitUseZNRecordStreamingSerializer at "
+        + new Date(System.currentTimeMillis()));
+
+    ZNRecordStreamingSerializer serializer = new ZNRecordStreamingSerializer();
+    ZkClient zkClient = new ZkClient(ZK_ADDR);
+    zkClient.setZkSerializer(serializer);
+    String root = className;
+    byte[] buf = new byte[1024];
+    for (int i = 0; i < 1024; i++)
+    {
+      buf[i] = 'a';
+    }
+    String bufStr = new String(buf);
+
+    // test zkClient
+    // legal-sized data gets written to zk
+    // write a znode of size less than 1m
+    final ZNRecord smallRecord = new ZNRecord("normalsize");
+    smallRecord.getSimpleFields().clear();
+    for (int i = 0; i < 900; i++)
+    {
+      smallRecord.setSimpleField(i + "", bufStr);
+    }
+
+    String path1 = "/" + root + "/test1";
+    zkClient.createPersistent(path1, true);
+    zkClient.writeData(path1, smallRecord);
+
+    ZNRecord record = zkClient.readData(path1);
+    Assert.assertTrue(serializer.serialize(record).length > 900 * 1024);
+
+    // oversized data doesn't create any data on zk
+    // prepare a znode of size larger than 1m
+    final ZNRecord largeRecord = new ZNRecord("oversize");
+    largeRecord.getSimpleFields().clear();
+    for (int i = 0; i < 1024; i++)
+    {
+      largeRecord.setSimpleField(i + "", bufStr);
+    }
+    String path2 = "/" + root + "/test2";
+    zkClient.createPersistent(path2, true);
+    try
+    {
+      zkClient.writeData(path2, largeRecord);
+      Assert.fail("Should fail because data size is larger than 1M");
+    }
+    catch (HelixException e)
+    {
+      // OK
+    }
+    record = zkClient.readData(path2);
+    Assert.assertNull(record);
+
+    // oversized write doesn't overwrite existing data on zk
+    record = zkClient.readData(path1);
+    try
+    {
+      zkClient.writeData(path1, largeRecord);
+      Assert.fail("Should fail because data size is larger than 1M");
+    }
+    catch (HelixException e)
+    {
+      // OK
+    }
+    ZNRecord recordNew = zkClient.readData(path1);
+    byte[] arr = serializer.serialize(record);
+    byte[] arrNew = serializer.serialize(recordNew);
+    Assert.assertTrue(Arrays.equals(arr, arrNew));
+
+    // test ZkDataAccessor
+    ZKHelixAdmin admin = new ZKHelixAdmin(zkClient);
+    admin.addCluster(className, true);
+    InstanceConfig instanceConfig = new InstanceConfig("localhost_12918");
+    admin.addInstance(className, instanceConfig);
+
+    // oversized data should not create any new data on zk
+    ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(className, new ZkBaseDataAccessor(zkClient));
+    Builder keyBuilder = accessor.keyBuilder();
+
+//    ZNRecord statusUpdates = new ZNRecord("statusUpdates");
+    IdealState idealState = new IdealState("currentState");
+    idealState.setStateModelDefRef("MasterSlave");
+    idealState.setIdealStateMode("AUTO");
+    idealState.setNumPartitions(10);
+
+    for (int i = 0; i < 1024; i++)
+    {
+      idealState.getRecord().setSimpleField(i + "", bufStr);
+    }
+    boolean succeed =
+        accessor.setProperty(keyBuilder.idealStates("TestDB_1"),
+                                                              idealState);
+    Assert.assertFalse(succeed);
+    HelixProperty property =
+        accessor.getProperty(keyBuilder.idealStates("TestDB_1"));
+    Assert.assertNull(property);
+
+    // legal sized data gets written to zk
+    idealState.getRecord().getSimpleFields().clear();
+    idealState.setStateModelDefRef("MasterSlave");
+    idealState.setIdealStateMode("AUTO");
+    idealState.setNumPartitions(10);
+
+    for (int i = 0; i < 900; i++)
+    {
+      idealState.getRecord().setSimpleField(i + "", bufStr);
+    }
+    succeed =
+        accessor.setProperty(keyBuilder.idealStates("TestDB_2"),
+                                                              idealState);
+    Assert.assertTrue(succeed);
+    record =
+        accessor.getProperty(keyBuilder.idealStates("TestDB_2")).getRecord();
+    Assert.assertTrue(serializer.serialize(record).length > 900 * 1024);
+
+    // oversized data should not update existing data on zk
+    idealState.getRecord().getSimpleFields().clear();
+    idealState.setStateModelDefRef("MasterSlave");
+    idealState.setIdealStateMode("AUTO");
+    idealState.setNumPartitions(10);
+
+    for (int i = 900; i < 1024; i++)
+    {
+      idealState.getRecord().setSimpleField(i + "", bufStr);
+    }
+    // System.out.println("record: " + idealState.getRecord());
+    succeed =
+        accessor.updateProperty(keyBuilder.idealStates("TestDB_2"),
+                                                                 idealState);
+    Assert.assertFalse(succeed);
+    recordNew =
+        accessor.getProperty(keyBuilder.idealStates("TestDB_2")).getRecord();
+    arr = serializer.serialize(record);
+    arrNew = serializer.serialize(recordNew);
+    Assert.assertTrue(Arrays.equals(arr, arrNew));
+
+    System.out.println("END testZNRecordSizeLimitUseZNRecordStreamingSerializer at "
+        + new Date(System.currentTimeMillis()));
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBaseDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBaseDataAccessor.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBaseDataAccessor.java
new file mode 100644
index 0000000..031cedf
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBaseDataAccessor.java
@@ -0,0 +1,318 @@
+package org.apache.helix.manager.zk;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import org.I0Itec.zkclient.DataUpdater;
+import org.apache.helix.AccessOption;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZNRecordUpdater;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.zookeeper.data.Stat;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestZkBaseDataAccessor extends ZkUnitTestBase
+{
+  @Test
+  public void testSyncZkBaseDataAccessor()
+  {
+    System.out.println("START TestZkBaseDataAccessor.sync at " + new Date(System.currentTimeMillis()));
+
+    String root = "TestZkBaseDataAccessor_syn";
+    ZkClient zkClient = new ZkClient(ZK_ADDR);
+    zkClient.setZkSerializer(new ZNRecordSerializer());
+    zkClient.deleteRecursive("/" + root);
+
+    BaseDataAccessor<ZNRecord> accessor = new ZkBaseDataAccessor<ZNRecord>(zkClient);
+
+    // test sync create
+    for (int i = 0; i < 10; i++)
+    {
+      String msgId = "msg_" + i;
+      String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
+      boolean success = accessor.create(path, new ZNRecord(msgId), AccessOption.PERSISTENT);
+      Assert.assertTrue(success, "Should succeed in create");
+    }
+
+    // test get what we created
+    for (int i = 0; i < 10; i++)
+    {
+      String msgId = "msg_" + i;
+      String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
+      ZNRecord record = zkClient.readData(path);
+      Assert.assertEquals(record.getId(), msgId, "Should get what we created");
+    }
+
+    // test sync set
+    for (int i = 0; i < 10; i++)
+    {
+      String msgId = "msg_" + i;
+      String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
+      ZNRecord newRecord = new ZNRecord(msgId);
+      newRecord.setSimpleField("key1", "value1");
+      boolean success = accessor.set(path, newRecord, AccessOption.PERSISTENT);
+      Assert.assertTrue(success, "Should succeed in set");
+    }
+
+    // test get what we set
+    for (int i = 0; i < 10; i++)
+    {
+      String msgId = "msg_" + i;
+      String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
+      ZNRecord record = zkClient.readData(path);
+      Assert.assertEquals(record.getSimpleFields().size(), 1, "Should have 1 simple field set");
+      Assert.assertEquals(record.getSimpleField("key1"), "value1", "Should have value1 set");
+    }
+    
+    // test sync update
+    for (int i = 0; i < 10; i++)
+    {
+      String msgId = "msg_" + i;
+      String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
+      ZNRecord newRecord = new ZNRecord(msgId);
+      newRecord.setSimpleField("key2", "value2");
+      boolean success = accessor.update(path, new ZNRecordUpdater(newRecord), AccessOption.PERSISTENT);
+      Assert.assertTrue(success, "Should succeed in update");
+    }
+
+    // test get what we updated
+    for (int i = 0; i < 10; i++)
+    {
+      String msgId = "msg_" + i;
+      String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
+      ZNRecord record = zkClient.readData(path);
+      Assert.assertEquals(record.getSimpleFields().size(), 2, "Should have 2 simple fields set");
+      Assert.assertEquals(record.getSimpleField("key2"), "value2", "Should have value2 set");
+    }
+    
+    // test sync get
+    for (int i = 0; i < 10; i++)
+    {
+      String msgId = "msg_" + i;
+      String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
+      ZNRecord record = accessor.get(path, null, 0);
+      Assert.assertEquals(record.getSimpleFields().size(), 2, "Should have 2 simple fields set");
+      Assert.assertEquals(record.getSimpleField("key1"), "value1", "Should have value1 set");
+      Assert.assertEquals(record.getSimpleField("key2"), "value2", "Should have value2 set");
+    }
+
+    // test sync exist
+    for (int i = 0; i < 10; i++)
+    {
+      String msgId = "msg_" + i;
+      String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
+      boolean exists = accessor.exists(path, 0);
+      Assert.assertTrue(exists, "Should exist");
+    }
+
+    // test getStat()
+    for (int i = 0; i < 10; i++)
+    {
+      String msgId = "msg_" + i;
+      String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
+      Stat stat = accessor.getStat(path, 0);
+      Assert.assertNotNull(stat, "Stat should exist");
+      Assert.assertEquals(stat.getVersion(), 2, "DataVersion should be 2, since we set 1 and update 1");
+    }
+
+    // test sync remove
+    for (int i = 0; i < 10; i++)
+    {
+      String msgId = "msg_" + i;
+      String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
+      boolean success = accessor.remove(path, 0);
+      Assert.assertTrue(success, "Should remove");
+    }
+    
+    // test get what we removed
+    for (int i = 0; i < 10; i++)
+    {
+      String msgId = "msg_" + i;
+      String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
+      boolean exists = zkClient.exists(path);
+      Assert.assertFalse(exists, "Should be removed");
+    }
+
+    zkClient.close();
+    System.out.println("END TestZkBaseDataAccessor.sync at " + new Date(System.currentTimeMillis()));
+  }
+  
+  @Test
+  public void testAsyncZkBaseDataAccessor()
+  {
+    System.out.println("START TestZkBaseDataAccessor.async at " + new Date(System.currentTimeMillis()));
+
+    String root = "TestZkBaseDataAccessor_asyn";
+    ZkClient zkClient = new ZkClient(ZK_ADDR);
+    zkClient.setZkSerializer(new ZNRecordSerializer());
+    zkClient.deleteRecursive("/" + root);
+
+    ZkBaseDataAccessor<ZNRecord> accessor = new ZkBaseDataAccessor<ZNRecord>(zkClient);
+    
+    // test async createChildren
+    String parentPath = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1");
+    List<ZNRecord> records = new ArrayList<ZNRecord>();
+    List<String> paths = new  ArrayList<String>();
+    for (int i = 0; i < 10; i++)
+    {
+      String msgId = "msg_" + i;
+      paths.add(PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1",msgId));
+      records.add(new ZNRecord(msgId));
+    }
+    boolean[] success = accessor.createChildren(paths, records, AccessOption.PERSISTENT);
+    for (int i = 0; i < 10; i++)
+    {
+      String msgId = "msg_" + i;
+      Assert.assertTrue(success[i], "Should succeed in create " + msgId);
+    }
+
+    // test get what we created
+    for (int i = 0; i < 10; i++)
+    {
+      String msgId = "msg_" + i;
+      String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1", msgId);
+      ZNRecord record = zkClient.readData(path);
+      Assert.assertEquals(record.getId(), msgId, "Should get what we created");
+    }
+
+    // test async setChildren
+    parentPath = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1");
+    records = new ArrayList<ZNRecord>();
+    paths = new  ArrayList<String>();
+    for (int i = 0; i < 10; i++)
+    {
+      String msgId = "msg_" + i;
+      paths.add(PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1",msgId));
+      ZNRecord newRecord = new ZNRecord(msgId);
+      newRecord.setSimpleField("key1", "value1");
+      records.add(newRecord);
+    }
+    success = accessor.setChildren(paths, records, AccessOption.PERSISTENT);
+    for (int i = 0; i < 10; i++)
+    {
+      String msgId = "msg_" + i;
+      Assert.assertTrue(success[i], "Should succeed in set " + msgId);
+    }
+
+    // test get what we set
+    for (int i = 0; i < 10; i++)
+    {
+      String msgId = "msg_" + i;
+      String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1", msgId);
+      ZNRecord record = zkClient.readData(path);
+      Assert.assertEquals(record.getSimpleFields().size(), 1, "Should have 1 simple field set");
+      Assert.assertEquals(record.getSimpleField("key1"), "value1", "Should have value1 set");
+    }
+    
+    // test async updateChildren
+    parentPath = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1");
+//    records = new ArrayList<ZNRecord>();
+    List<DataUpdater<ZNRecord>> znrecordUpdaters = new ArrayList<DataUpdater<ZNRecord>>();
+    paths = new ArrayList<String>();
+    for (int i = 0; i < 10; i++)
+    {
+      String msgId = "msg_" + i;
+      paths.add(PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1",msgId));
+      ZNRecord newRecord = new ZNRecord(msgId);
+      newRecord.setSimpleField("key2", "value2");
+//      records.add(newRecord);
+      znrecordUpdaters.add(new ZNRecordUpdater(newRecord));
+    }
+    success = accessor.updateChildren(paths, znrecordUpdaters, AccessOption.PERSISTENT);
+    for (int i = 0; i < 10; i++)
+    {
+      String msgId = "msg_" + i;
+      Assert.assertTrue(success[i], "Should succeed in update " + msgId);
+    }
+
+    // test get what we updated
+    for (int i = 0; i < 10; i++)
+    {
+      String msgId = "msg_" + i;
+      String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1", msgId);
+      ZNRecord record = zkClient.readData(path);
+      Assert.assertEquals(record.getSimpleFields().size(), 2, "Should have 2 simple fields set");
+      Assert.assertEquals(record.getSimpleField("key2"), "value2", "Should have value2 set");
+    }
+
+    // test async getChildren
+    parentPath = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1");
+    records = accessor.getChildren(parentPath, null, 0);
+    for (int i = 0; i < 10; i++)
+    {
+      String msgId = "msg_" + i;
+      ZNRecord record = records.get(i);
+      Assert.assertEquals(record.getId(), msgId, "Should get what we updated");
+      Assert.assertEquals(record.getSimpleFields().size(), 2, "Should have 2 simple fields set");
+      Assert.assertEquals(record.getSimpleField("key2"), "value2", "Should have value2 set");
+    }
+
+    // test async exists
+    parentPath = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1");
+    paths = new ArrayList<String>();
+    for (int i = 0; i < 10; i++)
+    {
+      String msgId = "msg_" + i;
+      paths.add(PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1", msgId));
+    }
+    boolean[] exists = accessor.exists(paths, 0);
+    for (int i = 0; i < 10; i++)
+    {
+      String msgId = "msg_" + i;
+      Assert.assertTrue(exists[i], "Should exist " + msgId);
+    }
+
+    // test async getStats
+    parentPath = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1");
+    paths = new ArrayList<String>();
+    for (int i = 0; i < 10; i++)
+    {
+      String msgId = "msg_" + i;
+      paths.add(PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1", msgId));
+    }
+    Stat[] stats = accessor.getStats(paths, 0);
+    for (int i = 0; i < 10; i++)
+    {
+      String msgId = "msg_" + i;
+      Assert.assertNotNull(stats[i], "Stat should exist for " + msgId);
+      Assert.assertEquals(stats[i].getVersion(), 2, "DataVersion should be 2, since we set 1 and update 1 for " + msgId);
+    }
+
+    // test async remove
+    parentPath = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1");
+    paths = new ArrayList<String>();
+    for (int i = 0; i < 10; i++)
+    {
+      String msgId = "msg_" + i;
+      paths.add(PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1", msgId));
+    }
+    success = accessor.remove(paths, 0);
+    for (int i = 0; i < 10; i++)
+    {
+      String msgId = "msg_" + i;
+      Assert.assertTrue(success[i], "Should succeed in remove " + msgId);
+    }
+    
+    // test get what we removed
+    for (int i = 0; i < 10; i++)
+    {
+      String msgId = "msg_" + i;
+      String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1", msgId);
+      boolean pathExists = zkClient.exists(path);
+      Assert.assertFalse(pathExists, "Should be removed " + msgId);
+    }
+
+    zkClient.close();
+    System.out.println("END TestZkBaseDataAccessor.async at " + new Date(System.currentTimeMillis()));
+  }
+ 
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkCacheAsyncOpSingleThread.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkCacheAsyncOpSingleThread.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkCacheAsyncOpSingleThread.java
new file mode 100644
index 0000000..55cdc70
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkCacheAsyncOpSingleThread.java
@@ -0,0 +1,382 @@
+package org.apache.helix.manager.zk;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+
+import org.I0Itec.zkclient.DataUpdater;
+import org.apache.helix.AccessOption;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZNRecordUpdater;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkCacheBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestZkCacheAsyncOpSingleThread extends ZkUnitTestBase
+{
+  @Test
+  public void testHappyPathExtOpZkCacheBaseDataAccessor() throws Exception
+  {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    System.out.println("START " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+    // init external base data accessor
+    ZkClient extZkclient = new ZkClient(ZK_ADDR);
+    extZkclient.setZkSerializer(new ZNRecordSerializer());
+    ZkBaseDataAccessor<ZNRecord> extBaseAccessor =
+        new ZkBaseDataAccessor<ZNRecord>(extZkclient);
+
+    // init zkCacheBaseDataAccessor
+    String curStatePath =
+        PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
+                                   clusterName,
+                                   "localhost_8901");
+    String extViewPath =
+        PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, clusterName);
+
+    ZkBaseDataAccessor<ZNRecord> baseAccessor =
+        new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+
+    extBaseAccessor.create(curStatePath, null, AccessOption.PERSISTENT);
+
+    List<String> zkCacheInitPaths = Arrays.asList(curStatePath, extViewPath);
+    ZkCacheBaseDataAccessor<ZNRecord> accessor =
+        new ZkCacheBaseDataAccessor<ZNRecord>(baseAccessor, null, null, zkCacheInitPaths);
+
+    // TestHelper.printCache(accessor._zkCache);
+    boolean ret =
+        TestHelper.verifyZkCache(zkCacheInitPaths,
+                                 accessor._zkCache._cache,
+                                 _gZkClient,
+                                 true);
+    Assert.assertTrue(ret, "zkCache doesn't match data on Zk");
+
+    // create 10 current states using external base accessor
+    List<String> paths = new ArrayList<String>();
+    List<ZNRecord> records = new ArrayList<ZNRecord>();
+    for (int i = 0; i < 10; i++)
+    {
+      String path =
+          PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
+                                     clusterName,
+                                     "localhost_8901",
+                                     "session_0",
+                                     "TestDB" + i);
+      ZNRecord record = new ZNRecord("TestDB" + i);
+
+      paths.add(path);
+      records.add(record);
+    }
+
+    boolean[] success = extBaseAccessor.createChildren(paths, records, AccessOption.PERSISTENT);
+    for (int i = 0; i < 10; i++)
+    {
+      Assert.assertTrue(success[i], "Should succeed in create: " + paths.get(i));
+    }
+
+    // wait zkEventThread update zkCache
+    Thread.sleep(100);
+
+    // verify wtCache
+    // TestHelper.printCache(accessor._zkCache);
+
+    ret =
+        TestHelper.verifyZkCache(zkCacheInitPaths,
+                                 accessor._zkCache._cache,
+                                 _gZkClient,
+                                 true);
+    // System.out.println("ret: " + ret);
+    Assert.assertTrue(ret, "zkCache doesn't match data on Zk");
+
+    // update each current state 10 times by external base accessor
+    List<DataUpdater<ZNRecord>> updaters = new ArrayList<DataUpdater<ZNRecord>>();
+    for (int j = 0; j < 10; j++)
+    {
+      paths.clear();
+      updaters.clear();
+      for (int i = 0; i < 10; i++)
+      {
+        String path = curStatePath + "/session_0/TestDB" + i;
+        ZNRecord newRecord = new ZNRecord("TestDB" + i);
+        newRecord.setSimpleField("" + j, "" + j);
+        DataUpdater<ZNRecord> updater = new ZNRecordUpdater(newRecord);
+        paths.add(path);
+        updaters.add(updater);
+      }
+      success = extBaseAccessor.updateChildren(paths, updaters, AccessOption.PERSISTENT);
+
+      for (int i = 0; i < 10; i++)
+      {
+        Assert.assertTrue(success[i], "Should succeed in update: " + paths.get(i));
+      }
+    }
+
+    // wait zkEventThread update zkCache
+    Thread.sleep(100);
+
+    // verify cache
+    // TestHelper.printCache(accessor._zkCache);
+    ret =
+        TestHelper.verifyZkCache(zkCacheInitPaths,
+                                 accessor._zkCache._cache,
+                                 _gZkClient,
+                                 true);
+    Assert.assertTrue(ret, "zkCache doesn't match data on Zk");
+
+    // set 10 external views by external accessor
+    paths.clear();
+    records.clear();
+    for (int i = 0; i < 10; i++)
+    {
+      String path =
+          PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, clusterName, "TestDB" + i);
+      ZNRecord record = new ZNRecord("TestDB" + i);
+
+      paths.add(path);
+      records.add(record);
+    }
+    success = extBaseAccessor.setChildren(paths, records, AccessOption.PERSISTENT);
+    for (int i = 0; i < 10; i++)
+    {
+      Assert.assertTrue(success[i], "Should succeed in set: " + paths.get(i));
+    }
+
+    // wait zkEventThread update zkCache
+    Thread.sleep(100);
+
+    // verify cache
+    // TestHelper.printCache(accessor._zkCache._cache);
+    ret =
+        TestHelper.verifyZkCache(zkCacheInitPaths,
+                                 accessor._zkCache._cache,
+                                 _gZkClient,
+                                 true);
+    Assert.assertTrue(ret, "zkCache doesn't match data on Zk");
+
+    // remove 10 external views by external accessor
+    paths.clear();
+    for (int i = 0; i < 10; i++)
+    {
+      String path =
+          PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, clusterName, "TestDB" + i);
+
+      paths.add(path);
+    }
+    success = extBaseAccessor.remove(paths, 0);
+    for (int i = 0; i < 10; i++)
+    {
+      Assert.assertTrue(success[i], "Should succeed in remove: " + paths.get(i));
+    }
+
+    // wait zkEventThread update zkCache
+    Thread.sleep(100);
+
+    // verify cache
+    // TestHelper.printCache(accessor._zkCache._cache);
+    ret =
+        TestHelper.verifyZkCache(zkCacheInitPaths,
+                                 accessor._zkCache._cache,
+                                 _gZkClient,
+                                 true);
+    Assert.assertTrue(ret, "zkCache doesn't match data on Zk");
+
+    // clean up
+    extZkclient.close();
+    System.out.println("END " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+  }
+
+  @Test
+  public void testHappyPathSelfOpZkCacheBaseDataAccessor() throws Exception
+  {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    System.out.println("START " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+    // init zkCacheDataAccessor
+    String curStatePath =
+        PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
+                                   clusterName,
+                                   "localhost_8901");
+    String extViewPath =
+        PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, clusterName);
+
+    ZkBaseDataAccessor<ZNRecord> baseAccessor =
+        new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+
+    baseAccessor.create(curStatePath, null, AccessOption.PERSISTENT);
+
+    List<String> zkCacheInitPaths = Arrays.asList(curStatePath, extViewPath);
+    ZkCacheBaseDataAccessor<ZNRecord> accessor =
+        new ZkCacheBaseDataAccessor<ZNRecord>(baseAccessor, null, null, zkCacheInitPaths);
+
+    // TestHelper.printCache(accessor._zkCache._cache);
+    boolean ret =
+        TestHelper.verifyZkCache(zkCacheInitPaths,
+                                 accessor._zkCache._cache,
+                                 _gZkClient,
+                                 true);
+    Assert.assertTrue(ret, "zkCache doesn't match data on Zk");
+
+    // create 10 current states using this accessor
+    List<String> paths = new ArrayList<String>();
+    List<ZNRecord> records = new ArrayList<ZNRecord>();
+    for (int i = 0; i < 10; i++)
+    {
+      String path =
+          PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
+                                     clusterName,
+                                     "localhost_8901",
+                                     "session_0",
+                                     "TestDB" + i);
+      ZNRecord record = new ZNRecord("TestDB" + i);
+
+      paths.add(path);
+      records.add(record);
+    }
+
+    boolean[] success = accessor.createChildren(paths, records, AccessOption.PERSISTENT);
+    for (int i = 0; i < 10; i++)
+    {
+      Assert.assertTrue(success[i], "Should succeed in create: " + paths.get(i));
+    }
+
+    // verify cache
+//    TestHelper.printCache(accessor._zkCache._cache);
+    ret =
+        TestHelper.verifyZkCache(zkCacheInitPaths,
+                                 accessor._zkCache._cache,
+                                 _gZkClient,
+                                 false);
+    Assert.assertTrue(ret, "zkCache doesn't match data on Zk");
+
+    // update each current state 10 times by this accessor
+    List<DataUpdater<ZNRecord>> updaters = new ArrayList<DataUpdater<ZNRecord>>();
+    for (int j = 0; j < 10; j++)
+    {
+      paths.clear();
+      updaters.clear();
+      for (int i = 0; i < 10; i++)
+      {
+        String path = curStatePath + "/session_0/TestDB" + i;
+        ZNRecord newRecord = new ZNRecord("TestDB" + i);
+        newRecord.setSimpleField("" + j, "" + j);
+        DataUpdater<ZNRecord> updater = new ZNRecordUpdater(newRecord);
+        paths.add(path);
+        updaters.add(updater);
+      }
+      success = accessor.updateChildren(paths, updaters, AccessOption.PERSISTENT);
+
+      for (int i = 0; i < 10; i++)
+      {
+        Assert.assertTrue(success[i], "Should succeed in update: " + paths.get(i));
+      }
+    }
+
+    // verify cache
+    // TestHelper.printCache(accessor._zkCache._cache);
+    ret =
+        TestHelper.verifyZkCache(zkCacheInitPaths,
+                                 zkCacheInitPaths,
+                                 accessor._zkCache._cache,
+                                 _gZkClient,
+                                 true);
+    // ret = TestHelper.verifyZkCache(zkCacheInitPaths, accessor, _gZkClient, true);
+    // System.out.println("ret: " + ret);
+    Assert.assertTrue(ret, "zkCache doesn't match data on Zk");
+
+    // set 10 external views 10 times by this accessor
+    paths.clear();
+    records.clear();
+    for (int j = 0; j < 10; j++)
+    {
+      for (int i = 0; i < 10; i++)
+      {
+        String path =
+            PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, clusterName, "TestDB"
+                + i);
+        ZNRecord record = new ZNRecord("TestDB" + i);
+        record.setSimpleField("setKey", "" + j);
+
+        paths.add(path);
+        records.add(record);
+      }
+      success = accessor.setChildren(paths, records, AccessOption.PERSISTENT);
+      for (int i = 0; i < 10; i++)
+      {
+        Assert.assertTrue(success[i], "Should succeed in set: " + paths.get(i));
+      }
+    }
+
+    // verify cache
+    // TestHelper.printCache(accessor._zkCache._cache);
+    ret =
+        TestHelper.verifyZkCache(zkCacheInitPaths,
+                                 accessor._zkCache._cache,
+                                 _gZkClient,
+                                 true);
+    // System.out.println("ret: " + ret);
+    Assert.assertTrue(ret, "zkCache doesn't match data on Zk");
+
+    // get 10 external views
+    paths.clear();
+    records.clear();
+    for (int i = 0; i < 10; i++)
+    {
+      String path = extViewPath + "/TestDB" + i;
+      paths.add(path);
+    }
+
+    records = accessor.get(paths, null, 0);
+    for (int i = 0; i < 10; i++)
+    {
+      Assert.assertEquals(records.get(i).getId(), "TestDB" + i);
+    }
+
+    // getChildren
+    records.clear();
+    records = accessor.getChildren(extViewPath, null, 0);
+    for (int i = 0; i < 10; i++)
+    {
+      Assert.assertEquals(records.get(i).getId(), "TestDB" + i);
+    }
+
+    // // exists
+    paths.clear();
+    for (int i = 0; i < 10; i++)
+    {
+      String path = curStatePath + "/session_0/TestDB" + i;
+      // // PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
+      // // clusterName,
+      // // "localhost_8901",
+      // // "session_0",
+      // // "TestDB" + i);
+      paths.add(path);
+    }
+    success = accessor.exists(paths, 0);
+    for (int i = 0; i < 10; i++)
+    {
+      Assert.assertTrue(success[i], "Should exits: " + paths.get(i));
+    }
+
+    System.out.println("END " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkCacheSyncOpSingleThread.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkCacheSyncOpSingleThread.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkCacheSyncOpSingleThread.java
new file mode 100644
index 0000000..98c6cc3
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkCacheSyncOpSingleThread.java
@@ -0,0 +1,209 @@
+package org.apache.helix.manager.zk;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.helix.AccessOption;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZNRecordUpdater;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkCacheBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.store.HelixPropertyListener;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestZkCacheSyncOpSingleThread extends ZkUnitTestBase
+{
+  class TestListener implements HelixPropertyListener
+  {
+    ConcurrentLinkedQueue<String> _deletePathQueue = new ConcurrentLinkedQueue<String>();
+    ConcurrentLinkedQueue<String> _createPathQueue = new ConcurrentLinkedQueue<String>();
+    ConcurrentLinkedQueue<String> _changePathQueue = new ConcurrentLinkedQueue<String>();
+
+    @Override
+    public void onDataDelete(String path)
+    {
+      // System.out.println(Thread.currentThread().getName() + ", onDelete: " + path);
+      _deletePathQueue.add(path);
+    }
+
+    @Override
+    public void onDataCreate(String path)
+    {
+      // System.out.println(Thread.currentThread().getName() + ", onCreate: " + path);
+      _createPathQueue.add(path);
+    }
+
+    @Override
+    public void onDataChange(String path)
+    {
+      // System.out.println(Thread.currentThread().getName() + ", onChange: " + path);
+      _changePathQueue.add(path);
+    }
+
+    public void reset()
+    {
+      _deletePathQueue.clear();
+      _createPathQueue.clear();
+      _changePathQueue.clear();
+    }
+  }
+
+  @Test
+  public void testZkCacheCallbackExternalOpNoChroot() throws Exception
+  {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    System.out.println("START " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+    // init external base data accessor
+    ZkClient zkclient = new ZkClient(ZK_ADDR);
+    zkclient.setZkSerializer(new ZNRecordSerializer());
+    ZkBaseDataAccessor<ZNRecord> extBaseAccessor =
+        new ZkBaseDataAccessor<ZNRecord>(zkclient);
+
+    // init zkCacheDataAccessor
+    String curStatePath =
+        PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
+                                   clusterName,
+                                   "localhost_8901");
+    String extViewPath =
+        PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, clusterName);
+
+    ZkBaseDataAccessor<ZNRecord> baseAccessor =
+        new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+
+    extBaseAccessor.create(curStatePath, null, AccessOption.PERSISTENT);
+
+    List<String> cachePaths = Arrays.asList(curStatePath, extViewPath);
+    ZkCacheBaseDataAccessor<ZNRecord> accessor =
+        new ZkCacheBaseDataAccessor<ZNRecord>(baseAccessor, null, null, cachePaths);
+    // TestHelper.printCache(accessor._zkCache._cache);
+
+    TestListener listener = new TestListener();
+    accessor.subscribe(curStatePath, listener);
+
+    // create 10 current states
+    List<String> createPaths = new ArrayList<String>();
+    for (int i = 0; i < 10; i++)
+    {
+      String path = curStatePath + "/session_0/TestDB" + i;
+      createPaths.add(path);
+      boolean success =
+          extBaseAccessor.create(path,
+                                 new ZNRecord("TestDB" + i),
+                                 AccessOption.PERSISTENT);
+      Assert.assertTrue(success, "Should succeed in create: " + path);
+    }
+
+    Thread.sleep(500);
+
+    // verify cache
+    // TestHelper.printCache(accessor._zkCache._cache);
+    boolean ret =
+        TestHelper.verifyZkCache(cachePaths, accessor._zkCache._cache, _gZkClient, true);
+    // System.out.println("ret: " + ret);
+    Assert.assertTrue(ret, "zkCache doesn't match data on Zk");
+    System.out.println("createCnt: " + listener._createPathQueue.size());
+    Assert.assertEquals(listener._createPathQueue.size(),
+                        11,
+                        "Shall get 11 onCreate callbacks.");
+
+    // verify each callback path
+    createPaths.add(curStatePath + "/session_0");
+    List<String> createCallbackPaths = new ArrayList<String>(listener._createPathQueue);
+    Collections.sort(createPaths);
+    Collections.sort(createCallbackPaths);
+    // System.out.println("createCallbackPaths: " + createCallbackPaths);
+    Assert.assertEquals(createCallbackPaths,
+                        createPaths,
+                        "Should get create callbacks at " + createPaths + ", but was "
+                            + createCallbackPaths);
+
+    // update each current state, single thread
+    List<String> updatePaths = new ArrayList<String>();
+    listener.reset();
+    for (int i = 0; i < 10; i++)
+    {
+      String path = curStatePath + "/session_0/TestDB" + i;
+      for (int j = 0; j < 10; j++)
+      {
+        updatePaths.add(path);
+        ZNRecord newRecord = new ZNRecord("TestDB" + i);
+        newRecord.setSimpleField("" + j, "" + j);
+        boolean success =
+            accessor.update(path, new ZNRecordUpdater(newRecord), AccessOption.PERSISTENT);
+        Assert.assertTrue(success, "Should succeed in update: " + path);
+      }
+    }
+    Thread.sleep(500);
+
+    // verify cache
+    // TestHelper.printCache(accessor._zkCache._cache);
+    ret =
+        TestHelper.verifyZkCache(cachePaths, accessor._zkCache._cache, _gZkClient, true);
+    // System.out.println("ret: " + ret);
+    Assert.assertTrue(ret, "zkCache doesn't match data on Zk");
+    System.out.println("changeCnt: " + listener._changePathQueue.size());
+    Assert.assertEquals(listener._changePathQueue.size(),
+                        100,
+                        "Shall get 100 onChange callbacks.");
+
+    // verify each callback path
+    List<String> updateCallbackPaths = new ArrayList<String>(listener._changePathQueue);
+    Collections.sort(updatePaths);
+    Collections.sort(updateCallbackPaths);
+    Assert.assertEquals(updateCallbackPaths,
+                        updatePaths,
+                        "Should get change callbacks at " + updatePaths + ", but was "
+                            + updateCallbackPaths);
+
+    // remove 10 current states
+    List<String> removePaths = new ArrayList<String>();
+    listener.reset();
+    for (int i = 0; i < 10; i++)
+    {
+      String path = curStatePath + "/session_0/TestDB" + i;
+      removePaths.add(path);
+      boolean success = accessor.remove(path, AccessOption.PERSISTENT);
+      Assert.assertTrue(success, "Should succeed in remove: " + path);
+    }
+    Thread.sleep(500);
+
+    // verify cache
+    // TestHelper.printCache(accessor._zkCache._cache);
+    ret =
+        TestHelper.verifyZkCache(cachePaths, accessor._zkCache._cache, _gZkClient, true);
+    // System.out.println("ret: " + ret);
+    Assert.assertTrue(ret, "zkCache doesn't match data on Zk");
+    System.out.println("deleteCnt: " + listener._deletePathQueue.size());
+    Assert.assertEquals(listener._deletePathQueue.size(),
+                        10,
+                        "Shall get 10 onDelete callbacks.");
+
+    // verify each callback path
+    List<String> removeCallbackPaths = new ArrayList<String>(listener._deletePathQueue);
+    Collections.sort(removePaths);
+    Collections.sort(removeCallbackPaths);
+    Assert.assertEquals(removeCallbackPaths,
+                        removePaths,
+                        "Should get remove callbacks at " + removePaths + ", but was "
+                            + removeCallbackPaths);
+
+    System.out.println("END " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java
new file mode 100644
index 0000000..4054679
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java
@@ -0,0 +1,146 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.manager.zk;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.ConfigScope;
+import org.apache.helix.ConfigScopeBuilder;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixException;
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.manager.MockListener;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.store.PropertyStore;
+import org.testng.Assert;
+import org.testng.AssertJUnit;
+import org.testng.annotations.Test;
+
+
+public class TestZkClusterManager extends ZkUnitTestBase
+{
+  final String className = getShortClassName();
+
+  @Test()
+  public void testController() throws Exception
+  {
+    System.out.println("START " + className + ".testController() at " + new Date(System.currentTimeMillis()));
+    final String clusterName = CLUSTER_PREFIX + "_" + className + "_controller";
+
+    // basic test
+    if (_gZkClient.exists("/" + clusterName))
+    {
+      _gZkClient.deleteRecursive("/" + clusterName);
+    }
+
+    ZKHelixManager controller = new ZKHelixManager(clusterName, null,
+                                                   InstanceType.CONTROLLER,
+                                                   ZK_ADDR);
+    try
+    {
+      controller.connect();
+      Assert.fail("Should throw HelixException if initial cluster structure is not setup");
+    } catch (HelixException e)
+    {
+      // OK
+    }
+
+    TestHelper.setupEmptyCluster(_gZkClient, clusterName);
+
+    controller.connect();
+    AssertJUnit.assertTrue(controller.isConnected());
+    controller.connect();
+    AssertJUnit.assertTrue(controller.isConnected());
+
+    MockListener listener = new MockListener();
+    listener.reset();
+
+    try
+    {
+      controller.addControllerListener(null);
+      Assert.fail("Should throw HelixException");
+    } catch (HelixException e)
+    {
+      // OK
+    }
+
+    controller.addControllerListener(listener);
+    AssertJUnit.assertTrue(listener.isControllerChangeListenerInvoked);
+    controller.removeListener(listener);
+
+    PropertyStore<ZNRecord> store = controller.getPropertyStore();
+    ZNRecord record = new ZNRecord("node_1");
+    store.setProperty("node_1", record);
+    record = store.getProperty("node_1");
+    AssertJUnit.assertEquals("node_1", record.getId());
+
+    controller.getMessagingService();
+    controller.getHealthReportCollector();
+    controller.getClusterManagmentTool();
+
+    controller.handleNewSession();
+    controller.disconnect();
+    AssertJUnit.assertFalse(controller.isConnected());
+
+    System.out.println("END " + className + ".testController() at " + new Date(System.currentTimeMillis()));
+  }
+
+  @Test()
+  public void testAdministrator() throws Exception
+  {
+    System.out.println("START " + className + ".testAdministrator() at " + new Date(System.currentTimeMillis()));
+    final String clusterName = CLUSTER_PREFIX + "_" + className + "_admin";
+
+    // basic test
+    if (_gZkClient.exists("/" + clusterName))
+    {
+      _gZkClient.deleteRecursive("/" + clusterName);
+    }
+
+    ZKHelixManager admin = new ZKHelixManager(clusterName, null,
+                                              InstanceType.ADMINISTRATOR,
+                                              ZK_ADDR);
+
+    TestHelper.setupEmptyCluster(_gZkClient, clusterName);
+
+    admin.connect();
+    AssertJUnit.assertTrue(admin.isConnected());
+
+    HelixAdmin adminTool = admin.getClusterManagmentTool();
+    ConfigScope scope = new ConfigScopeBuilder().forCluster(clusterName)
+        .forResource("testResource").forPartition("testPartition").build();
+    Map<String, String> properties = new HashMap<String, String>();
+    properties.put("pKey1", "pValue1");
+    properties.put("pKey2", "pValue2");
+    adminTool.setConfig(scope, properties);
+
+    properties = adminTool.getConfig(scope, TestHelper.setOf("pKey1", "pKey2"));
+    Assert.assertEquals(properties.size(), 2);
+    Assert.assertEquals(properties.get("pKey1"), "pValue1");
+    Assert.assertEquals(properties.get("pKey2"), "pValue2");
+
+    admin.disconnect();
+    AssertJUnit.assertFalse(admin.isConnected());
+
+    System.out.println("END " + className + ".testAdministrator() at " + new Date(System.currentTimeMillis()));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
new file mode 100644
index 0000000..c520b68
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
@@ -0,0 +1,198 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.manager.zk;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.ConfigScope;
+import org.apache.helix.ConfigScopeBuilder;
+import org.apache.helix.HelixException;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKUtil;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.StateModelDefinition;
+import org.testng.Assert;
+import org.testng.AssertJUnit;
+import org.testng.annotations.Test;
+
+
+public class TestZkHelixAdmin extends ZkUnitTestBase
+{
+  @Test()
+  public void testZkHelixAdmin()
+  {
+    System.out.println("START testZkHelixAdmin at " + new Date(System.currentTimeMillis()));
+
+    final String clusterName = getShortClassName();
+    if (_gZkClient.exists("/" + clusterName))
+    {
+      _gZkClient.deleteRecursive("/" + clusterName);
+    }
+
+    ZKHelixAdmin tool = new ZKHelixAdmin(_gZkClient);
+    tool.addCluster(clusterName, true);
+    Assert.assertTrue(ZKUtil.isClusterSetup(clusterName, _gZkClient));
+    tool.addCluster(clusterName, true);
+    Assert.assertTrue(ZKUtil.isClusterSetup(clusterName, _gZkClient));
+
+    List<String> list = tool.getClusters();
+    AssertJUnit.assertTrue(list.size() > 0);
+
+    try
+    {
+      tool.addCluster(clusterName, false);
+      Assert.fail("should fail if add an already existing cluster");
+    } catch (HelixException e)
+    {
+      // OK
+    }
+
+    InstanceConfig config = new InstanceConfig("host1_9999");
+    config.setHostName("host1");
+    config.setPort("9999");
+    tool.addInstance(clusterName, config);
+    tool.enableInstance(clusterName, "host1_9999", true);
+    String path = PropertyPathConfig.getPath(PropertyType.INSTANCES,
+        clusterName, "host1_9999");
+    AssertJUnit.assertTrue(_gZkClient.exists(path));
+
+    try
+    {
+      tool.addInstance(clusterName, config);
+      Assert.fail("should fail if add an alredy-existing instance");
+    } catch (HelixException e)
+    {
+      // OK
+    }
+    config = tool.getInstanceConfig(clusterName, "host1_9999");
+    AssertJUnit.assertEquals(config.getId(), "host1_9999");
+
+    tool.dropInstance(clusterName, config);
+    try
+    {
+      tool.getInstanceConfig(clusterName, "host1_9999");
+      Assert.fail("should fail if get a non-existent instance");
+    } catch (HelixException e)
+    {
+      // OK
+    }
+    try
+    {
+      tool.dropInstance(clusterName, config);
+      Assert.fail("should fail if drop on a non-existent instance");
+    } catch (HelixException e)
+    {
+      // OK
+    }
+    try
+    {
+      tool.enableInstance(clusterName, "host1_9999", false);
+      Assert.fail("should fail if enable a non-existent instance");
+    } catch (HelixException e)
+    {
+      // OK
+    }
+    ZNRecord stateModelRecord = new ZNRecord("id1");
+    try
+    {
+      tool.addStateModelDef(clusterName, "id1", new StateModelDefinition(
+          stateModelRecord));
+      path = PropertyPathConfig.getPath(PropertyType.STATEMODELDEFS,
+          clusterName, "id1");
+      AssertJUnit.assertTrue(_gZkClient.exists(path));
+      Assert.fail("should fail");
+    } catch (HelixException e)
+    {
+      // OK
+    }
+    try
+    {
+      tool.addStateModelDef(clusterName, "id1", new StateModelDefinition(
+          stateModelRecord));
+      Assert.fail("should fail if add an already-existing state model");
+    } catch (HelixException e)
+    {
+      // OK
+    }
+    list = tool.getStateModelDefs(clusterName);
+    AssertJUnit.assertEquals(list.size(), 0);
+
+    try
+    {
+      tool.addResource(clusterName, "resource", 10,
+          "nonexistStateModelDef");
+      Assert
+          .fail("should fail if add a resource without an existing state model");
+    } catch (HelixException e)
+    {
+      // OK
+    }
+    try
+    {
+      tool.addResource(clusterName, "resource", 10, "id1");
+      Assert.fail("should fail");
+    } catch (HelixException e)
+    {
+      // OK
+    }
+    list = tool.getResourcesInCluster(clusterName);
+    AssertJUnit.assertEquals(list.size(), 0);
+    try
+    {
+      tool.addResource(clusterName, "resource", 10, "id1");
+      Assert.fail("should fail");
+    } catch (HelixException e)
+    {
+      // OK
+    }
+    list = tool.getResourcesInCluster(clusterName);
+    AssertJUnit.assertEquals(list.size(), 0);
+
+    ExternalView resourceExternalView = tool.getResourceExternalView(
+        clusterName, "resource");
+    AssertJUnit.assertNull(resourceExternalView);
+
+    // test config support
+    ConfigScope scope = new ConfigScopeBuilder().forCluster(clusterName)
+        .forResource("testResource").forPartition("testPartition").build();
+    Map<String, String> properties = new HashMap<String, String>();
+    properties.put("pKey1", "pValue1");
+    properties.put("pKey2", "pValue2");
+    
+    // make sure calling set/getConfig() many times will not drain zkClient resources
+    // int nbOfZkClients = ZkClient.getNumberOfConnections();
+    for (int i = 0; i < 100; i++)
+    {
+      tool.setConfig(scope, properties);
+      Map<String, String> newProperties = tool.getConfig(scope, properties.keySet());
+      Assert.assertEquals(newProperties.size(), 2);
+      Assert.assertEquals(newProperties.get("pKey1"), "pValue1");
+      Assert.assertEquals(newProperties.get("pKey2"), "pValue2");
+    }
+    // Assert.assertTrue(ZkClient.getNumberOfConnections() - nbOfZkClients < 5);
+
+    System.out.println("END testZkHelixAdmin at " + new Date(System.currentTimeMillis()));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/messaging/TestAsyncCallback.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/TestAsyncCallback.java b/helix-core/src/test/java/org/apache/helix/messaging/TestAsyncCallback.java
new file mode 100644
index 0000000..adda6cb
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/messaging/TestAsyncCallback.java
@@ -0,0 +1,127 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.messaging;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.helix.messaging.AsyncCallback;
+import org.apache.helix.model.Message;
+import org.testng.AssertJUnit;
+import org.testng.annotations.Test;
+
+
+public class TestAsyncCallback
+{
+  class AsyncCallbackSample extends AsyncCallback
+  {
+    int _onTimeOutCalled = 0;
+    int _onReplyMessageCalled = 0;
+    @Override
+    public void onTimeOut()
+    {
+      // TODO Auto-generated method stub
+      _onTimeOutCalled ++;
+    }
+
+    @Override
+    public void onReplyMessage(Message message)
+    {
+      _onReplyMessageCalled++;
+    }
+  }
+
+  @Test()
+  public void testAsyncCallback() throws Exception
+  {
+    System.out.println("START TestAsyncCallback at " + new Date(System.currentTimeMillis()));
+    AsyncCallbackSample callback = new AsyncCallbackSample();
+    AssertJUnit.assertFalse(callback.isInterrupted());
+    AssertJUnit.assertFalse(callback.isTimedOut());
+    AssertJUnit.assertTrue(callback.getMessageReplied().size() == 0);
+
+    int nMsgs = 5;
+
+    List<Message> messageSent = new ArrayList<Message>();
+    for(int i = 0;i < nMsgs; i++)
+    {
+      messageSent.add(new Message("Test", UUID.randomUUID().toString()));
+    }
+
+    callback.setMessagesSent(messageSent);
+
+    for(int i = 0;i < nMsgs; i++)
+    {
+      AssertJUnit.assertFalse(callback.isDone());
+      callback.onReply(new Message("TestReply", UUID.randomUUID().toString()));
+    }
+    AssertJUnit.assertTrue(callback.isDone());
+
+    AssertJUnit.assertTrue(callback._onTimeOutCalled == 0 );
+
+    sleep(50);
+    callback = new AsyncCallbackSample();
+    callback.setMessagesSent(messageSent);
+    callback.setTimeout(1000);
+    sleep(50);
+    callback.startTimer();
+    AssertJUnit.assertFalse(callback.isTimedOut());
+    for(int i = 0;i < nMsgs - 1; i++)
+    {
+      sleep(50);
+      AssertJUnit.assertFalse(callback.isDone());
+      AssertJUnit.assertTrue(callback._onReplyMessageCalled == i);
+      callback.onReply(new Message("TestReply", UUID.randomUUID().toString()));
+    }
+    sleep(1000);
+    AssertJUnit.assertTrue(callback.isTimedOut());
+    AssertJUnit.assertTrue(callback._onTimeOutCalled == 1 );
+    AssertJUnit.assertFalse(callback.isDone());
+
+    callback = new AsyncCallbackSample();
+    callback.setMessagesSent(messageSent);
+    callback.setTimeout(1000);
+    callback.startTimer();
+    sleep(50);
+    AssertJUnit.assertFalse(callback.isTimedOut());
+    for(int i = 0;i < nMsgs; i++)
+    {
+      AssertJUnit.assertFalse(callback.isDone());
+      sleep(50);
+      AssertJUnit.assertTrue(callback._onReplyMessageCalled == i);
+      callback.onReply(new Message("TestReply", UUID.randomUUID().toString()));
+    }
+    AssertJUnit.assertTrue(callback.isDone());
+    sleep(1300);
+    AssertJUnit.assertFalse(callback.isTimedOut());
+    AssertJUnit.assertTrue(callback._onTimeOutCalled == 0 );
+    System.out.println("END TestAsyncCallback at " + new Date(System.currentTimeMillis()));
+  }
+
+  void sleep(int time)
+  {
+    try
+    {
+      Thread.sleep(time);
+    }
+    catch(Exception e)
+    {
+      System.out.println(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/messaging/TestAsyncCallbackSvc.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/TestAsyncCallbackSvc.java b/helix-core/src/test/java/org/apache/helix/messaging/TestAsyncCallbackSvc.java
new file mode 100644
index 0000000..7908ab2
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/messaging/TestAsyncCallbackSvc.java
@@ -0,0 +1,128 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.messaging;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.Mocks;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.messaging.AsyncCallback;
+import org.apache.helix.messaging.handling.AsyncCallbackService;
+import org.apache.helix.messaging.handling.MessageHandler;
+import org.apache.helix.messaging.handling.TestHelixTaskExecutor.MockClusterManager;
+import org.apache.helix.model.Message;
+import org.testng.annotations.Test;
+import org.testng.AssertJUnit;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestAsyncCallbackSvc
+{
+  class MockHelixManager extends Mocks.MockManager
+  {
+    public String getSessionId()
+    {
+      return "123";
+    }
+  }
+  
+  class TestAsyncCallback extends AsyncCallback
+  {
+    HashSet<String> _repliedMessageId = new HashSet<String>();
+    @Override
+    public void onTimeOut()
+    {
+      // TODO Auto-generated method stub
+      
+    }
+
+    @Override
+    public void onReplyMessage(Message message)
+    {
+      // TODO Auto-generated method stub
+      _repliedMessageId.add(message.getMsgId());
+    }
+    
+  }
+  @Test(groups =
+  { "unitTest" })
+  public void testAsyncCallbackSvc() throws Exception
+  {
+    AsyncCallbackService svc = new AsyncCallbackService();
+    HelixManager manager = new MockHelixManager();
+    NotificationContext changeContext = new NotificationContext(manager);
+    
+    Message msg = new Message(svc.getMessageType(), UUID.randomUUID().toString());
+    msg.setTgtSessionId(manager.getSessionId());
+    try
+    {
+      MessageHandler aHandler = svc.createHandler(msg, changeContext);
+    }
+    catch(HelixException e)
+    {
+      AssertJUnit.assertTrue(e.getMessage().indexOf(msg.getMsgId())!= -1);
+    }
+    Message msg2 = new Message("RandomType", UUID.randomUUID().toString());
+    msg2.setTgtSessionId(manager.getSessionId());
+    try
+    {
+      MessageHandler aHandler = svc.createHandler(msg2, changeContext);
+    }
+    catch(HelixException e)
+    {
+      AssertJUnit.assertTrue(e.getMessage().indexOf(msg2.getMsgId())!= -1);
+    }
+    Message msg3 = new Message(svc.getMessageType(), UUID.randomUUID().toString());
+    msg3.setTgtSessionId(manager.getSessionId());
+    msg3.setCorrelationId("wfwegw");
+    try
+    {
+      MessageHandler aHandler = svc.createHandler(msg3, changeContext);
+    }
+    catch(HelixException e)
+    {
+      AssertJUnit.assertTrue(e.getMessage().indexOf(msg3.getMsgId())!= -1);
+    }
+    
+    TestAsyncCallback callback = new TestAsyncCallback();
+    String corrId = UUID.randomUUID().toString();
+    svc.registerAsyncCallback(corrId, new TestAsyncCallback());
+    svc.registerAsyncCallback(corrId, callback);
+    
+    List<Message> msgSent = new ArrayList<Message>();
+    msgSent.add(new Message("Test", UUID.randomUUID().toString()));
+    callback.setMessagesSent(msgSent);
+    
+    msg = new Message(svc.getMessageType(), UUID.randomUUID().toString());
+    msg.setTgtSessionId("*");
+    msg.setCorrelationId(corrId);
+    
+    MessageHandler aHandler = svc.createHandler(msg, changeContext);
+    Map<String, String> resultMap = new HashMap<String, String>();
+    aHandler.handleMessage();
+    
+    AssertJUnit.assertTrue(callback.isDone());
+    AssertJUnit.assertTrue(callback._repliedMessageId.contains(msg.getMsgId()));
+  }
+}