You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:59 UTC

[41/42] Refactoring the package names and removing jsql parser

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/store/zk/TestZKPropertyStore.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/store/zk/TestZKPropertyStore.java b/helix-core/src/test/java/org/apache/helix/store/zk/TestZKPropertyStore.java
new file mode 100644
index 0000000..1012402
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/store/zk/TestZKPropertyStore.java
@@ -0,0 +1,537 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.store.zk;
+
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.I0Itec.zkclient.DataUpdater;
+import org.I0Itec.zkclient.IZkDataListener;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.manager.zk.ByteArraySerializer;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.store.PropertyChangeListener;
+import org.apache.helix.store.PropertyJsonComparator;
+import org.apache.helix.store.PropertyJsonSerializer;
+import org.apache.helix.store.PropertyStoreException;
+import org.apache.helix.store.zk.ZKPropertyStore;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+// TODO need to write performance test for zk-property store
+public class TestZKPropertyStore extends ZkUnitTestBase
+{
+  private static final Logger LOG = Logger.getLogger(TestZKPropertyStore.class);
+  final String className = getShortClassName();
+  final int bufSize = 128;
+  final int mapNr = 10;
+  final int firstLevelNr = 10;
+  final int secondLevelNr = 10;
+  final int totalNodes = firstLevelNr * secondLevelNr;
+
+  class TestListener implements PropertyChangeListener<ZNRecord>
+  {
+    Map<String, String> _keySet;
+
+    public TestListener(Map<String, String> keySet)
+    {
+      _keySet = keySet;
+    }
+
+    @Override
+    public void onPropertyChange(String key)
+    {
+      long now = System.currentTimeMillis();
+      _keySet.put(key, Long.toString(now));
+    }
+  }
+
+  private class TestUpdater implements DataUpdater<ZNRecord>
+  {
+    @Override
+    public ZNRecord update(ZNRecord current)
+    {
+      char[] data = new char[bufSize];
+
+      for (int i = 0; i < bufSize; i++)
+      {
+        data[i] = 'e';
+      }
+
+      Map<String, String> map = new TreeMap<String, String>();
+      for (int i = 0; i < mapNr; i++)
+      {
+        map.put("key_" + i, new String(data));
+      }
+
+      String nodeId = current.getId();
+      ZNRecord record = new ZNRecord(nodeId);
+      record.setSimpleFields(map);
+      return record;
+    }
+  }
+
+  String getNodeId(int i, int j)
+  {
+    return "childNode_" + i + "_" + j;
+  }
+
+  String getSecondLevelKey(int i, int j)
+  {
+    return "/node_" + i + "/" + getNodeId(i, j);
+  }
+
+  String getFirstLevelKey(int i)
+  {
+    return "/node_" + i;
+  }
+
+  // TODO: separate into small tests
+  @Test()
+  public void testZKPropertyStore() throws Exception
+  {
+    System.out.println("START " + className + " at "
+        + new Date(System.currentTimeMillis()));
+    // LOG.info("number of connections is " + ZkClient.getNumberOfConnections());
+
+    // clean up zk
+    final String propertyStoreRoot = buildPropertyStoreRoot();
+    if (_gZkClient.exists(propertyStoreRoot))
+    {
+      _gZkClient.deleteRecursive(propertyStoreRoot);
+    }
+
+    ZkClient zkclient = new ZkClient(ZK_ADDR);
+    zkclient.setZkSerializer(new ByteArraySerializer());
+    ZKPropertyStore<ZNRecord> store =
+        new ZKPropertyStore<ZNRecord>(zkclient,
+                                      new PropertyJsonSerializer<ZNRecord>(ZNRecord.class),
+                                      propertyStoreRoot);
+
+    // test back to back add-delete-add
+
+    store.setProperty("child0", new ZNRecord("child0"));
+
+    ZNRecord record2 = store.getProperty("child0"); // will put the record in cache
+    String child0Path = propertyStoreRoot + "/child0";
+    _gZkClient.subscribeDataChanges(child0Path, new IZkDataListener()
+    {
+      @Override
+      public void handleDataDeleted(String dataPath) throws Exception
+      {
+        // TODO Auto-generated method stub
+        System.out.println("TestZKPropertyStore.testZKPropertyStore().new IZkDataListener() {...}.handleDataDeleted()");
+      }
+
+      @Override
+      public void handleDataChange(String dataPath, Object data) throws Exception
+      {
+        // TODO Auto-generated method stub
+        System.out.println("TestZKPropertyStore.testZKPropertyStore().new IZkDataListener() {...}.handleDataChange()");
+      }
+    });
+    for (int i = 0; i < 2; i++)
+    {
+      _gZkClient.delete(child0Path);
+      _gZkClient.createPersistent(child0Path, new ZNRecord("child0-new"));
+    }
+    record2 = store.getProperty("child0");
+    Assert.assertEquals(record2.getId(), "child0-new");
+    _gZkClient.delete(child0Path);
+    Thread.sleep(300); // should wait for zk callback to remove "child0" from cache
+    record2 = store.getProperty("child0");
+    Assert.assertNull(record2);
+
+    // zookeeper has a default 1M limit on size
+    char[] data = new char[bufSize];
+    for (int i = 0; i < bufSize; i++)
+    {
+      data[i] = 'a';
+    }
+
+    Map<String, String> map = new TreeMap<String, String>();
+    for (int i = 0; i < mapNr; i++)
+    {
+      map.put("key_" + i, new String(data));
+    }
+    String node = "node";
+    ZNRecord record = new ZNRecord(node);
+    record.setSimpleFields(map);
+
+    ZNRecordSerializer serializer = new ZNRecordSerializer();
+    int bytesPerNode = serializer.serialize(record).length;
+    System.out.println("use znode of size " + bytesPerNode / 1024 + "K");
+    Assert.assertTrue(bytesPerNode < 1024 * 1024,
+                      "zookeeper has a default 1M limit on size");
+
+    // test getPropertyRootNamespace()
+    String root = store.getPropertyRootNamespace();
+    Assert.assertEquals(root, propertyStoreRoot);
+
+    // set 100 nodes and get 100 nodes, verify get what we set
+    long start = System.currentTimeMillis();
+    setNodes(store, 'a', false);
+    long end = System.currentTimeMillis();
+    System.out.println("ZKPropertyStore write throughput is " + bytesPerNode * totalNodes
+        / (end - start) + " kilo-bytes per second");
+
+    start = System.currentTimeMillis();
+    for (int i = 0; i < 10; i++)
+    {
+      for (int j = 0; j < 10; j++)
+      {
+        String nodeId = getNodeId(i, j);
+        String key = getSecondLevelKey(i, j);
+        record = store.getProperty(key);
+        Assert.assertEquals(record.getId(), nodeId);
+      }
+    }
+    end = System.currentTimeMillis();
+    System.out.println("ZKPropertyStore read throughput is " + bytesPerNode * totalNodes
+        / (end - start) + " kilo-bytes per second");
+
+    // test subscribe
+    Map<String, String> keyMap = new TreeMap<String, String>();
+    // verify initial callbacks invoked for all 100 nodes
+    PropertyChangeListener<ZNRecord> listener = new TestListener(keyMap);
+    store.subscribeForPropertyChange("", listener);
+    System.out.println("keyMap size: " + keyMap.size());
+    Assert.assertTrue(keyMap.size() > 100);
+    for (int i = 0; i < firstLevelNr; i++)
+    {
+      for (int j = 0; j < secondLevelNr; j++)
+      {
+        String key = getSecondLevelKey(i, j);
+        Assert.assertTrue(keyMap.containsKey(key));
+      }
+    }
+
+    // change nodes via property store interface
+    // and verify all notifications have been received (TODO: without latency?)
+    start = System.currentTimeMillis();
+    keyMap.clear();
+    setNodes(store, 'b', true);
+
+    // wait for all callbacks completed
+    for (int i = 0; i < 10; i++)
+    {
+      System.out.println("keySet size: " + keyMap.size());
+      if (keyMap.size() == totalNodes)
+      {
+        break;
+      }
+      Thread.sleep(500);
+    }
+    Assert.assertEquals(keyMap.size(), totalNodes, "should receive " + totalNodes
+        + " callbacks");
+    end = System.currentTimeMillis();
+    long waitTime = (end - start) * 2;
+
+    long maxLatency = 0;
+    for (int i = 0; i < firstLevelNr; i++)
+    {
+      for (int j = 0; j < secondLevelNr; j++)
+      {
+        String key = getSecondLevelKey(i, j);
+        Assert.assertTrue(keyMap.containsKey(key));
+        record = store.getProperty(key);
+        start = Long.parseLong(record.getSimpleField("SetTimestamp"));
+        end = Long.parseLong(keyMap.get(key));
+        long latency = end - start;
+        if (latency > maxLatency)
+        {
+          maxLatency = latency;
+        }
+      }
+    }
+    System.out.println("ZKPropertyStore callback latency is " + maxLatency
+        + " millisecond");
+
+    // change nodes via native zkclient interface
+    // and verify all notifications have been received with some latency
+    keyMap.clear();
+    setNodes(_gZkClient, propertyStoreRoot, 'a', true);
+
+    // wait for all callbacks completed
+    Thread.sleep(waitTime);
+    Assert.assertEquals(keyMap.size(), totalNodes, "should receive " + totalNodes
+        + " callbacks");
+
+    // remove node via native zkclient interface
+    // should receive callbacks on parent key
+    keyMap.clear();
+    for (int i = 0; i < firstLevelNr; i++)
+    {
+      int j = 0;
+      String key = getSecondLevelKey(i, j);
+      _gZkClient.delete(propertyStoreRoot + key);
+    }
+    Thread.sleep(waitTime);
+    for (int i = 0; i < firstLevelNr; i++)
+    {
+      String key = getFirstLevelKey(i);
+      Assert.assertTrue(keyMap.containsKey(key), "should receive callbacks on " + key);
+    }
+
+    keyMap.clear();
+    for (int j = 1; j < secondLevelNr; j++)
+    {
+      int i = 0;
+      String key = getSecondLevelKey(i, j);
+      _gZkClient.delete(propertyStoreRoot + key);
+    }
+    Thread.sleep(waitTime);
+    String node0Key = getFirstLevelKey(0);
+    Assert.assertTrue(keyMap.containsKey(node0Key), "should receive callback on "
+        + node0Key);
+
+    // add back removed nodes
+    // should receive callbacks on parent key
+    keyMap.clear();
+    for (int i = 0; i < bufSize; i++)
+    {
+      data[i] = 'a';
+    }
+
+    map = new TreeMap<String, String>();
+    for (int i = 0; i < mapNr; i++)
+    {
+      map.put("key_" + i, new String(data));
+    }
+
+    for (int i = 0; i < firstLevelNr; i++)
+    {
+      int j = 0;
+      String nodeId = getNodeId(i, j);
+      String key = getSecondLevelKey(i, j);
+      record = new ZNRecord(nodeId);
+      record.setSimpleFields(map);
+      store.setProperty(key, record);
+    }
+    Thread.sleep(waitTime);
+    for (int i = 0; i < firstLevelNr; i++)
+    {
+      String key = getFirstLevelKey(i);
+      Assert.assertTrue(keyMap.containsKey(key), "should receive callbacks on " + key);
+    }
+
+    keyMap.clear();
+    for (int j = 1; j < secondLevelNr; j++)
+    {
+      int i = 0;
+      String nodeId = getNodeId(i, j);
+      String key = getSecondLevelKey(i, j);
+      record = new ZNRecord(nodeId);
+      record.setSimpleFields(map);
+      store.setProperty(key, record);
+    }
+    Thread.sleep(waitTime);
+    node0Key = getFirstLevelKey(0);
+    Assert.assertTrue(keyMap.containsKey(node0Key), "should receive callback on "
+        + node0Key);
+
+    // test unsubscribe
+    store.unsubscribeForPropertyChange("", listener);
+    // change all nodes and verify no notification happens
+    keyMap.clear();
+    setNodes(store, 'c', false);
+    Thread.sleep(waitTime);
+    Assert.assertEquals(keyMap.size(), 0);
+
+    // test getPropertyNames
+    List<String> names = store.getPropertyNames("");
+    int cnt = 0;
+    for (String name : names)
+    {
+      int i = cnt / 10;
+      int j = cnt % 10;
+      cnt++;
+      String key = getSecondLevelKey(i, j);
+      Assert.assertEquals(name, key);
+    }
+
+    // test compare and set
+    char[] updateData = new char[bufSize];
+    for (int i = 0; i < bufSize; i++)
+    {
+      data[i] = 'c';
+      updateData[i] = 'd';
+    }
+
+    Map<String, String> updateMap = new TreeMap<String, String>();
+    for (int i = 0; i < 10; i++)
+    {
+      map.put("key_" + i, new String(data));
+      updateMap.put("key_" + i, new String(updateData));
+    }
+
+    PropertyJsonComparator<ZNRecord> comparator =
+        new PropertyJsonComparator<ZNRecord>(ZNRecord.class);
+    for (int i = 0; i < firstLevelNr; i++)
+    {
+      for (int j = 0; j < secondLevelNr; j++)
+      {
+        String nodeId = getNodeId(i, j);
+        record = new ZNRecord(nodeId);
+        record.setSimpleFields(map);
+        String key = getSecondLevelKey(i, j);
+
+        ZNRecord update = new ZNRecord(nodeId);
+        update.setSimpleFields(updateMap);
+        boolean succeed = store.compareAndSet(key, record, update, comparator);
+        Assert.assertTrue(succeed);
+        record = store.getProperty(key);
+        Assert.assertEquals(record.getSimpleField("key_0").charAt(0), 'd');
+      }
+    }
+
+    // test updateUntilSucceed
+    TestUpdater updater = new TestUpdater();
+    for (int i = 0; i < firstLevelNr; i++)
+    {
+      for (int j = 0; j < secondLevelNr; j++)
+      {
+        String key = getSecondLevelKey(i, j);
+
+        store.updatePropertyUntilSucceed(key, updater);
+        record = store.getProperty(key);
+        Assert.assertEquals(record.getSimpleField("key_0").charAt(0), 'e');
+      }
+    }
+
+    // test exist
+    for (int i = 0; i < firstLevelNr; i++)
+    {
+      for (int j = 0; j < secondLevelNr; j++)
+      {
+        String key = getSecondLevelKey(i, j);
+
+        boolean exist = store.exists(key);
+        Assert.assertTrue(exist);
+      }
+    }
+
+    // test removeProperty
+    for (int i = 0; i < firstLevelNr; i++)
+    {
+      int j = 0;
+
+      String key = getSecondLevelKey(i, j);
+
+      store.removeProperty(key);
+      record = store.getProperty(key);
+      Assert.assertNull(record);
+    }
+
+    // test removePropertyNamespace
+    for (int i = 0; i < firstLevelNr; i++)
+    {
+      String key = "/node_" + i;
+      store.removeNamespace(key);
+    }
+
+    for (int i = 0; i < firstLevelNr; i++)
+    {
+      for (int j = 0; j < secondLevelNr; j++)
+      {
+        String key = getSecondLevelKey(i, j);
+
+        store.removeProperty(key);
+        record = store.getProperty(key);
+        Assert.assertNull(record);
+      }
+    }
+
+    System.out.println("END " + className + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  private String buildPropertyStoreRoot()
+  {
+    return "/" + className;
+  }
+
+  private void setNodes(ZKPropertyStore<ZNRecord> store, char c, boolean needTimestamp) throws PropertyStoreException
+  {
+    char[] data = new char[bufSize];
+
+    for (int i = 0; i < bufSize; i++)
+    {
+      data[i] = c;
+    }
+
+    Map<String, String> map = new TreeMap<String, String>();
+    for (int i = 0; i < mapNr; i++)
+    {
+      map.put("key_" + i, new String(data));
+    }
+
+    for (int i = 0; i < firstLevelNr; i++)
+    {
+      for (int j = 0; j < secondLevelNr; j++)
+      {
+        String nodeId = getNodeId(i, j);
+        ZNRecord record = new ZNRecord(nodeId);
+        record.setSimpleFields(map);
+        if (needTimestamp)
+        {
+          long now = System.currentTimeMillis();
+          record.setSimpleField("SetTimestamp", Long.toString(now));
+        }
+        String key = getSecondLevelKey(i, j);
+        store.setProperty(key, record);
+      }
+    }
+  }
+
+  private void setNodes(ZkClient zkClient, String root, char c, boolean needTimestamp) throws PropertyStoreException
+  {
+    char[] data = new char[bufSize];
+
+    for (int i = 0; i < bufSize; i++)
+    {
+      data[i] = c;
+    }
+
+    Map<String, String> map = new TreeMap<String, String>();
+    for (int i = 0; i < mapNr; i++)
+    {
+      map.put("key_" + i, new String(data));
+    }
+
+    for (int i = 0; i < firstLevelNr; i++)
+    {
+      for (int j = 0; j < secondLevelNr; j++)
+      {
+        String nodeId = getNodeId(i, j);
+        ZNRecord record = new ZNRecord(nodeId);
+        record.setSimpleFields(map);
+        if (needTimestamp)
+        {
+          long now = System.currentTimeMillis();
+          record.setSimpleField("SetTimestamp", Long.toString(now));
+        }
+        String key = getSecondLevelKey(i, j);
+        zkClient.writeData(root + key, record);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/store/zk/TestZKPropertyStoreMultiThread.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/store/zk/TestZKPropertyStoreMultiThread.java b/helix-core/src/test/java/org/apache/helix/store/zk/TestZKPropertyStoreMultiThread.java
new file mode 100644
index 0000000..e1e9140
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/store/zk/TestZKPropertyStoreMultiThread.java
@@ -0,0 +1,157 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.store.zk;
+
+import java.util.Date;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import org.I0Itec.zkclient.DataUpdater;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.store.PropertyJsonComparator;
+import org.apache.helix.store.PropertyJsonSerializer;
+import org.apache.helix.store.zk.ZKPropertyStore;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestZKPropertyStoreMultiThread extends ZkUnitTestBase
+{
+  private static final Logger LOG = Logger.getLogger(TestZKPropertyStoreMultiThread.class);
+
+  private final String _root = "/" + getShortClassName();
+  private final ZNRecord _record = new ZNRecord("key1");
+
+
+  @BeforeClass()
+  public void beforeClass()
+  {
+    if (_gZkClient.exists(_root))
+    {
+      _gZkClient.deleteRecursive(_root);
+    }
+  }
+
+  public class TestCallableCAS implements Callable<Boolean>
+  {
+    @Override
+    public Boolean call()
+    {
+      try
+      {
+        ZKPropertyStore<ZNRecord> store
+          = new ZKPropertyStore<ZNRecord>(new ZkClient(ZK_ADDR),
+                                          new PropertyJsonSerializer<ZNRecord>(ZNRecord.class),
+                                          _root);
+        PropertyJsonComparator<ZNRecord> comparator = new PropertyJsonComparator<ZNRecord>(ZNRecord.class);
+        long id = Thread.currentThread().getId();
+
+        store.setProperty("key1", _record);
+
+        boolean success;
+        do
+        {
+          ZNRecord current = store.getProperty("key1");
+          ZNRecord update = new ZNRecord(current);
+          update.setSimpleField("thread_" + id, "simpleValue");
+
+          success = store.compareAndSet("key1", current, update, comparator);
+        } while (!success);
+
+        return Boolean.TRUE;
+      }
+      catch (Exception e)
+      {
+        LOG.error(e);
+        return Boolean.FALSE;
+      }
+    }
+  }
+
+  @Test ()
+  public void testCmpAndSet()
+  {
+    System.out.println("START testCmpAndSet at" + new Date(System.currentTimeMillis()));
+
+    Map<String, Boolean> results = TestHelper.<Boolean>startThreadsConcurrently(5, new TestCallableCAS(), 10);
+    Assert.assertEquals(results.size(), 5);
+    for (Boolean result : results.values())
+    {
+      Assert.assertTrue(result.booleanValue());
+    }
+
+    System.out.println("END testCmpAndSet at" + new Date(System.currentTimeMillis()));
+  }
+
+  private class TestUpdater implements DataUpdater<ZNRecord>
+  {
+
+    @Override
+    public ZNRecord update(ZNRecord current)
+    {
+      long id = Thread.currentThread().getId();
+
+      current.setSimpleField("thread_" + id, "simpleValue");
+      return current;
+    }
+
+  }
+
+  public class TestCallableUpdate implements Callable<Boolean>
+  {
+    @Override
+    public Boolean call()
+    {
+      try
+      {
+        ZKPropertyStore<ZNRecord> store
+          = new ZKPropertyStore<ZNRecord>(new ZkClient(ZK_ADDR),
+                                          new PropertyJsonSerializer<ZNRecord>(ZNRecord.class),
+                                          _root);
+
+        store.setProperty("key2", _record);
+        store.updatePropertyUntilSucceed("key2", new TestUpdater());
+
+        return Boolean.TRUE;
+      }
+      catch (Exception e)
+      {
+        LOG.error(e);
+        return Boolean.FALSE;
+      }
+    }
+  }
+
+  @Test()
+  public void testUpdateUntilSucceed()
+  {
+    System.out.println("START testUpdateUntilSucceed at" + new Date(System.currentTimeMillis()));
+
+    Map<String, Boolean> results = TestHelper.<Boolean>startThreadsConcurrently(5, new TestCallableUpdate(), 10);
+    Assert.assertEquals(results.size(), 5);
+    for (Boolean result : results.values())
+    {
+      Assert.assertTrue(result.booleanValue());
+    }
+
+    System.out.println("END testUpdateUntilSucceed at" + new Date(System.currentTimeMillis()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/store/zk/TestZkHelixPropertyStore.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/store/zk/TestZkHelixPropertyStore.java b/helix-core/src/test/java/org/apache/helix/store/zk/TestZkHelixPropertyStore.java
new file mode 100644
index 0000000..1f6eaff
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/store/zk/TestZkHelixPropertyStore.java
@@ -0,0 +1,369 @@
+package org.apache.helix.store.zk;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
+import org.apache.helix.AccessOption;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.store.HelixPropertyListener;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestZkHelixPropertyStore extends ZkUnitTestBase
+{
+  final String _root         = "/" + getShortClassName();
+  final int    bufSize       = 128;
+  final int    mapNr         = 10;
+  final int    firstLevelNr  = 10;
+  final int    secondLevelNr = 10;
+
+  // final int totalNodes = firstLevelNr * secondLevelNr;
+
+  class TestListener implements HelixPropertyListener
+  {
+    Map<String, Long> _changeKeys = new HashMap<String, Long>();
+    Map<String, Long> _createKeys = new HashMap<String, Long>();
+    Map<String, Long> _deleteKeys = new HashMap<String, Long>();
+
+    public void reset()
+    {
+      _changeKeys.clear();
+      _createKeys.clear();
+      _deleteKeys.clear();
+    }
+
+    @Override
+    public void onDataChange(String path)
+    {
+      _changeKeys.put(path, new Long(System.currentTimeMillis()));
+    }
+
+    @Override
+    public void onDataCreate(String path)
+    {
+      _createKeys.put(path, new Long(System.currentTimeMillis()));
+    }
+
+    @Override
+    public void onDataDelete(String path)
+    {
+      _deleteKeys.put(path, new Long(System.currentTimeMillis()));
+    }
+  }
+
+  @Test
+  public void testSet()
+  {
+    // Logger.getRootLogger().setLevel(Level.INFO);
+
+    System.out.println("START testSet() at " + new Date(System.currentTimeMillis()));
+
+    String subRoot = _root + "/" + "set";
+    List<String> subscribedPaths = new ArrayList<String>();
+    subscribedPaths.add(subRoot);
+    ZkHelixPropertyStore<ZNRecord> store =
+        new ZkHelixPropertyStore<ZNRecord>(new ZkBaseDataAccessor<ZNRecord>(_gZkClient),
+                                           subRoot,
+                                           subscribedPaths);
+
+    setNodes(store, 'a', false);
+    for (int i = 0; i < 10; i++)
+    {
+      for (int j = 0; j < 10; j++)
+      {
+        String nodeId = getNodeId(i, j);
+        String key = getSecondLevelKey(i, j);
+        ZNRecord record = store.get(key, null, 0);
+        Assert.assertEquals(record.getId(), nodeId);
+      }
+    }
+
+    long startT = System.currentTimeMillis();
+    for (int i = 0; i < 1000; i++)
+    {
+      ZNRecord record = store.get("/node_0/childNode_0_0", null, 0);
+      Assert.assertNotNull(record);
+    }
+    long endT = System.currentTimeMillis();
+    System.out.println("1000 Get() time used: " + (endT - startT) + "ms");
+    Assert.assertTrue((endT - startT) < 60, "1000 Gets should be finished within 50ms");
+
+    store.stop();
+    System.out.println("END testSet() at " + new Date(System.currentTimeMillis()));
+  }
+
+  @Test
+  public void testLocalTriggeredCallback() throws Exception
+  {
+    System.out.println("START testLocalTriggeredCallback() at "
+        + new Date(System.currentTimeMillis()));
+
+    String subRoot = _root + "/" + "localCallback";
+    List<String> subscribedPaths = new ArrayList<String>();
+    subscribedPaths.add(subRoot);
+    ZkHelixPropertyStore<ZNRecord> store =
+        new ZkHelixPropertyStore<ZNRecord>(new ZkBaseDataAccessor<ZNRecord>(_gZkClient),
+                                           subRoot,
+                                           subscribedPaths);
+
+    // change nodes via property store interface
+    // and verify all notifications have been received
+    TestListener listener = new TestListener();
+    store.subscribe("/", listener);
+
+    // test dataCreate callbacks
+    listener.reset();
+    setNodes(store, 'a', true);
+
+    // wait until all callbacks have been received
+    Thread.sleep(500);
+    int expectCreateNodes = 1 + firstLevelNr + firstLevelNr * secondLevelNr;
+    System.out.println("createKey#:" + listener._createKeys.size() + ", changeKey#:"
+        + listener._changeKeys.size() + ", deleteKey#:" + listener._deleteKeys.size());
+    Assert.assertTrue(listener._createKeys.size() == expectCreateNodes, "Should receive "
+        + expectCreateNodes + " create callbacks");
+
+    // test dataChange callbacks
+    listener.reset();
+    setNodes(store, 'b', true);
+
+    // wait until all callbacks have been received
+    Thread.sleep(500);
+    int expectChangeNodes = firstLevelNr * secondLevelNr;
+    System.out.println("createKey#:" + listener._createKeys.size() + ", changeKey#:"
+        + listener._changeKeys.size() + ", deleteKey#:" + listener._deleteKeys.size());
+    Assert.assertTrue(listener._changeKeys.size() >= expectChangeNodes,
+                      "Should receive at least " + expectChangeNodes
+                          + " change callbacks");
+
+    // test delete callbacks
+    listener.reset();
+    int expectDeleteNodes = 1 + firstLevelNr + firstLevelNr * secondLevelNr;
+    store.remove("/", 0);
+    // wait until all callbacks have been received
+    Thread.sleep(500);
+
+    System.out.println("createKey#:" + listener._createKeys.size() + ", changeKey#:"
+        + listener._changeKeys.size() + ", deleteKey#:" + listener._deleteKeys.size());
+    Assert.assertTrue(listener._deleteKeys.size() == expectDeleteNodes, "Should receive "
+        + expectDeleteNodes + " delete callbacks");
+
+    store.stop();
+    System.out.println("END testLocalTriggeredCallback() at "
+        + new Date(System.currentTimeMillis()));
+  }
+
+  @Test
+  public void testZkTriggeredCallback() throws Exception
+  {
+    System.out.println("START testZkTriggeredCallback() at "
+        + new Date(System.currentTimeMillis()));
+
+    String subRoot = _root + "/" + "zkCallback";
+    List<String> subscribedPaths = Arrays.asList(subRoot);
+    ZkHelixPropertyStore<ZNRecord> store =
+        new ZkHelixPropertyStore<ZNRecord>(new ZkBaseDataAccessor<ZNRecord>(_gZkClient),
+                                           subRoot,
+                                           subscribedPaths);
+
+    // change nodes via property store interface
+    // and verify all notifications have been received
+    TestListener listener = new TestListener();
+    store.subscribe("/", listener);
+
+    // test create callbacks
+    listener.reset();
+    setNodes(_gZkClient, subRoot, 'a', true);
+    int expectCreateNodes = 1 + firstLevelNr + firstLevelNr * secondLevelNr;
+    Thread.sleep(500);
+
+    System.out.println("createKey#:" + listener._createKeys.size() + ", changeKey#:"
+        + listener._changeKeys.size() + ", deleteKey#:" + listener._deleteKeys.size());
+    Assert.assertTrue(listener._createKeys.size() == expectCreateNodes, "Should receive "
+        + expectCreateNodes + " create callbacks");
+
+    // test change callbacks
+    listener.reset();
+    setNodes(_gZkClient, subRoot, 'b', true);
+    int expectChangeNodes = firstLevelNr * secondLevelNr;
+    Thread.sleep(500);
+
+    System.out.println("createKey#:" + listener._createKeys.size() + ", changeKey#:"
+        + listener._changeKeys.size() + ", deleteKey#:" + listener._deleteKeys.size());
+    Assert.assertTrue(listener._changeKeys.size() >= expectChangeNodes,
+                      "Should receive at least " + expectChangeNodes
+                          + " change callbacks");
+
+    // test delete callbacks
+    listener.reset();
+    int expectDeleteNodes = 1 + firstLevelNr + firstLevelNr * secondLevelNr;
+    _gZkClient.deleteRecursive(subRoot);
+    Thread.sleep(1000);
+
+    System.out.println("createKey#:" + listener._createKeys.size() + ", changeKey#:"
+        + listener._changeKeys.size() + ", deleteKey#:" + listener._deleteKeys.size());
+    Assert.assertTrue(listener._deleteKeys.size() == expectDeleteNodes, "Should receive "
+        + expectDeleteNodes + " delete callbacks");
+
+    store.stop();
+    System.out.println("END testZkTriggeredCallback() at "
+        + new Date(System.currentTimeMillis()));
+  }
+
+  @Test
+  public void testBackToBackRemoveAndSet() throws Exception
+  {
+    System.out.println("START testBackToBackRemoveAndSet() at "
+        + new Date(System.currentTimeMillis()));
+
+    String subRoot = _root + "/" + "backToBackRemoveAndSet";
+    List<String> subscribedPaths = new ArrayList<String>();
+    subscribedPaths.add(subRoot);
+    ZkHelixPropertyStore<ZNRecord> store =
+        new ZkHelixPropertyStore<ZNRecord>(new ZkBaseDataAccessor<ZNRecord>(_gZkClient),
+                                           subRoot,
+                                           subscribedPaths);
+
+    store.set("/child0", new ZNRecord("child0"), AccessOption.PERSISTENT);
+
+    ZNRecord record = store.get("/child0", null, 0); // will put the record in cache
+    Assert.assertEquals(record.getId(), "child0");
+    // System.out.println("1:get:" + record);
+
+    String child0Path = subRoot + "/child0";
+    for (int i = 0; i < 2; i++)
+    {
+      _gZkClient.delete(child0Path);
+      _gZkClient.createPersistent(child0Path, new ZNRecord("child0-new-" + i));
+    }
+
+    Thread.sleep(500); // should wait for zk callback to add "/child0" into cache
+    record = store.get("/child0", null, 0);
+    Assert.assertEquals(record.getId(),
+                        "child0-new-1",
+                        "Cache shoulde be updated to latest create");
+    // System.out.println("2:get:" + record);
+
+    _gZkClient.delete(child0Path);
+    Thread.sleep(500); // should wait for zk callback to remove "/child0" from cache
+    try
+    {
+      record = store.get("/child0", null, AccessOption.THROW_EXCEPTION_IFNOTEXIST);
+      Assert.fail("/child0 should have been removed");
+    }
+    catch (ZkNoNodeException e)
+    {
+      // OK.
+    }
+    // System.out.println("3:get:" + record);
+
+    store.stop();
+    System.out.println("END testBackToBackRemoveAndSet() at "
+        + new Date(System.currentTimeMillis()));
+  }
+
+  private String getNodeId(int i, int j)
+  {
+    return "childNode_" + i + "_" + j;
+  }
+
+  private String getSecondLevelKey(int i, int j)
+  {
+    return "/node_" + i + "/" + getNodeId(i, j);
+  }
+
+  private String getFirstLevelKey(int i)
+  {
+    return "/node_" + i;
+  }
+
+  private void setNodes(ZkHelixPropertyStore<ZNRecord> store,
+                        char c,
+                        boolean needTimestamp)
+  {
+    char[] data = new char[bufSize];
+
+    for (int i = 0; i < bufSize; i++)
+    {
+      data[i] = c;
+    }
+
+    Map<String, String> map = new TreeMap<String, String>();
+    for (int i = 0; i < mapNr; i++)
+    {
+      map.put("key_" + i, new String(data));
+    }
+
+    for (int i = 0; i < firstLevelNr; i++)
+    {
+      for (int j = 0; j < secondLevelNr; j++)
+      {
+        String nodeId = getNodeId(i, j);
+        ZNRecord record = new ZNRecord(nodeId);
+        record.setSimpleFields(map);
+        if (needTimestamp)
+        {
+          long now = System.currentTimeMillis();
+          record.setSimpleField("SetTimestamp", Long.toString(now));
+        }
+        String key = getSecondLevelKey(i, j);
+        store.set(key, record, AccessOption.PERSISTENT);
+      }
+    }
+  }
+
+  private void setNodes(ZkClient zkClient, String root, char c, boolean needTimestamp)
+  {
+    char[] data = new char[bufSize];
+
+    for (int i = 0; i < bufSize; i++)
+    {
+      data[i] = c;
+    }
+
+    Map<String, String> map = new TreeMap<String, String>();
+    for (int i = 0; i < mapNr; i++)
+    {
+      map.put("key_" + i, new String(data));
+    }
+
+    for (int i = 0; i < firstLevelNr; i++)
+    {
+      String firstLevelKey = getFirstLevelKey(i);
+
+      for (int j = 0; j < secondLevelNr; j++)
+      {
+        String nodeId = getNodeId(i, j);
+        ZNRecord record = new ZNRecord(nodeId);
+        record.setSimpleFields(map);
+        if (needTimestamp)
+        {
+          long now = System.currentTimeMillis();
+          record.setSimpleField("SetTimestamp", Long.toString(now));
+        }
+        String key = getSecondLevelKey(i, j);
+        try
+        {
+          zkClient.writeData(root + key, record);
+        }
+        catch (ZkNoNodeException e)
+        {
+          zkClient.createPersistent(root + firstLevelKey, true);
+          zkClient.createPersistent(root + key, record);
+        }
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/store/zk/TestZkPropertyStoreSessionExpiry.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/store/zk/TestZkPropertyStoreSessionExpiry.java b/helix-core/src/test/java/org/apache/helix/store/zk/TestZkPropertyStoreSessionExpiry.java
new file mode 100644
index 0000000..03ff074
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/store/zk/TestZkPropertyStoreSessionExpiry.java
@@ -0,0 +1,107 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.store.zk;
+
+import java.util.Date;
+
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.store.PropertyChangeListener;
+import org.apache.helix.store.PropertyJsonSerializer;
+import org.apache.helix.store.zk.ZKPropertyStore;
+import org.apache.log4j.Logger;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestZkPropertyStoreSessionExpiry extends ZkUnitTestBase
+{
+  private static final Logger LOG = Logger.getLogger(TestZkPropertyStoreSessionExpiry.class);
+
+  private class TestPropertyChangeListener
+  implements PropertyChangeListener<String>
+  {
+    public boolean _propertyChangeReceived = false;
+
+    @Override
+    public void onPropertyChange(String key)
+    {
+      // TODO Auto-generated method stub
+      LOG.info("property change, " + key);
+      _propertyChangeReceived = true;
+    }
+  }
+
+	ZkClient _zkClient;
+
+	@BeforeClass
+	public void beforeClass()
+	{
+		_zkClient = new ZkClient(ZK_ADDR);
+		_zkClient.setZkSerializer(new ZNRecordSerializer());
+	}
+
+	@AfterClass
+	public void afterClass()
+	{
+		_zkClient.close();
+	}
+
+
+  @Test()
+  public void testZkPropertyStoreSessionExpiry() throws Exception
+  {
+    LOG.info("START " + getShortClassName() + " at " + new Date(System.currentTimeMillis()));
+
+    PropertyJsonSerializer<String> serializer = new PropertyJsonSerializer<String>(String.class);
+
+    ZkConnection zkConn = new ZkConnection(ZK_ADDR);
+
+    final String propertyStoreRoot = "/" + getShortClassName();
+    if (_zkClient.exists(propertyStoreRoot))
+    {
+      _zkClient.deleteRecursive(propertyStoreRoot);
+    }
+
+    ZKPropertyStore<String> zkPropertyStore 
+      = new ZKPropertyStore<String>(new ZkClient(zkConn), serializer, propertyStoreRoot);
+
+    zkPropertyStore.setProperty("/child1/grandchild1", "grandchild1");
+    zkPropertyStore.setProperty("/child1/grandchild2", "grandchild2");
+
+    TestPropertyChangeListener listener = new TestPropertyChangeListener();
+    zkPropertyStore.subscribeForPropertyChange("", listener);
+
+    listener._propertyChangeReceived = false;
+    zkPropertyStore.setProperty("/child2/grandchild3", "grandchild3");
+    Thread.sleep(100);
+    AssertJUnit.assertEquals(listener._propertyChangeReceived, true);
+
+    simulateSessionExpiry(zkConn);
+
+    listener._propertyChangeReceived = false;
+    zkPropertyStore.setProperty("/child2/grandchild4", "grandchild4");
+    Thread.sleep(100);
+    AssertJUnit.assertEquals(listener._propertyChangeReceived, true);
+
+    LOG.info("END " + getShortClassName() + " at " + new Date(System.currentTimeMillis()));
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/tools/TestClusterSetup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/tools/TestClusterSetup.java b/helix-core/src/test/java/org/apache/helix/tools/TestClusterSetup.java
new file mode 100644
index 0000000..6c0c8c4
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/tools/TestClusterSetup.java
@@ -0,0 +1,494 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.tools;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Date;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class TestClusterSetup extends ZkUnitTestBase
+{
+  private static Logger         LOG             =
+                                                    Logger.getLogger(TestClusterSetup.class);
+
+  protected static final String CLUSTER_NAME    = "TestClusterSetup";
+  protected static final String TEST_DB         = "TestDB";
+  protected static final String INSTANCE_PREFIX = "instance:";
+  protected static final String STATE_MODEL     = "MasterSlave";
+  protected static final String TEST_NODE       = "testnode:1";
+
+  ZkClient                      _zkClient;
+  ClusterSetup                  _clusterSetup;
+
+  String instanceColonToUnderscoreFormat(String colonFormat)
+  {
+    int lastPos = colonFormat.lastIndexOf(":");
+    if (lastPos <= 0)
+    {
+      String error = "Invalid storage Instance info format: " + colonFormat;
+      LOG.warn(error);
+      throw new HelixException(error);
+    }
+    String host = colonFormat.substring(0, lastPos);
+    String portStr = colonFormat.substring(lastPos + 1);
+    return host + "_" + portStr;
+  }
+
+  private static String[] createArgs(String str)
+  {
+    String[] split = str.split("[ ]+");
+    System.out.println(Arrays.toString(split));
+    return split;
+  }
+
+  @BeforeClass()
+  public void beforeClass() throws IOException,
+      Exception
+  {
+    System.out.println("START TestClusterSetup.beforeClass() "
+        + new Date(System.currentTimeMillis()));
+
+    _zkClient = new ZkClient(ZK_ADDR);
+    _zkClient.setZkSerializer(new ZNRecordSerializer());
+  }
+
+  @AfterClass()
+  public void afterClass()
+  {
+    _zkClient.close();
+    System.out.println("END TestClusterSetup.afterClass() "
+        + new Date(System.currentTimeMillis()));
+  }
+
+  @BeforeMethod()
+  public void setup()
+  {
+
+    _zkClient.deleteRecursive("/" + CLUSTER_NAME);
+    _clusterSetup = new ClusterSetup(ZK_ADDR);
+    _clusterSetup.addCluster(CLUSTER_NAME, true);
+  }
+
+  @Test()
+  public void testAddInstancesToCluster() throws Exception
+  {
+    String instanceAddresses[] = new String[3];
+    for (int i = 0; i < 3; i++)
+    {
+      String currInstance = INSTANCE_PREFIX + i;
+      instanceAddresses[i] = currInstance;
+    }
+    String nextInstanceAddress = INSTANCE_PREFIX + 3;
+
+    _clusterSetup.addInstancesToCluster(CLUSTER_NAME, instanceAddresses);
+
+    // verify instances
+    for (String instance : instanceAddresses)
+    {
+      verifyInstance(_zkClient,
+                     CLUSTER_NAME,
+                     instanceColonToUnderscoreFormat(instance),
+                     true);
+    }
+
+    _clusterSetup.addInstanceToCluster(CLUSTER_NAME, nextInstanceAddress);
+    verifyInstance(_zkClient,
+                   CLUSTER_NAME,
+                   instanceColonToUnderscoreFormat(nextInstanceAddress),
+                   true);
+    // re-add
+    boolean caughtException = false;
+    try
+    {
+      _clusterSetup.addInstanceToCluster(CLUSTER_NAME, nextInstanceAddress);
+    }
+    catch (HelixException e)
+    {
+      caughtException = true;
+    }
+    AssertJUnit.assertTrue(caughtException);
+
+    // bad instance format
+    String badFormatInstance = "badinstance";
+    caughtException = false;
+    try
+    {
+      _clusterSetup.addInstanceToCluster(CLUSTER_NAME, badFormatInstance);
+    }
+    catch (HelixException e)
+    {
+      caughtException = true;
+    }
+    AssertJUnit.assertTrue(caughtException);
+
+  }
+
+  @Test()
+  public void testDisableDropInstancesFromCluster() throws Exception
+  {
+    testAddInstancesToCluster();
+    String instanceAddresses[] = new String[3];
+    for (int i = 0; i < 3; i++)
+    {
+      String currInstance = INSTANCE_PREFIX + i;
+      instanceAddresses[i] = currInstance;
+    }
+    String nextInstanceAddress = INSTANCE_PREFIX + 3;
+
+    boolean caughtException = false;
+    // drop without disabling
+    try
+    {
+      _clusterSetup.dropInstanceFromCluster(CLUSTER_NAME, nextInstanceAddress);
+    }
+    catch (HelixException e)
+    {
+      caughtException = true;
+    }
+    AssertJUnit.assertTrue(caughtException);
+
+    // disable
+    _clusterSetup.getClusterManagementTool()
+                 .enableInstance(CLUSTER_NAME,
+                                 instanceColonToUnderscoreFormat(nextInstanceAddress),
+                                 false);
+    verifyEnabled(_zkClient,
+                  CLUSTER_NAME,
+                  instanceColonToUnderscoreFormat(nextInstanceAddress),
+                  false);
+
+    // drop
+    _clusterSetup.dropInstanceFromCluster(CLUSTER_NAME, nextInstanceAddress);
+    verifyInstance(_zkClient,
+                   CLUSTER_NAME,
+                   instanceColonToUnderscoreFormat(nextInstanceAddress),
+                   false);
+
+    // re-drop
+    caughtException = false;
+    try
+    {
+      _clusterSetup.dropInstanceFromCluster(CLUSTER_NAME, nextInstanceAddress);
+    }
+    catch (HelixException e)
+    {
+      caughtException = true;
+    }
+    AssertJUnit.assertTrue(caughtException);
+    /*
+     * //drop a set _clusterSetup.getClusterManagementTool().enableInstances(CLUSTER_NAME,
+     * instanceAddresses, false); _clusterSetup.dropInstancesFromCluster(CLUSTER_NAME,
+     * instanceAddresses);
+     */
+
+    // bad format disable, drop
+    String badFormatInstance = "badinstance";
+    caughtException = false;
+    try
+    {
+      _clusterSetup.getClusterManagementTool().enableInstance(CLUSTER_NAME,
+                                                              badFormatInstance,
+                                                              false);
+    }
+    catch (HelixException e)
+    {
+      caughtException = true;
+    }
+    AssertJUnit.assertTrue(caughtException);
+
+    caughtException = false;
+    try
+    {
+      _clusterSetup.dropInstanceFromCluster(CLUSTER_NAME, badFormatInstance);
+    }
+    catch (HelixException e)
+    {
+      caughtException = true;
+    }
+    AssertJUnit.assertTrue(caughtException);
+  }
+
+  @Test()
+  public void testAddResource() throws Exception
+  {
+    try
+    {
+      _clusterSetup.addResourceToCluster(CLUSTER_NAME, TEST_DB, 16, STATE_MODEL);
+    }
+    catch(Exception e)
+    {}
+    verifyResource(_zkClient, CLUSTER_NAME, TEST_DB, true);
+  }
+
+  @Test()
+  public void testRemoveResource() throws Exception
+  {
+    _clusterSetup.setupTestCluster(CLUSTER_NAME);
+    verifyResource(_zkClient, CLUSTER_NAME, TEST_DB, true);
+    _clusterSetup.dropResourceFromCluster(CLUSTER_NAME, TEST_DB);
+    verifyResource(_zkClient, CLUSTER_NAME, TEST_DB, false);
+  }
+
+  @Test()
+  public void testRebalanceCluster() throws Exception
+  {
+    _clusterSetup.setupTestCluster(CLUSTER_NAME);
+    // testAddInstancesToCluster();
+    testAddResource();
+    _clusterSetup.rebalanceStorageCluster(CLUSTER_NAME, TEST_DB, 4);
+    verifyReplication(_zkClient, CLUSTER_NAME, TEST_DB, 4);
+  }
+
+  /*
+   * @Test (groups = {"unitTest"}) public void testPrintUsage() throws Exception { Options
+   * cliOptions = ClusterSetup.constructCommandLineOptions();
+   * ClusterSetup.printUsage(null); }
+   */
+
+  @Test()
+  public void testParseCommandLinesArgs() throws Exception
+  {
+    // ClusterSetup
+    // .processCommandLineArgs(createArgs("-zkSvr "+ZK_ADDR+ " help"));
+
+    // wipe ZK
+    _zkClient.deleteRecursive("/" + CLUSTER_NAME);
+    _clusterSetup = new ClusterSetup(ZK_ADDR);
+
+    ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR + " --addCluster "
+        + CLUSTER_NAME));
+
+    // wipe again
+    _zkClient.deleteRecursive("/" + CLUSTER_NAME);
+    _clusterSetup = new ClusterSetup(ZK_ADDR);
+
+    _clusterSetup.setupTestCluster(CLUSTER_NAME);
+
+    ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR + " --addNode "
+        + CLUSTER_NAME + " " + TEST_NODE));
+    verifyInstance(_zkClient,
+                   CLUSTER_NAME,
+                   instanceColonToUnderscoreFormat(TEST_NODE),
+                   true);
+    try
+    {
+      ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR
+        + " --addResource " + CLUSTER_NAME + " " + TEST_DB + " 4 " + STATE_MODEL));
+    }
+    catch(Exception e)
+    {
+      
+    }
+    verifyResource(_zkClient, CLUSTER_NAME, TEST_DB, true);
+    // ClusterSetup
+    // .processCommandLineArgs(createArgs("-zkSvr "+ZK_ADDR+" --addNode node-1"));
+    ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR
+        + " --enableInstance " + CLUSTER_NAME + " "
+        + instanceColonToUnderscoreFormat(TEST_NODE) + " true"));
+    verifyEnabled(_zkClient,
+                  CLUSTER_NAME,
+                  instanceColonToUnderscoreFormat(TEST_NODE),
+                  true);
+
+    // TODO: verify list commands
+    /*
+     * ClusterSetup
+     * .processCommandLineArgs(createArgs("-zkSvr "+ZK_ADDR+" --listClusterInfo "
+     * +CLUSTER_NAME)); ClusterSetup
+     * .processCommandLineArgs(createArgs("-zkSvr "+ZK_ADDR+" --listClusters"));
+     * ClusterSetup
+     * .processCommandLineArgs(createArgs("-zkSvr "+ZK_ADDR+" --listInstanceInfo "
+     * +CLUSTER_NAME+" "+instanceColonToUnderscoreFormat(TEST_NODE))); ClusterSetup
+     * .processCommandLineArgs
+     * (createArgs("-zkSvr "+ZK_ADDR+" --listInstances "+CLUSTER_NAME)); ClusterSetup
+     * .processCommandLineArgs
+     * (createArgs("-zkSvr "+ZK_ADDR+" --listResourceInfo "+CLUSTER_NAME +" "+TEST_DB));
+     * ClusterSetup
+     * .processCommandLineArgs(createArgs("-zkSvr "+ZK_ADDR+" --listResources "
+     * +CLUSTER_NAME)); ClusterSetup
+     * .processCommandLineArgs(createArgs("-zkSvr "+ZK_ADDR+" --listStateModel "
+     * +CLUSTER_NAME+" "+STATE_MODEL)); ClusterSetup
+     * .processCommandLineArgs(createArgs("-zkSvr "
+     * +ZK_ADDR+" --listStateModels "+CLUSTER_NAME));
+     */
+    // ClusterSetup
+    // .processCommandLineArgs(createArgs("-zkSvr "+ZK_ADDR+" --rebalance "+CLUSTER_NAME+" "+TEST_DB+" 1"));
+    ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR
+        + " --enableInstance " + CLUSTER_NAME + " "
+        + instanceColonToUnderscoreFormat(TEST_NODE) + " false"));
+    verifyEnabled(_zkClient,
+                  CLUSTER_NAME,
+                  instanceColonToUnderscoreFormat(TEST_NODE),
+                  false);
+    ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR + " --dropNode "
+        + CLUSTER_NAME + " " + TEST_NODE));
+  }
+
+  @Test()
+  public void testSetGetConfig() throws Exception
+  {
+    System.out.println("START testSetGetConfig() " + new Date(System.currentTimeMillis()));
+
+    // basic
+    String scopesStr = "CLUSTER=" + CLUSTER_NAME + ",PARTICIPANT=localhost_0";
+    String propertiesStr = "key1=value1,key2=value2";
+    String keysStr = "key1,key2";
+    _clusterSetup.setConfig(scopesStr, propertiesStr);
+    String valuesStr = _clusterSetup.getConfig(scopesStr, keysStr);
+    Assert.assertEquals(valuesStr, propertiesStr);
+
+    System.out.println("END testSetGetConfig() " + new Date(System.currentTimeMillis()));
+
+  }
+
+  @Test
+  public void testEnableCluster() throws Exception
+  {
+    // Logger.getRootLogger().setLevel(Level.INFO);
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+
+    System.out.println("START " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+                            "localhost", // participant name prefix
+                            "TestDB", // resource name prefix
+                            1, // resources
+                            10, // partitions per resource
+                            5, // number of nodes
+                            3, // replicas
+                            "MasterSlave",
+                            true); // do rebalance
+
+    // pause cluster
+    ClusterSetup.processCommandLineArgs(new String[] { "--zkSvr", ZK_ADDR,
+        "--enableCluster", clusterName, "false" });
+
+    Builder keyBuilder = new Builder(clusterName);
+    boolean exists = _gZkClient.exists(keyBuilder.pause().getPath());
+    Assert.assertTrue(exists, "pause node under controller should be created");
+
+    // resume cluster
+    ClusterSetup.processCommandLineArgs(new String[] { "--zkSvr", ZK_ADDR,
+        "--enableCluster", clusterName, "true" });
+
+    exists = _gZkClient.exists(keyBuilder.pause().getPath());
+    Assert.assertFalse(exists, "pause node under controller should be removed");
+
+    System.out.println("END " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+  }
+
+  @Test
+  public void testDropInstance() throws Exception
+  {
+    // drop without stop, should throw exception
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+
+    System.out.println("START " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+                            "localhost", // participant name prefix
+                            "TestDB", // resource name prefix
+                            1, // resources
+                            10, // partitions per resource
+                            5, // number of nodes
+                            3, // replicas
+                            "MasterSlave",
+                            true); // do rebalance
+
+    // add fake liveInstance
+    ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+    Builder keyBuilder = new Builder(clusterName);
+    LiveInstance liveInstance = new LiveInstance("localhost_12918");
+    liveInstance.setSessionId("session_0");
+    liveInstance.setHelixVersion("version_0");
+    accessor.setProperty(keyBuilder.liveInstance("localhost_12918"), liveInstance);
+
+    // drop without stop the process, should throw exception
+    try
+    {
+      ClusterSetup.processCommandLineArgs(new String[] { "--zkSvr", ZK_ADDR,
+          "--dropNode", clusterName, "localhost:12918" });
+      Assert.fail("Should throw exception since localhost_12918 is still in LIVEINSTANCES/");
+    }
+    catch (Exception e)
+    {
+      // OK
+    }
+    accessor.removeProperty(keyBuilder.liveInstance("localhost_12918"));
+
+    // drop without disable, should throw exception
+    try
+    {
+      ClusterSetup.processCommandLineArgs(new String[] { "--zkSvr", ZK_ADDR,
+          "--dropNode", clusterName, "localhost:12918" });
+      Assert.fail("Should throw exception since localhost_12918 is enabled");
+    }
+    catch (Exception e)
+    {
+      // e.printStackTrace();
+      // OK
+    }
+
+    // drop it
+    ClusterSetup.processCommandLineArgs(new String[] { "--zkSvr", ZK_ADDR,
+        "--enableInstance", clusterName, "localhost_12918", "false" });
+    ClusterSetup.processCommandLineArgs(new String[] { "--zkSvr", ZK_ADDR, "--dropNode",
+        clusterName, "localhost:12918" });
+
+    Assert.assertNull(accessor.getProperty(keyBuilder.instanceConfig("localhost_12918")),
+                      "Instance config should be dropped");
+    Assert.assertFalse(_gZkClient.exists(PropertyPathConfig.getPath(PropertyType.INSTANCES,
+                                                                    clusterName,
+                                                                    "localhost_12918")),
+                       "Instance/host should be dropped");
+    
+    System.out.println("END " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/tools/TestHelixAdminScenariosCli.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/tools/TestHelixAdminScenariosCli.java b/helix-core/src/test/java/org/apache/helix/tools/TestHelixAdminScenariosCli.java
new file mode 100644
index 0000000..9b9fcb0
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/tools/TestHelixAdminScenariosCli.java
@@ -0,0 +1,423 @@
+package org.apache.helix.tools;
+/*
+ * Simulate all the admin tasks needed by using command line tool
+ * 
+ * */
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.TestHelper.StartCMResult;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.controller.restlet.ZKPropertyTransferServer;
+import org.apache.helix.controller.restlet.ZkPropertyTransferClient;
+import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.manager.zk.ZKUtil;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.MasterNbInExtViewVerifier;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestHelixAdminScenariosCli extends ZkIntegrationTestBase
+{
+  Map<String, StartCMResult> _startCMResultMap =
+        new HashMap<String, StartCMResult>();
+  
+  public static String ObjectToJson(Object object) throws JsonGenerationException,
+    JsonMappingException,
+    IOException
+  {
+    ObjectMapper mapper = new ObjectMapper();
+    SerializationConfig serializationConfig = mapper.getSerializationConfig();
+    serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+    
+    StringWriter sw = new StringWriter();
+    mapper.writeValue(sw, object);
+    
+    return sw.toString();
+  }
+
+  
+  @Test
+  public void testAddDeleteClusterAndInstanceAndResource() throws Exception
+  {
+    // Helix bug helix-102
+    //ZKPropertyTransferServer.PERIOD = 500;
+    //ZkPropertyTransferClient.SEND_PERIOD = 500;
+    //ZKPropertyTransferServer.getInstance().init(19999, ZK_ADDR);
+    
+    /**======================= Add clusters ==============================*/
+    
+    testAddCluster();
+    
+    /**================= Add / drop some resources ===========================*/
+    
+    testAddResource();
+    
+    /**====================== Add / delete  instances ===========================*/
+    
+    testAddInstance();
+    
+    /**===================== Rebalance resource ===========================*/
+    
+    testRebalanceResource();
+    
+    /**====================  start the clusters =============================*/
+    
+    testStartCluster();
+    
+    /**====================  drop add resource in live clusters ===================*/
+    testDropAddResource();
+    /**======================Operations with live node ============================*/
+   
+    testInstanceOperations();
+    
+    /**============================ expand cluster ===========================*/
+   
+    testExpandCluster();
+      
+    /**============================ deactivate cluster ===========================*/
+    testDeactivateCluster();
+    
+  }
+  
+  void assertClusterSetupException(String command)
+  {
+    boolean exceptionThrown = false;
+    try
+    {
+      ClusterSetup.processCommandLineArgs(command.split(" "));
+    }
+    catch(Exception e)
+    {
+      exceptionThrown = true;
+    }
+    Assert.assertTrue(exceptionThrown);
+  }
+  
+  public void testAddCluster() throws Exception
+  {
+    String command = "--zkSvr localhost:2183 -addCluster clusterTest";
+    ClusterSetup.processCommandLineArgs(command.split(" "));
+    
+    // malformed cluster name 
+    command = "--zkSvr localhost:2183 -addCluster /ClusterTest";
+    assertClusterSetupException(command);
+    
+    // Add the grand cluster
+    command = "--zkSvr localhost:2183 -addCluster \"Klazt3rz";
+    ClusterSetup.processCommandLineArgs(command.split(" "));
+    
+    command = "--zkSvr localhost:2183 -addCluster \\ClusterTest";
+    ClusterSetup.processCommandLineArgs(command.split(" "));
+    
+    // Add already exist cluster
+    command = "--zkSvr localhost:2183 -addCluster clusterTest";
+    assertClusterSetupException(command);
+    
+    // delete cluster without resource and instance
+    Assert.assertTrue(ZKUtil.isClusterSetup("Klazt3rz", _gZkClient));
+    Assert.assertTrue(ZKUtil.isClusterSetup("clusterTest", _gZkClient));
+    Assert.assertTrue(ZKUtil.isClusterSetup("\\ClusterTest", _gZkClient));
+    
+    command = "-zkSvr localhost:2183 -dropCluster \\ClusterTest";
+    ClusterSetup.processCommandLineArgs(command.split(" "));
+    
+    command = "-zkSvr localhost:2183 -dropCluster clusterTest1";
+    ClusterSetup.processCommandLineArgs(command.split(" "));
+    
+    command = "-zkSvr localhost:2183 -dropCluster clusterTest";
+    ClusterSetup.processCommandLineArgs(command.split(" "));
+    
+    Assert.assertFalse(_gZkClient.exists("/clusterTest"));
+    Assert.assertFalse(_gZkClient.exists("/\\ClusterTest"));
+    Assert.assertFalse(_gZkClient.exists("/clusterTest1"));
+    
+    command = "-zkSvr localhost:2183 -addCluster clusterTest1";
+    ClusterSetup.processCommandLineArgs(command.split(" "));
+  }
+  
+  public void testAddResource() throws Exception
+  {
+    String command = "-zkSvr localhost:2183 -addResource clusterTest1 db_22 144 MasterSlave";
+    ClusterSetup.processCommandLineArgs(command.split(" "));
+    
+    command = "-zkSvr localhost:2183 -addResource clusterTest1 db_11 44 MasterSlave";
+    ClusterSetup.processCommandLineArgs(command.split(" "));
+    
+    // Add duplicate resource 
+    command = "-zkSvr localhost:2183 -addResource clusterTest1 db_22 55 OnlineOffline";
+    assertClusterSetupException(command);
+    
+    // drop resource now
+    command = "-zkSvr localhost:2183 -dropResource clusterTest1 db_11 ";
+    ClusterSetup.processCommandLineArgs(command.split(" "));
+    
+    command = "-zkSvr localhost:2183 -addResource clusterTest1 db_11 44 MasterSlave";
+    ClusterSetup.processCommandLineArgs(command.split(" "));
+  }
+
+  private void testDeactivateCluster() throws Exception, InterruptedException
+  {
+    String command;
+    HelixDataAccessor accessor;
+    String path;
+    // deactivate cluster
+    command = "-zkSvr localhost:2183 -activateCluster clusterTest1 Klazt3rz false";
+    ClusterSetup.processCommandLineArgs(command.split(" "));
+    Thread.sleep(6000);
+    
+    accessor = _startCMResultMap.get("localhost_1231")._manager.getHelixDataAccessor();
+    path = accessor.keyBuilder().controllerLeader().getPath();
+    Assert.assertFalse(_gZkClient.exists(path));
+    
+    command = "-zkSvr localhost:2183 -dropCluster clusterTest1";
+    assertClusterSetupException(command);
+    
+    // leader node should be gone
+    for(StartCMResult result : _startCMResultMap.values())
+    {
+      result._manager.disconnect();
+      result._thread.interrupt();
+    }
+
+    command = "-zkSvr localhost:2183 -dropCluster clusterTest1";
+    ClusterSetup.processCommandLineArgs(command.split(" "));
+    
+    command = "-zkSvr localhost:2183 -dropCluster Klazt3rz";
+    ClusterSetup.processCommandLineArgs(command.split(" "));
+  }
+  
+  private void testDropAddResource() throws Exception
+  {
+    ZNRecord record = _gSetupTool._admin.getResourceIdealState("clusterTest1", "db_11").getRecord();
+    String x = ObjectToJson(record);
+    
+    FileWriter fos = new FileWriter("/tmp/temp.log"); 
+    PrintWriter pw = new PrintWriter(fos);
+    pw.write(x);
+    pw.close();
+    
+    String command = "-zkSvr localhost:2183 -dropResource clusterTest1 db_11 ";
+    ClusterSetup.processCommandLineArgs(command.split(" "));
+    
+    boolean verifyResult =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                 "clusterTest1"));
+    Assert.assertTrue(verifyResult);
+    
+    command = "-zkSvr localhost:2183 -addIdealState clusterTest1 db_11 \"/tmp/temp.log\"";
+    ClusterSetup.processCommandLineArgs(command.split(" "));
+    
+    verifyResult =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                 "clusterTest1"));
+    Assert.assertTrue(verifyResult);
+    
+    ZNRecord record2 = _gSetupTool._admin.getResourceIdealState("clusterTest1", "db_11").getRecord();
+    Assert.assertTrue(record2.equals(record));
+  }
+
+  private void testExpandCluster() throws Exception
+  {
+    String command;
+    HelixDataAccessor accessor;
+    boolean verifyResult;
+    String path;
+    
+    command = "-zkSvr localhost:2183 -addNode clusterTest1 localhost:12331;localhost:12341;localhost:12351;localhost:12361";
+    ClusterSetup.processCommandLineArgs(command.split(" "));
+    
+    command = "-zkSvr localhost:2183 -expandCluster clusterTest1";
+    ClusterSetup.processCommandLineArgs(command.split(" "));
+    
+    for(int i = 3; i<= 6; i++)
+    {
+      StartCMResult result =
+          TestHelper.startDummyProcess(ZK_ADDR, "clusterTest1", "localhost_123"+i+"1");
+      _startCMResultMap.put("localhost_123"+i + "1", result);
+    }
+    
+    verifyResult =
+        ClusterStateVerifier.verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR,
+                                                                              "clusterTest1"));
+    Assert.assertTrue(verifyResult);
+    
+    verifyResult =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                 "clusterTest1"));
+    Assert.assertTrue(verifyResult);
+  }
+
+  private void testInstanceOperations() throws Exception
+  {
+    String command;
+    HelixDataAccessor accessor;
+    boolean verifyResult;
+    // drop node should fail as not disabled
+    command = "-zkSvr localhost:2183 -dropNode clusterTest1 localhost:1232";
+    assertClusterSetupException(command);
+    
+    // disabled node
+    command = "-zkSvr localhost:2183 -enableInstance clusterTest1 localhost:1232 false";
+    ClusterSetup.processCommandLineArgs(command.split(" "));
+    
+    // Cannot drop / swap
+    command = "-zkSvr localhost:2183 -dropNode clusterTest1 localhost:1232";
+    assertClusterSetupException(command);
+    
+    command = "-zkSvr localhost:2183 -swapInstance clusterTest1 localhost_1232 localhost_12320";
+    assertClusterSetupException(command);
+    
+    // disconnect the node
+    _startCMResultMap.get("localhost_1232")._manager.disconnect();
+    _startCMResultMap.get("localhost_1232")._thread.interrupt();
+    
+    // add new node then swap instance
+    command = "-zkSvr localhost:2183 -addNode clusterTest1 localhost:12320";
+    ClusterSetup.processCommandLineArgs(command.split(" "));
+    
+    // swap instance. The instance get swapped out should not exist anymore
+    command = "-zkSvr localhost:2183 -swapInstance clusterTest1 localhost_1232 localhost_12320";
+    ClusterSetup.processCommandLineArgs(command.split(" "));
+    
+    accessor = _startCMResultMap.get("localhost_1231")._manager.getHelixDataAccessor();
+    String path = accessor.keyBuilder().instanceConfig("localhost_1232").getPath();
+    Assert.assertFalse(_gZkClient.exists(path));
+    
+    _startCMResultMap.put("localhost_12320", TestHelper.startDummyProcess(ZK_ADDR, "clusterTest1", "localhost_12320"));
+  }
+
+  private void testStartCluster() throws Exception, InterruptedException
+  {
+    String command;
+    //start mock nodes
+    for(int i = 0; i < 6 ; i++)
+    {
+      StartCMResult result =
+          TestHelper.startDummyProcess(ZK_ADDR, "clusterTest1", "localhost_123"+i);
+      _startCMResultMap.put("localhost_123"+i, result);
+    }
+    
+    //start controller nodes
+    for(int i = 0; i< 2; i++)
+    {
+      StartCMResult result =
+          TestHelper.startController("Klazt3rz", "controller_900" + i, ZK_ADDR, HelixControllerMain.DISTRIBUTED);
+          
+      _startCMResultMap.put("controller_900" + i, result);
+    }
+    Thread.sleep(100);
+    
+    // activate clusters
+    // wrong grand clustername
+    command = "-zkSvr localhost:2183 -activateCluster clusterTest1 Klazters true";
+    assertClusterSetupException(command);
+    
+    // wrong cluster name
+    command = "-zkSvr localhost:2183 -activateCluster clusterTest2 Klazt3rs true";
+    assertClusterSetupException(command);
+    
+    command = "-zkSvr localhost:2183 -activateCluster clusterTest1 Klazt3rz true";
+    ClusterSetup.processCommandLineArgs(command.split(" "));
+    Thread.sleep(500);
+    
+    command = "-zkSvr localhost:2183 -dropCluster clusterTest1";
+    assertClusterSetupException(command);
+    
+    // verify leader node
+    HelixDataAccessor accessor = _startCMResultMap.get("controller_9001")._manager.getHelixDataAccessor();
+    LiveInstance controllerLeader = accessor.getProperty(accessor.keyBuilder().controllerLeader());
+    Assert.assertTrue(controllerLeader.getInstanceName().startsWith("controller_900"));
+    
+    accessor = _startCMResultMap.get("localhost_1232")._manager.getHelixDataAccessor();
+    LiveInstance leader = accessor.getProperty(accessor.keyBuilder().controllerLeader());
+    Assert.assertTrue(leader.getInstanceName().startsWith("controller_900"));
+    
+    boolean verifyResult =
+        ClusterStateVerifier.verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR,
+                                                                              "clusterTest1"));
+    Assert.assertTrue(verifyResult);
+    
+    verifyResult =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                 "clusterTest1"));
+    Assert.assertTrue(verifyResult);
+  }
+
+  private void testRebalanceResource() throws Exception
+  {
+    String command = "-zkSvr localhost:2183 -rebalance clusterTest1 db_11 3";
+    ClusterSetup.processCommandLineArgs(command.split(" "));
+    
+    command = "-zkSvr localhost:2183 -dropResource clusterTest1 db_11 ";
+    ClusterSetup.processCommandLineArgs(command.split(" "));
+    
+    // re-add and rebalance
+    command = "-zkSvr localhost:2183 -addResource clusterTest1 db_11 48 MasterSlave";
+    ClusterSetup.processCommandLineArgs(command.split(" "));
+    
+    command = "-zkSvr localhost:2183 -rebalance clusterTest1 db_11 3";
+    ClusterSetup.processCommandLineArgs(command.split(" "));
+    
+    // rebalance with key prefix
+    command = "-zkSvr localhost:2183 -rebalance clusterTest1 db_22 2 -key alias";
+    ClusterSetup.processCommandLineArgs(command.split(" "));
+  }
+
+  private void testAddInstance() throws Exception
+  {
+    String command;
+    for(int i = 0; i < 3; i++)
+    {
+      command = "-zkSvr localhost:2183 -addNode clusterTest1 localhost:123"+i;
+      ClusterSetup.processCommandLineArgs(command.split(" "));
+    }
+    command = "-zkSvr localhost:2183 -addNode clusterTest1 localhost:1233;localhost:1234;localhost:1235;localhost:1236";
+    ClusterSetup.processCommandLineArgs(command.split(" "));
+    
+    // delete one node without disable
+    command = "-zkSvr localhost:2183 -dropNode clusterTest1 localhost:1236";
+    assertClusterSetupException(command);
+    
+    // delete non-exist node
+    command = "-zkSvr localhost:2183 -dropNode clusterTest1 localhost:12367";
+    assertClusterSetupException(command);
+    
+    // disable node
+    command = "-zkSvr localhost:2183 -enableInstance clusterTest1 localhost:1236 false";
+    ClusterSetup.processCommandLineArgs(command.split(" "));
+    
+    command = "-zkSvr localhost:2183 -dropNode clusterTest1 localhost:1236";
+    ClusterSetup.processCommandLineArgs(command.split(" "));
+    
+    // add node to controller cluster
+    command = "-zkSvr localhost:2183 -addNode Klazt3rz controller:9000;controller:9001";
+    ClusterSetup.processCommandLineArgs(command.split(" "));
+    
+    // add a dup host
+    command = "-zkSvr localhost:2183 -addNode clusterTest1 localhost:1234";
+    assertClusterSetupException(command);
+    
+    // drop and add resource
+    command = "-zkSvr localhost:2183 -dropResource clusterTest1 db_11 ";
+    ClusterSetup.processCommandLineArgs(command.split(" "));
+    
+    command = "-zkSvr localhost:2183 -addResource clusterTest1 db_11 12 MasterSlave";
+    ClusterSetup.processCommandLineArgs(command.split(" "));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/util/TestZKClientPool.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/util/TestZKClientPool.java b/helix-core/src/test/java/org/apache/helix/util/TestZKClientPool.java
new file mode 100644
index 0000000..509d6c0
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/util/TestZKClientPool.java
@@ -0,0 +1,72 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.util;
+
+import java.util.Date;
+
+import org.I0Itec.zkclient.ZkServer;
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.util.ZKClientPool;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestZKClientPool
+{
+
+  @Test
+  public void test() throws Exception
+  {
+    String testName = "TestZKClientPool";
+    System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis()));
+
+    String zkAddr = "localhost:2187";
+    ZkServer zkServer = TestHelper.startZkSever(zkAddr);
+    ZkClient zkClient = ZKClientPool.getZkClient(zkAddr);
+    
+    zkClient.createPersistent("/" + testName, new ZNRecord(testName));
+    ZNRecord record = zkClient.readData("/" + testName);
+    Assert.assertEquals(record.getId(), testName);
+    
+    TestHelper.stopZkServer(zkServer);
+    
+    // restart zk 
+    zkServer = TestHelper.startZkSever(zkAddr);
+    try
+    {
+      zkClient = ZKClientPool.getZkClient(zkAddr);
+      record = zkClient.readData("/" + testName);
+      Assert.fail("should fail on zk no node exception");
+    } catch (ZkNoNodeException e)
+    {
+      // OK
+    } catch (Exception e)
+    {
+      Assert.fail("should not fail on exception other than ZkNoNodeException");
+    }
+    
+    zkClient.createPersistent("/" + testName, new ZNRecord(testName));
+    record = zkClient.readData("/" + testName);
+    Assert.assertEquals(record.getId(), testName);
+    
+    zkClient.close();
+    TestHelper.stopZkServer(zkServer);
+    System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/mockservice/src/main/java/org/apache/helix/CMConnector.java
----------------------------------------------------------------------
diff --git a/mockservice/src/main/java/org/apache/helix/CMConnector.java b/mockservice/src/main/java/org/apache/helix/CMConnector.java
new file mode 100644
index 0000000..5eaa8a1
--- /dev/null
+++ b/mockservice/src/main/java/org/apache/helix/CMConnector.java
@@ -0,0 +1,42 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+
+
+public class CMConnector {
+
+	HelixManager _manager;
+
+	public CMConnector(final String clusterName, final String instanceName, final String zkAddr) throws Exception //, final ZkClient zkClient) throws Exception
+	{
+		 _manager = null;
+		 _manager = HelixManagerFactory
+		            .getZKHelixManager(clusterName, instanceName, InstanceType.PARTICIPANT, zkAddr); //, zkClient);
+		 _manager.connect();
+	}
+
+	public HelixManager getManager() {
+		return _manager;
+	}
+
+	public void disconnect() {
+		_manager.disconnect();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/mockservice/src/main/java/org/apache/helix/EspressoResource.java
----------------------------------------------------------------------
diff --git a/mockservice/src/main/java/org/apache/helix/EspressoResource.java b/mockservice/src/main/java/org/apache/helix/EspressoResource.java
new file mode 100644
index 0000000..30e8853
--- /dev/null
+++ b/mockservice/src/main/java/org/apache/helix/EspressoResource.java
@@ -0,0 +1,214 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import java.io.IOException;
+import java.io.Reader;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.restlet.Context;
+import org.restlet.data.Form;
+import org.restlet.data.MediaType;
+import org.restlet.data.Method;
+import org.restlet.data.Request;
+import org.restlet.data.Response;
+import org.restlet.data.Status;
+import org.restlet.resource.Representation;
+import org.restlet.resource.Resource;
+import org.restlet.resource.ResourceException;
+import org.restlet.resource.StringRepresentation;
+import org.restlet.resource.Variant;
+
+
+public class EspressoResource extends Resource {
+	
+	private static final int POST_BODY_BUFFER_SIZE = 1024*10;
+	
+	private static final Logger logger = Logger
+			.getLogger(EspressoResource.class);
+	
+	Context _context;
+	
+	 public EspressoResource(Context context,
+	            Request request,
+	            Response response) 
+	  {
+	    super(context, request, response);
+	    getVariants().add(new Variant(MediaType.TEXT_PLAIN));
+	    getVariants().add(new Variant(MediaType.APPLICATION_JSON));
+	    _context = context;
+	  }
+	 
+	 public boolean allowGet()
+	  {
+		 System.out.println("PutResource.allowGet()");
+	    return true;
+	  }
+	  
+	  public boolean allowPost()
+	  {
+		  System.out.println("PutResource.allowPost()");
+	    return true;
+	  }
+	  
+	  public boolean allowPut()
+	  {
+		  System.out.println("PutResource.allowPut()");
+	    return true;
+	  }
+	  
+	  public boolean allowDelete()
+	  {
+	    return false;
+	  }
+	  
+	  /*
+	   * Handle get requests
+	   * @see org.restlet.resource.Resource#represent(org.restlet.resource.Variant)
+	   */
+	  public Representation represent(Variant variant)
+	  {
+	    StringRepresentation presentation = null;
+	    try
+	    {
+	    	String databaseId = (String)getRequest().getAttributes().get(MockEspressoService.DATABASENAME);
+	    	String tableId = (String)getRequest().getAttributes().get(MockEspressoService.TABLENAME);
+	    	String resourceId = (String)getRequest().getAttributes().get(MockEspressoService.RESOURCENAME);
+	    	String subResourceId = (String)getRequest().getAttributes().get(MockEspressoService.SUBRESOURCENAME);
+	    	logger.debug("Done getting request components");
+	    	logger.debug("method: "+getRequest().getMethod());
+	    	String composedKey = databaseId + tableId + resourceId; // + subResourceId;
+	    	EspressoStorageMockNode mock = (EspressoStorageMockNode)_context.getAttributes().get(MockEspressoService.CONTEXT_MOCK_NODE_NAME);
+
+	    	
+	    	if (getRequest().getMethod() == Method.PUT) {
+	    		logger.debug("processing PUT");
+	    		Reader postBodyReader;
+	    		//TODO: get to no fixed size on buffer
+	    		char[] postBody = new char[POST_BODY_BUFFER_SIZE];	    		
+	    		postBodyReader = getRequest().getEntity().getReader();
+	    		postBodyReader.read(postBody);
+	    		logger.debug("postBody: "+new String(postBody));
+	    		mock.doPut(databaseId, composedKey, new String(postBody));
+	    		presentation = new StringRepresentation("Put succeeded", MediaType.APPLICATION_JSON);
+	    	}
+	    	else if (getRequest().getMethod() == Method.GET) {
+	    		logger.debug("processing GET");
+	    		String result = mock.doGet(databaseId, composedKey);
+			      logger.debug("result: "+result);
+			      if (result == null) {
+			    	  presentation = new StringRepresentation("Record not found", MediaType.APPLICATION_JSON);
+			    	  getResponse().setStatus(Status.CLIENT_ERROR_NOT_FOUND,"Record not found");
+			      }
+			      else {
+			    	  getResponse().setStatus(Status.SUCCESS_OK,"Success");
+			    	  presentation = new StringRepresentation(result, MediaType.APPLICATION_JSON);
+			      }
+	    	}
+	    }
+
+	    catch (IOException e) {
+	    	presentation = new StringRepresentation(e.getMessage(), MediaType.APPLICATION_JSON);
+	    	e.printStackTrace();
+	    }
+
+	    catch(Exception e)
+	    {
+	    	String error = "Error with op"; //ClusterRepresentationUtil.getErrorAsJsonStringFromException(e);
+	    	presentation = new StringRepresentation(error, MediaType.APPLICATION_JSON);	      
+	    	e.printStackTrace();
+	    }  
+	    return presentation;
+	  }
+	  
+	  /*
+	   * Handle put requests (non-Javadoc)
+	   * @see org.restlet.resource.Resource#storeRepresentation(org.restlet.resource.Representation)
+	   */
+	 public void storeRepresentation(Representation entity) throws ResourceException {
+		 logger.debug("in storeRepresentation");
+		 StringRepresentation presentation = null;
+		// try {
+		 Form requestHeaders = (Form) getRequest().getAttributes().get("org.restlet.http.headers");
+		 Map<String, String> headerMap = requestHeaders.getValuesMap();
+		 logger.debug("HEADERS MAP");
+		 for (String key : headerMap.keySet()) {
+			 logger.debug(key+" : "+headerMap.get(key));
+		 }
+	//	} catch (IOException e1) {
+			// TODO Auto-generated catch block
+			//e1.printStackTrace();
+		//}   
+		 try
+		    {
+		    	logger.debug("in PutResource handle");
+		    	String databaseId = (String)getRequest().getAttributes().get(MockEspressoService.DATABASENAME);
+		    	String tableId = (String)getRequest().getAttributes().get(MockEspressoService.TABLENAME);
+		    	String resourceId = (String)getRequest().getAttributes().get(MockEspressoService.RESOURCENAME);
+		    	String subResourceId = (String)getRequest().getAttributes().get(MockEspressoService.SUBRESOURCENAME);
+		    	logger.debug("Done getting request components");
+		    	logger.debug("method: "+getRequest().getMethod());
+		    	String composedKey = databaseId + tableId + resourceId; // + subResourceId;
+		    	EspressoStorageMockNode mock = (EspressoStorageMockNode)_context.getAttributes().get(MockEspressoService.CONTEXT_MOCK_NODE_NAME);
+
+		    	if (getRequest().getMethod() == Method.PUT) {
+		    		logger.debug("processing PUT");
+		    		Reader postBodyReader;
+		    		//TODO: get to no fixed size on buffer
+		    		char[] postBody = new char[POST_BODY_BUFFER_SIZE];
+		    		postBodyReader = getRequest().getEntity().getReader();
+		    		postBodyReader.read(postBody);
+		    		logger.debug("postBody: "+new String(postBody));
+		    		mock.doPut(databaseId, composedKey, new String(postBody));
+		    		presentation = new StringRepresentation("Put succeeded", MediaType.APPLICATION_JSON);
+		    	}
+		    	else if (getRequest().getMethod() == Method.GET) {
+		    		logger.debug("Processing GET");
+		    		String result = mock.doGet(databaseId, composedKey);
+				      logger.debug("result: "+result);
+				      if (result == null) {
+				    	  presentation = new StringRepresentation("Record not found", MediaType.APPLICATION_JSON);
+				      }
+				      else {
+				    	  presentation = new StringRepresentation(result, MediaType.APPLICATION_JSON);
+				      }
+		    	}
+		    }
+
+		    catch (IOException e) {
+		    	presentation = new StringRepresentation(e.getMessage(), MediaType.APPLICATION_JSON);
+		    	e.printStackTrace();
+		    }
+
+		    catch(Exception e)
+		    {
+		    	String error = "Error with op";
+		    	presentation = new StringRepresentation(error, MediaType.APPLICATION_JSON);	      
+		    	e.printStackTrace();
+		    }  
+		    finally {
+		    	entity.release();
+		    }
+	 }
+}