You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by on 2012/10/25 00:26:41 UTC

[45/47] Refactoring from com.linkedin.helix to org.apache.helix
diff --git a/helix-core/src/test/java/com/linkedin/helix/store/ b/helix-core/src/test/java/com/linkedin/helix/store/
deleted file mode 100644
index 8359d6c..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/store/
+++ /dev/null
@@ -1,35 +0,0 @@
- * Copyright (C) 2012 LinkedIn Inc <>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import org.testng.annotations.Test;
-import org.testng.AssertJUnit;
-import org.testng.AssertJUnit;
-import org.testng.annotations.Test;
-public class TestPropertyStat
-  @Test (groups = {"unitTest"})
-  public void testPropertyStat()
-  {
-    PropertyStat stat = new PropertyStat(0, 0);
-    AssertJUnit.assertEquals(0, stat.getLastModifiedTime());
-    AssertJUnit.assertEquals(0, stat.getVersion());
-  }
diff --git a/helix-core/src/test/java/com/linkedin/helix/store/ b/helix-core/src/test/java/com/linkedin/helix/store/
deleted file mode 100644
index 3ce19d2..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/store/
+++ /dev/null
@@ -1,37 +0,0 @@
- * Copyright (C) 2012 LinkedIn Inc <>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import org.testng.annotations.Test;
-import org.testng.AssertJUnit;
-import org.testng.AssertJUnit;
-import org.testng.annotations.Test;
-public class TestPropertyStoreException
-  @Test (groups = {"unitTest"})
-  public void testPropertyStoreException()
-  {
-    PropertyStoreException exception = new PropertyStoreException("msg");
-    AssertJUnit.assertEquals(exception.getMessage(), "msg");
-    exception = new PropertyStoreException();
-    AssertJUnit.assertNull(exception.getMessage());
-  }
diff --git a/helix-core/src/test/java/com/linkedin/helix/store/ b/helix-core/src/test/java/com/linkedin/helix/store/
deleted file mode 100644
index 6e12e94..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/store/
+++ /dev/null
@@ -1,89 +0,0 @@
- * Copyright (C) 2012 LinkedIn Inc <>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import org.testng.Assert;
-import org.testng.AssertJUnit;
-import org.testng.annotations.Test;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.ZkUnitTestBase;
-public class TestPropertyStoreFactory extends ZkUnitTestBase
-  @Test()
-  public void testZkPropertyStoreFactory()
-  {
-    try
-    {
-      PropertyStoreFactory.<ZNRecord> getZKPropertyStore(null, null, null);
-"Should fail since zkAddr|serializer|root can't be null");
-    }
-    catch (IllegalArgumentException e)
-    {
-      // OK
-    }
-    final String rootNamespace = "TestPropertyStoreFactory";
-    PropertyJsonSerializer<ZNRecord> serializer =
-        new PropertyJsonSerializer<ZNRecord>(ZNRecord.class);
-    try
-    {
-      PropertyStoreFactory.<ZNRecord> getZKPropertyStore("localhost:1812", serializer, rootNamespace);
-"Should fail since zkAddr is not connectable");
-    }
-    catch (Exception e)
-    {
-      // OK
-    }
-    PropertyStore<ZNRecord> store =
-        PropertyStoreFactory.<ZNRecord> getZKPropertyStore(ZK_ADDR,
-                                                           serializer,
-                                                           rootNamespace);
-    Assert.assertNotNull(store);
-  }
-  @Test()
-  public void testFilePropertyStoreFactory()
-  {
-    final String rootNamespace = "/tmp/TestPropertyStoreFactory";
-    PropertyJsonSerializer<ZNRecord> serializer =
-        new PropertyJsonSerializer<ZNRecord>(ZNRecord.class);
-    PropertyJsonComparator<ZNRecord> comparator =
-        new PropertyJsonComparator<ZNRecord>(ZNRecord.class);
-    PropertyStore<ZNRecord> store;
-    boolean exceptionCaught = false;
-    try
-    {
-      store = PropertyStoreFactory.<ZNRecord> getFilePropertyStore(null, null, null);
-    }
-    catch (IllegalArgumentException e)
-    {
-      exceptionCaught = true;
-    }
-    AssertJUnit.assertTrue(exceptionCaught);
-    store =
-        PropertyStoreFactory.<ZNRecord> getFilePropertyStore(serializer,
-                                                             rootNamespace,
-                                                             comparator);
-    Assert.assertNotNull(store);
-  }
diff --git a/helix-core/src/test/java/com/linkedin/helix/store/ b/helix-core/src/test/java/com/linkedin/helix/store/
deleted file mode 100644
index 7507542..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/store/
+++ /dev/null
@@ -1,75 +0,0 @@
- * Copyright (C) 2012 LinkedIn Inc <>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.util.Arrays;
-import java.util.Date;
-import java.util.List;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.ZkUnitTestBase;
-import com.linkedin.helix.manager.zk.ZkClient;
-public class TestZNRecordJsonSerializer extends ZkUnitTestBase
-  @Test
-  public void testZNRecordJsonSerializer() throws Exception
-  {
-    final String testRoot = getShortClassName();
-    System.out.println("START " + testRoot + " at " + new Date(System.currentTimeMillis()));
-    ZNRecord record = new ZNRecord("node1");
-    record.setSimpleField(ZNRecord.LIST_FIELD_BOUND, "" + 3);
-    List<String> list1 = Arrays.asList("one", "two", "three", "four");
-    List<String> list2 = Arrays.asList("a", "b", "c", "d");
-    List<String> list3 = Arrays.asList("x", "y");
-    record.setListField("list1", list1);
-    record.setListField("list2", list2);
-    record.setListField("list3", list3);
-    ZKPropertyStore<ZNRecord> store = new ZKPropertyStore<ZNRecord>(new ZkClient(ZK_ADDR),
-        new ZNRecordJsonSerializer(), "/" + testRoot);
-    store.setProperty("node1", record);
-    ZNRecord newRecord = store.getProperty("node1");
-    list1 = newRecord.getListField("list1");
-    Assert.assertTrue(list1.size() == 3);
-    Assert.assertTrue(list1.contains("one"));
-    Assert.assertTrue(list1.contains("two"));
-    Assert.assertTrue(list1.contains("three"));
-    list2 = newRecord.getListField("list2");
-    Assert.assertTrue(list2.size() == 3);
-    Assert.assertTrue(list2.contains("a"));
-    Assert.assertTrue(list2.contains("b"));
-    Assert.assertTrue(list2.contains("c"));
-    list3 = newRecord.getListField("list3");
-    Assert.assertTrue(list3.size() == 2);
-    Assert.assertTrue(list3.contains("x"));
-    Assert.assertTrue(list3.contains("y"));
-    System.out.println("END " + testRoot + " at " + new Date(System.currentTimeMillis()));
-  }
diff --git a/helix-core/src/test/java/com/linkedin/helix/store/file/ b/helix-core/src/test/java/com/linkedin/helix/store/file/
deleted file mode 100644
index f011084..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/store/file/
+++ /dev/null
@@ -1,156 +0,0 @@
- * Copyright (C) 2012 LinkedIn Inc <>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.util.Date;
-import java.util.List;
-import org.I0Itec.zkclient.DataUpdater;
-import org.apache.log4j.Logger;
-import org.testng.AssertJUnit;
-import org.testng.annotations.Test;
-public class TestFilePropertyStore
-  private static Logger logger = Logger.getLogger(TestFilePropertyStore.class);
-  private static final String rootNamespace = "/tmp/TestFilePropertyStore";
-  public class TestPropertyChangeListener implements PropertyChangeListener<String>
-  {
-    public boolean _propertyChangeReceived = false;
-    @Override
-    public void onPropertyChange(String key)
-    {
-"property changed at " + key);
-      _propertyChangeReceived = true;
-    }
-  }
-  public class TestUpdater implements DataUpdater<String>
-  {
-    @Override
-    public String update(String currentData)
-    {
-      return "new " + currentData;
-    }
-  }
-  @Test (groups = {"unitTest"})
-  public void testFilePropertyStore() throws Exception
-  {
-    System.out.println("START TestFilePropertyStore at " + new Date(System.currentTimeMillis()));
-    final int SLEEP_TIME = 2000;
-    PropertyJsonSerializer<String> serializer = new PropertyJsonSerializer<String>(String.class);
-    PropertyJsonComparator<String> comparator = new PropertyJsonComparator<String>(String.class);
-    FilePropertyStore<String> store = new FilePropertyStore<String>(serializer, rootNamespace,
-        comparator);
-    // store.removeRootNamespace();
-    // store.createRootNamespace();
-    store.start();
-    // test set
-    store.createPropertyNamespace("/child1");
-    store.setProperty("/child1/grandchild1", "grandchild1\n");
-    store.setProperty("/child1/grandchild2", "grandchild2\n");
-    store.createPropertyNamespace("/child1/grandchild3");
-    store.setProperty("/child1/grandchild3/grandgrandchild1", "grandgrandchild1\n");
-    // test get-names
-    List<String> names = store.getPropertyNames("/child1");
-    AssertJUnit.assertEquals(names.size(), 3);
-    AssertJUnit.assertTrue(names.contains("/child1/grandchild1"));
-    AssertJUnit.assertTrue(names.contains("/child1/grandchild2"));
-    AssertJUnit.assertTrue(names.contains("/child1/grandchild3/grandgrandchild1"));
-    // test get
-    String value = store.getProperty("nonExist");
-    AssertJUnit.assertEquals(value, null);
-    value = store.getProperty("/child1/grandchild2");
-    AssertJUnit.assertEquals(value, "grandchild2\n");
-    Thread.sleep(SLEEP_TIME);
-    // test subscribe
-    TestPropertyChangeListener listener1 = new TestPropertyChangeListener();
-    TestPropertyChangeListener listener2 = new TestPropertyChangeListener();
-    store.subscribeForPropertyChange("/child1", listener1);
-    store.subscribeForPropertyChange("/child1", listener1);
-    store.subscribeForPropertyChange("/child1", listener2);
-    store.setProperty("/child1/grandchild2", "grandchild2-new\n");
-    Thread.sleep(SLEEP_TIME);
-    AssertJUnit.assertEquals(listener1._propertyChangeReceived, true);
-    AssertJUnit.assertEquals(listener2._propertyChangeReceived, true);
-    listener1._propertyChangeReceived = false;
-    listener2._propertyChangeReceived = false;
-    // test unsubscribe
-    store.unsubscribeForPropertyChange("/child1", listener1);
-    store.setProperty("/child1/grandchild3/grandgrandchild1", "grandgrandchild1-new\n");
-    Thread.sleep(SLEEP_TIME);
-    AssertJUnit.assertEquals(listener1._propertyChangeReceived, false);
-    AssertJUnit.assertEquals(listener2._propertyChangeReceived, true);
-    listener2._propertyChangeReceived = false;
-    // test update property
-    store.updatePropertyUntilSucceed("child1/grandchild2", new TestUpdater());
-    value = store.getProperty("child1/grandchild2");
-    AssertJUnit.assertEquals("new grandchild2-new\n", value);
-    // test remove
-    store.removeProperty("/child1/grandchild2");
-    value = store.getProperty("/child1/grandchild2");
-    AssertJUnit.assertEquals(value, null);
-    Thread.sleep(SLEEP_TIME);
-    AssertJUnit.assertEquals(listener2._propertyChangeReceived, true);
-    listener2._propertyChangeReceived = false;
-    // test compare and set
-    boolean success = store.compareAndSet("/child1/grandchild1", "grandchild1-old\n",
-                                          "grandchild1-new\n", comparator);
-    AssertJUnit.assertEquals(success, false);
-    success = store.compareAndSet("/child1/grandchild1", "grandchild1\n",
-                                  "grandchild1-new\n", comparator);
-    AssertJUnit.assertEquals(success, true);
-    store.stop();
-    // test stop
-    listener2._propertyChangeReceived = false;
-    store.setProperty("/child1/grandchild3/grandgrandchild1", "grandgrandchild1-new-new\n");
-    Thread.sleep(SLEEP_TIME);
-    AssertJUnit.assertEquals(listener2._propertyChangeReceived, false);
-    store.unsubscribeForPropertyChange("/child1", listener2);
-    // store.stop();
-    System.out.println("END TestFilePropertyStore at " + new Date(System.currentTimeMillis()));
-  }
diff --git a/helix-core/src/test/java/com/linkedin/helix/store/zk/ b/helix-core/src/test/java/com/linkedin/helix/store/zk/
deleted file mode 100644
index 2df2317..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/store/zk/
+++ /dev/null
@@ -1,536 +0,0 @@
- * Copyright (C) 2012 LinkedIn Inc <>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import org.I0Itec.zkclient.DataUpdater;
-import org.I0Itec.zkclient.IZkDataListener;
-import org.apache.log4j.Logger;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.ZkUnitTestBase;
-import com.linkedin.helix.manager.zk.ByteArraySerializer;
-import com.linkedin.helix.manager.zk.ZNRecordSerializer;
-import com.linkedin.helix.manager.zk.ZkClient;
-// TODO need to write performance test for zk-property store
-public class TestZKPropertyStore extends ZkUnitTestBase
-  private static final Logger LOG = Logger.getLogger(TestZKPropertyStore.class);
-  final String className = getShortClassName();
-  final int bufSize = 128;
-  final int mapNr = 10;
-  final int firstLevelNr = 10;
-  final int secondLevelNr = 10;
-  final int totalNodes = firstLevelNr * secondLevelNr;
-  class TestListener implements PropertyChangeListener<ZNRecord>
-  {
-    Map<String, String> _keySet;
-    public TestListener(Map<String, String> keySet)
-    {
-      _keySet = keySet;
-    }
-    @Override
-    public void onPropertyChange(String key)
-    {
-      long now = System.currentTimeMillis();
-      _keySet.put(key, Long.toString(now));
-    }
-  }
-  private class TestUpdater implements DataUpdater<ZNRecord>
-  {
-    @Override
-    public ZNRecord update(ZNRecord current)
-    {
-      char[] data = new char[bufSize];
-      for (int i = 0; i < bufSize; i++)
-      {
-        data[i] = 'e';
-      }
-      Map<String, String> map = new TreeMap<String, String>();
-      for (int i = 0; i < mapNr; i++)
-      {
-        map.put("key_" + i, new String(data));
-      }
-      String nodeId = current.getId();
-      ZNRecord record = new ZNRecord(nodeId);
-      record.setSimpleFields(map);
-      return record;
-    }
-  }
-  String getNodeId(int i, int j)
-  {
-    return "childNode_" + i + "_" + j;
-  }
-  String getSecondLevelKey(int i, int j)
-  {
-    return "/node_" + i + "/" + getNodeId(i, j);
-  }
-  String getFirstLevelKey(int i)
-  {
-    return "/node_" + i;
-  }
-  // TODO: separate into small tests
-  @Test()
-  public void testZKPropertyStore() throws Exception
-  {
-    System.out.println("START " + className + " at "
-        + new Date(System.currentTimeMillis()));
-    //"number of connections is " + ZkClient.getNumberOfConnections());
-    // clean up zk
-    final String propertyStoreRoot = buildPropertyStoreRoot();
-    if (_gZkClient.exists(propertyStoreRoot))
-    {
-      _gZkClient.deleteRecursive(propertyStoreRoot);
-    }
-    ZkClient zkclient = new ZkClient(ZK_ADDR);
-    zkclient.setZkSerializer(new ByteArraySerializer());
-    ZKPropertyStore<ZNRecord> store =
-        new ZKPropertyStore<ZNRecord>(zkclient,
-                                      new PropertyJsonSerializer<ZNRecord>(ZNRecord.class),
-                                      propertyStoreRoot);
-    // test back to back add-delete-add
-    store.setProperty("child0", new ZNRecord("child0"));
-    ZNRecord record2 = store.getProperty("child0"); // will put the record in cache
-    String child0Path = propertyStoreRoot + "/child0";
-    _gZkClient.subscribeDataChanges(child0Path, new IZkDataListener()
-    {
-      @Override
-      public void handleDataDeleted(String dataPath) throws Exception
-      {
-        // TODO Auto-generated method stub
-        System.out.println("TestZKPropertyStore.testZKPropertyStore().new IZkDataListener() {...}.handleDataDeleted()");
-      }
-      @Override
-      public void handleDataChange(String dataPath, Object data) throws Exception
-      {
-        // TODO Auto-generated method stub
-        System.out.println("TestZKPropertyStore.testZKPropertyStore().new IZkDataListener() {...}.handleDataChange()");
-      }
-    });
-    for (int i = 0; i < 2; i++)
-    {
-      _gZkClient.delete(child0Path);
-      _gZkClient.createPersistent(child0Path, new ZNRecord("child0-new"));
-    }
-    record2 = store.getProperty("child0");
-    Assert.assertEquals(record2.getId(), "child0-new");
-    _gZkClient.delete(child0Path);
-    Thread.sleep(300); // should wait for zk callback to remove "child0" from cache
-    record2 = store.getProperty("child0");
-    Assert.assertNull(record2);
-    // zookeeper has a default 1M limit on size
-    char[] data = new char[bufSize];
-    for (int i = 0; i < bufSize; i++)
-    {
-      data[i] = 'a';
-    }
-    Map<String, String> map = new TreeMap<String, String>();
-    for (int i = 0; i < mapNr; i++)
-    {
-      map.put("key_" + i, new String(data));
-    }
-    String node = "node";
-    ZNRecord record = new ZNRecord(node);
-    record.setSimpleFields(map);
-    ZNRecordSerializer serializer = new ZNRecordSerializer();
-    int bytesPerNode = serializer.serialize(record).length;
-    System.out.println("use znode of size " + bytesPerNode / 1024 + "K");
-    Assert.assertTrue(bytesPerNode < 1024 * 1024,
-                      "zookeeper has a default 1M limit on size");
-    // test getPropertyRootNamespace()
-    String root = store.getPropertyRootNamespace();
-    Assert.assertEquals(root, propertyStoreRoot);
-    // set 100 nodes and get 100 nodes, verify get what we set
-    long start = System.currentTimeMillis();
-    setNodes(store, 'a', false);
-    long end = System.currentTimeMillis();
-    System.out.println("ZKPropertyStore write throughput is " + bytesPerNode * totalNodes
-        / (end - start) + " kilo-bytes per second");
-    start = System.currentTimeMillis();
-    for (int i = 0; i < 10; i++)
-    {
-      for (int j = 0; j < 10; j++)
-      {
-        String nodeId = getNodeId(i, j);
-        String key = getSecondLevelKey(i, j);
-        record = store.getProperty(key);
-        Assert.assertEquals(record.getId(), nodeId);
-      }
-    }
-    end = System.currentTimeMillis();
-    System.out.println("ZKPropertyStore read throughput is " + bytesPerNode * totalNodes
-        / (end - start) + " kilo-bytes per second");
-    // test subscribe
-    Map<String, String> keyMap = new TreeMap<String, String>();
-    // verify initial callbacks invoked for all 100 nodes
-    PropertyChangeListener<ZNRecord> listener = new TestListener(keyMap);
-    store.subscribeForPropertyChange("", listener);
-    System.out.println("keyMap size: " + keyMap.size());
-    Assert.assertTrue(keyMap.size() > 100);
-    for (int i = 0; i < firstLevelNr; i++)
-    {
-      for (int j = 0; j < secondLevelNr; j++)
-      {
-        String key = getSecondLevelKey(i, j);
-        Assert.assertTrue(keyMap.containsKey(key));
-      }
-    }
-    // change nodes via property store interface
-    // and verify all notifications have been received (TODO: without latency?)
-    start = System.currentTimeMillis();
-    keyMap.clear();
-    setNodes(store, 'b', true);
-    // wait for all callbacks completed
-    for (int i = 0; i < 10; i++)
-    {
-      System.out.println("keySet size: " + keyMap.size());
-      if (keyMap.size() == totalNodes)
-      {
-        break;
-      }
-      Thread.sleep(500);
-    }
-    Assert.assertEquals(keyMap.size(), totalNodes, "should receive " + totalNodes
-        + " callbacks");
-    end = System.currentTimeMillis();
-    long waitTime = (end - start) * 2;
-    long maxLatency = 0;
-    for (int i = 0; i < firstLevelNr; i++)
-    {
-      for (int j = 0; j < secondLevelNr; j++)
-      {
-        String key = getSecondLevelKey(i, j);
-        Assert.assertTrue(keyMap.containsKey(key));
-        record = store.getProperty(key);
-        start = Long.parseLong(record.getSimpleField("SetTimestamp"));
-        end = Long.parseLong(keyMap.get(key));
-        long latency = end - start;
-        if (latency > maxLatency)
-        {
-          maxLatency = latency;
-        }
-      }
-    }
-    System.out.println("ZKPropertyStore callback latency is " + maxLatency
-        + " millisecond");
-    // change nodes via native zkclient interface
-    // and verify all notifications have been received with some latency
-    keyMap.clear();
-    setNodes(_gZkClient, propertyStoreRoot, 'a', true);
-    // wait for all callbacks completed
-    Thread.sleep(waitTime);
-    Assert.assertEquals(keyMap.size(), totalNodes, "should receive " + totalNodes
-        + " callbacks");
-    // remove node via native zkclient interface
-    // should receive callbacks on parent key
-    keyMap.clear();
-    for (int i = 0; i < firstLevelNr; i++)
-    {
-      int j = 0;
-      String key = getSecondLevelKey(i, j);
-      _gZkClient.delete(propertyStoreRoot + key);
-    }
-    Thread.sleep(waitTime);
-    for (int i = 0; i < firstLevelNr; i++)
-    {
-      String key = getFirstLevelKey(i);
-      Assert.assertTrue(keyMap.containsKey(key), "should receive callbacks on " + key);
-    }
-    keyMap.clear();
-    for (int j = 1; j < secondLevelNr; j++)
-    {
-      int i = 0;
-      String key = getSecondLevelKey(i, j);
-      _gZkClient.delete(propertyStoreRoot + key);
-    }
-    Thread.sleep(waitTime);
-    String node0Key = getFirstLevelKey(0);
-    Assert.assertTrue(keyMap.containsKey(node0Key), "should receive callback on "
-        + node0Key);
-    // add back removed nodes
-    // should receive callbacks on parent key
-    keyMap.clear();
-    for (int i = 0; i < bufSize; i++)
-    {
-      data[i] = 'a';
-    }
-    map = new TreeMap<String, String>();
-    for (int i = 0; i < mapNr; i++)
-    {
-      map.put("key_" + i, new String(data));
-    }
-    for (int i = 0; i < firstLevelNr; i++)
-    {
-      int j = 0;
-      String nodeId = getNodeId(i, j);
-      String key = getSecondLevelKey(i, j);
-      record = new ZNRecord(nodeId);
-      record.setSimpleFields(map);
-      store.setProperty(key, record);
-    }
-    Thread.sleep(waitTime);
-    for (int i = 0; i < firstLevelNr; i++)
-    {
-      String key = getFirstLevelKey(i);
-      Assert.assertTrue(keyMap.containsKey(key), "should receive callbacks on " + key);
-    }
-    keyMap.clear();
-    for (int j = 1; j < secondLevelNr; j++)
-    {
-      int i = 0;
-      String nodeId = getNodeId(i, j);
-      String key = getSecondLevelKey(i, j);
-      record = new ZNRecord(nodeId);
-      record.setSimpleFields(map);
-      store.setProperty(key, record);
-    }
-    Thread.sleep(waitTime);
-    node0Key = getFirstLevelKey(0);
-    Assert.assertTrue(keyMap.containsKey(node0Key), "should receive callback on "
-        + node0Key);
-    // test unsubscribe
-    store.unsubscribeForPropertyChange("", listener);
-    // change all nodes and verify no notification happens
-    keyMap.clear();
-    setNodes(store, 'c', false);
-    Thread.sleep(waitTime);
-    Assert.assertEquals(keyMap.size(), 0);
-    // test getPropertyNames
-    List<String> names = store.getPropertyNames("");
-    int cnt = 0;
-    for (String name : names)
-    {
-      int i = cnt / 10;
-      int j = cnt % 10;
-      cnt++;
-      String key = getSecondLevelKey(i, j);
-      Assert.assertEquals(name, key);
-    }
-    // test compare and set
-    char[] updateData = new char[bufSize];
-    for (int i = 0; i < bufSize; i++)
-    {
-      data[i] = 'c';
-      updateData[i] = 'd';
-    }
-    Map<String, String> updateMap = new TreeMap<String, String>();
-    for (int i = 0; i < 10; i++)
-    {
-      map.put("key_" + i, new String(data));
-      updateMap.put("key_" + i, new String(updateData));
-    }
-    PropertyJsonComparator<ZNRecord> comparator =
-        new PropertyJsonComparator<ZNRecord>(ZNRecord.class);
-    for (int i = 0; i < firstLevelNr; i++)
-    {
-      for (int j = 0; j < secondLevelNr; j++)
-      {
-        String nodeId = getNodeId(i, j);
-        record = new ZNRecord(nodeId);
-        record.setSimpleFields(map);
-        String key = getSecondLevelKey(i, j);
-        ZNRecord update = new ZNRecord(nodeId);
-        update.setSimpleFields(updateMap);
-        boolean succeed = store.compareAndSet(key, record, update, comparator);
-        Assert.assertTrue(succeed);
-        record = store.getProperty(key);
-        Assert.assertEquals(record.getSimpleField("key_0").charAt(0), 'd');
-      }
-    }
-    // test updateUntilSucceed
-    TestUpdater updater = new TestUpdater();
-    for (int i = 0; i < firstLevelNr; i++)
-    {
-      for (int j = 0; j < secondLevelNr; j++)
-      {
-        String key = getSecondLevelKey(i, j);
-        store.updatePropertyUntilSucceed(key, updater);
-        record = store.getProperty(key);
-        Assert.assertEquals(record.getSimpleField("key_0").charAt(0), 'e');
-      }
-    }
-    // test exist
-    for (int i = 0; i < firstLevelNr; i++)
-    {
-      for (int j = 0; j < secondLevelNr; j++)
-      {
-        String key = getSecondLevelKey(i, j);
-        boolean exist = store.exists(key);
-        Assert.assertTrue(exist);
-      }
-    }
-    // test removeProperty
-    for (int i = 0; i < firstLevelNr; i++)
-    {
-      int j = 0;
-      String key = getSecondLevelKey(i, j);
-      store.removeProperty(key);
-      record = store.getProperty(key);
-      Assert.assertNull(record);
-    }
-    // test removePropertyNamespace
-    for (int i = 0; i < firstLevelNr; i++)
-    {
-      String key = "/node_" + i;
-      store.removeNamespace(key);
-    }
-    for (int i = 0; i < firstLevelNr; i++)
-    {
-      for (int j = 0; j < secondLevelNr; j++)
-      {
-        String key = getSecondLevelKey(i, j);
-        store.removeProperty(key);
-        record = store.getProperty(key);
-        Assert.assertNull(record);
-      }
-    }
-    System.out.println("END " + className + " at " + new Date(System.currentTimeMillis()));
-  }
-  private String buildPropertyStoreRoot()
-  {
-    return "/" + className;
-  }
-  private void setNodes(ZKPropertyStore<ZNRecord> store, char c, boolean needTimestamp) throws PropertyStoreException
-  {
-    char[] data = new char[bufSize];
-    for (int i = 0; i < bufSize; i++)
-    {
-      data[i] = c;
-    }
-    Map<String, String> map = new TreeMap<String, String>();
-    for (int i = 0; i < mapNr; i++)
-    {
-      map.put("key_" + i, new String(data));
-    }
-    for (int i = 0; i < firstLevelNr; i++)
-    {
-      for (int j = 0; j < secondLevelNr; j++)
-      {
-        String nodeId = getNodeId(i, j);
-        ZNRecord record = new ZNRecord(nodeId);
-        record.setSimpleFields(map);
-        if (needTimestamp)
-        {
-          long now = System.currentTimeMillis();
-          record.setSimpleField("SetTimestamp", Long.toString(now));
-        }
-        String key = getSecondLevelKey(i, j);
-        store.setProperty(key, record);
-      }
-    }
-  }
-  private void setNodes(ZkClient zkClient, String root, char c, boolean needTimestamp) throws PropertyStoreException
-  {
-    char[] data = new char[bufSize];
-    for (int i = 0; i < bufSize; i++)
-    {
-      data[i] = c;
-    }
-    Map<String, String> map = new TreeMap<String, String>();
-    for (int i = 0; i < mapNr; i++)
-    {
-      map.put("key_" + i, new String(data));
-    }
-    for (int i = 0; i < firstLevelNr; i++)
-    {
-      for (int j = 0; j < secondLevelNr; j++)
-      {
-        String nodeId = getNodeId(i, j);
-        ZNRecord record = new ZNRecord(nodeId);
-        record.setSimpleFields(map);
-        if (needTimestamp)
-        {
-          long now = System.currentTimeMillis();
-          record.setSimpleField("SetTimestamp", Long.toString(now));
-        }
-        String key = getSecondLevelKey(i, j);
-        zkClient.writeData(root + key, record);
-      }
-    }
-  }
diff --git a/helix-core/src/test/java/com/linkedin/helix/store/zk/ b/helix-core/src/test/java/com/linkedin/helix/store/zk/
deleted file mode 100644
index 2df5242..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/store/zk/
+++ /dev/null
@@ -1,156 +0,0 @@
- * Copyright (C) 2012 LinkedIn Inc <>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.util.Date;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import org.I0Itec.zkclient.DataUpdater;
-import org.apache.log4j.Logger;
-import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-import com.linkedin.helix.TestHelper;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.ZkUnitTestBase;
-import com.linkedin.helix.manager.zk.ZkClient;
-public class TestZKPropertyStoreMultiThread extends ZkUnitTestBase
-  private static final Logger LOG = Logger.getLogger(TestZKPropertyStoreMultiThread.class);
-  private final String _root = "/" + getShortClassName();
-  private final ZNRecord _record = new ZNRecord("key1");
-  @BeforeClass()
-  public void beforeClass()
-  {
-    if (_gZkClient.exists(_root))
-    {
-      _gZkClient.deleteRecursive(_root);
-    }
-  }
-  public class TestCallableCAS implements Callable<Boolean>
-  {
-    @Override
-    public Boolean call()
-    {
-      try
-      {
-        ZKPropertyStore<ZNRecord> store
-          = new ZKPropertyStore<ZNRecord>(new ZkClient(ZK_ADDR),
-                                          new PropertyJsonSerializer<ZNRecord>(ZNRecord.class),
-                                          _root);
-        PropertyJsonComparator<ZNRecord> comparator = new PropertyJsonComparator<ZNRecord>(ZNRecord.class);
-        long id = Thread.currentThread().getId();
-        store.setProperty("key1", _record);
-        boolean success;
-        do
-        {
-          ZNRecord current = store.getProperty("key1");
-          ZNRecord update = new ZNRecord(current);
-          update.setSimpleField("thread_" + id, "simpleValue");
-          success = store.compareAndSet("key1", current, update, comparator);
-        } while (!success);
-        return Boolean.TRUE;
-      }
-      catch (Exception e)
-      {
-        LOG.error(e);
-        return Boolean.FALSE;
-      }
-    }
-  }
-  @Test ()
-  public void testCmpAndSet()
-  {
-    System.out.println("START testCmpAndSet at" + new Date(System.currentTimeMillis()));
-    Map<String, Boolean> results = TestHelper.<Boolean>startThreadsConcurrently(5, new TestCallableCAS(), 10);
-    Assert.assertEquals(results.size(), 5);
-    for (Boolean result : results.values())
-    {
-      Assert.assertTrue(result.booleanValue());
-    }
-    System.out.println("END testCmpAndSet at" + new Date(System.currentTimeMillis()));
-  }
-  private class TestUpdater implements DataUpdater<ZNRecord>
-  {
-    @Override
-    public ZNRecord update(ZNRecord current)
-    {
-      long id = Thread.currentThread().getId();
-      current.setSimpleField("thread_" + id, "simpleValue");
-      return current;
-    }
-  }
-  public class TestCallableUpdate implements Callable<Boolean>
-  {
-    @Override
-    public Boolean call()
-    {
-      try
-      {
-        ZKPropertyStore<ZNRecord> store
-          = new ZKPropertyStore<ZNRecord>(new ZkClient(ZK_ADDR),
-                                          new PropertyJsonSerializer<ZNRecord>(ZNRecord.class),
-                                          _root);
-        store.setProperty("key2", _record);
-        store.updatePropertyUntilSucceed("key2", new TestUpdater());
-        return Boolean.TRUE;
-      }
-      catch (Exception e)
-      {
-        LOG.error(e);
-        return Boolean.FALSE;
-      }
-    }
-  }
-  @Test()
-  public void testUpdateUntilSucceed()
-  {
-    System.out.println("START testUpdateUntilSucceed at" + new Date(System.currentTimeMillis()));
-    Map<String, Boolean> results = TestHelper.<Boolean>startThreadsConcurrently(5, new TestCallableUpdate(), 10);
-    Assert.assertEquals(results.size(), 5);
-    for (Boolean result : results.values())
-    {
-      Assert.assertTrue(result.booleanValue());
-    }
-    System.out.println("END testUpdateUntilSucceed at" + new Date(System.currentTimeMillis()));
-  }
diff --git a/helix-core/src/test/java/com/linkedin/helix/store/zk/ b/helix-core/src/test/java/com/linkedin/helix/store/zk/
deleted file mode 100644
index 388c069..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/store/zk/
+++ /dev/null
@@ -1,368 +0,0 @@
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import org.I0Itec.zkclient.exception.ZkNoNodeException;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-import com.linkedin.helix.AccessOption;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.ZkUnitTestBase;
-import com.linkedin.helix.manager.zk.ZkBaseDataAccessor;
-import com.linkedin.helix.manager.zk.ZkClient;
-public class TestZkHelixPropertyStore extends ZkUnitTestBase
-  final String _root         = "/" + getShortClassName();
-  final int    bufSize       = 128;
-  final int    mapNr         = 10;
-  final int    firstLevelNr  = 10;
-  final int    secondLevelNr = 10;
-  // final int totalNodes = firstLevelNr * secondLevelNr;
-  class TestListener implements HelixPropertyListener
-  {
-    Map<String, Long> _changeKeys = new HashMap<String, Long>();
-    Map<String, Long> _createKeys = new HashMap<String, Long>();
-    Map<String, Long> _deleteKeys = new HashMap<String, Long>();
-    public void reset()
-    {
-      _changeKeys.clear();
-      _createKeys.clear();
-      _deleteKeys.clear();
-    }
-    @Override
-    public void onDataChange(String path)
-    {
-      _changeKeys.put(path, new Long(System.currentTimeMillis()));
-    }
-    @Override
-    public void onDataCreate(String path)
-    {
-      _createKeys.put(path, new Long(System.currentTimeMillis()));
-    }
-    @Override
-    public void onDataDelete(String path)
-    {
-      _deleteKeys.put(path, new Long(System.currentTimeMillis()));
-    }
-  }
-  @Test
-  public void testSet()
-  {
-    // Logger.getRootLogger().setLevel(Level.INFO);
-    System.out.println("START testSet() at " + new Date(System.currentTimeMillis()));
-    String subRoot = _root + "/" + "set";
-    List<String> subscribedPaths = new ArrayList<String>();
-    subscribedPaths.add(subRoot);
-    ZkHelixPropertyStore<ZNRecord> store =
-        new ZkHelixPropertyStore<ZNRecord>(new ZkBaseDataAccessor<ZNRecord>(_gZkClient),
-                                           subRoot,
-                                           subscribedPaths);
-    setNodes(store, 'a', false);
-    for (int i = 0; i < 10; i++)
-    {
-      for (int j = 0; j < 10; j++)
-      {
-        String nodeId = getNodeId(i, j);
-        String key = getSecondLevelKey(i, j);
-        ZNRecord record = store.get(key, null, 0);
-        Assert.assertEquals(record.getId(), nodeId);
-      }
-    }
-    long startT = System.currentTimeMillis();
-    for (int i = 0; i < 1000; i++)
-    {
-      ZNRecord record = store.get("/node_0/childNode_0_0", null, 0);
-      Assert.assertNotNull(record);
-    }
-    long endT = System.currentTimeMillis();
-    System.out.println("1000 Get() time used: " + (endT - startT) + "ms");
-    Assert.assertTrue((endT - startT) < 60, "1000 Gets should be finished within 50ms");
-    store.stop();
-    System.out.println("END testSet() at " + new Date(System.currentTimeMillis()));
-  }
-  @Test
-  public void testLocalTriggeredCallback() throws Exception
-  {
-    System.out.println("START testLocalTriggeredCallback() at "
-        + new Date(System.currentTimeMillis()));
-    String subRoot = _root + "/" + "localCallback";
-    List<String> subscribedPaths = new ArrayList<String>();
-    subscribedPaths.add(subRoot);
-    ZkHelixPropertyStore<ZNRecord> store =
-        new ZkHelixPropertyStore<ZNRecord>(new ZkBaseDataAccessor<ZNRecord>(_gZkClient),
-                                           subRoot,
-                                           subscribedPaths);
-    // change nodes via property store interface
-    // and verify all notifications have been received
-    TestListener listener = new TestListener();
-    store.subscribe("/", listener);
-    // test dataCreate callbacks
-    listener.reset();
-    setNodes(store, 'a', true);
-    // wait until all callbacks have been received
-    Thread.sleep(500);
-    int expectCreateNodes = 1 + firstLevelNr + firstLevelNr * secondLevelNr;
-    System.out.println("createKey#:" + listener._createKeys.size() + ", changeKey#:"
-        + listener._changeKeys.size() + ", deleteKey#:" + listener._deleteKeys.size());
-    Assert.assertTrue(listener._createKeys.size() == expectCreateNodes, "Should receive "
-        + expectCreateNodes + " create callbacks");
-    // test dataChange callbacks
-    listener.reset();
-    setNodes(store, 'b', true);
-    // wait until all callbacks have been received
-    Thread.sleep(500);
-    int expectChangeNodes = firstLevelNr * secondLevelNr;
-    System.out.println("createKey#:" + listener._createKeys.size() + ", changeKey#:"
-        + listener._changeKeys.size() + ", deleteKey#:" + listener._deleteKeys.size());
-    Assert.assertTrue(listener._changeKeys.size() >= expectChangeNodes,
-                      "Should receive at least " + expectChangeNodes
-                          + " change callbacks");
-    // test delete callbacks
-    listener.reset();
-    int expectDeleteNodes = 1 + firstLevelNr + firstLevelNr * secondLevelNr;
-    store.remove("/", 0);
-    // wait until all callbacks have been received
-    Thread.sleep(500);
-    System.out.println("createKey#:" + listener._createKeys.size() + ", changeKey#:"
-        + listener._changeKeys.size() + ", deleteKey#:" + listener._deleteKeys.size());
-    Assert.assertTrue(listener._deleteKeys.size() == expectDeleteNodes, "Should receive "
-        + expectDeleteNodes + " delete callbacks");
-    store.stop();
-    System.out.println("END testLocalTriggeredCallback() at "
-        + new Date(System.currentTimeMillis()));
-  }
-  @Test
-  public void testZkTriggeredCallback() throws Exception
-  {
-    System.out.println("START testZkTriggeredCallback() at "
-        + new Date(System.currentTimeMillis()));
-    String subRoot = _root + "/" + "zkCallback";
-    List<String> subscribedPaths = Arrays.asList(subRoot);
-    ZkHelixPropertyStore<ZNRecord> store =
-        new ZkHelixPropertyStore<ZNRecord>(new ZkBaseDataAccessor<ZNRecord>(_gZkClient),
-                                           subRoot,
-                                           subscribedPaths);
-    // change nodes via property store interface
-    // and verify all notifications have been received
-    TestListener listener = new TestListener();
-    store.subscribe("/", listener);
-    // test create callbacks
-    listener.reset();
-    setNodes(_gZkClient, subRoot, 'a', true);
-    int expectCreateNodes = 1 + firstLevelNr + firstLevelNr * secondLevelNr;
-    Thread.sleep(500);
-    System.out.println("createKey#:" + listener._createKeys.size() + ", changeKey#:"
-        + listener._changeKeys.size() + ", deleteKey#:" + listener._deleteKeys.size());
-    Assert.assertTrue(listener._createKeys.size() == expectCreateNodes, "Should receive "
-        + expectCreateNodes + " create callbacks");
-    // test change callbacks
-    listener.reset();
-    setNodes(_gZkClient, subRoot, 'b', true);
-    int expectChangeNodes = firstLevelNr * secondLevelNr;
-    Thread.sleep(500);
-    System.out.println("createKey#:" + listener._createKeys.size() + ", changeKey#:"
-        + listener._changeKeys.size() + ", deleteKey#:" + listener._deleteKeys.size());
-    Assert.assertTrue(listener._changeKeys.size() >= expectChangeNodes,
-                      "Should receive at least " + expectChangeNodes
-                          + " change callbacks");
-    // test delete callbacks
-    listener.reset();
-    int expectDeleteNodes = 1 + firstLevelNr + firstLevelNr * secondLevelNr;
-    _gZkClient.deleteRecursive(subRoot);
-    Thread.sleep(1000);
-    System.out.println("createKey#:" + listener._createKeys.size() + ", changeKey#:"
-        + listener._changeKeys.size() + ", deleteKey#:" + listener._deleteKeys.size());
-    Assert.assertTrue(listener._deleteKeys.size() == expectDeleteNodes, "Should receive "
-        + expectDeleteNodes + " delete callbacks");
-    store.stop();
-    System.out.println("END testZkTriggeredCallback() at "
-        + new Date(System.currentTimeMillis()));
-  }
-  @Test
-  public void testBackToBackRemoveAndSet() throws Exception
-  {
-    System.out.println("START testBackToBackRemoveAndSet() at "
-        + new Date(System.currentTimeMillis()));
-    String subRoot = _root + "/" + "backToBackRemoveAndSet";
-    List<String> subscribedPaths = new ArrayList<String>();
-    subscribedPaths.add(subRoot);
-    ZkHelixPropertyStore<ZNRecord> store =
-        new ZkHelixPropertyStore<ZNRecord>(new ZkBaseDataAccessor<ZNRecord>(_gZkClient),
-                                           subRoot,
-                                           subscribedPaths);
-    store.set("/child0", new ZNRecord("child0"), AccessOption.PERSISTENT);
-    ZNRecord record = store.get("/child0", null, 0); // will put the record in cache
-    Assert.assertEquals(record.getId(), "child0");
-    // System.out.println("1:get:" + record);
-    String child0Path = subRoot + "/child0";
-    for (int i = 0; i < 2; i++)
-    {
-      _gZkClient.delete(child0Path);
-      _gZkClient.createPersistent(child0Path, new ZNRecord("child0-new-" + i));
-    }
-    Thread.sleep(500); // should wait for zk callback to add "/child0" into cache
-    record = store.get("/child0", null, 0);
-    Assert.assertEquals(record.getId(),
-                        "child0-new-1",
-                        "Cache shoulde be updated to latest create");
-    // System.out.println("2:get:" + record);
-    _gZkClient.delete(child0Path);
-    Thread.sleep(500); // should wait for zk callback to remove "/child0" from cache
-    try
-    {
-      record = store.get("/child0", null, AccessOption.THROW_EXCEPTION_IFNOTEXIST);
-"/child0 should have been removed");
-    }
-    catch (ZkNoNodeException e)
-    {
-      // OK.
-    }
-    // System.out.println("3:get:" + record);
-    store.stop();
-    System.out.println("END testBackToBackRemoveAndSet() at "
-        + new Date(System.currentTimeMillis()));
-  }
-  private String getNodeId(int i, int j)
-  {
-    return "childNode_" + i + "_" + j;
-  }
-  private String getSecondLevelKey(int i, int j)
-  {
-    return "/node_" + i + "/" + getNodeId(i, j);
-  }
-  private String getFirstLevelKey(int i)
-  {
-    return "/node_" + i;
-  }
-  private void setNodes(ZkHelixPropertyStore<ZNRecord> store,
-                        char c,
-                        boolean needTimestamp)
-  {
-    char[] data = new char[bufSize];
-    for (int i = 0; i < bufSize; i++)
-    {
-      data[i] = c;
-    }
-    Map<String, String> map = new TreeMap<String, String>();
-    for (int i = 0; i < mapNr; i++)
-    {
-      map.put("key_" + i, new String(data));
-    }
-    for (int i = 0; i < firstLevelNr; i++)
-    {
-      for (int j = 0; j < secondLevelNr; j++)
-      {
-        String nodeId = getNodeId(i, j);
-        ZNRecord record = new ZNRecord(nodeId);
-        record.setSimpleFields(map);
-        if (needTimestamp)
-        {
-          long now = System.currentTimeMillis();
-          record.setSimpleField("SetTimestamp", Long.toString(now));
-        }
-        String key = getSecondLevelKey(i, j);
-        store.set(key, record, AccessOption.PERSISTENT);
-      }
-    }
-  }
-  private void setNodes(ZkClient zkClient, String root, char c, boolean needTimestamp)
-  {
-    char[] data = new char[bufSize];
-    for (int i = 0; i < bufSize; i++)
-    {
-      data[i] = c;
-    }
-    Map<String, String> map = new TreeMap<String, String>();
-    for (int i = 0; i < mapNr; i++)
-    {
-      map.put("key_" + i, new String(data));
-    }
-    for (int i = 0; i < firstLevelNr; i++)
-    {
-      String firstLevelKey = getFirstLevelKey(i);
-      for (int j = 0; j < secondLevelNr; j++)
-      {
-        String nodeId = getNodeId(i, j);
-        ZNRecord record = new ZNRecord(nodeId);
-        record.setSimpleFields(map);
-        if (needTimestamp)
-        {
-          long now = System.currentTimeMillis();
-          record.setSimpleField("SetTimestamp", Long.toString(now));
-        }
-        String key = getSecondLevelKey(i, j);
-        try
-        {
-          zkClient.writeData(root + key, record);
-        }
-        catch (ZkNoNodeException e)
-        {
-          zkClient.createPersistent(root + firstLevelKey, true);
-          zkClient.createPersistent(root + key, record);
-        }
-      }
-    }
-  }
diff --git a/helix-core/src/test/java/com/linkedin/helix/store/zk/ b/helix-core/src/test/java/com/linkedin/helix/store/zk/
deleted file mode 100644
index cde0f8e..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/store/zk/
+++ /dev/null
@@ -1,106 +0,0 @@
- * Copyright (C) 2012 LinkedIn Inc <>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.util.Date;
-import org.I0Itec.zkclient.ZkConnection;
-import org.apache.log4j.Logger;
-import org.testng.AssertJUnit;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-import com.linkedin.helix.ZkUnitTestBase;
-import com.linkedin.helix.manager.zk.ZNRecordSerializer;
-import com.linkedin.helix.manager.zk.ZkClient;
-public class TestZkPropertyStoreSessionExpiry extends ZkUnitTestBase
-  private static final Logger LOG = Logger.getLogger(TestZkPropertyStoreSessionExpiry.class);
-  private class TestPropertyChangeListener
-  implements PropertyChangeListener<String>
-  {
-    public boolean _propertyChangeReceived = false;
-    @Override
-    public void onPropertyChange(String key)
-    {
-      // TODO Auto-generated method stub
-"property change, " + key);
-      _propertyChangeReceived = true;
-    }
-  }
-	ZkClient _zkClient;
-	@BeforeClass
-	public void beforeClass()
-	{
-		_zkClient = new ZkClient(ZK_ADDR);
-		_zkClient.setZkSerializer(new ZNRecordSerializer());
-	}
-	@AfterClass
-	public void afterClass()
-	{
-		_zkClient.close();
-	}
-  @Test()
-  public void testZkPropertyStoreSessionExpiry() throws Exception
-  {
-"START " + getShortClassName() + " at " + new Date(System.currentTimeMillis()));
-    PropertyJsonSerializer<String> serializer = new PropertyJsonSerializer<String>(String.class);
-    ZkConnection zkConn = new ZkConnection(ZK_ADDR);
-    final String propertyStoreRoot = "/" + getShortClassName();
-    if (_zkClient.exists(propertyStoreRoot))
-    {
-      _zkClient.deleteRecursive(propertyStoreRoot);
-    }
-    ZKPropertyStore<String> zkPropertyStore 
-      = new ZKPropertyStore<String>(new ZkClient(zkConn), serializer, propertyStoreRoot);
-    zkPropertyStore.setProperty("/child1/grandchild1", "grandchild1");
-    zkPropertyStore.setProperty("/child1/grandchild2", "grandchild2");
-    TestPropertyChangeListener listener = new TestPropertyChangeListener();
-    zkPropertyStore.subscribeForPropertyChange("", listener);
-    listener._propertyChangeReceived = false;
-    zkPropertyStore.setProperty("/child2/grandchild3", "grandchild3");
-    Thread.sleep(100);
-    AssertJUnit.assertEquals(listener._propertyChangeReceived, true);
-    simulateSessionExpiry(zkConn);
-    listener._propertyChangeReceived = false;
-    zkPropertyStore.setProperty("/child2/grandchild4", "grandchild4");
-    Thread.sleep(100);
-    AssertJUnit.assertEquals(listener._propertyChangeReceived, true);
-"END " + getShortClassName() + " at " + new Date(System.currentTimeMillis()));
-  }
diff --git a/helix-core/src/test/java/com/linkedin/helix/tools/ b/helix-core/src/test/java/com/linkedin/helix/tools/
deleted file mode 100644
index f3aa399..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/tools/
+++ /dev/null
@@ -1,494 +0,0 @@
- * Copyright (C) 2012 LinkedIn Inc <>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.util.Arrays;
-import java.util.Date;
-import org.apache.log4j.Logger;
-import org.testng.Assert;
-import org.testng.AssertJUnit;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-import com.linkedin.helix.HelixException;
-import com.linkedin.helix.PropertyKey;
-import com.linkedin.helix.PropertyPathConfig;
-import com.linkedin.helix.PropertyType;
-import com.linkedin.helix.TestHelper;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.ZkUnitTestBase;
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.manager.zk.ZKHelixDataAccessor;
-import com.linkedin.helix.manager.zk.ZNRecordSerializer;
-import com.linkedin.helix.manager.zk.ZkBaseDataAccessor;
-import com.linkedin.helix.manager.zk.ZkClient;
-import com.linkedin.helix.model.LiveInstance;
-public class TestClusterSetup extends ZkUnitTestBase
-  private static Logger         LOG             =
-                                                    Logger.getLogger(TestClusterSetup.class);
-  protected static final String CLUSTER_NAME    = "TestClusterSetup";
-  protected static final String TEST_DB         = "TestDB";
-  protected static final String INSTANCE_PREFIX = "instance:";
-  protected static final String STATE_MODEL     = "MasterSlave";
-  protected static final String TEST_NODE       = "testnode:1";
-  ZkClient                      _zkClient;
-  ClusterSetup                  _clusterSetup;
-  String instanceColonToUnderscoreFormat(String colonFormat)
-  {
-    int lastPos = colonFormat.lastIndexOf(":");
-    if (lastPos <= 0)
-    {
-      String error = "Invalid storage Instance info format: " + colonFormat;
-      LOG.warn(error);
-      throw new HelixException(error);
-    }
-    String host = colonFormat.substring(0, lastPos);
-    String portStr = colonFormat.substring(lastPos + 1);
-    return host + "_" + portStr;
-  }
-  private static String[] createArgs(String str)
-  {
-    String[] split = str.split("[ ]+");
-    System.out.println(Arrays.toString(split));
-    return split;
-  }
-  @BeforeClass()
-  public void beforeClass() throws IOException,
-      Exception
-  {
-    System.out.println("START TestClusterSetup.beforeClass() "
-        + new Date(System.currentTimeMillis()));
-    _zkClient = new ZkClient(ZK_ADDR);
-    _zkClient.setZkSerializer(new ZNRecordSerializer());
-  }
-  @AfterClass()
-  public void afterClass()
-  {
-    _zkClient.close();
-    System.out.println("END TestClusterSetup.afterClass() "
-        + new Date(System.currentTimeMillis()));
-  }
-  @BeforeMethod()
-  public void setup()
-  {
-    _zkClient.deleteRecursive("/" + CLUSTER_NAME);
-    _clusterSetup = new ClusterSetup(ZK_ADDR);
-    _clusterSetup.addCluster(CLUSTER_NAME, true);
-  }
-  @Test()
-  public void testAddInstancesToCluster() throws Exception
-  {
-    String instanceAddresses[] = new String[3];
-    for (int i = 0; i < 3; i++)
-    {
-      String currInstance = INSTANCE_PREFIX + i;
-      instanceAddresses[i] = currInstance;
-    }
-    String nextInstanceAddress = INSTANCE_PREFIX + 3;
-    _clusterSetup.addInstancesToCluster(CLUSTER_NAME, instanceAddresses);
-    // verify instances
-    for (String instance : instanceAddresses)
-    {
-      verifyInstance(_zkClient,
-                     CLUSTER_NAME,
-                     instanceColonToUnderscoreFormat(instance),
-                     true);
-    }
-    _clusterSetup.addInstanceToCluster(CLUSTER_NAME, nextInstanceAddress);
-    verifyInstance(_zkClient,
-                   CLUSTER_NAME,
-                   instanceColonToUnderscoreFormat(nextInstanceAddress),
-                   true);
-    // re-add
-    boolean caughtException = false;
-    try
-    {
-      _clusterSetup.addInstanceToCluster(CLUSTER_NAME, nextInstanceAddress);
-    }
-    catch (HelixException e)
-    {
-      caughtException = true;
-    }
-    AssertJUnit.assertTrue(caughtException);
-    // bad instance format
-    String badFormatInstance = "badinstance";
-    caughtException = false;
-    try
-    {
-      _clusterSetup.addInstanceToCluster(CLUSTER_NAME, badFormatInstance);
-    }
-    catch (HelixException e)
-    {
-      caughtException = true;
-    }
-    AssertJUnit.assertTrue(caughtException);
-  }
-  @Test()
-  public void testDisableDropInstancesFromCluster() throws Exception
-  {
-    testAddInstancesToCluster();
-    String instanceAddresses[] = new String[3];
-    for (int i = 0; i < 3; i++)
-    {
-      String currInstance = INSTANCE_PREFIX + i;
-      instanceAddresses[i] = currInstance;
-    }
-    String nextInstanceAddress = INSTANCE_PREFIX + 3;
-    boolean caughtException = false;
-    // drop without disabling
-    try
-    {
-      _clusterSetup.dropInstanceFromCluster(CLUSTER_NAME, nextInstanceAddress);
-    }
-    catch (HelixException e)
-    {
-      caughtException = true;
-    }
-    AssertJUnit.assertTrue(caughtException);
-    // disable
-    _clusterSetup.getClusterManagementTool()
-                 .enableInstance(CLUSTER_NAME,
-                                 instanceColonToUnderscoreFormat(nextInstanceAddress),
-                                 false);
-    verifyEnabled(_zkClient,
-                  CLUSTER_NAME,
-                  instanceColonToUnderscoreFormat(nextInstanceAddress),
-                  false);
-    // drop
-    _clusterSetup.dropInstanceFromCluster(CLUSTER_NAME, nextInstanceAddress);
-    verifyInstance(_zkClient,
-                   CLUSTER_NAME,
-                   instanceColonToUnderscoreFormat(nextInstanceAddress),
-                   false);
-    // re-drop
-    caughtException = false;
-    try
-    {
-      _clusterSetup.dropInstanceFromCluster(CLUSTER_NAME, nextInstanceAddress);
-    }
-    catch (HelixException e)
-    {
-      caughtException = true;
-    }
-    AssertJUnit.assertTrue(caughtException);
-    /*
-     * //drop a set _clusterSetup.getClusterManagementTool().enableInstances(CLUSTER_NAME,
-     * instanceAddresses, false); _clusterSetup.dropInstancesFromCluster(CLUSTER_NAME,
-     * instanceAddresses);
-     */
-    // bad format disable, drop
-    String badFormatInstance = "badinstance";
-    caughtException = false;
-    try
-    {
-      _clusterSetup.getClusterManagementTool().enableInstance(CLUSTER_NAME,
-                                                              badFormatInstance,
-                                                              false);
-    }
-    catch (HelixException e)
-    {
-      caughtException = true;
-    }
-    AssertJUnit.assertTrue(caughtException);
-    caughtException = false;
-    try
-    {
-      _clusterSetup.dropInstanceFromCluster(CLUSTER_NAME, badFormatInstance);
-    }
-    catch (HelixException e)
-    {
-      caughtException = true;
-    }
-    AssertJUnit.assertTrue(caughtException);
-  }
-  @Test()
-  public void testAddResource() throws Exception
-  {
-    try
-    {
-      _clusterSetup.addResourceToCluster(CLUSTER_NAME, TEST_DB, 16, STATE_MODEL);
-    }
-    catch(Exception e)
-    {}
-    verifyResource(_zkClient, CLUSTER_NAME, TEST_DB, true);
-  }
-  @Test()
-  public void testRemoveResource() throws Exception
-  {
-    _clusterSetup.setupTestCluster(CLUSTER_NAME);
-    verifyResource(_zkClient, CLUSTER_NAME, TEST_DB, true);
-    _clusterSetup.dropResourceFromCluster(CLUSTER_NAME, TEST_DB);
-    verifyResource(_zkClient, CLUSTER_NAME, TEST_DB, false);
-  }
-  @Test()
-  public void testRebalanceCluster() throws Exception
-  {
-    _clusterSetup.setupTestCluster(CLUSTER_NAME);
-    // testAddInstancesToCluster();
-    testAddResource();
-    _clusterSetup.rebalanceStorageCluster(CLUSTER_NAME, TEST_DB, 4);
-    verifyReplication(_zkClient, CLUSTER_NAME, TEST_DB, 4);
-  }
-  /*
-   * @Test (groups = {"unitTest"}) public void testPrintUsage() throws Exception { Options
-   * cliOptions = ClusterSetup.constructCommandLineOptions();
-   * ClusterSetup.printUsage(null); }
-   */
-  @Test()
-  public void testParseCommandLinesArgs() throws Exception
-  {
-    // ClusterSetup
-    // .processCommandLineArgs(createArgs("-zkSvr "+ZK_ADDR+ " help"));
-    // wipe ZK
-    _zkClient.deleteRecursive("/" + CLUSTER_NAME);
-    _clusterSetup = new ClusterSetup(ZK_ADDR);
-    ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR + " --addCluster "
-        + CLUSTER_NAME));
-    // wipe again
-    _zkClient.deleteRecursive("/" + CLUSTER_NAME);
-    _clusterSetup = new ClusterSetup(ZK_ADDR);
-    _clusterSetup.setupTestCluster(CLUSTER_NAME);
-    ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR + " --addNode "
-        + CLUSTER_NAME + " " + TEST_NODE));
-    verifyInstance(_zkClient,
-                   CLUSTER_NAME,
-                   instanceColonToUnderscoreFormat(TEST_NODE),
-                   true);
-    try
-    {
-      ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR
-        + " --addResource " + CLUSTER_NAME + " " + TEST_DB + " 4 " + STATE_MODEL));
-    }
-    catch(Exception e)
-    {
-    }
-    verifyResource(_zkClient, CLUSTER_NAME, TEST_DB, true);
-    // ClusterSetup
-    // .processCommandLineArgs(createArgs("-zkSvr "+ZK_ADDR+" --addNode node-1"));
-    ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR
-        + " --enableInstance " + CLUSTER_NAME + " "
-        + instanceColonToUnderscoreFormat(TEST_NODE) + " true"));
-    verifyEnabled(_zkClient,
-                  CLUSTER_NAME,
-                  instanceColonToUnderscoreFormat(TEST_NODE),
-                  true);
-    // TODO: verify list commands
-    /*
-     * ClusterSetup
-     * .processCommandLineArgs(createArgs("-zkSvr "+ZK_ADDR+" --listClusterInfo "
-     * +CLUSTER_NAME)); ClusterSetup
-     * .processCommandLineArgs(createArgs("-zkSvr "+ZK_ADDR+" --listClusters"));
-     * ClusterSetup
-     * .processCommandLineArgs(createArgs("-zkSvr "+ZK_ADDR+" --listInstanceInfo "
-     * +CLUSTER_NAME+" "+instanceColonToUnderscoreFormat(TEST_NODE))); ClusterSetup
-     * .processCommandLineArgs
-     * (createArgs("-zkSvr "+ZK_ADDR+" --listInstances "+CLUSTER_NAME)); ClusterSetup
-     * .processCommandLineArgs
-     * (createArgs("-zkSvr "+ZK_ADDR+" --listResourceInfo "+CLUSTER_NAME +" "+TEST_DB));
-     * ClusterSetup
-     * .processCommandLineArgs(createArgs("-zkSvr "+ZK_ADDR+" --listResources "
-     * +CLUSTER_NAME)); ClusterSetup
-     * .processCommandLineArgs(createArgs("-zkSvr "+ZK_ADDR+" --listStateModel "
-     * +CLUSTER_NAME+" "+STATE_MODEL)); ClusterSetup
-     * .processCommandLineArgs(createArgs("-zkSvr "
-     * +ZK_ADDR+" --listStateModels "+CLUSTER_NAME));
-     */
-    // ClusterSetup
-    // .processCommandLineArgs(createArgs("-zkSvr "+ZK_ADDR+" --rebalance "+CLUSTER_NAME+" "+TEST_DB+" 1"));
-    ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR
-        + " --enableInstance " + CLUSTER_NAME + " "
-        + instanceColonToUnderscoreFormat(TEST_NODE) + " false"));
-    verifyEnabled(_zkClient,
-                  CLUSTER_NAME,
-                  instanceColonToUnderscoreFormat(TEST_NODE),
-                  false);
-    ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR + " --dropNode "
-        + CLUSTER_NAME + " " + TEST_NODE));
-  }
-  @Test()
-  public void testSetGetConfig() throws Exception
-  {
-    System.out.println("START testSetGetConfig() " + new Date(System.currentTimeMillis()));
-    // basic
-    String scopesStr = "CLUSTER=" + CLUSTER_NAME + ",PARTICIPANT=localhost_0";
-    String propertiesStr = "key1=value1,key2=value2";
-    String keysStr = "key1,key2";
-    _clusterSetup.setConfig(scopesStr, propertiesStr);
-    String valuesStr = _clusterSetup.getConfig(scopesStr, keysStr);
-    Assert.assertEquals(valuesStr, propertiesStr);
-    System.out.println("END testSetGetConfig() " + new Date(System.currentTimeMillis()));
-  }
-  @Test
-  public void testEnableCluster() throws Exception
-  {
-    // Logger.getRootLogger().setLevel(Level.INFO);
-    String className = TestHelper.getTestClassName();
-    String methodName = TestHelper.getTestMethodName();
-    String clusterName = className + "_" + methodName;
-    System.out.println("START " + clusterName + " at "
-        + new Date(System.currentTimeMillis()));
-    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
-                            "localhost", // participant name prefix
-                            "TestDB", // resource name prefix
-                            1, // resources
-                            10, // partitions per resource
-                            5, // number of nodes
-                            3, // replicas
-                            "MasterSlave",
-                            true); // do rebalance
-    // pause cluster
-    ClusterSetup.processCommandLineArgs(new String[] { "--zkSvr", ZK_ADDR,
-        "--enableCluster", clusterName, "false" });
-    Builder keyBuilder = new Builder(clusterName);
-    boolean exists = _gZkClient.exists(keyBuilder.pause().getPath());
-    Assert.assertTrue(exists, "pause node under controller should be created");
-    // resume cluster
-    ClusterSetup.processCommandLineArgs(new String[] { "--zkSvr", ZK_ADDR,
-        "--enableCluster", clusterName, "true" });
-    exists = _gZkClient.exists(keyBuilder.pause().getPath());
-    Assert.assertFalse(exists, "pause node under controller should be removed");
-    System.out.println("END " + clusterName + " at "
-        + new Date(System.currentTimeMillis()));
-  }
-  @Test
-  public void testDropInstance() throws Exception
-  {
-    // drop without stop, should throw exception
-    String className = TestHelper.getTestClassName();
-    String methodName = TestHelper.getTestMethodName();
-    String clusterName = className + "_" + methodName;
-    System.out.println("START " + clusterName + " at "
-        + new Date(System.currentTimeMillis()));
-    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
-                            "localhost", // participant name prefix
-                            "TestDB", // resource name prefix
-                            1, // resources
-                            10, // partitions per resource
-                            5, // number of nodes
-                            3, // replicas
-                            "MasterSlave",
-                            true); // do rebalance
-    // add fake liveInstance
-    ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
-    Builder keyBuilder = new Builder(clusterName);
-    LiveInstance liveInstance = new LiveInstance("localhost_12918");
-    liveInstance.setSessionId("session_0");
-    liveInstance.setHelixVersion("version_0");
-    accessor.setProperty(keyBuilder.liveInstance("localhost_12918"), liveInstance);
-    // drop without stop the process, should throw exception
-    try
-    {
-      ClusterSetup.processCommandLineArgs(new String[] { "--zkSvr", ZK_ADDR,
-          "--dropNode", clusterName, "localhost:12918" });
-"Should throw exception since localhost_12918 is still in LIVEINSTANCES/");
-    }
-    catch (Exception e)
-    {
-      // OK
-    }
-    accessor.removeProperty(keyBuilder.liveInstance("localhost_12918"));
-    // drop without disable, should throw exception
-    try
-    {
-      ClusterSetup.processCommandLineArgs(new String[] { "--zkSvr", ZK_ADDR,
-          "--dropNode", clusterName, "localhost:12918" });
-"Should throw exception since localhost_12918 is enabled");
-    }
-    catch (Exception e)
-    {
-      // e.printStackTrace();
-      // OK
-    }
-    // drop it
-    ClusterSetup.processCommandLineArgs(new String[] { "--zkSvr", ZK_ADDR,
-        "--enableInstance", clusterName, "localhost_12918", "false" });
-    ClusterSetup.processCommandLineArgs(new String[] { "--zkSvr", ZK_ADDR, "--dropNode",
-        clusterName, "localhost:12918" });
-    Assert.assertNull(accessor.getProperty(keyBuilder.instanceConfig("localhost_12918")),
-                      "Instance config should be dropped");
-    Assert.assertFalse(_gZkClient.exists(PropertyPathConfig.getPath(PropertyType.INSTANCES,
-                                                                    clusterName,
-                                                                    "localhost_12918")),
-                       "Instance/host should be dropped");
-    System.out.println("END " + clusterName + " at "
-        + new Date(System.currentTimeMillis()));
-  }