You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:59 UTC
[36/42] Refactoring the package names and removing jsql parser
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/manager/file/MockFileHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/file/MockFileHelixManager.java b/helix-core/src/test/java/org/apache/helix/manager/file/MockFileHelixManager.java
new file mode 100644
index 0000000..3fc16be
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/file/MockFileHelixManager.java
@@ -0,0 +1,275 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.manager.file;
+
+import org.apache.helix.ClusterMessagingService;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.ConfigChangeListener;
+import org.apache.helix.ControllerChangeListener;
+import org.apache.helix.CurrentStateChangeListener;
+import org.apache.helix.DataAccessor;
+import org.apache.helix.ExternalViewChangeListener;
+import org.apache.helix.HealthStateChangeListener;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.IdealStateChangeListener;
+import org.apache.helix.InstanceType;
+import org.apache.helix.LiveInstanceChangeListener;
+import org.apache.helix.MessageListener;
+import org.apache.helix.PreConnectCallback;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
+import org.apache.helix.manager.file.FileHelixDataAccessor;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.store.PropertyStore;
+import org.apache.helix.store.file.FilePropertyStore;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+
+public class MockFileHelixManager implements HelixManager
+{
+// private final FileDataAccessor _accessor;
+ private final HelixDataAccessor _accessor;
+ private final String _instanceName;
+ private final String _clusterName;
+ private final InstanceType _type;
+
+ public MockFileHelixManager(String clusterName, String instanceName, InstanceType type,
+ FilePropertyStore<ZNRecord> store)
+ {
+ _instanceName = instanceName;
+ _clusterName = clusterName;
+ _type = type;
+// _accessor = new FileDataAccessor(store, clusterName);
+ _accessor = new FileHelixDataAccessor(store, clusterName);
+ }
+
+ @Override
+ public void connect() throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public boolean isConnected()
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public void disconnect()
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void addIdealStateChangeListener(IdealStateChangeListener listener) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void addLiveInstanceChangeListener(LiveInstanceChangeListener listener) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void addConfigChangeListener(ConfigChangeListener listener) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void addMessageListener(MessageListener listener, String instanceName) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void addCurrentStateChangeListener(CurrentStateChangeListener listener,
+ String instanceName,
+ String sessionId) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void addExternalViewChangeListener(ExternalViewChangeListener listener) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public boolean removeListener(Object listener)
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public DataAccessor getDataAccessor()
+ {
+ return null;
+ }
+
+ @Override
+ public String getClusterName()
+ {
+ return _clusterName;
+ }
+
+ @Override
+ public String getInstanceName()
+ {
+ return _instanceName;
+ }
+
+ @Override
+ public String getSessionId()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public long getLastNotificationTime()
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public void addControllerListener(ControllerChangeListener listener)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public HelixAdmin getClusterManagmentTool()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public PropertyStore<ZNRecord> getPropertyStore()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public ClusterMessagingService getMessagingService()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public ParticipantHealthReportCollector getHealthReportCollector()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public InstanceType getInstanceType()
+ {
+ return _type;
+ }
+
+
+ @Override
+ public void addHealthStateChangeListener(HealthStateChangeListener listener,
+ String instanceName) throws Exception {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public String getVersion()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public StateMachineEngine getStateMachineEngine()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public boolean isLeader()
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public ConfigAccessor getConfigAccessor()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void startTimerTasks()
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void stopTimerTasks()
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public HelixDataAccessor getHelixDataAccessor()
+ {
+ return _accessor;
+ }
+
+ @Override
+ public void addPreConnectCallback(PreConnectCallback callback)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public ZkHelixPropertyStore<ZNRecord> getHelixPropertyStore()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/manager/file/TestFileCallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/file/TestFileCallbackHandler.java b/helix-core/src/test/java/org/apache/helix/manager/file/TestFileCallbackHandler.java
new file mode 100644
index 0000000..77fe5a9
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/file/TestFileCallbackHandler.java
@@ -0,0 +1,104 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.manager.file;
+
+import org.testng.annotations.Test;
+import org.testng.AssertJUnit;
+import org.apache.helix.InstanceType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.HelixConstants.ChangeType;
+import org.apache.helix.manager.MockListener;
+import org.apache.helix.manager.file.FileCallbackHandler;
+import org.apache.helix.store.PropertyJsonComparator;
+import org.apache.helix.store.PropertyJsonSerializer;
+import org.apache.helix.store.PropertyStoreException;
+import org.apache.helix.store.file.FilePropertyStore;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestFileCallbackHandler
+{
+ @Test(groups = { "unitTest" })
+ public void testFileCallbackHandler()
+ {
+ final String clusterName = "TestFileCallbackHandler";
+ final String rootNamespace = "/tmp/" + clusterName;
+ final String instanceName = "controller_0";
+ MockListener listener = new MockListener();
+
+ PropertyJsonSerializer<ZNRecord> serializer =
+ new PropertyJsonSerializer<ZNRecord>(ZNRecord.class);
+ PropertyJsonComparator<ZNRecord> comparator =
+ new PropertyJsonComparator<ZNRecord>(ZNRecord.class);
+ FilePropertyStore<ZNRecord> store =
+ new FilePropertyStore<ZNRecord>(serializer, rootNamespace, comparator);
+ try
+ {
+ store.removeRootNamespace();
+ }
+ catch (PropertyStoreException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ listener.reset();
+ MockFileHelixManager manager =
+ new MockFileHelixManager(clusterName, instanceName, InstanceType.CONTROLLER, store);
+ FileCallbackHandler handler =
+ new FileCallbackHandler(manager,
+ rootNamespace,
+ listener,
+ new EventType[] { EventType.NodeChildrenChanged,
+ EventType.NodeDeleted, EventType.NodeCreated },
+ ChangeType.CONFIG);
+ AssertJUnit.assertEquals(listener, handler.getListener());
+ AssertJUnit.assertEquals(rootNamespace, handler.getPath());
+ AssertJUnit.assertTrue(listener.isConfigChangeListenerInvoked);
+
+ handler =
+ new FileCallbackHandler(manager,
+ rootNamespace,
+ listener,
+ new EventType[] { EventType.NodeChildrenChanged,
+ EventType.NodeDeleted, EventType.NodeCreated },
+ ChangeType.EXTERNAL_VIEW);
+ AssertJUnit.assertTrue(listener.isExternalViewChangeListenerInvoked);
+
+ EventType[] eventTypes = new EventType[] { EventType.NodeChildrenChanged,
+ EventType.NodeDeleted, EventType.NodeCreated };
+ handler =
+ new FileCallbackHandler(manager,
+ rootNamespace,
+ listener,
+ eventTypes,
+ ChangeType.CONTROLLER);
+ AssertJUnit.assertEquals(handler.getEventTypes(), eventTypes);
+ AssertJUnit.assertTrue(listener.isControllerChangeListenerInvoked);
+
+ listener.reset();
+ handler.reset();
+ AssertJUnit.assertTrue(listener.isControllerChangeListenerInvoked);
+
+ listener.reset();
+ handler.onPropertyChange(rootNamespace);
+ AssertJUnit.assertTrue(listener.isControllerChangeListenerInvoked);
+
+ store.stop();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/manager/file/TestFileDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/file/TestFileDataAccessor.java b/helix-core/src/test/java/org/apache/helix/manager/file/TestFileDataAccessor.java
new file mode 100644
index 0000000..ba1c6cc
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/file/TestFileDataAccessor.java
@@ -0,0 +1,94 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.manager.file;
+
+import org.apache.helix.DataAccessor;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ConfigScope.ConfigScopeProperty;
+import org.apache.helix.manager.file.FileDataAccessor;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.store.PropertyJsonComparator;
+import org.apache.helix.store.PropertyJsonSerializer;
+import org.apache.helix.store.PropertyStoreException;
+import org.apache.helix.store.file.FilePropertyStore;
+import org.testng.AssertJUnit;
+import org.testng.annotations.Test;
+
+
+public class TestFileDataAccessor
+{
+ @Test()
+ public void testFileDataAccessor()
+ {
+ final String clusterName = "TestFileDataAccessor";
+ String rootNamespace = "/tmp/" + clusterName;
+ PropertyJsonSerializer<ZNRecord> serializer = new PropertyJsonSerializer<ZNRecord>(
+ ZNRecord.class);
+ PropertyJsonComparator<ZNRecord> comparator = new PropertyJsonComparator<ZNRecord>(
+ ZNRecord.class);
+ FilePropertyStore<ZNRecord> store = new FilePropertyStore<ZNRecord>(serializer, rootNamespace,
+ comparator);
+ try
+ {
+ store.removeRootNamespace();
+ } catch (PropertyStoreException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ DataAccessor accessor = new FileDataAccessor(store, clusterName);
+
+ InstanceConfig config = new InstanceConfig("id0");
+ accessor.setProperty(PropertyType.CONFIGS, config, ConfigScopeProperty.PARTICIPANT.toString(),
+ "key0");
+ config = accessor.getProperty(InstanceConfig.class, PropertyType.CONFIGS,
+ ConfigScopeProperty.PARTICIPANT.toString(), "key0");
+ AssertJUnit.assertEquals("id0", config.getId());
+
+ InstanceConfig newConfig = new InstanceConfig("id1");
+ accessor.updateProperty(PropertyType.CONFIGS, newConfig,
+ ConfigScopeProperty.PARTICIPANT.toString(),
+ "key0");
+ config = accessor.getProperty(InstanceConfig.class, PropertyType.CONFIGS,
+ ConfigScopeProperty.PARTICIPANT.toString(), "key0");
+ AssertJUnit.assertEquals("id1", config.getId());
+
+ accessor.removeProperty(PropertyType.CONFIGS, ConfigScopeProperty.PARTICIPANT.toString(),
+ "key0");
+ config = accessor.getProperty(InstanceConfig.class, PropertyType.CONFIGS,
+ ConfigScopeProperty.PARTICIPANT.toString(), "key0");
+ AssertJUnit.assertNull(config);
+
+ LiveInstance leader = new LiveInstance("id2");
+ accessor.updateProperty(PropertyType.LEADER, leader);
+ LiveInstance nullLeader = accessor.getProperty(LiveInstance.class, PropertyType.LEADER);
+ AssertJUnit.assertNull(nullLeader);
+
+ accessor.setProperty(PropertyType.LEADER, leader);
+ LiveInstance newLeader = new LiveInstance("id3");
+ accessor.updateProperty(PropertyType.LEADER, newLeader);
+ leader = accessor.getProperty(LiveInstance.class, PropertyType.LEADER);
+ AssertJUnit.assertEquals("id3", leader.getId());
+
+ // List<ZNRecord> childs = accessor.getChildValues(PropertyType.HISTORY);
+ // AssertJUnit.assertEquals(childs.size(), 0);
+
+ store.stop();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/manager/file/TestStaticFileCM.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/file/TestStaticFileCM.java b/helix-core/src/test/java/org/apache/helix/manager/file/TestStaticFileCM.java
new file mode 100644
index 0000000..388e753
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/file/TestStaticFileCM.java
@@ -0,0 +1,148 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.manager.file;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.helix.ClusterView;
+import org.apache.helix.InstanceType;
+import org.apache.helix.manager.MockListener;
+import org.apache.helix.manager.file.StaticFileHelixManager;
+import org.apache.helix.manager.file.StaticFileHelixManager.DBParam;
+import org.apache.helix.tools.ClusterViewSerializer;
+import org.testng.AssertJUnit;
+import org.testng.annotations.Test;
+
+
+public class TestStaticFileCM
+{
+ @Test()
+ public void testStaticFileCM()
+ {
+ final String clusterName = "TestSTaticFileCM";
+ final String controllerName = "controller_0";
+
+ ClusterView view;
+ String[] illegalNodesInfo = {"localhost_8900", "localhost_8901"};
+ List<DBParam> dbParams = new ArrayList<DBParam>();
+ dbParams.add(new DBParam("TestDB0", 10));
+ dbParams.add(new DBParam("TestDB1", 10));
+
+ boolean exceptionCaught = false;
+ try
+ {
+ view = StaticFileHelixManager.generateStaticConfigClusterView(illegalNodesInfo, dbParams, 3);
+ } catch (IllegalArgumentException e)
+ {
+ exceptionCaught = true;
+ }
+ AssertJUnit.assertTrue(exceptionCaught);
+ String[] nodesInfo = {"localhost:8900", "localhost:8901", "localhost:8902"};
+ view = StaticFileHelixManager.generateStaticConfigClusterView(nodesInfo, dbParams, 2);
+
+ String configFile = "/tmp/" + clusterName;
+ ClusterViewSerializer.serialize(view, new File(configFile));
+ ClusterView restoredView = ClusterViewSerializer.deserialize(new File(configFile));
+ // System.out.println(restoredView);
+ // byte[] bytes = ClusterViewSerializer.serialize(restoredView);
+ // System.out.println(new String(bytes));
+
+ StaticFileHelixManager.verifyFileBasedClusterStates("localhost_8900",
+ configFile, configFile);
+
+ StaticFileHelixManager controller = new StaticFileHelixManager(clusterName, controllerName,
+ InstanceType.CONTROLLER, configFile);
+ controller.disconnect();
+ AssertJUnit.assertFalse(controller.isConnected());
+ controller.connect();
+ AssertJUnit.assertTrue(controller.isConnected());
+
+ String sessionId = controller.getSessionId();
+// AssertJUnit.assertEquals(DynamicFileClusterManager._sessionId, sessionId);
+ AssertJUnit.assertEquals(clusterName, controller.getClusterName());
+ AssertJUnit.assertEquals(0, controller.getLastNotificationTime());
+ AssertJUnit.assertEquals(InstanceType.CONTROLLER, controller.getInstanceType());
+ AssertJUnit.assertNull(controller.getPropertyStore());
+ AssertJUnit.assertNull(controller.getHealthReportCollector());
+ AssertJUnit.assertEquals(controllerName, controller.getInstanceName());
+ AssertJUnit.assertNull(controller.getClusterManagmentTool());
+ AssertJUnit.assertNull(controller.getMessagingService());
+
+ MockListener controllerListener = new MockListener();
+ AssertJUnit.assertFalse(controller.removeListener(controllerListener));
+ controllerListener.reset();
+
+ controller.addIdealStateChangeListener(controllerListener);
+ AssertJUnit.assertTrue(controllerListener.isIdealStateChangeListenerInvoked);
+
+ controller.addMessageListener(controllerListener, "localhost_8900");
+ AssertJUnit.assertTrue(controllerListener.isMessageListenerInvoked);
+
+ exceptionCaught = false;
+ try
+ {
+ controller.addLiveInstanceChangeListener(controllerListener);
+ } catch (UnsupportedOperationException e)
+ {
+ exceptionCaught = true;
+ }
+ AssertJUnit.assertTrue(exceptionCaught);
+
+ exceptionCaught = false;
+ try
+ {
+ controller.addCurrentStateChangeListener(controllerListener, "localhost_8900", sessionId);
+ } catch (UnsupportedOperationException e)
+ {
+ exceptionCaught = true;
+ }
+ AssertJUnit.assertTrue(exceptionCaught);
+
+ exceptionCaught = false;
+ try
+ {
+ controller.addConfigChangeListener(controllerListener);
+ } catch (UnsupportedOperationException e)
+ {
+ exceptionCaught = true;
+ }
+ AssertJUnit.assertTrue(exceptionCaught);
+
+ exceptionCaught = false;
+ try
+ {
+ controller.addExternalViewChangeListener(controllerListener);
+ } catch (UnsupportedOperationException e)
+ {
+ exceptionCaught = true;
+ }
+ AssertJUnit.assertTrue(exceptionCaught);
+
+ exceptionCaught = false;
+ try
+ {
+ controller.addControllerListener(controllerListener);
+ } catch (UnsupportedOperationException e)
+ {
+ exceptionCaught = true;
+ }
+ AssertJUnit.assertTrue(exceptionCaught);
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/manager/file/UnitTestDynFileClusterMgr.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/file/UnitTestDynFileClusterMgr.java b/helix-core/src/test/java/org/apache/helix/manager/file/UnitTestDynFileClusterMgr.java
new file mode 100644
index 0000000..8db8802
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/file/UnitTestDynFileClusterMgr.java
@@ -0,0 +1,282 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.manager.file;
+
+import java.util.List;
+
+import org.apache.helix.ClusterMessagingService;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixException;
+import org.apache.helix.InstanceType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.MockListener;
+import org.apache.helix.manager.file.DynamicFileHelixManager;
+import org.apache.helix.manager.file.FileHelixAdmin;
+import org.apache.helix.mock.storage.DummyProcess;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.IdealState.IdealStateModeProperty;
+import org.apache.helix.store.PropertyJsonComparator;
+import org.apache.helix.store.PropertyJsonSerializer;
+import org.apache.helix.store.PropertyStore;
+import org.apache.helix.store.PropertyStoreException;
+import org.apache.helix.store.file.FilePropertyStore;
+import org.testng.Assert;
+import org.testng.AssertJUnit;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class UnitTestDynFileClusterMgr
+{
+ final String className = "UnitTestDynFileClusterMgr";
+ final String _rootNamespace = "/tmp/" + className;
+ final String instanceName = "localhost_12918";
+
+ FilePropertyStore<ZNRecord> _store;
+ HelixAdmin _mgmtTool;
+
+ @BeforeClass
+ public void beforeClass()
+ {
+ _store = new FilePropertyStore<ZNRecord>(new PropertyJsonSerializer<ZNRecord>(ZNRecord.class),
+ _rootNamespace, new PropertyJsonComparator<ZNRecord>(ZNRecord.class));
+ try
+ {
+ _store.removeRootNamespace();
+ } catch (PropertyStoreException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ _mgmtTool = new FileHelixAdmin(_store);
+ }
+
+ @Test()
+ public void testBasic() throws PropertyStoreException
+ {
+ final String clusterName = className + "_basic";
+ String controllerName = "controller_0";
+ DynamicFileHelixManager controller = new DynamicFileHelixManager(clusterName,
+ controllerName, InstanceType.CONTROLLER, _store);
+
+ _mgmtTool.addCluster(clusterName, true);
+ _mgmtTool.addInstance(clusterName, new InstanceConfig(instanceName));
+
+ DynamicFileHelixManager participant = new DynamicFileHelixManager(clusterName,
+ instanceName, InstanceType.PARTICIPANT, _store);
+
+ AssertJUnit.assertEquals(instanceName, participant.getInstanceName());
+
+ controller.disconnect();
+ AssertJUnit.assertFalse(controller.isConnected());
+ controller.connect();
+ AssertJUnit.assertTrue(controller.isConnected());
+
+ String sessionId = controller.getSessionId();
+ // AssertJUnit.assertEquals(DynamicFileClusterManager._sessionId,
+ // sessionId);
+ AssertJUnit.assertEquals(clusterName, controller.getClusterName());
+ AssertJUnit.assertEquals(0, controller.getLastNotificationTime());
+ AssertJUnit.assertEquals(InstanceType.CONTROLLER, controller.getInstanceType());
+
+ // AssertJUnit.assertNull(controller.getPropertyStore());
+ PropertyStore<ZNRecord> propertyStore = controller.getPropertyStore();
+ AssertJUnit.assertNotNull(propertyStore);
+ propertyStore.setProperty("testKey", new ZNRecord("testValue"));
+ ZNRecord record = propertyStore.getProperty("testKey");
+ Assert.assertEquals(record.getId(), "testValue");
+
+ AssertJUnit.assertNull(controller.getHealthReportCollector());
+
+ MockListener controllerListener = new MockListener();
+ controllerListener.reset();
+
+ controller.addIdealStateChangeListener(controllerListener);
+ AssertJUnit.assertTrue(controllerListener.isIdealStateChangeListenerInvoked);
+
+ controller.addLiveInstanceChangeListener(controllerListener);
+ AssertJUnit.assertTrue(controllerListener.isLiveInstanceChangeListenerInvoked);
+
+ controller.addCurrentStateChangeListener(controllerListener, controllerName, sessionId);
+ AssertJUnit.assertTrue(controllerListener.isCurrentStateChangeListenerInvoked);
+
+ boolean exceptionCaught = false;
+ try
+ {
+ controller.addConfigChangeListener(controllerListener);
+ } catch (UnsupportedOperationException e)
+ {
+ exceptionCaught = true;
+ }
+ AssertJUnit.assertTrue(exceptionCaught);
+
+ exceptionCaught = false;
+ try
+ {
+ controller.addExternalViewChangeListener(controllerListener);
+ } catch (UnsupportedOperationException e)
+ {
+ exceptionCaught = true;
+ }
+ AssertJUnit.assertTrue(exceptionCaught);
+
+ exceptionCaught = false;
+ try
+ {
+ controller.addControllerListener(controllerListener);
+ } catch (UnsupportedOperationException e)
+ {
+ exceptionCaught = true;
+ }
+ AssertJUnit.assertTrue(exceptionCaught);
+
+ AssertJUnit.assertFalse(controller.removeListener(controllerListener));
+
+ exceptionCaught = false;
+ try
+ {
+ controller.addIdealStateChangeListener(null);
+ } catch (HelixException e)
+ {
+ exceptionCaught = true;
+ }
+ AssertJUnit.assertTrue(exceptionCaught);
+
+ // test message service
+ ClusterMessagingService msgService = controller.getMessagingService();
+
+ // test file management tool
+ HelixAdmin tool = controller.getClusterManagmentTool();
+
+ exceptionCaught = false;
+ try
+ {
+ tool.getClusters();
+ } catch (UnsupportedOperationException e)
+ {
+ exceptionCaught = true;
+ }
+ AssertJUnit.assertTrue(exceptionCaught);
+
+ exceptionCaught = false;
+ try
+ {
+ tool.getResourcesInCluster(clusterName);
+ } catch (UnsupportedOperationException e)
+ {
+ exceptionCaught = true;
+ }
+ AssertJUnit.assertTrue(exceptionCaught);
+
+ exceptionCaught = false;
+ try
+ {
+ tool.addResource(clusterName, "resource", 10, "MasterSlave",
+ IdealStateModeProperty.AUTO.toString());
+ } catch (UnsupportedOperationException e)
+ {
+ exceptionCaught = true;
+ }
+ AssertJUnit.assertTrue(exceptionCaught);
+
+ exceptionCaught = false;
+ try
+ {
+ tool.getStateModelDefs(clusterName);
+ } catch (UnsupportedOperationException e)
+ {
+ exceptionCaught = true;
+ }
+ AssertJUnit.assertTrue(exceptionCaught);
+
+ exceptionCaught = false;
+ try
+ {
+ tool.getInstanceConfig(clusterName, instanceName);
+ } catch (UnsupportedOperationException e)
+ {
+ exceptionCaught = true;
+ }
+ AssertJUnit.assertTrue(exceptionCaught);
+
+ exceptionCaught = false;
+ try
+ {
+ tool.getStateModelDef(clusterName, "MasterSlave");
+ } catch (UnsupportedOperationException e)
+ {
+ exceptionCaught = true;
+ }
+ AssertJUnit.assertTrue(exceptionCaught);
+
+ exceptionCaught = false;
+ try
+ {
+ tool.getResourceExternalView(clusterName, "resource");
+ } catch (UnsupportedOperationException e)
+ {
+ exceptionCaught = true;
+ }
+ AssertJUnit.assertTrue(exceptionCaught);
+
+ exceptionCaught = false;
+ try
+ {
+ tool.enableInstance(clusterName, "resource", false);
+ } catch (UnsupportedOperationException e)
+ {
+ exceptionCaught = true;
+ }
+ AssertJUnit.assertTrue(exceptionCaught);
+
+ tool.addCluster(clusterName, true);
+ tool.addResource(clusterName, "resource", 10, "MasterSlave");
+ InstanceConfig config = new InstanceConfig("nodeConfig");
+ tool.addInstance(clusterName, config);
+ List<String> instances = tool.getInstancesInCluster(clusterName);
+ AssertJUnit.assertEquals(1, instances.size());
+ tool.dropInstance(clusterName, config);
+
+ IdealState idealState = new IdealState("idealState");
+ tool.setResourceIdealState(clusterName, "resource", idealState);
+ idealState = tool.getResourceIdealState(clusterName, "resource");
+ AssertJUnit.assertEquals(idealState.getId(), "idealState");
+
+ tool.dropResource(clusterName, "resource");
+ _store.stop();
+ }
+
+ @Test
+ public void testStartInstanceBeforeAdd()
+ {
+ final String clusterName = className + "_startInsB4Add";
+ _mgmtTool.addCluster(clusterName, true);
+
+ try
+ {
+ new DummyProcess(null, clusterName, instanceName, "dynamic-file", null, 0, _store).start();
+ Assert.fail("Should fail since instance is not configured");
+ } catch (HelixException e)
+ {
+ // OK
+ } catch (Exception e)
+ {
+ Assert.fail("Should not fail on exceptions other than ClusterManagerException");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/manager/zk/TestDefaultControllerMsgHandlerFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestDefaultControllerMsgHandlerFactory.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestDefaultControllerMsgHandlerFactory.java
new file mode 100644
index 0000000..f3e6798
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestDefaultControllerMsgHandlerFactory.java
@@ -0,0 +1,102 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.manager.zk;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.manager.zk.DefaultControllerMessageHandlerFactory;
+import org.apache.helix.manager.zk.DefaultControllerMessageHandlerFactory.DefaultControllerMessageHandler;
+import org.apache.helix.messaging.handling.MessageHandler;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageType;
+import org.testng.AssertJUnit;
+import org.testng.annotations.Test;
+
+
+public class TestDefaultControllerMsgHandlerFactory
+{
+ @Test()
+ public void testDefaultControllerMsgHandlerFactory()
+ {
+ System.out.println("START TestDefaultControllerMsgHandlerFactory at " + new Date(System.currentTimeMillis()));
+
+ DefaultControllerMessageHandlerFactory facotry = new DefaultControllerMessageHandlerFactory();
+
+ Message message = new Message(MessageType.NO_OP, "0");
+ NotificationContext context = new NotificationContext(null);
+
+ boolean exceptionCaught = false;
+ try
+ {
+ MessageHandler handler = facotry.createHandler(message, context);
+ } catch (HelixException e)
+ {
+ exceptionCaught = true;
+ }
+ AssertJUnit.assertTrue(exceptionCaught);
+
+ message = new Message(MessageType.CONTROLLER_MSG, "1");
+ exceptionCaught = false;
+ try
+ {
+ MessageHandler handler = facotry.createHandler(message, context);
+ } catch (HelixException e)
+ {
+ exceptionCaught = true;
+ }
+ AssertJUnit.assertFalse(exceptionCaught);
+
+ Map<String, String> resultMap = new HashMap<String, String>();
+ message = new Message(MessageType.NO_OP, "3");
+ DefaultControllerMessageHandler defaultHandler = new DefaultControllerMessageHandler(message, context);
+ try
+ {
+ defaultHandler.handleMessage();
+ } catch (HelixException e)
+ {
+ exceptionCaught = true;
+ }
+ catch (InterruptedException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ AssertJUnit.assertTrue(exceptionCaught);
+
+ message = new Message(MessageType.CONTROLLER_MSG, "4");
+ defaultHandler = new DefaultControllerMessageHandler(message, context);
+ exceptionCaught = false;
+ try
+ {
+ defaultHandler.handleMessage();
+ } catch (HelixException e)
+ {
+ exceptionCaught = true;
+ }
+ catch (InterruptedException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ AssertJUnit.assertFalse(exceptionCaught);
+ System.out.println("END TestDefaultControllerMsgHandlerFactory at " + new Date(System.currentTimeMillis()));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleNewSession.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleNewSession.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleNewSession.java
new file mode 100644
index 0000000..cfacedb
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleNewSession.java
@@ -0,0 +1,71 @@
+package org.apache.helix.manager.zk;
+
+import java.util.Date;
+
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZkTestHelper;
+import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestHandleNewSession extends ZkIntegrationTestBase
+{
+ @Test
+ public void testHandleNewSession() throws Exception
+ {
+ // Logger.getRootLogger().setLevel(Level.INFO);
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+
+ System.out.println("START " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 10, // partitions per resource
+ 5, // number of nodes
+ 3, // replicas
+ "MasterSlave",
+ true); // do rebalance
+
+ ZKHelixManager manager =
+ new ZKHelixManager(clusterName,
+ "localhost_12918",
+ InstanceType.PARTICIPANT,
+ ZK_ADDR);
+ manager.connect();
+
+ // Logger.getRootLogger().setLevel(Level.INFO);
+ String lastSessionId = manager.getSessionId();
+ for (int i = 0; i < 3; i++)
+ {
+ // System.err.println("curSessionId: " + lastSessionId);
+ ZkTestHelper.expireSession(manager._zkClient);
+
+ String sessionId = manager.getSessionId();
+ Assert.assertTrue(sessionId.compareTo(lastSessionId) > 0, "Session id should be increased after expiry");
+ lastSessionId = sessionId;
+
+ // make sure session id is not 0
+ Assert.assertFalse(sessionId.equals("0"),
+ "Hit race condition in zhclient.handleNewSession(). sessionId is not returned yet.");
+
+ // TODO: need to test session expiry during handleNewSession()
+ }
+
+ // Logger.getRootLogger().setLevel(Level.INFO);
+ System.out.println("Disconnecting ...");
+ manager.disconnect();
+
+ System.out.println("END " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/manager/zk/TestLiveInstanceBounce.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestLiveInstanceBounce.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestLiveInstanceBounce.java
new file mode 100644
index 0000000..53da22b
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestLiveInstanceBounce.java
@@ -0,0 +1,65 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.manager.zk;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.TestHelper.StartCMResult;
+import org.apache.helix.integration.ZkStandAloneCMTestBase;
+import org.apache.helix.integration.ZkStandAloneCMTestBaseWithPropertyServerCheck;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestLiveInstanceBounce extends ZkStandAloneCMTestBaseWithPropertyServerCheck
+{
+ @Test
+ public void testInstanceBounce() throws Exception
+ {
+ String controllerName = CONTROLLER_PREFIX + "_0";
+ StartCMResult controllerResult = _startCMResultMap.get(controllerName);
+ ZKHelixManager controller = (ZKHelixManager) controllerResult._manager;
+ int handlerSize = controller.getHandlers().size();
+
+ for (int i = 0; i < 2; i++)
+ {
+ String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+ // kill 2 participants
+ _startCMResultMap.get(instanceName)._manager.disconnect();
+ _startCMResultMap.get(instanceName)._thread.interrupt();
+ try
+ {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ // restart the participant
+ StartCMResult result = TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instanceName);
+ _startCMResultMap.put(instanceName, result);
+ Thread.sleep(100);
+ }
+ Thread.sleep(2000);
+
+ boolean result = ClusterStateVerifier.verifyByPolling(
+ new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME), 50 * 1000);
+ Assert.assertTrue(result);
+ // When a new live instance is created, we still add current state listener to it thus number should increase by 2
+ Assert.assertEquals( controller.getHandlers().size(), handlerSize + 2);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/manager/zk/TestWtCacheAsyncOpMultiThread.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestWtCacheAsyncOpMultiThread.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestWtCacheAsyncOpMultiThread.java
new file mode 100644
index 0000000..3816f52
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestWtCacheAsyncOpMultiThread.java
@@ -0,0 +1,278 @@
+package org.apache.helix.manager.zk;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.Callable;
+
+import org.I0Itec.zkclient.DataUpdater;
+import org.apache.helix.AccessOption;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZNRecordUpdater;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkCacheBaseDataAccessor;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestWtCacheAsyncOpMultiThread extends ZkUnitTestBase
+{
+ class TestCreateZkCacheBaseDataAccessor implements Callable<Boolean>
+ {
+ final ZkCacheBaseDataAccessor<ZNRecord> _accessor;
+ final String _clusterName;
+ final int _id;
+
+ public TestCreateZkCacheBaseDataAccessor(ZkCacheBaseDataAccessor<ZNRecord> accessor, String clusterName, int id)
+ {
+ _accessor = accessor;
+ _clusterName = clusterName;
+ _id = id;
+ }
+
+ @Override
+ public Boolean call() throws Exception
+ {
+ // create 10 current states in 2 steps
+ List<String> paths = new ArrayList<String>();
+ List<ZNRecord> records = new ArrayList<ZNRecord>();
+ for (int j = 0; j < 2; j++)
+ {
+ paths.clear();
+ records.clear();
+
+ if (_id == 1 && j == 0)
+ {
+ // let thread_0 create 0-4
+ Thread.sleep(30);
+ }
+
+ if (_id == 0 && j == 1)
+ {
+ // let thread_1 create 5-9
+ Thread.sleep(100);
+ }
+
+
+ for (int i = 0; i < 5; i++)
+ {
+ int k = j * 5 + i;
+ String path =
+ PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
+ _clusterName,
+ "localhost_8901",
+ "session_0",
+ "TestDB" + k);
+ ZNRecord record = new ZNRecord("TestDB" + k);
+
+ paths.add(path);
+ records.add(record);
+ }
+
+ boolean[] success = _accessor.createChildren(paths, records, AccessOption.PERSISTENT);
+ // System.out.println("thread-" + _id + " creates " + j + ": " + Arrays.toString(success));
+
+ // create all all sync'ed, so we shall see either all true or all false
+ for (int i = 1; i < 5; i++)
+ {
+ Assert.assertEquals(success[i], success[0], "Should be either all succeed of all fail");
+ }
+ }
+
+ return true;
+ }
+ }
+
+ class TestUpdateZkCacheBaseDataAccessor implements Callable<Boolean>
+ {
+ final ZkCacheBaseDataAccessor<ZNRecord> _accessor;
+ final String _clusterName;
+ final int _id;
+
+ public TestUpdateZkCacheBaseDataAccessor(ZkCacheBaseDataAccessor<ZNRecord> accessor, String clusterName, int id)
+ {
+ _accessor = accessor;
+ _clusterName = clusterName;
+ _id = id;
+ }
+
+ @Override
+ public Boolean call() throws Exception
+ {
+ // create 10 current states in 2 steps
+ List<String> paths = new ArrayList<String>();
+ List<DataUpdater<ZNRecord>> updaters = new ArrayList<DataUpdater<ZNRecord>>();
+ for (int j = 0; j < 10; j++)
+ {
+ paths.clear();
+ updaters.clear();
+
+ for (int i = 0; i < 10; i++)
+ {
+ String path =
+ PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
+ _clusterName,
+ "localhost_8901",
+ "session_0",
+ "TestDB" + i);
+
+ ZNRecord newRecord = new ZNRecord("TestDB" + i);
+ newRecord.setSimpleField("" + j, "" + j);
+ DataUpdater<ZNRecord> updater = new ZNRecordUpdater(newRecord);
+ paths.add(path);
+ updaters.add(updater);
+ }
+
+ boolean[] success = _accessor.updateChildren(paths, updaters, AccessOption.PERSISTENT);
+ // System.out.println("thread-" + _id + " updates " + j + ": " + Arrays.toString(success));
+
+ for (int i = 0; i < 10; i++)
+ {
+ Assert.assertEquals(success[i], true, "Should be all succeed");
+ }
+ }
+
+ return true;
+ }
+ }
+
+ class TestSetZkCacheBaseDataAccessor implements Callable<Boolean>
+ {
+ final ZkCacheBaseDataAccessor<ZNRecord> _accessor;
+ final String _clusterName;
+ final int _id;
+
+ public TestSetZkCacheBaseDataAccessor(ZkCacheBaseDataAccessor<ZNRecord> accessor, String clusterName, int id)
+ {
+ _accessor = accessor;
+ _clusterName = clusterName;
+ _id = id;
+ }
+
+ @Override
+ public Boolean call() throws Exception
+ {
+ // create 10 current states in 2 steps
+ List<String> paths = new ArrayList<String>();
+ List<ZNRecord> records = new ArrayList<ZNRecord>();
+ for (int j = 0; j < 2; j++)
+ {
+ paths.clear();
+ records.clear();
+
+ if (_id == 1 && j == 0)
+ {
+ // let thread_0 create 0-4
+ Thread.sleep(30);
+ }
+
+ if (_id == 0 && j == 1)
+ {
+ // let thread_1 create 5-9
+ Thread.sleep(100);
+ }
+
+ for (int i = 0; i < 5; i++)
+ {
+ int k = j * 5 + i;
+ String path =
+ PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, _clusterName, "TestDB" + k);
+ ZNRecord record = new ZNRecord("TestDB" + k);
+
+ paths.add(path);
+ records.add(record);
+ }
+ boolean[] success = _accessor.setChildren(paths, records, AccessOption.PERSISTENT);
+ // System.out.println("thread-" + _id + " sets " + j + ": " + Arrays.toString(success));
+
+ for (int i = 0; i < 5; i++)
+ {
+ Assert.assertEquals(success[i], true);
+ }
+ }
+
+ return true;
+ }
+ }
+
+ @Test
+ public void testHappyPathZkCacheBaseDataAccessor()
+ {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ System.out.println("START " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ // init zkCacheDataAccessor
+ String curStatePath =
+ PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
+ clusterName,
+ "localhost_8901");
+ String extViewPath =
+ PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, clusterName);
+
+ ZkBaseDataAccessor<ZNRecord> baseAccessor =
+ new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+
+ baseAccessor.create(curStatePath, null, AccessOption.PERSISTENT);
+
+ List<String> cachePaths = Arrays.asList(curStatePath, extViewPath);
+ ZkCacheBaseDataAccessor<ZNRecord> accessor =
+ new ZkCacheBaseDataAccessor<ZNRecord>(baseAccessor,
+ null,
+ cachePaths,
+ null);
+
+ // TestHelper.printCache(accessor._wtCache);
+ boolean ret = TestHelper.verifyZkCache(cachePaths, accessor._wtCache._cache, _gZkClient, false);
+ Assert.assertTrue(ret, "wtCache doesn't match data on Zk");
+
+ // create 10 current states using 2 threads
+ List<Callable<Boolean>> threads = new ArrayList<Callable<Boolean>>();
+ for (int i = 0; i < 2; i++)
+ {
+ threads.add(new TestCreateZkCacheBaseDataAccessor(accessor, clusterName, i));
+ }
+ TestHelper.startThreadsConcurrently(threads, 1000);
+
+ // verify wtCache
+ // TestHelper.printCache(accessor._wtCache);
+ ret = TestHelper.verifyZkCache(cachePaths, accessor._wtCache._cache, _gZkClient, false);
+ Assert.assertTrue(ret, "wtCache doesn't match data on Zk");
+
+ // update 10 current states 10 times using 2 threads
+ threads.clear();
+ for (int i = 0; i < 2; i++)
+ {
+ threads.add(new TestUpdateZkCacheBaseDataAccessor(accessor, clusterName, i));
+ }
+ TestHelper.startThreadsConcurrently(threads, 1000);
+
+ // verify wtCache
+ // TestHelper.printCache(accessor._wtCache);
+ ret = TestHelper.verifyZkCache(cachePaths, accessor._wtCache._cache, _gZkClient, false);
+ Assert.assertTrue(ret, "wtCache doesn't match data on Zk");
+
+ // set 10 external views using 2 threads
+ threads.clear();
+ for (int i = 0; i < 2; i++)
+ {
+ threads.add(new TestSetZkCacheBaseDataAccessor(accessor, clusterName, i));
+ }
+ TestHelper.startThreadsConcurrently(threads, 1000);
+
+ // verify wtCache
+ // TestHelper.printCache(accessor._wtCache);
+ ret = TestHelper.verifyZkCache(cachePaths, accessor._wtCache._cache, _gZkClient, false);
+ Assert.assertTrue(ret, "wtCache doesn't match data on Zk");
+
+ System.out.println("END " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/manager/zk/TestWtCacheAsyncOpSingleThread.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestWtCacheAsyncOpSingleThread.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestWtCacheAsyncOpSingleThread.java
new file mode 100644
index 0000000..ef47a8a
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestWtCacheAsyncOpSingleThread.java
@@ -0,0 +1,253 @@
+package org.apache.helix.manager.zk;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+
+import org.I0Itec.zkclient.DataUpdater;
+import org.apache.helix.AccessOption;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZNRecordUpdater;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkCacheBaseDataAccessor;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestWtCacheAsyncOpSingleThread extends ZkUnitTestBase
+{
+ @Test
+ public void testHappyPathZkCacheBaseDataAccessor()
+ {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ System.out.println("START " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ // init zkCacheDataAccessor
+ String curStatePath =
+ PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
+ clusterName,
+ "localhost_8901");
+ String extViewPath =
+ PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, clusterName);
+
+ ZkBaseDataAccessor<ZNRecord> baseAccessor =
+ new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+
+ baseAccessor.create(curStatePath, null, AccessOption.PERSISTENT);
+
+ List<String> cachePaths = Arrays.asList(curStatePath, extViewPath);
+ ZkCacheBaseDataAccessor<ZNRecord> accessor =
+ new ZkCacheBaseDataAccessor<ZNRecord>(baseAccessor,
+ null,
+ cachePaths,
+ null);
+
+ boolean ret = TestHelper.verifyZkCache(cachePaths, accessor._wtCache._cache, _gZkClient, false);
+ Assert.assertTrue(ret, "wtCache doesn't match data on Zk");
+
+
+ // create 10 current states
+ List<String> paths = new ArrayList<String>();
+ List<ZNRecord> records = new ArrayList<ZNRecord>();
+ for (int i = 0; i < 10; i++)
+ {
+ String path =
+ PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
+ clusterName,
+ "localhost_8901",
+ "session_0",
+ "TestDB" + i);
+ ZNRecord record = new ZNRecord("TestDB" + i);
+
+ paths.add(path);
+ records.add(record);
+ }
+
+ boolean[] success = accessor.createChildren(paths, records, AccessOption.PERSISTENT);
+ for (int i = 0; i < 10; i++)
+ {
+ Assert.assertTrue(success[i], "Should succeed in create: " + paths.get(i));
+ }
+
+ // verify wtCache
+ // TestHelper.printCache(accessor._wtCache);
+ ret = TestHelper.verifyZkCache(cachePaths, accessor._wtCache._cache, _gZkClient, false);
+ Assert.assertTrue(ret, "wtCache doesn't match data on Zk");
+
+
+ // update each current state 10 times
+ List<DataUpdater<ZNRecord>> updaters = new ArrayList<DataUpdater<ZNRecord>>();
+ for (int j = 0; j < 10; j++)
+ {
+ paths.clear();
+ updaters.clear();
+ for (int i = 0; i < 10; i++)
+ {
+ String path = curStatePath + "/session_0/TestDB" + i;
+ ZNRecord newRecord = new ZNRecord("TestDB" + i);
+ newRecord.setSimpleField("" + j, "" + j);
+ DataUpdater<ZNRecord> updater = new ZNRecordUpdater(newRecord);
+ paths.add(path);
+ updaters.add(updater);
+ }
+ success = accessor.updateChildren(paths, updaters, AccessOption.PERSISTENT);
+
+ for (int i = 0; i < 10; i++)
+ {
+ Assert.assertTrue(success[i], "Should succeed in update: " + paths.get(i));
+ }
+ }
+
+ // verify cache
+ // TestHelper.printCache(accessor._wtCache);
+ ret = TestHelper.verifyZkCache(cachePaths, accessor._wtCache._cache, _gZkClient, false);
+ Assert.assertTrue(ret, "wtCache doesn't match data on Zk");
+
+
+ // set 10 external views
+ paths.clear();
+ records.clear();
+ for (int i = 0; i < 10; i++)
+ {
+ String path =
+ PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, clusterName, "TestDB" + i);
+ ZNRecord record = new ZNRecord("TestDB" + i);
+
+ paths.add(path);
+ records.add(record);
+ }
+ success = accessor.setChildren(paths, records, AccessOption.PERSISTENT);
+ for (int i = 0; i < 10; i++)
+ {
+ Assert.assertTrue(success[i], "Should succeed in set: " + paths.get(i));
+ }
+
+ // verify wtCache
+ // TestHelper.printCache(accessor._wtCache);
+ ret = TestHelper.verifyZkCache(cachePaths, accessor._wtCache._cache, _gZkClient, false);
+ Assert.assertTrue(ret, "wtCache doesn't match data on Zk");
+
+
+ // get 10 external views
+ paths.clear();
+ records.clear();
+ for (int i = 0; i < 10; i++)
+ {
+ String path =
+ PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, clusterName, "TestDB" + i);
+ paths.add(path);
+ }
+
+ records = accessor.get(paths, null, 0);
+ for (int i = 0; i < 10; i++)
+ {
+ Assert.assertEquals(records.get(i).getId(), "TestDB" + i);
+ }
+
+ // getChildren
+ records.clear();
+ records = accessor.getChildren(extViewPath, null, 0);
+ for (int i = 0; i < 10; i++)
+ {
+ Assert.assertEquals(records.get(i).getId(), "TestDB" + i);
+ }
+
+ // exists
+ paths.clear();
+ for (int i = 0; i < 10; i++)
+ {
+ String path =
+ PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
+ clusterName,
+ "localhost_8901",
+ "session_0",
+ "TestDB" + i);
+ paths.add(path);
+ }
+ success = accessor.exists(paths, 0);
+ for (int i = 0; i < 10; i++)
+ {
+ Assert.assertTrue(success[i], "Should exits: TestDB" + i);
+ }
+
+ System.out.println("END " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+ }
+
+ @Test
+ public void testCreateFailZkCacheBaseAccessor()
+ {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ System.out.println("START " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ // init zkCacheDataAccessor
+ String curStatePath =
+ PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
+ clusterName,
+ "localhost_8901");
+ String extViewPath =
+ PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, clusterName);
+
+ ZkBaseDataAccessor<ZNRecord> baseAccessor =
+ new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+
+ baseAccessor.create(curStatePath, null, AccessOption.PERSISTENT);
+
+ ZkCacheBaseDataAccessor<ZNRecord> accessor =
+ new ZkCacheBaseDataAccessor<ZNRecord>(baseAccessor,
+ null,
+ Arrays.asList(curStatePath, extViewPath),
+ null);
+
+ Assert.assertEquals(accessor._wtCache._cache.size(), 1, "Should contain only:\n"
+ + curStatePath);
+ Assert.assertTrue(accessor._wtCache._cache.containsKey(curStatePath));
+
+ // create 10 current states
+ List<String> paths = new ArrayList<String>();
+ List<ZNRecord> records = new ArrayList<ZNRecord>();
+ for (int i = 0; i < 10; i++)
+ {
+ String path =
+ PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
+ clusterName,
+ "localhost_8901",
+ "session_1",
+ "TestDB" + i);
+ ZNRecord record = new ZNRecord("TestDB" + i);
+
+ paths.add(path);
+ records.add(record);
+ }
+
+ boolean[] success = accessor.createChildren(paths, records, AccessOption.PERSISTENT);
+ for (int i = 0; i < 10; i++)
+ {
+ Assert.assertTrue(success[i], "Should succeed in create: " + paths.get(i));
+ }
+
+
+ // create same 10 current states again, should fail on NodeExists
+ success = accessor.createChildren(paths, records, AccessOption.PERSISTENT);
+ // System.out.println(Arrays.toString(success));
+ for (int i = 0; i < 10; i++)
+ {
+ Assert.assertFalse(success[i], "Should fail on create: " + paths.get(i));
+ }
+
+ System.out.println("END " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/manager/zk/TestWtCacheSyncOpSingleThread.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestWtCacheSyncOpSingleThread.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestWtCacheSyncOpSingleThread.java
new file mode 100644
index 0000000..a9d81e1
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestWtCacheSyncOpSingleThread.java
@@ -0,0 +1,196 @@
+package org.apache.helix.manager.zk;
+
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.helix.AccessOption;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZNRecordUpdater;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkCacheBaseDataAccessor;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestWtCacheSyncOpSingleThread extends ZkUnitTestBase
+{
+ // TODO: add TestZkCacheSyncOpSingleThread
+ // TODO: add TestZkCacheAsyncOpMultiThread
+ @Test
+ public void testHappyPathZkCacheBaseDataAccessor() throws Exception
+ {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ System.out.println("START " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ // init zkCacheDataAccessor
+ String curStatePath =
+ PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
+ clusterName,
+ "localhost_8901");
+ String extViewPath =
+ PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, clusterName);
+
+ ZkBaseDataAccessor<ZNRecord> baseAccessor =
+ new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+
+ baseAccessor.create(curStatePath, null, AccessOption.PERSISTENT);
+
+ List<String> cachePaths = Arrays.asList(curStatePath, extViewPath);
+ ZkCacheBaseDataAccessor<ZNRecord> accessor =
+ new ZkCacheBaseDataAccessor<ZNRecord>(baseAccessor,
+ null,
+ cachePaths,
+ null);
+
+ boolean ret = TestHelper.verifyZkCache(cachePaths, accessor._wtCache._cache, _gZkClient, true);
+ Assert.assertTrue(ret, "wtCache doesn't match data on Zk");
+
+
+ // create 10 current states
+ for (int i = 0; i < 10; i++)
+ {
+ String path = curStatePath + "/session_0/TestDB" + i;
+ boolean success =
+ accessor.create(path, new ZNRecord("TestDB" + i), AccessOption.PERSISTENT);
+ Assert.assertTrue(success, "Should succeed in create: " + path);
+ }
+
+ // verify wtCache
+ // TestHelper.printCache(accessor._wtCache);
+ ret = TestHelper.verifyZkCache(cachePaths, accessor._wtCache._cache, _gZkClient, false);
+ Assert.assertTrue(ret, "wtCache doesn't match data on Zk");
+
+ // update each current state 10 times, single thread
+ for (int i = 0; i < 10; i++)
+ {
+ String path = curStatePath + "/session_0/TestDB" + i;
+ for (int j = 0; j < 10; j++)
+ {
+ ZNRecord newRecord = new ZNRecord("TestDB" + i);
+ newRecord.setSimpleField("" + j, "" + j);
+ boolean success =
+ accessor.update(path, new ZNRecordUpdater(newRecord), AccessOption.PERSISTENT);
+ Assert.assertTrue(success, "Should succeed in update: " + path);
+
+ }
+ }
+
+ // verify cache
+// TestHelper.printCache(accessor._wtCache._cache);
+ ret = TestHelper.verifyZkCache(cachePaths, accessor._wtCache._cache, _gZkClient, false);
+ Assert.assertTrue(ret, "wtCache doesn't match data on Zk");
+
+ // set 10 external views
+ for (int i = 0; i < 10; i++)
+ {
+ String path =
+ PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, clusterName, "TestDB" + i);
+ boolean success = accessor.set(path, new ZNRecord("TestDB" + i), AccessOption.PERSISTENT);
+ Assert.assertTrue(success, "Should succeed in set: " + path);
+ }
+
+ // verify wtCache
+ // accessor.printWtCache();
+ ret = TestHelper.verifyZkCache(cachePaths, accessor._wtCache._cache, _gZkClient, false);
+ Assert.assertTrue(ret, "wtCache doesn't match data on Zk");
+
+
+ // get 10 external views
+ for (int i = 0; i < 10; i++)
+ {
+ String path =
+ PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, clusterName, "TestDB" + i);
+ ZNRecord record = accessor.get(path, null, 0);
+ Assert.assertEquals(record.getId(), "TestDB" + i);
+ }
+
+ // getChildNames
+ List<String> childNames = accessor.getChildNames(extViewPath, 0);
+ // System.out.println(childNames);
+ Assert.assertEquals(childNames.size(), 10, "Should contain only: TestDB0-9");
+ for (int i = 0; i < 10; i++)
+ {
+ Assert.assertTrue(childNames.contains("TestDB" + i));
+ }
+
+ // exists
+ for (int i = 0; i < 10; i++)
+ {
+ String path =
+ PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
+ clusterName,
+ "localhost_8901",
+ "session_0",
+ "TestDB" + i);
+
+ Assert.assertTrue(accessor.exists(path, 0));
+ }
+
+ System.out.println("END " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+ }
+
+ @Test
+ public void testCreateFailZkCacheBaseDataAccessor()
+ {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ System.out.println("START " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ // init zkCacheDataAccessor
+ String curStatePath =
+ PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
+ clusterName,
+ "localhost_8901");
+
+ ZkBaseDataAccessor<ZNRecord> baseAccessor =
+ new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+
+ ZkCacheBaseDataAccessor<ZNRecord> accessor =
+ new ZkCacheBaseDataAccessor<ZNRecord>(baseAccessor,
+ null,
+ Arrays.asList(curStatePath),
+ null);
+
+ // create 10 current states
+ for (int i = 0; i < 10; i++)
+ {
+ String path =
+ PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
+ clusterName,
+ "localhost_8901",
+ "session_1",
+ "TestDB" + i);
+ boolean success =
+ accessor.create(path, new ZNRecord("TestDB" + i), AccessOption.PERSISTENT);
+ Assert.assertTrue(success, "Should succeed in create: " + path);
+ }
+
+ // create same 10 current states again, should fail
+ for (int i = 0; i < 10; i++)
+ {
+ String path =
+ PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
+ clusterName,
+ "localhost_8901",
+ "session_1",
+ "TestDB" + i);
+ boolean success =
+ accessor.create(path, new ZNRecord("TestDB" + i), AccessOption.PERSISTENT);
+ Assert.assertFalse(success, "Should fail in create due to NodeExists: " + path);
+ }
+
+ System.out.println("END " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKDataAccessor.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKDataAccessor.java
new file mode 100644
index 0000000..092403a
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKDataAccessor.java
@@ -0,0 +1,174 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.manager.zk;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.helix.DataAccessor;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.manager.zk.ZKDataAccessor;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.IdealStateModeProperty;
+import org.apache.zookeeper.data.Stat;
+import org.testng.Assert;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+
+public class TestZKDataAccessor extends ZkUnitTestBase
+{
+ private DataAccessor _accessor;
+ private String _clusterName;
+ private final String resource = "resource";
+ private ZkClient _zkClient;
+
+ @Test ()
+ public void testSet()
+ {
+ IdealState idealState = new IdealState(resource);
+ idealState.setNumPartitions(20);
+ idealState.setReplicas(Integer.toString(2));
+ idealState.setStateModelDefRef("StateModel1");
+ idealState.setIdealStateMode(IdealStateModeProperty.AUTO.toString());
+ boolean success = _accessor.setProperty(PropertyType.IDEALSTATES, idealState, resource);
+ AssertJUnit.assertTrue(success);
+ String path = PropertyPathConfig.getPath(PropertyType.IDEALSTATES, _clusterName, resource);
+ AssertJUnit.assertTrue(_zkClient.exists(path));
+ AssertJUnit.assertEquals(idealState.getRecord(), _zkClient.readData(path));
+
+ idealState.setNumPartitions(20);
+ success = _accessor.setProperty(PropertyType.IDEALSTATES, idealState, resource);
+ AssertJUnit.assertTrue(success);
+ AssertJUnit.assertTrue(_zkClient.exists(path));
+ AssertJUnit.assertEquals(idealState.getRecord(), _zkClient.readData(path));
+ }
+
+ @Test ()
+ public void testGet()
+ {
+ String path = PropertyPathConfig.getPath(PropertyType.IDEALSTATES, _clusterName, resource);
+ IdealState idealState = new IdealState(resource);
+ idealState.setIdealStateMode(IdealStateModeProperty.AUTO.toString());
+
+ _zkClient.delete(path);
+ _zkClient.createPersistent(new File(path).getParent(), true);
+ _zkClient.createPersistent(path, idealState.getRecord());
+ IdealState idealStateRead = _accessor.getProperty(IdealState.class, PropertyType.IDEALSTATES, resource);
+ AssertJUnit.assertEquals(idealState.getRecord(), idealStateRead.getRecord());
+ }
+
+ @Test ()
+ public void testRemove()
+ {
+ String path = PropertyPathConfig.getPath(PropertyType.IDEALSTATES, _clusterName, resource);
+ IdealState idealState = new IdealState(resource);
+ idealState.setIdealStateMode(IdealStateModeProperty.AUTO.toString());
+
+ _zkClient.delete(path);
+ _zkClient.createPersistent(new File(path).getParent(), true);
+ _zkClient.createPersistent(path, idealState.getRecord());
+ boolean success = _accessor.removeProperty(PropertyType.IDEALSTATES, resource);
+ AssertJUnit.assertTrue(success);
+ AssertJUnit.assertFalse(_zkClient.exists(path));
+ IdealState idealStateRead = _accessor.getProperty(IdealState.class, PropertyType.IDEALSTATES, resource);
+ AssertJUnit.assertNull(idealStateRead);
+
+ }
+
+ @Test ()
+ public void testUpdate()
+ {
+ String path = PropertyPathConfig.getPath(PropertyType.IDEALSTATES, _clusterName, resource);
+ IdealState idealState = new IdealState(resource);
+ idealState.setIdealStateMode(IdealStateModeProperty.AUTO.toString());
+
+ _zkClient.delete(path);
+ _zkClient.createPersistent(new File(path).getParent(), true);
+ _zkClient.createPersistent(path, idealState.getRecord());
+ Stat stat = _zkClient.getStat(path);
+
+ idealState.setIdealStateMode(IdealStateModeProperty.CUSTOMIZED.toString());
+
+ boolean success = _accessor.updateProperty(PropertyType.IDEALSTATES, idealState, resource);
+ AssertJUnit.assertTrue(success);
+ AssertJUnit.assertTrue(_zkClient.exists(path));
+ ZNRecord value = _zkClient.readData(path);
+ AssertJUnit.assertEquals(idealState.getRecord(), value);
+ Stat newstat = _zkClient.getStat(path);
+
+ AssertJUnit.assertEquals(stat.getCtime(), newstat.getCtime());
+ AssertJUnit.assertNotSame(stat.getMtime(), newstat.getMtime());
+ AssertJUnit.assertTrue(stat.getMtime() < newstat.getMtime());
+ }
+
+ @Test ()
+ public void testGetChildValues()
+ {
+ List<ExternalView> list = _accessor.getChildValues(ExternalView.class, PropertyType.EXTERNALVIEW, _clusterName);
+ AssertJUnit.assertEquals(0, list.size());
+ }
+
+ @Test
+ public void testBackToBackRemoveAndSet()
+ {
+ // CONFIG is cached
+ _accessor.setProperty(PropertyType.CONFIGS, new ZNRecord("id1"), "config1");
+ ZNRecord record = _accessor.getProperty(PropertyType.CONFIGS, "config1");
+ // System.out.println(record.getId());
+ Assert.assertEquals(record.getId(), "id1");
+ String path = PropertyPathConfig.getPath(PropertyType.CONFIGS, _clusterName, "config1");
+ _zkClient.delete(path);
+ _zkClient.createPersistent(path, new ZNRecord("id1-new"));
+ record = _accessor.getProperty(PropertyType.CONFIGS, "config1");
+ // System.out.println(record.getId());
+ Assert.assertEquals(record.getId(), "id1-new", "Should update cache since creation time is changed.");
+ }
+
+ @BeforeClass
+ public void beforeClass() throws IOException, Exception
+ {
+ _clusterName = CLUSTER_PREFIX + "_" + getShortClassName();
+
+ System.out.println("START TestZKDataAccessor at " + new Date(System.currentTimeMillis()));
+ _zkClient = new ZkClient(ZK_ADDR);
+ _zkClient.setZkSerializer(new ZNRecordSerializer());
+
+ if (_zkClient.exists("/" + _clusterName))
+ {
+ _zkClient.deleteRecursive("/" + _clusterName);
+ }
+ _accessor = new ZKDataAccessor(_clusterName, _zkClient);
+ }
+
+ @AfterClass
+ public void afterClass()
+ {
+ _zkClient.close();
+ System.out.println("END TestZKDataAccessor at " + new Date(System.currentTimeMillis()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKDataAccessorCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKDataAccessorCache.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKDataAccessorCache.java
new file mode 100644
index 0000000..a373a1f
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKDataAccessorCache.java
@@ -0,0 +1,152 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.manager.zk;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.ConfigScope.ConfigScopeProperty;
+import org.apache.helix.manager.zk.ZKDataAccessor;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestZKDataAccessorCache extends ZkUnitTestBase
+{
+ private static Logger LOG = Logger.getLogger(TestZKDataAccessorCache.class);
+ private ZKDataAccessor _accessor;
+ private String _clusterName;
+ private ZkClient _zkClient;
+
+ @BeforeClass
+ public void beforeClass() throws IOException, Exception
+ {
+ _clusterName = CLUSTER_PREFIX + "_" + getShortClassName();
+
+ System.out.println("START TestZKCacheDataAccessor at " + new Date(System.currentTimeMillis()));
+ _zkClient = new ZkClient(ZK_ADDR);
+ _zkClient.setZkSerializer(new ZNRecordSerializer());
+
+ if (_zkClient.exists("/" + _clusterName))
+ {
+ _zkClient.deleteRecursive("/" + _clusterName);
+ }
+ _zkClient.createPersistent(
+ PropertyPathConfig.getPath(PropertyType.CONFIGS, _clusterName,
+ ConfigScopeProperty.CLUSTER.toString(), _clusterName), true);
+ _zkClient.createPersistent(
+ PropertyPathConfig.getPath(PropertyType.CONFIGS, _clusterName,
+ ConfigScopeProperty.PARTICIPANT.toString()), true);
+ _zkClient.createPersistent(
+ PropertyPathConfig.getPath(PropertyType.CONFIGS, _clusterName,
+ ConfigScopeProperty.RESOURCE.toString()), true);
+ _zkClient.createPersistent(PropertyPathConfig.getPath(PropertyType.IDEALSTATES, _clusterName),
+ true);
+ _zkClient.createPersistent(PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, _clusterName),
+ true);
+ _zkClient.createPersistent(
+ PropertyPathConfig.getPath(PropertyType.LIVEINSTANCES, _clusterName), true);
+ _zkClient.createPersistent(
+ PropertyPathConfig.getPath(PropertyType.STATEMODELDEFS, _clusterName), true);
+ _zkClient.createPersistent(PropertyPathConfig.getPath(PropertyType.CURRENTSTATES, _clusterName,
+ "localhost_12918", "123456"), true);
+
+ _accessor = new ZKDataAccessor(_clusterName, _zkClient);
+ }
+
+ @AfterClass
+ public void afterClass()
+ {
+ _zkClient.close();
+ System.out.println("END TestZKCacheDataAccessor at " + new Date(System.currentTimeMillis()));
+ }
+
+ @Test
+ public void testAccessorCache()
+ {
+ testAccessorCache(PropertyType.IDEALSTATES);
+ testAccessorCache(PropertyType.STATEMODELDEFS);
+ testAccessorCache(PropertyType.LIVEINSTANCES);
+ testAccessorCache(PropertyType.CONFIGS, ConfigScopeProperty.PARTICIPANT.toString());
+ testAccessorCache(PropertyType.EXTERNALVIEW);
+ testAccessorCache(PropertyType.CURRENTSTATES, "localhost_12918", "123456");
+ }
+
+ private void testAccessorCache(PropertyType type, String... keys)
+ {
+ String parentPath = PropertyPathConfig.getPath(type, _clusterName, keys);
+ _zkClient.createPersistent(parentPath + "/child1", new ZNRecord("child1"));
+ ZNRecord record2 = new ZNRecord("child2");
+ _zkClient.createPersistent(parentPath + "/child2", record2);
+
+ List<ZNRecord> records = _accessor.getChildValues(type, keys);
+ LOG.debug("records:" + records);
+ Assert.assertNotNull(getRecord(records, "child1"));
+ Assert.assertNotNull(getRecord(records, "child2"));
+
+ // no data change
+ List<ZNRecord> newRecords = _accessor.getChildValues(type, keys);
+ LOG.debug("new records:" + newRecords);
+ Assert.assertEquals(getRecord(newRecords, "child1"), getRecord(records, "child1"));
+
+ // change value of an existing znode
+ record2.setSimpleField("key1", "value1");
+ _zkClient.writeData(parentPath + "/child2", record2);
+ newRecords = _accessor.getChildValues(type, keys);
+ LOG.debug("new records:" + newRecords);
+ Assert.assertEquals(getRecord(newRecords, "child2").getSimpleField("key1"), "value1");
+ Assert.assertNotSame(getRecord(newRecords, "child2"), getRecord(records, "child2"));
+
+ // add a new child
+ _zkClient.createPersistent(parentPath + "/child3", new ZNRecord("child3"));
+ records = newRecords;
+ newRecords = _accessor.getChildValues(type, keys);
+ LOG.debug("new records:" + newRecords);
+ Assert.assertNull(getRecord(records, "child3"));
+ Assert.assertNotNull(getRecord(newRecords, "child3"));
+
+ // delete a child
+ _zkClient.delete(parentPath + "/child2");
+ records = newRecords;
+ newRecords = _accessor.getChildValues(type, keys);
+ LOG.debug("new records:" + newRecords);
+ Assert.assertNotNull(getRecord(records, "child2"));
+ Assert.assertNull(getRecord(newRecords, "child2"),
+ "Should be null, since child2 has been deleted");
+ }
+
+ private ZNRecord getRecord(List<ZNRecord> list, String id)
+ {
+ for (ZNRecord record : list)
+ {
+ if (record.getId().equals(id))
+ {
+ return record;
+ }
+ }
+ return null;
+ }
+}