You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:59 UTC

[36/42] Refactoring the package names and removing jsql parser

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/manager/file/MockFileHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/file/MockFileHelixManager.java b/helix-core/src/test/java/org/apache/helix/manager/file/MockFileHelixManager.java
new file mode 100644
index 0000000..3fc16be
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/file/MockFileHelixManager.java
@@ -0,0 +1,275 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.manager.file;
+
+import org.apache.helix.ClusterMessagingService;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.ConfigChangeListener;
+import org.apache.helix.ControllerChangeListener;
+import org.apache.helix.CurrentStateChangeListener;
+import org.apache.helix.DataAccessor;
+import org.apache.helix.ExternalViewChangeListener;
+import org.apache.helix.HealthStateChangeListener;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.IdealStateChangeListener;
+import org.apache.helix.InstanceType;
+import org.apache.helix.LiveInstanceChangeListener;
+import org.apache.helix.MessageListener;
+import org.apache.helix.PreConnectCallback;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
+import org.apache.helix.manager.file.FileHelixDataAccessor;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.store.PropertyStore;
+import org.apache.helix.store.file.FilePropertyStore;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+
+public class MockFileHelixManager implements HelixManager
+{
+//  private final FileDataAccessor _accessor;
+  private final HelixDataAccessor _accessor;
+  private final String _instanceName;
+  private final String _clusterName;
+  private final InstanceType _type;
+
+  public MockFileHelixManager(String clusterName, String instanceName, InstanceType type,
+                              FilePropertyStore<ZNRecord> store)
+  {
+    _instanceName = instanceName;
+    _clusterName = clusterName;
+    _type = type;
+//    _accessor = new FileDataAccessor(store, clusterName);
+    _accessor = new FileHelixDataAccessor(store, clusterName);
+  }
+
+  @Override
+  public void connect() throws Exception
+  {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public boolean isConnected()
+  {
+    // TODO Auto-generated method stub
+    return false;
+  }
+
+  @Override
+  public void disconnect()
+  {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public void addIdealStateChangeListener(IdealStateChangeListener listener) throws Exception
+  {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public void addLiveInstanceChangeListener(LiveInstanceChangeListener listener) throws Exception
+  {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public void addConfigChangeListener(ConfigChangeListener listener) throws Exception
+  {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public void addMessageListener(MessageListener listener, String instanceName) throws Exception
+  {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public void addCurrentStateChangeListener(CurrentStateChangeListener listener,
+                                            String instanceName,
+                                            String sessionId) throws Exception
+  {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public void addExternalViewChangeListener(ExternalViewChangeListener listener) throws Exception
+  {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public boolean removeListener(Object listener)
+  {
+    // TODO Auto-generated method stub
+    return false;
+  }
+
+  @Override
+  public DataAccessor getDataAccessor()
+  {
+    return null;
+  }
+
+  @Override
+  public String getClusterName()
+  {
+    return _clusterName;
+  }
+
+  @Override
+  public String getInstanceName()
+  {
+    return _instanceName;
+  }
+
+  @Override
+  public String getSessionId()
+  {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public long getLastNotificationTime()
+  {
+    // TODO Auto-generated method stub
+    return 0;
+  }
+
+  @Override
+  public void addControllerListener(ControllerChangeListener listener)
+  {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public HelixAdmin getClusterManagmentTool()
+  {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public PropertyStore<ZNRecord> getPropertyStore()
+  {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public ClusterMessagingService getMessagingService()
+  {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public ParticipantHealthReportCollector getHealthReportCollector()
+  {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public InstanceType getInstanceType()
+  {
+    return _type;
+  }
+
+
+  @Override
+  public void addHealthStateChangeListener(HealthStateChangeListener listener,
+  		String instanceName) throws Exception {
+  	// TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public String getVersion()
+  {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public StateMachineEngine getStateMachineEngine()
+  {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public boolean isLeader()
+  {
+    // TODO Auto-generated method stub
+    return false;
+  }
+
+  @Override
+  public ConfigAccessor getConfigAccessor()
+  {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public void startTimerTasks()
+  {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public void stopTimerTasks()
+  {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public HelixDataAccessor getHelixDataAccessor()
+  {
+    return _accessor;
+  }
+
+  @Override
+  public void addPreConnectCallback(PreConnectCallback callback)
+  {
+    // TODO Auto-generated method stub
+    
+  }
+
+  @Override
+  public ZkHelixPropertyStore<ZNRecord> getHelixPropertyStore()
+  {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/manager/file/TestFileCallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/file/TestFileCallbackHandler.java b/helix-core/src/test/java/org/apache/helix/manager/file/TestFileCallbackHandler.java
new file mode 100644
index 0000000..77fe5a9
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/file/TestFileCallbackHandler.java
@@ -0,0 +1,104 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.manager.file;
+
+import org.testng.annotations.Test;
+import org.testng.AssertJUnit;
+import org.apache.helix.InstanceType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.HelixConstants.ChangeType;
+import org.apache.helix.manager.MockListener;
+import org.apache.helix.manager.file.FileCallbackHandler;
+import org.apache.helix.store.PropertyJsonComparator;
+import org.apache.helix.store.PropertyJsonSerializer;
+import org.apache.helix.store.PropertyStoreException;
+import org.apache.helix.store.file.FilePropertyStore;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestFileCallbackHandler
+{
+  @Test(groups = { "unitTest" })
+  public void testFileCallbackHandler()
+  {
+    final String clusterName = "TestFileCallbackHandler";
+    final String rootNamespace = "/tmp/" + clusterName;
+    final String instanceName = "controller_0";
+    MockListener listener = new MockListener();
+
+    PropertyJsonSerializer<ZNRecord> serializer =
+        new PropertyJsonSerializer<ZNRecord>(ZNRecord.class);
+    PropertyJsonComparator<ZNRecord> comparator =
+        new PropertyJsonComparator<ZNRecord>(ZNRecord.class);
+    FilePropertyStore<ZNRecord> store =
+        new FilePropertyStore<ZNRecord>(serializer, rootNamespace, comparator);
+    try
+    {
+      store.removeRootNamespace();
+    }
+    catch (PropertyStoreException e)
+    {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+
+    listener.reset();
+    MockFileHelixManager manager =
+        new MockFileHelixManager(clusterName, instanceName, InstanceType.CONTROLLER, store);
+    FileCallbackHandler handler =
+        new FileCallbackHandler(manager,
+                                   rootNamespace,
+                                   listener,
+                                   new EventType[] { EventType.NodeChildrenChanged,
+                                       EventType.NodeDeleted, EventType.NodeCreated },
+                                   ChangeType.CONFIG);
+    AssertJUnit.assertEquals(listener, handler.getListener());
+    AssertJUnit.assertEquals(rootNamespace, handler.getPath());
+    AssertJUnit.assertTrue(listener.isConfigChangeListenerInvoked);
+
+    handler =
+        new FileCallbackHandler(manager,
+                                   rootNamespace,
+                                   listener,
+                                   new EventType[] { EventType.NodeChildrenChanged,
+                                       EventType.NodeDeleted, EventType.NodeCreated },
+                                   ChangeType.EXTERNAL_VIEW);
+    AssertJUnit.assertTrue(listener.isExternalViewChangeListenerInvoked);
+
+    EventType[] eventTypes = new EventType[] { EventType.NodeChildrenChanged,
+        EventType.NodeDeleted, EventType.NodeCreated };
+    handler =
+        new FileCallbackHandler(manager,
+                                   rootNamespace,
+                                   listener,
+                                   eventTypes,
+                                   ChangeType.CONTROLLER);
+    AssertJUnit.assertEquals(handler.getEventTypes(), eventTypes);
+    AssertJUnit.assertTrue(listener.isControllerChangeListenerInvoked);
+    
+    listener.reset();
+    handler.reset();
+    AssertJUnit.assertTrue(listener.isControllerChangeListenerInvoked);
+
+    listener.reset();
+    handler.onPropertyChange(rootNamespace);
+    AssertJUnit.assertTrue(listener.isControllerChangeListenerInvoked);
+
+    store.stop();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/manager/file/TestFileDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/file/TestFileDataAccessor.java b/helix-core/src/test/java/org/apache/helix/manager/file/TestFileDataAccessor.java
new file mode 100644
index 0000000..ba1c6cc
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/file/TestFileDataAccessor.java
@@ -0,0 +1,94 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.manager.file;
+
+import org.apache.helix.DataAccessor;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ConfigScope.ConfigScopeProperty;
+import org.apache.helix.manager.file.FileDataAccessor;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.store.PropertyJsonComparator;
+import org.apache.helix.store.PropertyJsonSerializer;
+import org.apache.helix.store.PropertyStoreException;
+import org.apache.helix.store.file.FilePropertyStore;
+import org.testng.AssertJUnit;
+import org.testng.annotations.Test;
+
+
+public class TestFileDataAccessor
+{
+  @Test()
+  public void testFileDataAccessor()
+  {
+    final String clusterName = "TestFileDataAccessor";
+    String rootNamespace = "/tmp/" + clusterName;
+    PropertyJsonSerializer<ZNRecord> serializer = new PropertyJsonSerializer<ZNRecord>(
+        ZNRecord.class);
+    PropertyJsonComparator<ZNRecord> comparator = new PropertyJsonComparator<ZNRecord>(
+        ZNRecord.class);
+    FilePropertyStore<ZNRecord> store = new FilePropertyStore<ZNRecord>(serializer, rootNamespace,
+        comparator);
+    try
+    {
+      store.removeRootNamespace();
+    } catch (PropertyStoreException e)
+    {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+    DataAccessor accessor = new FileDataAccessor(store, clusterName);
+
+    InstanceConfig config = new InstanceConfig("id0");
+    accessor.setProperty(PropertyType.CONFIGS, config, ConfigScopeProperty.PARTICIPANT.toString(),
+        "key0");
+    config = accessor.getProperty(InstanceConfig.class, PropertyType.CONFIGS,
+        ConfigScopeProperty.PARTICIPANT.toString(), "key0");
+    AssertJUnit.assertEquals("id0", config.getId());
+
+    InstanceConfig newConfig = new InstanceConfig("id1");
+    accessor.updateProperty(PropertyType.CONFIGS, newConfig,
+        ConfigScopeProperty.PARTICIPANT.toString(),
+        "key0");
+    config = accessor.getProperty(InstanceConfig.class, PropertyType.CONFIGS,
+        ConfigScopeProperty.PARTICIPANT.toString(), "key0");
+    AssertJUnit.assertEquals("id1", config.getId());
+
+    accessor.removeProperty(PropertyType.CONFIGS, ConfigScopeProperty.PARTICIPANT.toString(),
+        "key0");
+    config = accessor.getProperty(InstanceConfig.class, PropertyType.CONFIGS,
+        ConfigScopeProperty.PARTICIPANT.toString(), "key0");
+    AssertJUnit.assertNull(config);
+
+    LiveInstance leader = new LiveInstance("id2");
+    accessor.updateProperty(PropertyType.LEADER, leader);
+    LiveInstance nullLeader = accessor.getProperty(LiveInstance.class, PropertyType.LEADER);
+    AssertJUnit.assertNull(nullLeader);
+
+    accessor.setProperty(PropertyType.LEADER, leader);
+    LiveInstance newLeader = new LiveInstance("id3");
+    accessor.updateProperty(PropertyType.LEADER, newLeader);
+    leader = accessor.getProperty(LiveInstance.class, PropertyType.LEADER);
+    AssertJUnit.assertEquals("id3", leader.getId());
+
+    // List<ZNRecord> childs = accessor.getChildValues(PropertyType.HISTORY);
+    // AssertJUnit.assertEquals(childs.size(), 0);
+
+    store.stop();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/manager/file/TestStaticFileCM.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/file/TestStaticFileCM.java b/helix-core/src/test/java/org/apache/helix/manager/file/TestStaticFileCM.java
new file mode 100644
index 0000000..388e753
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/file/TestStaticFileCM.java
@@ -0,0 +1,148 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.manager.file;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.helix.ClusterView;
+import org.apache.helix.InstanceType;
+import org.apache.helix.manager.MockListener;
+import org.apache.helix.manager.file.StaticFileHelixManager;
+import org.apache.helix.manager.file.StaticFileHelixManager.DBParam;
+import org.apache.helix.tools.ClusterViewSerializer;
+import org.testng.AssertJUnit;
+import org.testng.annotations.Test;
+
+
+public class TestStaticFileCM
+{
+  @Test()
+  public void testStaticFileCM()
+  {
+    final String clusterName = "TestSTaticFileCM";
+    final String controllerName = "controller_0";
+
+    ClusterView view;
+    String[] illegalNodesInfo = {"localhost_8900", "localhost_8901"};
+    List<DBParam> dbParams = new ArrayList<DBParam>();
+    dbParams.add(new DBParam("TestDB0", 10));
+    dbParams.add(new DBParam("TestDB1", 10));
+
+    boolean exceptionCaught = false;
+    try
+    {
+      view = StaticFileHelixManager.generateStaticConfigClusterView(illegalNodesInfo, dbParams, 3);
+    } catch (IllegalArgumentException e)
+    {
+      exceptionCaught = true;
+    }
+    AssertJUnit.assertTrue(exceptionCaught);
+    String[] nodesInfo = {"localhost:8900", "localhost:8901", "localhost:8902"};
+    view = StaticFileHelixManager.generateStaticConfigClusterView(nodesInfo, dbParams, 2);
+
+    String configFile = "/tmp/" + clusterName;
+    ClusterViewSerializer.serialize(view, new File(configFile));
+    ClusterView restoredView = ClusterViewSerializer.deserialize(new File(configFile));
+    // System.out.println(restoredView);
+    // byte[] bytes = ClusterViewSerializer.serialize(restoredView);
+    // System.out.println(new String(bytes));
+
+    StaticFileHelixManager.verifyFileBasedClusterStates("localhost_8900",
+                                       configFile, configFile);
+
+    StaticFileHelixManager controller = new StaticFileHelixManager(clusterName, controllerName,
+                                                     InstanceType.CONTROLLER, configFile);
+    controller.disconnect();
+    AssertJUnit.assertFalse(controller.isConnected());
+    controller.connect();
+    AssertJUnit.assertTrue(controller.isConnected());
+
+    String sessionId = controller.getSessionId();
+//    AssertJUnit.assertEquals(DynamicFileClusterManager._sessionId, sessionId);
+    AssertJUnit.assertEquals(clusterName, controller.getClusterName());
+    AssertJUnit.assertEquals(0, controller.getLastNotificationTime());
+    AssertJUnit.assertEquals(InstanceType.CONTROLLER, controller.getInstanceType());
+    AssertJUnit.assertNull(controller.getPropertyStore());
+    AssertJUnit.assertNull(controller.getHealthReportCollector());
+    AssertJUnit.assertEquals(controllerName, controller.getInstanceName());
+    AssertJUnit.assertNull(controller.getClusterManagmentTool());
+    AssertJUnit.assertNull(controller.getMessagingService());
+
+    MockListener controllerListener = new MockListener();
+    AssertJUnit.assertFalse(controller.removeListener(controllerListener));
+    controllerListener.reset();
+
+    controller.addIdealStateChangeListener(controllerListener);
+    AssertJUnit.assertTrue(controllerListener.isIdealStateChangeListenerInvoked);
+
+    controller.addMessageListener(controllerListener, "localhost_8900");
+    AssertJUnit.assertTrue(controllerListener.isMessageListenerInvoked);
+
+    exceptionCaught = false;
+    try
+    {
+      controller.addLiveInstanceChangeListener(controllerListener);
+    } catch (UnsupportedOperationException e)
+    {
+      exceptionCaught = true;
+    }
+    AssertJUnit.assertTrue(exceptionCaught);
+
+    exceptionCaught = false;
+    try
+    {
+      controller.addCurrentStateChangeListener(controllerListener, "localhost_8900", sessionId);
+    } catch (UnsupportedOperationException e)
+    {
+      exceptionCaught = true;
+    }
+    AssertJUnit.assertTrue(exceptionCaught);
+
+    exceptionCaught = false;
+    try
+    {
+      controller.addConfigChangeListener(controllerListener);
+    } catch (UnsupportedOperationException e)
+    {
+      exceptionCaught = true;
+    }
+    AssertJUnit.assertTrue(exceptionCaught);
+
+    exceptionCaught = false;
+    try
+    {
+      controller.addExternalViewChangeListener(controllerListener);
+    } catch (UnsupportedOperationException e)
+    {
+      exceptionCaught = true;
+    }
+    AssertJUnit.assertTrue(exceptionCaught);
+
+    exceptionCaught = false;
+    try
+    {
+      controller.addControllerListener(controllerListener);
+    } catch (UnsupportedOperationException e)
+    {
+      exceptionCaught = true;
+    }
+    AssertJUnit.assertTrue(exceptionCaught);
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/manager/file/UnitTestDynFileClusterMgr.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/file/UnitTestDynFileClusterMgr.java b/helix-core/src/test/java/org/apache/helix/manager/file/UnitTestDynFileClusterMgr.java
new file mode 100644
index 0000000..8db8802
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/file/UnitTestDynFileClusterMgr.java
@@ -0,0 +1,282 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.manager.file;
+
+import java.util.List;
+
+import org.apache.helix.ClusterMessagingService;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixException;
+import org.apache.helix.InstanceType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.MockListener;
+import org.apache.helix.manager.file.DynamicFileHelixManager;
+import org.apache.helix.manager.file.FileHelixAdmin;
+import org.apache.helix.mock.storage.DummyProcess;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.IdealState.IdealStateModeProperty;
+import org.apache.helix.store.PropertyJsonComparator;
+import org.apache.helix.store.PropertyJsonSerializer;
+import org.apache.helix.store.PropertyStore;
+import org.apache.helix.store.PropertyStoreException;
+import org.apache.helix.store.file.FilePropertyStore;
+import org.testng.Assert;
+import org.testng.AssertJUnit;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class UnitTestDynFileClusterMgr
+{
+  final String className = "UnitTestDynFileClusterMgr";
+  final String _rootNamespace = "/tmp/" + className;
+  final String instanceName = "localhost_12918";
+
+  FilePropertyStore<ZNRecord> _store;
+  HelixAdmin _mgmtTool;
+
+  @BeforeClass
+  public void beforeClass()
+  {
+    _store = new FilePropertyStore<ZNRecord>(new PropertyJsonSerializer<ZNRecord>(ZNRecord.class),
+        _rootNamespace, new PropertyJsonComparator<ZNRecord>(ZNRecord.class));
+    try
+    {
+      _store.removeRootNamespace();
+    } catch (PropertyStoreException e)
+    {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+
+    _mgmtTool = new FileHelixAdmin(_store);
+  }
+
+  @Test()
+  public void testBasic() throws PropertyStoreException
+  {
+    final String clusterName = className + "_basic";
+    String controllerName = "controller_0";
+    DynamicFileHelixManager controller = new DynamicFileHelixManager(clusterName,
+        controllerName, InstanceType.CONTROLLER, _store);
+
+    _mgmtTool.addCluster(clusterName, true);
+    _mgmtTool.addInstance(clusterName, new InstanceConfig(instanceName));
+
+    DynamicFileHelixManager participant = new DynamicFileHelixManager(clusterName,
+        instanceName, InstanceType.PARTICIPANT, _store);
+
+    AssertJUnit.assertEquals(instanceName, participant.getInstanceName());
+
+    controller.disconnect();
+    AssertJUnit.assertFalse(controller.isConnected());
+    controller.connect();
+    AssertJUnit.assertTrue(controller.isConnected());
+
+    String sessionId = controller.getSessionId();
+    // AssertJUnit.assertEquals(DynamicFileClusterManager._sessionId,
+    // sessionId);
+    AssertJUnit.assertEquals(clusterName, controller.getClusterName());
+    AssertJUnit.assertEquals(0, controller.getLastNotificationTime());
+    AssertJUnit.assertEquals(InstanceType.CONTROLLER, controller.getInstanceType());
+
+    // AssertJUnit.assertNull(controller.getPropertyStore());
+    PropertyStore<ZNRecord> propertyStore = controller.getPropertyStore();
+    AssertJUnit.assertNotNull(propertyStore);
+    propertyStore.setProperty("testKey", new ZNRecord("testValue"));
+    ZNRecord record = propertyStore.getProperty("testKey");
+    Assert.assertEquals(record.getId(), "testValue");
+
+    AssertJUnit.assertNull(controller.getHealthReportCollector());
+
+    MockListener controllerListener = new MockListener();
+    controllerListener.reset();
+
+    controller.addIdealStateChangeListener(controllerListener);
+    AssertJUnit.assertTrue(controllerListener.isIdealStateChangeListenerInvoked);
+
+    controller.addLiveInstanceChangeListener(controllerListener);
+    AssertJUnit.assertTrue(controllerListener.isLiveInstanceChangeListenerInvoked);
+
+    controller.addCurrentStateChangeListener(controllerListener, controllerName, sessionId);
+    AssertJUnit.assertTrue(controllerListener.isCurrentStateChangeListenerInvoked);
+
+    boolean exceptionCaught = false;
+    try
+    {
+      controller.addConfigChangeListener(controllerListener);
+    } catch (UnsupportedOperationException e)
+    {
+      exceptionCaught = true;
+    }
+    AssertJUnit.assertTrue(exceptionCaught);
+
+    exceptionCaught = false;
+    try
+    {
+      controller.addExternalViewChangeListener(controllerListener);
+    } catch (UnsupportedOperationException e)
+    {
+      exceptionCaught = true;
+    }
+    AssertJUnit.assertTrue(exceptionCaught);
+
+    exceptionCaught = false;
+    try
+    {
+      controller.addControllerListener(controllerListener);
+    } catch (UnsupportedOperationException e)
+    {
+      exceptionCaught = true;
+    }
+    AssertJUnit.assertTrue(exceptionCaught);
+
+    AssertJUnit.assertFalse(controller.removeListener(controllerListener));
+
+    exceptionCaught = false;
+    try
+    {
+      controller.addIdealStateChangeListener(null);
+    } catch (HelixException e)
+    {
+      exceptionCaught = true;
+    }
+    AssertJUnit.assertTrue(exceptionCaught);
+
+    // test message service
+    ClusterMessagingService msgService = controller.getMessagingService();
+
+    // test file management tool
+    HelixAdmin tool = controller.getClusterManagmentTool();
+
+    exceptionCaught = false;
+    try
+    {
+      tool.getClusters();
+    } catch (UnsupportedOperationException e)
+    {
+      exceptionCaught = true;
+    }
+    AssertJUnit.assertTrue(exceptionCaught);
+
+    exceptionCaught = false;
+    try
+    {
+      tool.getResourcesInCluster(clusterName);
+    } catch (UnsupportedOperationException e)
+    {
+      exceptionCaught = true;
+    }
+    AssertJUnit.assertTrue(exceptionCaught);
+
+    exceptionCaught = false;
+    try
+    {
+      tool.addResource(clusterName, "resource", 10, "MasterSlave",
+          IdealStateModeProperty.AUTO.toString());
+    } catch (UnsupportedOperationException e)
+    {
+      exceptionCaught = true;
+    }
+    AssertJUnit.assertTrue(exceptionCaught);
+
+    exceptionCaught = false;
+    try
+    {
+      tool.getStateModelDefs(clusterName);
+    } catch (UnsupportedOperationException e)
+    {
+      exceptionCaught = true;
+    }
+    AssertJUnit.assertTrue(exceptionCaught);
+
+    exceptionCaught = false;
+    try
+    {
+      tool.getInstanceConfig(clusterName, instanceName);
+    } catch (UnsupportedOperationException e)
+    {
+      exceptionCaught = true;
+    }
+    AssertJUnit.assertTrue(exceptionCaught);
+
+    exceptionCaught = false;
+    try
+    {
+      tool.getStateModelDef(clusterName, "MasterSlave");
+    } catch (UnsupportedOperationException e)
+    {
+      exceptionCaught = true;
+    }
+    AssertJUnit.assertTrue(exceptionCaught);
+
+    exceptionCaught = false;
+    try
+    {
+      tool.getResourceExternalView(clusterName, "resource");
+    } catch (UnsupportedOperationException e)
+    {
+      exceptionCaught = true;
+    }
+    AssertJUnit.assertTrue(exceptionCaught);
+
+    exceptionCaught = false;
+    try
+    {
+      tool.enableInstance(clusterName, "resource", false);
+    } catch (UnsupportedOperationException e)
+    {
+      exceptionCaught = true;
+    }
+    AssertJUnit.assertTrue(exceptionCaught);
+
+    tool.addCluster(clusterName, true);
+    tool.addResource(clusterName, "resource", 10, "MasterSlave");
+    InstanceConfig config = new InstanceConfig("nodeConfig");
+    tool.addInstance(clusterName, config);
+    List<String> instances = tool.getInstancesInCluster(clusterName);
+    AssertJUnit.assertEquals(1, instances.size());
+    tool.dropInstance(clusterName, config);
+
+    IdealState idealState = new IdealState("idealState");
+    tool.setResourceIdealState(clusterName, "resource", idealState);
+    idealState = tool.getResourceIdealState(clusterName, "resource");
+    AssertJUnit.assertEquals(idealState.getId(), "idealState");
+
+    tool.dropResource(clusterName, "resource");
+    _store.stop();
+  }
+
+  @Test
+  public void testStartInstanceBeforeAdd()
+  {
+    final String clusterName = className + "_startInsB4Add";
+    _mgmtTool.addCluster(clusterName, true);
+
+    try
+    {
+      new DummyProcess(null, clusterName, instanceName, "dynamic-file", null, 0, _store).start();
+      Assert.fail("Should fail since instance is not configured");
+    } catch (HelixException e)
+    {
+      // OK
+    } catch (Exception e)
+    {
+      Assert.fail("Should not fail on exceptions other than ClusterManagerException");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/manager/zk/TestDefaultControllerMsgHandlerFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestDefaultControllerMsgHandlerFactory.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestDefaultControllerMsgHandlerFactory.java
new file mode 100644
index 0000000..f3e6798
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestDefaultControllerMsgHandlerFactory.java
@@ -0,0 +1,102 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.manager.zk;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.manager.zk.DefaultControllerMessageHandlerFactory;
+import org.apache.helix.manager.zk.DefaultControllerMessageHandlerFactory.DefaultControllerMessageHandler;
+import org.apache.helix.messaging.handling.MessageHandler;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageType;
+import org.testng.AssertJUnit;
+import org.testng.annotations.Test;
+
+
+public class TestDefaultControllerMsgHandlerFactory
+{
+  @Test()
+  public void testDefaultControllerMsgHandlerFactory()
+  {
+  	System.out.println("START TestDefaultControllerMsgHandlerFactory at " + new Date(System.currentTimeMillis()));
+
+    DefaultControllerMessageHandlerFactory facotry = new DefaultControllerMessageHandlerFactory();
+
+    Message message = new Message(MessageType.NO_OP, "0");
+    NotificationContext context = new NotificationContext(null);
+
+    boolean exceptionCaught = false;
+    try
+    {
+      MessageHandler handler = facotry.createHandler(message, context);
+    } catch (HelixException e)
+    {
+      exceptionCaught = true;
+    }
+    AssertJUnit.assertTrue(exceptionCaught);
+
+    message = new Message(MessageType.CONTROLLER_MSG, "1");
+    exceptionCaught = false;
+    try
+    {
+      MessageHandler handler = facotry.createHandler(message, context);
+    } catch (HelixException e)
+    {
+      exceptionCaught = true;
+    }
+    AssertJUnit.assertFalse(exceptionCaught);
+
+    Map<String, String> resultMap = new HashMap<String, String>();
+    message = new Message(MessageType.NO_OP, "3");
+    DefaultControllerMessageHandler defaultHandler = new DefaultControllerMessageHandler(message, context);
+    try
+    {
+      defaultHandler.handleMessage();
+    } catch (HelixException e)
+    {
+      exceptionCaught = true;
+    }
+    catch (InterruptedException e)
+    {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+    AssertJUnit.assertTrue(exceptionCaught);
+
+    message = new Message(MessageType.CONTROLLER_MSG, "4");
+    defaultHandler = new DefaultControllerMessageHandler(message, context);
+    exceptionCaught = false;
+    try
+    {
+      defaultHandler.handleMessage();
+    } catch (HelixException e)
+    {
+      exceptionCaught = true;
+    }
+    catch (InterruptedException e)
+    {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+    AssertJUnit.assertFalse(exceptionCaught);
+    System.out.println("END TestDefaultControllerMsgHandlerFactory at " + new Date(System.currentTimeMillis()));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleNewSession.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleNewSession.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleNewSession.java
new file mode 100644
index 0000000..cfacedb
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleNewSession.java
@@ -0,0 +1,71 @@
+package org.apache.helix.manager.zk;
+
+import java.util.Date;
+
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZkTestHelper;
+import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestHandleNewSession extends ZkIntegrationTestBase
+{
+  @Test
+  public void testHandleNewSession() throws Exception
+  {
+    // Logger.getRootLogger().setLevel(Level.INFO);
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    
+    System.out.println("START " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+    
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+                            "localhost", // participant name prefix
+                            "TestDB", // resource name prefix
+                            1, // resources
+                            10, // partitions per resource
+                            5, // number of nodes
+                            3, // replicas
+                            "MasterSlave",
+                            true); // do rebalance
+
+    ZKHelixManager manager =
+        new ZKHelixManager(clusterName,
+                               "localhost_12918",
+                               InstanceType.PARTICIPANT,
+                               ZK_ADDR);
+    manager.connect();
+   
+    // Logger.getRootLogger().setLevel(Level.INFO);
+    String lastSessionId = manager.getSessionId();
+    for (int i = 0; i < 3; i++)
+    {
+      // System.err.println("curSessionId: " + lastSessionId);
+      ZkTestHelper.expireSession(manager._zkClient);
+
+      String sessionId = manager.getSessionId();
+      Assert.assertTrue(sessionId.compareTo(lastSessionId) > 0, "Session id should be increased after expiry");
+      lastSessionId = sessionId;
+
+      // make sure session id is not 0
+      Assert.assertFalse(sessionId.equals("0"),
+                         "Hit race condition in zhclient.handleNewSession(). sessionId is not returned yet.");
+      
+      // TODO: need to test session expiry during handleNewSession()
+    }
+
+    // Logger.getRootLogger().setLevel(Level.INFO);
+    System.out.println("Disconnecting ...");
+    manager.disconnect();
+    
+    System.out.println("END " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/manager/zk/TestLiveInstanceBounce.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestLiveInstanceBounce.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestLiveInstanceBounce.java
new file mode 100644
index 0000000..53da22b
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestLiveInstanceBounce.java
@@ -0,0 +1,65 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.manager.zk;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.TestHelper.StartCMResult;
+import org.apache.helix.integration.ZkStandAloneCMTestBase;
+import org.apache.helix.integration.ZkStandAloneCMTestBaseWithPropertyServerCheck;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestLiveInstanceBounce extends ZkStandAloneCMTestBaseWithPropertyServerCheck
+{
+  @Test
+  public void testInstanceBounce() throws Exception
+  {
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    StartCMResult controllerResult = _startCMResultMap.get(controllerName);
+    ZKHelixManager controller = (ZKHelixManager) controllerResult._manager;
+    int handlerSize = controller.getHandlers().size();
+
+    for (int i = 0; i < 2; i++)
+    {
+      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      // kill 2 participants
+      _startCMResultMap.get(instanceName)._manager.disconnect();
+      _startCMResultMap.get(instanceName)._thread.interrupt();
+      try
+      {
+        Thread.sleep(1000);
+      }
+      catch (InterruptedException e)
+      {
+        e.printStackTrace();
+      }
+      // restart the participant
+      StartCMResult result = TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instanceName);
+      _startCMResultMap.put(instanceName, result);
+      Thread.sleep(100);
+    }
+    Thread.sleep(2000);
+
+    boolean result = ClusterStateVerifier.verifyByPolling(
+        new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME), 50 * 1000);
+    Assert.assertTrue(result);
+    // When a new live instance is created, we still add current state listener to it thus number should increase by 2
+    Assert.assertEquals( controller.getHandlers().size(), handlerSize + 2);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/manager/zk/TestWtCacheAsyncOpMultiThread.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestWtCacheAsyncOpMultiThread.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestWtCacheAsyncOpMultiThread.java
new file mode 100644
index 0000000..3816f52
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestWtCacheAsyncOpMultiThread.java
@@ -0,0 +1,278 @@
+package org.apache.helix.manager.zk;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.Callable;
+
+import org.I0Itec.zkclient.DataUpdater;
+import org.apache.helix.AccessOption;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZNRecordUpdater;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkCacheBaseDataAccessor;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestWtCacheAsyncOpMultiThread extends ZkUnitTestBase
+{
+  class TestCreateZkCacheBaseDataAccessor implements Callable<Boolean>
+  {
+    final ZkCacheBaseDataAccessor<ZNRecord> _accessor;
+    final String                         _clusterName;
+    final int _id;
+
+    public TestCreateZkCacheBaseDataAccessor(ZkCacheBaseDataAccessor<ZNRecord> accessor, String clusterName, int id)
+    {
+      _accessor = accessor;
+      _clusterName = clusterName;
+      _id = id;
+    }
+
+    @Override
+    public Boolean call() throws Exception
+    {
+      // create 10 current states in 2 steps
+      List<String> paths = new ArrayList<String>();
+      List<ZNRecord> records = new ArrayList<ZNRecord>();
+      for (int j = 0; j < 2; j++)
+      {
+        paths.clear();
+        records.clear();
+
+        if (_id == 1 && j == 0)
+        {
+          // let thread_0 create 0-4
+          Thread.sleep(30);
+        }
+
+        if (_id == 0 && j == 1)
+        {
+          // let thread_1 create 5-9
+          Thread.sleep(100);
+        }
+        
+        
+        for (int i = 0; i < 5; i++)
+        {
+          int k = j * 5 + i;
+          String path =
+              PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
+                                         _clusterName,
+                                         "localhost_8901",
+                                         "session_0",
+                                         "TestDB" + k);
+          ZNRecord record = new ZNRecord("TestDB" + k);
+
+          paths.add(path);
+          records.add(record);
+        }
+        
+        boolean[] success = _accessor.createChildren(paths, records, AccessOption.PERSISTENT);
+        // System.out.println("thread-" + _id + " creates " + j  + ": " + Arrays.toString(success));
+        
+        // create all all sync'ed, so we shall see either all true or all false
+        for (int i = 1; i < 5; i++)
+        {
+          Assert.assertEquals(success[i], success[0], "Should be either all succeed of all fail");
+        }
+      }
+
+      return true;
+    }
+  }
+  
+  class TestUpdateZkCacheBaseDataAccessor implements Callable<Boolean>
+  {
+    final ZkCacheBaseDataAccessor<ZNRecord> _accessor;
+    final String                         _clusterName;
+    final int _id;
+
+    public TestUpdateZkCacheBaseDataAccessor(ZkCacheBaseDataAccessor<ZNRecord> accessor, String clusterName, int id)
+    {
+      _accessor = accessor;
+      _clusterName = clusterName;
+      _id = id;
+    }
+
+    @Override
+    public Boolean call() throws Exception
+    {
+      // create 10 current states in 2 steps
+      List<String> paths = new ArrayList<String>();
+      List<DataUpdater<ZNRecord>> updaters = new ArrayList<DataUpdater<ZNRecord>>();
+      for (int j = 0; j < 10; j++)
+      {
+        paths.clear();
+        updaters.clear();
+
+        for (int i = 0; i < 10; i++)
+        {
+          String path =
+              PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
+                                         _clusterName,
+                                         "localhost_8901",
+                                         "session_0",
+                                         "TestDB" + i);
+          
+          ZNRecord newRecord = new ZNRecord("TestDB" + i);
+          newRecord.setSimpleField("" + j, "" + j);
+          DataUpdater<ZNRecord> updater = new ZNRecordUpdater(newRecord);
+          paths.add(path);
+          updaters.add(updater);
+        }
+        
+        boolean[] success = _accessor.updateChildren(paths, updaters, AccessOption.PERSISTENT);
+        // System.out.println("thread-" + _id + " updates " + j + ": " + Arrays.toString(success));
+        
+        for (int i = 0; i < 10; i++)
+        {
+          Assert.assertEquals(success[i], true, "Should be all succeed");
+        }
+      }
+
+      return true;
+    }
+  }
+  
+  class TestSetZkCacheBaseDataAccessor implements Callable<Boolean>
+  {
+    final ZkCacheBaseDataAccessor<ZNRecord> _accessor;
+    final String                         _clusterName;
+    final int _id;
+
+    public TestSetZkCacheBaseDataAccessor(ZkCacheBaseDataAccessor<ZNRecord> accessor, String clusterName, int id)
+    {
+      _accessor = accessor;
+      _clusterName = clusterName;
+      _id = id;
+    }
+
+    @Override
+    public Boolean call() throws Exception
+    {
+      // create 10 current states in 2 steps
+      List<String> paths = new ArrayList<String>();
+      List<ZNRecord> records = new ArrayList<ZNRecord>();
+      for (int j = 0; j < 2; j++)
+      {
+        paths.clear();
+        records.clear();
+
+        if (_id == 1 && j == 0)
+        {
+          // let thread_0 create 0-4
+          Thread.sleep(30);
+        }
+
+        if (_id == 0 && j == 1)
+        {
+          // let thread_1 create 5-9
+          Thread.sleep(100);
+        }
+
+        for (int i = 0; i < 5; i++)
+        {
+          int k = j * 5 + i;
+          String path =
+              PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, _clusterName, "TestDB" + k);
+          ZNRecord record = new ZNRecord("TestDB" + k);
+
+          paths.add(path);
+          records.add(record);
+        }
+        boolean[] success = _accessor.setChildren(paths, records, AccessOption.PERSISTENT);
+        // System.out.println("thread-" + _id + " sets " + j  + ": " + Arrays.toString(success));
+        
+        for (int i = 0; i < 5; i++)
+        {
+          Assert.assertEquals(success[i], true);
+        }
+      }
+
+      return true;
+    }
+  }
+  
+  @Test
+  public void testHappyPathZkCacheBaseDataAccessor()
+  {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    System.out.println("START " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+    // init zkCacheDataAccessor
+    String curStatePath =
+        PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
+                                   clusterName,
+                                   "localhost_8901");
+    String extViewPath =
+        PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, clusterName);
+
+    ZkBaseDataAccessor<ZNRecord> baseAccessor =
+        new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+
+    baseAccessor.create(curStatePath, null, AccessOption.PERSISTENT);
+
+    List<String> cachePaths = Arrays.asList(curStatePath, extViewPath);
+    ZkCacheBaseDataAccessor<ZNRecord> accessor =
+        new ZkCacheBaseDataAccessor<ZNRecord>(baseAccessor,
+                                           null,
+                                           cachePaths,
+                                           null);
+
+    // TestHelper.printCache(accessor._wtCache);
+    boolean ret = TestHelper.verifyZkCache(cachePaths, accessor._wtCache._cache, _gZkClient, false);
+    Assert.assertTrue(ret, "wtCache doesn't match data on Zk");
+
+    // create 10 current states using 2 threads
+    List<Callable<Boolean>> threads = new ArrayList<Callable<Boolean>>();
+    for (int i = 0; i < 2; i++)
+    {
+      threads.add(new TestCreateZkCacheBaseDataAccessor(accessor, clusterName, i));
+    }
+    TestHelper.startThreadsConcurrently(threads, 1000);
+
+    // verify wtCache
+    // TestHelper.printCache(accessor._wtCache);
+    ret = TestHelper.verifyZkCache(cachePaths, accessor._wtCache._cache, _gZkClient, false);
+    Assert.assertTrue(ret, "wtCache doesn't match data on Zk");
+    
+    // update 10 current states 10 times using 2 threads
+    threads.clear();
+    for (int i = 0; i < 2; i++)
+    {
+      threads.add(new TestUpdateZkCacheBaseDataAccessor(accessor, clusterName, i));
+    }
+    TestHelper.startThreadsConcurrently(threads, 1000);
+    
+    // verify wtCache
+    // TestHelper.printCache(accessor._wtCache);
+    ret = TestHelper.verifyZkCache(cachePaths, accessor._wtCache._cache, _gZkClient, false);
+    Assert.assertTrue(ret, "wtCache doesn't match data on Zk");
+
+    // set 10 external views using 2 threads
+    threads.clear();
+    for (int i = 0; i < 2; i++)
+    {
+      threads.add(new TestSetZkCacheBaseDataAccessor(accessor, clusterName, i));
+    }
+    TestHelper.startThreadsConcurrently(threads, 1000);
+    
+    // verify wtCache
+    // TestHelper.printCache(accessor._wtCache);
+    ret = TestHelper.verifyZkCache(cachePaths, accessor._wtCache._cache, _gZkClient, false);
+    Assert.assertTrue(ret, "wtCache doesn't match data on Zk");
+    
+    System.out.println("END " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/manager/zk/TestWtCacheAsyncOpSingleThread.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestWtCacheAsyncOpSingleThread.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestWtCacheAsyncOpSingleThread.java
new file mode 100644
index 0000000..ef47a8a
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestWtCacheAsyncOpSingleThread.java
@@ -0,0 +1,253 @@
+package org.apache.helix.manager.zk;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+
+import org.I0Itec.zkclient.DataUpdater;
+import org.apache.helix.AccessOption;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZNRecordUpdater;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkCacheBaseDataAccessor;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestWtCacheAsyncOpSingleThread extends ZkUnitTestBase
+{
+  @Test
+  public void testHappyPathZkCacheBaseDataAccessor()
+  {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    System.out.println("START " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+    // init zkCacheDataAccessor
+    String curStatePath =
+        PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
+                                   clusterName,
+                                   "localhost_8901");
+    String extViewPath =
+        PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, clusterName);
+
+    ZkBaseDataAccessor<ZNRecord> baseAccessor =
+        new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+
+    baseAccessor.create(curStatePath, null, AccessOption.PERSISTENT);
+
+    List<String> cachePaths = Arrays.asList(curStatePath, extViewPath);
+    ZkCacheBaseDataAccessor<ZNRecord> accessor =
+        new ZkCacheBaseDataAccessor<ZNRecord>(baseAccessor,
+                                           null,
+                                           cachePaths,
+                                           null);
+
+    boolean ret = TestHelper.verifyZkCache(cachePaths, accessor._wtCache._cache, _gZkClient, false);
+    Assert.assertTrue(ret, "wtCache doesn't match data on Zk");
+
+
+    // create 10 current states
+    List<String> paths = new ArrayList<String>();
+    List<ZNRecord> records = new ArrayList<ZNRecord>();
+    for (int i = 0; i < 10; i++)
+    {
+      String path =
+          PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
+                                     clusterName,
+                                     "localhost_8901",
+                                     "session_0",
+                                     "TestDB" + i);
+      ZNRecord record = new ZNRecord("TestDB" + i);
+
+      paths.add(path);
+      records.add(record);
+    }
+
+    boolean[] success = accessor.createChildren(paths, records, AccessOption.PERSISTENT);
+    for (int i = 0; i < 10; i++)
+    {
+      Assert.assertTrue(success[i], "Should succeed in create: " + paths.get(i));
+    }
+
+    // verify wtCache
+    // TestHelper.printCache(accessor._wtCache);
+    ret = TestHelper.verifyZkCache(cachePaths, accessor._wtCache._cache, _gZkClient, false);
+    Assert.assertTrue(ret, "wtCache doesn't match data on Zk");
+
+
+    // update each current state 10 times
+    List<DataUpdater<ZNRecord>> updaters = new ArrayList<DataUpdater<ZNRecord>>();
+    for (int j = 0; j < 10; j++)
+    {
+      paths.clear();
+      updaters.clear();
+      for (int i = 0; i < 10; i++)
+      {
+        String path = curStatePath + "/session_0/TestDB" + i;
+        ZNRecord newRecord = new ZNRecord("TestDB" + i);
+        newRecord.setSimpleField("" + j, "" + j);
+        DataUpdater<ZNRecord> updater = new ZNRecordUpdater(newRecord);
+        paths.add(path);
+        updaters.add(updater);
+      }
+      success = accessor.updateChildren(paths, updaters, AccessOption.PERSISTENT);
+
+      for (int i = 0; i < 10; i++)
+      {
+        Assert.assertTrue(success[i], "Should succeed in update: " + paths.get(i));
+      }
+    }
+
+    // verify cache
+    // TestHelper.printCache(accessor._wtCache);
+    ret = TestHelper.verifyZkCache(cachePaths, accessor._wtCache._cache, _gZkClient, false);
+    Assert.assertTrue(ret, "wtCache doesn't match data on Zk");
+
+
+    // set 10 external views
+    paths.clear();
+    records.clear();
+    for (int i = 0; i < 10; i++)
+    {
+      String path =
+          PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, clusterName, "TestDB" + i);
+      ZNRecord record = new ZNRecord("TestDB" + i);
+
+      paths.add(path);
+      records.add(record);
+    }
+    success = accessor.setChildren(paths, records, AccessOption.PERSISTENT);
+    for (int i = 0; i < 10; i++)
+    {
+      Assert.assertTrue(success[i], "Should succeed in set: " + paths.get(i));
+    }
+
+    // verify wtCache
+    // TestHelper.printCache(accessor._wtCache);
+    ret = TestHelper.verifyZkCache(cachePaths, accessor._wtCache._cache, _gZkClient, false);
+    Assert.assertTrue(ret, "wtCache doesn't match data on Zk");
+
+
+    // get 10 external views
+    paths.clear();
+    records.clear();
+    for (int i = 0; i < 10; i++)
+    {
+      String path =
+          PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, clusterName, "TestDB" + i);
+      paths.add(path);
+    }
+
+    records = accessor.get(paths, null, 0);
+    for (int i = 0; i < 10; i++)
+    {
+      Assert.assertEquals(records.get(i).getId(), "TestDB" + i);
+    }
+
+    // getChildren
+    records.clear();
+    records = accessor.getChildren(extViewPath, null, 0);
+    for (int i = 0; i < 10; i++)
+    {
+      Assert.assertEquals(records.get(i).getId(), "TestDB" + i);
+    }
+
+    // exists
+    paths.clear();
+    for (int i = 0; i < 10; i++)
+    {
+      String path =
+          PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
+                                     clusterName,
+                                     "localhost_8901",
+                                     "session_0",
+                                     "TestDB" + i);
+      paths.add(path);
+    }
+    success = accessor.exists(paths, 0);
+    for (int i = 0; i < 10; i++)
+    {
+      Assert.assertTrue(success[i], "Should exits: TestDB" + i);
+    }
+
+    System.out.println("END " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+  }
+  
+  @Test
+  public void testCreateFailZkCacheBaseAccessor()
+  {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    System.out.println("START " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+    // init zkCacheDataAccessor
+    String curStatePath =
+        PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
+                                   clusterName,
+                                   "localhost_8901");
+    String extViewPath =
+        PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, clusterName);
+
+    ZkBaseDataAccessor<ZNRecord> baseAccessor =
+        new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+
+    baseAccessor.create(curStatePath, null, AccessOption.PERSISTENT);
+
+    ZkCacheBaseDataAccessor<ZNRecord> accessor =
+        new ZkCacheBaseDataAccessor<ZNRecord>(baseAccessor,
+                                           null,
+                                           Arrays.asList(curStatePath, extViewPath),
+                                           null);
+
+    Assert.assertEquals(accessor._wtCache._cache.size(), 1, "Should contain only:\n"
+        + curStatePath);
+    Assert.assertTrue(accessor._wtCache._cache.containsKey(curStatePath));
+
+    // create 10 current states
+    List<String> paths = new ArrayList<String>();
+    List<ZNRecord> records = new ArrayList<ZNRecord>();
+    for (int i = 0; i < 10; i++)
+    {
+      String path =
+          PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
+                                     clusterName,
+                                     "localhost_8901",
+                                     "session_1",
+                                     "TestDB" + i);
+      ZNRecord record = new ZNRecord("TestDB" + i);
+
+      paths.add(path);
+      records.add(record);
+    }
+
+    boolean[] success = accessor.createChildren(paths, records, AccessOption.PERSISTENT);
+    for (int i = 0; i < 10; i++)
+    {
+      Assert.assertTrue(success[i], "Should succeed in create: " + paths.get(i));
+    }
+    
+    
+    // create same 10 current states again, should fail on NodeExists
+    success = accessor.createChildren(paths, records, AccessOption.PERSISTENT);
+    // System.out.println(Arrays.toString(success));
+    for (int i = 0; i < 10; i++)
+    {
+      Assert.assertFalse(success[i], "Should fail on create: " + paths.get(i));
+    }
+    
+    System.out.println("END " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/manager/zk/TestWtCacheSyncOpSingleThread.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestWtCacheSyncOpSingleThread.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestWtCacheSyncOpSingleThread.java
new file mode 100644
index 0000000..a9d81e1
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestWtCacheSyncOpSingleThread.java
@@ -0,0 +1,196 @@
+package org.apache.helix.manager.zk;
+
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.helix.AccessOption;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZNRecordUpdater;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkCacheBaseDataAccessor;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestWtCacheSyncOpSingleThread extends ZkUnitTestBase
+{
+  // TODO: add TestZkCacheSyncOpSingleThread
+  // TODO: add TestZkCacheAsyncOpMultiThread
+  @Test
+  public void testHappyPathZkCacheBaseDataAccessor() throws Exception
+  {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    System.out.println("START " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+    // init zkCacheDataAccessor
+    String curStatePath =
+        PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
+                                   clusterName,
+                                   "localhost_8901");
+    String extViewPath =
+        PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, clusterName);
+
+    ZkBaseDataAccessor<ZNRecord> baseAccessor =
+        new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+
+    baseAccessor.create(curStatePath, null, AccessOption.PERSISTENT);
+
+    List<String> cachePaths = Arrays.asList(curStatePath, extViewPath);
+    ZkCacheBaseDataAccessor<ZNRecord> accessor =
+        new ZkCacheBaseDataAccessor<ZNRecord>(baseAccessor,
+                                           null,
+                                           cachePaths,
+                                           null);
+    
+    boolean ret = TestHelper.verifyZkCache(cachePaths, accessor._wtCache._cache, _gZkClient, true);
+    Assert.assertTrue(ret, "wtCache doesn't match data on Zk");
+
+
+    // create 10 current states
+    for (int i = 0; i < 10; i++)
+    {
+      String path = curStatePath + "/session_0/TestDB" + i;
+      boolean success =
+          accessor.create(path, new ZNRecord("TestDB" + i), AccessOption.PERSISTENT);
+      Assert.assertTrue(success, "Should succeed in create: " + path);
+    }
+
+    // verify wtCache
+    // TestHelper.printCache(accessor._wtCache);
+    ret = TestHelper.verifyZkCache(cachePaths, accessor._wtCache._cache, _gZkClient, false);
+    Assert.assertTrue(ret, "wtCache doesn't match data on Zk");
+
+    // update each current state 10 times, single thread
+    for (int i = 0; i < 10; i++)
+    {
+      String path = curStatePath + "/session_0/TestDB" + i;
+      for (int j = 0; j < 10; j++)
+      {
+        ZNRecord newRecord = new ZNRecord("TestDB" + i);
+        newRecord.setSimpleField("" + j, "" + j);
+        boolean success =
+            accessor.update(path, new ZNRecordUpdater(newRecord), AccessOption.PERSISTENT);
+        Assert.assertTrue(success, "Should succeed in update: " + path);
+
+      }
+    }
+
+    // verify cache
+//    TestHelper.printCache(accessor._wtCache._cache);
+    ret = TestHelper.verifyZkCache(cachePaths, accessor._wtCache._cache, _gZkClient, false);
+    Assert.assertTrue(ret, "wtCache doesn't match data on Zk");
+
+    // set 10 external views
+    for (int i = 0; i < 10; i++)
+    {
+      String path =
+          PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, clusterName, "TestDB" + i);
+      boolean success = accessor.set(path, new ZNRecord("TestDB" + i), AccessOption.PERSISTENT);
+      Assert.assertTrue(success, "Should succeed in set: " + path);
+    }
+
+    // verify wtCache
+    // accessor.printWtCache();
+    ret = TestHelper.verifyZkCache(cachePaths, accessor._wtCache._cache, _gZkClient, false);
+    Assert.assertTrue(ret, "wtCache doesn't match data on Zk");
+
+
+    // get 10 external views
+    for (int i = 0; i < 10; i++)
+    {
+      String path =
+          PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, clusterName, "TestDB" + i);
+      ZNRecord record = accessor.get(path, null, 0);
+      Assert.assertEquals(record.getId(), "TestDB" + i);
+    }
+
+    // getChildNames
+    List<String> childNames = accessor.getChildNames(extViewPath, 0);
+    // System.out.println(childNames);
+    Assert.assertEquals(childNames.size(), 10, "Should contain only: TestDB0-9");
+    for (int i = 0; i < 10; i++)
+    {
+      Assert.assertTrue(childNames.contains("TestDB" + i));
+    }
+    
+    // exists
+    for (int i = 0; i < 10; i++)
+    {
+      String path =
+          PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
+                                     clusterName,
+                                     "localhost_8901",
+                                     "session_0",
+                                     "TestDB" + i);
+
+      Assert.assertTrue(accessor.exists(path, 0));
+    }
+    
+    System.out.println("END " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+  }
+  
+  @Test
+  public void testCreateFailZkCacheBaseDataAccessor()
+  {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    System.out.println("START " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+    // init zkCacheDataAccessor
+    String curStatePath =
+        PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
+                                   clusterName,
+                                   "localhost_8901");
+    
+    ZkBaseDataAccessor<ZNRecord> baseAccessor =
+        new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+
+    ZkCacheBaseDataAccessor<ZNRecord> accessor =
+        new ZkCacheBaseDataAccessor<ZNRecord>(baseAccessor,
+                                           null,
+                                           Arrays.asList(curStatePath),
+                                           null);
+
+    // create 10 current states
+    for (int i = 0; i < 10; i++)
+    {
+      String path =
+          PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
+                                     clusterName,
+                                     "localhost_8901",
+                                     "session_1",
+                                     "TestDB" + i);
+      boolean success =
+          accessor.create(path, new ZNRecord("TestDB" + i), AccessOption.PERSISTENT);
+      Assert.assertTrue(success, "Should succeed in create: " + path);
+    }
+    
+    // create same 10 current states again, should fail
+    for (int i = 0; i < 10; i++)
+    {
+      String path =
+          PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
+                                     clusterName,
+                                     "localhost_8901",
+                                     "session_1",
+                                     "TestDB" + i);
+      boolean success =
+          accessor.create(path, new ZNRecord("TestDB" + i), AccessOption.PERSISTENT);
+      Assert.assertFalse(success, "Should fail in create due to NodeExists: " + path);
+    }
+
+    System.out.println("END " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKDataAccessor.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKDataAccessor.java
new file mode 100644
index 0000000..092403a
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKDataAccessor.java
@@ -0,0 +1,174 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.manager.zk;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.helix.DataAccessor;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.manager.zk.ZKDataAccessor;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.IdealStateModeProperty;
+import org.apache.zookeeper.data.Stat;
+import org.testng.Assert;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+
+public class TestZKDataAccessor extends ZkUnitTestBase
+{
+  private DataAccessor _accessor;
+  private String _clusterName;
+  private final String resource = "resource";
+	private ZkClient _zkClient;
+
+  @Test ()
+  public void testSet()
+  {
+    IdealState idealState = new IdealState(resource);
+    idealState.setNumPartitions(20);
+    idealState.setReplicas(Integer.toString(2));
+    idealState.setStateModelDefRef("StateModel1");
+    idealState.setIdealStateMode(IdealStateModeProperty.AUTO.toString());
+    boolean success = _accessor.setProperty(PropertyType.IDEALSTATES, idealState, resource);
+    AssertJUnit.assertTrue(success);
+    String path = PropertyPathConfig.getPath(PropertyType.IDEALSTATES, _clusterName, resource);
+    AssertJUnit.assertTrue(_zkClient.exists(path));
+    AssertJUnit.assertEquals(idealState.getRecord(), _zkClient.readData(path));
+
+    idealState.setNumPartitions(20);
+    success = _accessor.setProperty(PropertyType.IDEALSTATES, idealState, resource);
+    AssertJUnit.assertTrue(success);
+    AssertJUnit.assertTrue(_zkClient.exists(path));
+    AssertJUnit.assertEquals(idealState.getRecord(), _zkClient.readData(path));
+  }
+
+  @Test ()
+  public void testGet()
+  {
+    String path = PropertyPathConfig.getPath(PropertyType.IDEALSTATES, _clusterName, resource);
+    IdealState idealState = new IdealState(resource);
+    idealState.setIdealStateMode(IdealStateModeProperty.AUTO.toString());
+
+    _zkClient.delete(path);
+    _zkClient.createPersistent(new File(path).getParent(), true);
+    _zkClient.createPersistent(path, idealState.getRecord());
+    IdealState idealStateRead = _accessor.getProperty(IdealState.class, PropertyType.IDEALSTATES, resource);
+    AssertJUnit.assertEquals(idealState.getRecord(), idealStateRead.getRecord());
+  }
+
+  @Test ()
+  public void testRemove()
+  {
+    String path = PropertyPathConfig.getPath(PropertyType.IDEALSTATES, _clusterName, resource);
+    IdealState idealState = new IdealState(resource);
+    idealState.setIdealStateMode(IdealStateModeProperty.AUTO.toString());
+
+    _zkClient.delete(path);
+    _zkClient.createPersistent(new File(path).getParent(), true);
+    _zkClient.createPersistent(path, idealState.getRecord());
+    boolean success = _accessor.removeProperty(PropertyType.IDEALSTATES, resource);
+    AssertJUnit.assertTrue(success);
+    AssertJUnit.assertFalse(_zkClient.exists(path));
+    IdealState idealStateRead = _accessor.getProperty(IdealState.class, PropertyType.IDEALSTATES, resource);
+    AssertJUnit.assertNull(idealStateRead);
+
+  }
+
+  @Test ()
+  public void testUpdate()
+  {
+    String path = PropertyPathConfig.getPath(PropertyType.IDEALSTATES, _clusterName, resource);
+    IdealState idealState = new IdealState(resource);
+    idealState.setIdealStateMode(IdealStateModeProperty.AUTO.toString());
+
+    _zkClient.delete(path);
+    _zkClient.createPersistent(new File(path).getParent(), true);
+    _zkClient.createPersistent(path, idealState.getRecord());
+    Stat stat = _zkClient.getStat(path);
+
+    idealState.setIdealStateMode(IdealStateModeProperty.CUSTOMIZED.toString());
+
+    boolean success = _accessor.updateProperty(PropertyType.IDEALSTATES, idealState, resource);
+    AssertJUnit.assertTrue(success);
+    AssertJUnit.assertTrue(_zkClient.exists(path));
+    ZNRecord value = _zkClient.readData(path);
+    AssertJUnit.assertEquals(idealState.getRecord(), value);
+    Stat newstat = _zkClient.getStat(path);
+
+    AssertJUnit.assertEquals(stat.getCtime(), newstat.getCtime());
+    AssertJUnit.assertNotSame(stat.getMtime(), newstat.getMtime());
+    AssertJUnit.assertTrue(stat.getMtime() < newstat.getMtime());
+  }
+
+  @Test ()
+  public void testGetChildValues()
+  {
+    List<ExternalView> list = _accessor.getChildValues(ExternalView.class, PropertyType.EXTERNALVIEW, _clusterName);
+    AssertJUnit.assertEquals(0, list.size());
+  }
+
+  @Test
+  public void testBackToBackRemoveAndSet()
+  {
+    // CONFIG is cached
+    _accessor.setProperty(PropertyType.CONFIGS, new ZNRecord("id1"), "config1");
+    ZNRecord record = _accessor.getProperty(PropertyType.CONFIGS, "config1");
+    // System.out.println(record.getId());
+    Assert.assertEquals(record.getId(), "id1");
+    String path = PropertyPathConfig.getPath(PropertyType.CONFIGS, _clusterName, "config1");
+    _zkClient.delete(path);
+    _zkClient.createPersistent(path, new ZNRecord("id1-new"));
+    record = _accessor.getProperty(PropertyType.CONFIGS, "config1");
+    // System.out.println(record.getId());
+    Assert.assertEquals(record.getId(), "id1-new", "Should update cache since creation time is changed.");
+  }
+
+  @BeforeClass
+  public void beforeClass() throws IOException, Exception
+  {
+    _clusterName = CLUSTER_PREFIX + "_" + getShortClassName();
+
+		System.out.println("START TestZKDataAccessor at " + new Date(System.currentTimeMillis()));
+		_zkClient = new ZkClient(ZK_ADDR);
+		_zkClient.setZkSerializer(new ZNRecordSerializer());
+
+    if (_zkClient.exists("/" + _clusterName))
+    {
+      _zkClient.deleteRecursive("/" + _clusterName);
+    }
+    _accessor = new ZKDataAccessor(_clusterName, _zkClient);
+  }
+
+  @AfterClass
+  public void afterClass()
+  {
+		_zkClient.close();
+		System.out.println("END TestZKDataAccessor at " + new Date(System.currentTimeMillis()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKDataAccessorCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKDataAccessorCache.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKDataAccessorCache.java
new file mode 100644
index 0000000..a373a1f
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKDataAccessorCache.java
@@ -0,0 +1,152 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.manager.zk;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.ConfigScope.ConfigScopeProperty;
+import org.apache.helix.manager.zk.ZKDataAccessor;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestZKDataAccessorCache extends ZkUnitTestBase
+{
+  private static Logger LOG = Logger.getLogger(TestZKDataAccessorCache.class);
+  private ZKDataAccessor _accessor;
+  private String _clusterName;
+  private ZkClient _zkClient;
+
+  @BeforeClass
+  public void beforeClass() throws IOException, Exception
+  {
+    _clusterName = CLUSTER_PREFIX + "_" + getShortClassName();
+
+    System.out.println("START TestZKCacheDataAccessor at " + new Date(System.currentTimeMillis()));
+    _zkClient = new ZkClient(ZK_ADDR);
+    _zkClient.setZkSerializer(new ZNRecordSerializer());
+
+    if (_zkClient.exists("/" + _clusterName))
+    {
+      _zkClient.deleteRecursive("/" + _clusterName);
+    }
+    _zkClient.createPersistent(
+        PropertyPathConfig.getPath(PropertyType.CONFIGS, _clusterName,
+            ConfigScopeProperty.CLUSTER.toString(), _clusterName), true);
+    _zkClient.createPersistent(
+        PropertyPathConfig.getPath(PropertyType.CONFIGS, _clusterName,
+        ConfigScopeProperty.PARTICIPANT.toString()), true);
+    _zkClient.createPersistent(
+        PropertyPathConfig.getPath(PropertyType.CONFIGS, _clusterName,
+            ConfigScopeProperty.RESOURCE.toString()), true);
+    _zkClient.createPersistent(PropertyPathConfig.getPath(PropertyType.IDEALSTATES, _clusterName),
+        true);
+    _zkClient.createPersistent(PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, _clusterName),
+        true);
+    _zkClient.createPersistent(
+        PropertyPathConfig.getPath(PropertyType.LIVEINSTANCES, _clusterName), true);
+    _zkClient.createPersistent(
+        PropertyPathConfig.getPath(PropertyType.STATEMODELDEFS, _clusterName), true);
+    _zkClient.createPersistent(PropertyPathConfig.getPath(PropertyType.CURRENTSTATES, _clusterName,
+        "localhost_12918", "123456"), true);
+
+    _accessor = new ZKDataAccessor(_clusterName, _zkClient);
+  }
+
+  @AfterClass
+  public void afterClass()
+  {
+    _zkClient.close();
+    System.out.println("END TestZKCacheDataAccessor at " + new Date(System.currentTimeMillis()));
+  }
+
+  @Test
+  public void testAccessorCache()
+  {
+    testAccessorCache(PropertyType.IDEALSTATES);
+    testAccessorCache(PropertyType.STATEMODELDEFS);
+    testAccessorCache(PropertyType.LIVEINSTANCES);
+    testAccessorCache(PropertyType.CONFIGS, ConfigScopeProperty.PARTICIPANT.toString());
+    testAccessorCache(PropertyType.EXTERNALVIEW);
+    testAccessorCache(PropertyType.CURRENTSTATES, "localhost_12918", "123456");
+  }
+
+  private void testAccessorCache(PropertyType type, String... keys)
+  {
+    String parentPath = PropertyPathConfig.getPath(type, _clusterName, keys);
+    _zkClient.createPersistent(parentPath + "/child1", new ZNRecord("child1"));
+    ZNRecord record2 = new ZNRecord("child2");
+    _zkClient.createPersistent(parentPath + "/child2", record2);
+
+    List<ZNRecord> records = _accessor.getChildValues(type, keys);
+    LOG.debug("records:" + records);
+    Assert.assertNotNull(getRecord(records, "child1"));
+    Assert.assertNotNull(getRecord(records, "child2"));
+
+    // no data change
+    List<ZNRecord> newRecords = _accessor.getChildValues(type, keys);
+    LOG.debug("new records:" + newRecords);
+    Assert.assertEquals(getRecord(newRecords, "child1"), getRecord(records, "child1"));
+
+    // change value of an existing znode
+    record2.setSimpleField("key1", "value1");
+    _zkClient.writeData(parentPath + "/child2", record2);
+    newRecords = _accessor.getChildValues(type, keys);
+    LOG.debug("new records:" + newRecords);
+    Assert.assertEquals(getRecord(newRecords, "child2").getSimpleField("key1"), "value1");
+    Assert.assertNotSame(getRecord(newRecords, "child2"), getRecord(records, "child2"));
+
+    // add a new child
+    _zkClient.createPersistent(parentPath + "/child3", new ZNRecord("child3"));
+    records = newRecords;
+    newRecords = _accessor.getChildValues(type, keys);
+    LOG.debug("new records:" + newRecords);
+    Assert.assertNull(getRecord(records, "child3"));
+    Assert.assertNotNull(getRecord(newRecords, "child3"));
+
+    // delete a child
+    _zkClient.delete(parentPath + "/child2");
+    records = newRecords;
+    newRecords = _accessor.getChildValues(type, keys);
+    LOG.debug("new records:" + newRecords);
+    Assert.assertNotNull(getRecord(records, "child2"));
+    Assert.assertNull(getRecord(newRecords, "child2"),
+        "Should be null, since child2 has been deleted");
+  }
+
+  private ZNRecord getRecord(List<ZNRecord> list, String id)
+  {
+    for (ZNRecord record : list)
+    {
+      if (record.getId().equals(id))
+      {
+        return record;
+      }
+    }
+    return null;
+  }
+}