You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:59 UTC
[37/42] Refactoring the package names and removing jsql parser
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKLiveInstanceData.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKLiveInstanceData.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKLiveInstanceData.java
new file mode 100644
index 0000000..890d862
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKLiveInstanceData.java
@@ -0,0 +1,152 @@
+package org.apache.helix.manager.zk;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.LiveInstanceChangeListener;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.tools.ClusterSetup;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestZKLiveInstanceData extends ZkUnitTestBase
+{
+ private final String clusterName = CLUSTER_PREFIX + "_" + getShortClassName();
+
+ @Test
+ public void testDataChange() throws Exception
+ {
+ // Create an admin and add LiveInstanceChange listener to it
+ HelixManager adminManager =
+ HelixManagerFactory.getZKHelixManager(clusterName,
+ null,
+ InstanceType.ADMINISTRATOR,
+ ZK_ADDR);
+ adminManager.connect();
+ final BlockingQueue<List<LiveInstance>> changeList =
+ new LinkedBlockingQueue<List<LiveInstance>>();
+
+ adminManager.addLiveInstanceChangeListener(new LiveInstanceChangeListener()
+ {
+ @Override
+ public void onLiveInstanceChange(List<LiveInstance> liveInstances,
+ NotificationContext changeContext)
+ {
+ // The queue is basically unbounded, so shouldn't throw exception when calling
+ // "add".
+ changeList.add(deepCopy(liveInstances));
+ }
+ });
+
+ // Check the initial condition
+ List<LiveInstance> instances = changeList.poll(1, TimeUnit.SECONDS);
+ Assert.assertNotNull(instances, "Expecting a list of live instance");
+ Assert.assertTrue(instances.isEmpty(), "Expecting an empty list of live instance");
+ // Join as participant, should trigger a live instance change event
+ HelixManager manager =
+ HelixManagerFactory.getZKHelixManager(clusterName,
+ "localhost_54321",
+ InstanceType.PARTICIPANT,
+ ZK_ADDR);
+ manager.connect();
+ instances = changeList.poll(1, TimeUnit.SECONDS);
+ Assert.assertNotNull(instances, "Expecting a list of live instance");
+ Assert.assertEquals(instances.size(), 1, "Expecting one live instance");
+ Assert.assertEquals(instances.get(0).getInstanceName(), manager.getInstanceName());
+ // Update data in the live instance node, should trigger another live instance change
+ // event
+ HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+ PropertyKey propertyKey =
+ helixDataAccessor.keyBuilder().liveInstance(manager.getInstanceName());
+ LiveInstance instance = helixDataAccessor.getProperty(propertyKey);
+
+ Map<String, String> map = new TreeMap<String, String>();
+ map.put("k1", "v1");
+ instance.getRecord().setMapField("test", map);
+ Assert.assertTrue(helixDataAccessor.updateProperty(propertyKey, instance),
+ "Failed to update live instance node");
+
+ instances = changeList.poll(1, TimeUnit.SECONDS);
+ Assert.assertNotNull(instances, "Expecting a list of live instance");
+ Assert.assertEquals(instances.get(0).getRecord().getMapField("test"),
+ map,
+ "Wrong map data.");
+ manager.disconnect();
+ Thread.sleep(1000); // wait for callback finish
+
+ instances = changeList.poll(1, TimeUnit.SECONDS);
+ Assert.assertNotNull(instances, "Expecting a list of live instance");
+ Assert.assertTrue(instances.isEmpty(), "Expecting an empty list of live instance");
+
+ adminManager.disconnect();
+
+ }
+
+ @BeforeClass()
+ public void beforeClass() throws Exception
+ {
+ ZkClient zkClient = null;
+ try
+ {
+ zkClient = new ZkClient(ZK_ADDR);
+ zkClient.setZkSerializer(new ZNRecordSerializer());
+ if (zkClient.exists("/" + clusterName))
+ {
+ zkClient.deleteRecursive("/" + clusterName);
+ }
+ }
+ finally
+ {
+ if (zkClient != null)
+ {
+ zkClient.close();
+ }
+ }
+
+ ClusterSetup.processCommandLineArgs(getArgs("-zkSvr",
+ ZK_ADDR,
+ "-addCluster",
+ clusterName));
+ ClusterSetup.processCommandLineArgs(getArgs("-zkSvr",
+ ZK_ADDR,
+ "-addNode",
+ clusterName,
+ "localhost:54321"));
+ ClusterSetup.processCommandLineArgs(getArgs("-zkSvr",
+ ZK_ADDR,
+ "-addNode",
+ clusterName,
+ "localhost:54322"));
+ }
+
+ private String[] getArgs(String... args)
+ {
+ return args;
+ }
+
+ private List<LiveInstance> deepCopy(List<LiveInstance> instances)
+ {
+ List<LiveInstance> result = new ArrayList<LiveInstance>();
+ for (LiveInstance instance : instances)
+ {
+ result.add(new LiveInstance(instance.getRecord()));
+ }
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKPropertyTransferServer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKPropertyTransferServer.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKPropertyTransferServer.java
new file mode 100644
index 0000000..2bc09bf
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKPropertyTransferServer.java
@@ -0,0 +1,59 @@
+package org.apache.helix.manager.zk;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.TestHelper;
+import org.apache.helix.TestHelper.StartCMResult;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.controller.restlet.ZKPropertyTransferServer;
+import org.apache.helix.integration.ZkStandAloneCMTestBaseWithPropertyServerCheck;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestZKPropertyTransferServer extends ZkStandAloneCMTestBaseWithPropertyServerCheck
+{
+ private static Logger LOG =
+ Logger.getLogger(TestZKPropertyTransferServer.class);
+
+ @Test
+ public void TestControllerChange() throws Exception
+ {
+ String controllerName = CONTROLLER_PREFIX + "_0";
+ _startCMResultMap.get(controllerName)._manager.disconnect();
+
+ Thread.sleep(1000);
+
+ // kill controller, participant should not know about the svc url
+ for (int i = 0; i < NODE_NR; i++)
+ {
+ String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+ HelixDataAccessor accessor = _startCMResultMap.get(instanceName)._manager.getHelixDataAccessor();
+ ZKHelixDataAccessor zkAccessor = (ZKHelixDataAccessor) accessor;
+ Assert.assertTrue(zkAccessor._zkPropertyTransferSvcUrl == null || zkAccessor._zkPropertyTransferSvcUrl.equals(""));
+ }
+ _startCMResultMap.get(controllerName)._thread.interrupt();
+ _startCMResultMap.remove(controllerName);
+
+ StartCMResult startResult =
+ TestHelper.startController(CLUSTER_NAME,
+ controllerName,
+ ZK_ADDR,
+ HelixControllerMain.STANDALONE);
+ _startCMResultMap.put(controllerName, startResult);
+
+ Thread.sleep(1000);
+
+ // create controller again, the svc url is notified to the participants
+ for (int i = 0; i < NODE_NR; i++)
+ {
+ String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+ HelixDataAccessor accessor = _startCMResultMap.get(instanceName)._manager.getHelixDataAccessor();
+ ZKHelixDataAccessor zkAccessor = (ZKHelixDataAccessor) accessor;
+ Assert.assertTrue(zkAccessor._zkPropertyTransferSvcUrl.equals(ZKPropertyTransferServer.getInstance().getWebserviceUrl()));
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKUtil.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKUtil.java
new file mode 100644
index 0000000..07fa06a
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKUtil.java
@@ -0,0 +1,163 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.manager.zk;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.ConfigScope.ConfigScopeProperty;
+import org.apache.helix.manager.zk.ZKUtil;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.log4j.Logger;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestZKUtil extends ZkUnitTestBase
+{
+ private static Logger LOG = Logger.getLogger(TestZKUtil.class);
+
+ String clusterName = CLUSTER_PREFIX + "_" + getShortClassName();
+ ZkClient _zkClient;
+
+ @BeforeClass()
+ public void beforeClass() throws IOException, Exception
+ {
+ _zkClient = new ZkClient(ZK_ADDR);
+ _zkClient.setZkSerializer(new ZNRecordSerializer());
+ if (_zkClient.exists("/" + clusterName))
+ {
+ _zkClient.deleteRecursive("/" + clusterName);
+ }
+
+ boolean result = ZKUtil.isClusterSetup(clusterName, _zkClient);
+ AssertJUnit.assertFalse(result);
+ result = ZKUtil.isClusterSetup(null, _zkClient);
+ AssertJUnit.assertFalse(result);
+
+ result = ZKUtil.isClusterSetup(null, null);
+ AssertJUnit.assertFalse(result);
+
+ result = ZKUtil.isClusterSetup(clusterName, null);
+ AssertJUnit.assertFalse(result);
+
+ TestHelper.setupEmptyCluster(_zkClient, clusterName);
+ }
+
+ @AfterClass()
+ public void afterClass()
+ {
+ _zkClient.close();
+ }
+
+ @Test()
+ public void testIsClusterSetup()
+ {
+ boolean result = ZKUtil.isClusterSetup(clusterName, _zkClient);
+ AssertJUnit.assertTrue(result);
+ }
+
+ @Test()
+ public void testChildrenOperations()
+ {
+ List<ZNRecord> list = new ArrayList<ZNRecord>();
+ list.add(new ZNRecord("id1"));
+ list.add(new ZNRecord("id2"));
+ String path = PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
+ ConfigScopeProperty.PARTICIPANT.toString());
+ ZKUtil.createChildren(_zkClient, path, list);
+ list = ZKUtil.getChildren(_zkClient, path);
+ AssertJUnit.assertEquals(2, list.size());
+
+ ZKUtil.dropChildren(_zkClient, path, list);
+ ZKUtil.dropChildren(_zkClient, path, new ZNRecord("id1"));
+ list = ZKUtil.getChildren(_zkClient, path);
+ AssertJUnit.assertEquals(0, list.size());
+
+ ZKUtil.dropChildren(_zkClient, path, (List<ZNRecord>) null);
+ }
+
+ @Test()
+ public void testUpdateIfExists()
+ {
+ String path = PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
+ ConfigScopeProperty.PARTICIPANT.toString(), "id3");
+ ZNRecord record = new ZNRecord("id4");
+ ZKUtil.updateIfExists(_zkClient, path, record, false);
+ AssertJUnit.assertFalse(_zkClient.exists(path));
+ _zkClient.createPersistent(path);
+ ZKUtil.updateIfExists(_zkClient, path, record, false);
+ AssertJUnit.assertTrue(_zkClient.exists(path));
+ record = _zkClient.<ZNRecord> readData(path);
+ AssertJUnit.assertEquals("id4", record.getId());
+ }
+
+ @Test()
+ public void testSubtract()
+ {
+ String path = PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
+ ConfigScopeProperty.PARTICIPANT.toString(), "id5");
+ ZNRecord record = new ZNRecord("id5");
+ record.setSimpleField("key1", "value1");
+ _zkClient.createPersistent(path, record);
+ ZKUtil.subtract(_zkClient, path, record);
+ record = _zkClient.<ZNRecord> readData(path);
+ AssertJUnit.assertNull(record.getSimpleField("key1"));
+ }
+
+ @Test()
+ public void testNullChildren()
+ {
+ String path = PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
+ ConfigScopeProperty.PARTICIPANT.toString(), "id6");
+ ZKUtil.createChildren(_zkClient, path, (List<ZNRecord>) null);
+ }
+
+ @Test()
+ public void testCreateOrUpdate()
+ {
+ String path = PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
+ ConfigScopeProperty.PARTICIPANT.toString(), "id7");
+ ZNRecord record = new ZNRecord("id7");
+ ZKUtil.createOrUpdate(_zkClient, path, record, true, true);
+ record = _zkClient.<ZNRecord> readData(path);
+ AssertJUnit.assertEquals("id7", record.getId());
+ }
+
+ @Test()
+ public void testCreateOrReplace()
+ {
+ String path = PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
+ ConfigScopeProperty.PARTICIPANT.toString(), "id8");
+ ZNRecord record = new ZNRecord("id8");
+ ZKUtil.createOrReplace(_zkClient, path, record, true);
+ record = _zkClient.<ZNRecord> readData(path);
+ AssertJUnit.assertEquals("id8", record.getId());
+ record = new ZNRecord("id9");
+ ZKUtil.createOrReplace(_zkClient, path, record, true);
+ record = _zkClient.<ZNRecord> readData(path);
+ AssertJUnit.assertEquals("id9", record.getId());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSizeLimit.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSizeLimit.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSizeLimit.java
new file mode 100644
index 0000000..bf85bb2
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSizeLimit.java
@@ -0,0 +1,328 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.manager.zk;
+
+import java.util.Arrays;
+import java.util.Date;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZNRecordStreamingSerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestZNRecordSizeLimit extends ZkUnitTestBase
+{
+ private static Logger LOG = Logger.getLogger(TestZNRecordSizeLimit.class);
+
+ @Test
+ public void testZNRecordSizeLimitUseZNRecordSerializer()
+ {
+ String className = getShortClassName();
+ System.out.println("START testZNRecordSizeLimitUseZNRecordSerializer at "
+ + new Date(System.currentTimeMillis()));
+
+ ZNRecordSerializer serializer = new ZNRecordSerializer();
+ ZkClient zkClient = new ZkClient(ZK_ADDR);
+ zkClient.setZkSerializer(serializer);
+ String root = className;
+ byte[] buf = new byte[1024];
+ for (int i = 0; i < 1024; i++)
+ {
+ buf[i] = 'a';
+ }
+ String bufStr = new String(buf);
+
+ // test zkClient
+ // legal-sized data gets written to zk
+ // write a znode of size less than 1m
+ final ZNRecord smallRecord = new ZNRecord("normalsize");
+ smallRecord.getSimpleFields().clear();
+ for (int i = 0; i < 900; i++)
+ {
+ smallRecord.setSimpleField(i + "", bufStr);
+ }
+
+ String path1 = "/" + root + "/test1";
+ zkClient.createPersistent(path1, true);
+ zkClient.writeData(path1, smallRecord);
+
+ ZNRecord record = zkClient.readData(path1);
+ Assert.assertTrue(serializer.serialize(record).length > 900 * 1024);
+
+ // oversized data doesn't create any data on zk
+ // prepare a znode of size larger than 1m
+ final ZNRecord largeRecord = new ZNRecord("oversize");
+ largeRecord.getSimpleFields().clear();
+ for (int i = 0; i < 1024; i++)
+ {
+ largeRecord.setSimpleField(i + "", bufStr);
+ }
+ String path2 = "/" + root + "/test2";
+ zkClient.createPersistent(path2, true);
+ try
+ {
+ zkClient.writeData(path2, largeRecord);
+ Assert.fail("Should fail because data size is larger than 1M");
+ }
+ catch (HelixException e)
+ {
+ // OK
+ }
+ record = zkClient.readData(path2);
+ Assert.assertNull(record);
+
+ // oversized write doesn't overwrite existing data on zk
+ record = zkClient.readData(path1);
+ try
+ {
+ zkClient.writeData(path1, largeRecord);
+ Assert.fail("Should fail because data size is larger than 1M");
+ }
+ catch (HelixException e)
+ {
+ // OK
+ }
+ ZNRecord recordNew = zkClient.readData(path1);
+ byte[] arr = serializer.serialize(record);
+ byte[] arrNew = serializer.serialize(recordNew);
+ Assert.assertTrue(Arrays.equals(arr, arrNew));
+
+ // test ZkDataAccessor
+ ZKHelixAdmin admin = new ZKHelixAdmin(zkClient);
+ admin.addCluster(className, true);
+ InstanceConfig instanceConfig = new InstanceConfig("localhost_12918");
+ admin.addInstance(className, instanceConfig);
+
+ // oversized data should not create any new data on zk
+ ZKHelixDataAccessor accessor =
+ new ZKHelixDataAccessor(className, new ZkBaseDataAccessor(zkClient));
+ Builder keyBuilder = accessor.keyBuilder();
+
+ IdealState idealState = new IdealState("currentState");
+ idealState.setStateModelDefRef("MasterSlave");
+ idealState.setIdealStateMode("AUTO");
+ idealState.setNumPartitions(10);
+
+ for (int i = 0; i < 1024; i++)
+ {
+ idealState.getRecord().setSimpleField(i + "", bufStr);
+ }
+ boolean succeed = accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
+ Assert.assertFalse(succeed);
+ HelixProperty property =
+ accessor.getProperty(keyBuilder.stateTransitionStatus("localhost_12918",
+ "session_1",
+ "partition_1"));
+ Assert.assertNull(property);
+
+ // legal sized data gets written to zk
+ idealState.getRecord().getSimpleFields().clear();
+ idealState.setStateModelDefRef("MasterSlave");
+ idealState.setIdealStateMode("AUTO");
+ idealState.setNumPartitions(10);
+
+ for (int i = 0; i < 900; i++)
+ {
+ idealState.getRecord().setSimpleField(i + "", bufStr);
+ }
+ succeed = accessor.setProperty(keyBuilder.idealStates("TestDB1"), idealState);
+ Assert.assertTrue(succeed);
+ record =
+ accessor.getProperty(keyBuilder.idealStates("TestDB1")).getRecord();
+ Assert.assertTrue(serializer.serialize(record).length > 900 * 1024);
+
+ // oversized data should not update existing data on zk
+ idealState.getRecord().getSimpleFields().clear();
+ idealState.setStateModelDefRef("MasterSlave");
+ idealState.setIdealStateMode("AUTO");
+ idealState.setNumPartitions(10);
+ for (int i = 900; i < 1024; i++)
+ {
+ idealState.getRecord().setSimpleField(i + "", bufStr);
+ }
+ // System.out.println("record: " + idealState.getRecord());
+ succeed =
+ accessor.updateProperty(keyBuilder.idealStates("TestDB1"), idealState);
+ Assert.assertFalse(succeed);
+ recordNew =
+ accessor.getProperty(keyBuilder.idealStates("TestDB1")).getRecord();
+ arr = serializer.serialize(record);
+ arrNew = serializer.serialize(recordNew);
+ Assert.assertTrue(Arrays.equals(arr, arrNew));
+
+ System.out.println("END testZNRecordSizeLimitUseZNRecordSerializer at "
+ + new Date(System.currentTimeMillis()));
+ }
+
+ @Test
+ public void testZNRecordSizeLimitUseZNRecordStreamingSerializer()
+ {
+ String className = getShortClassName();
+ System.out.println("START testZNRecordSizeLimitUseZNRecordStreamingSerializer at "
+ + new Date(System.currentTimeMillis()));
+
+ ZNRecordStreamingSerializer serializer = new ZNRecordStreamingSerializer();
+ ZkClient zkClient = new ZkClient(ZK_ADDR);
+ zkClient.setZkSerializer(serializer);
+ String root = className;
+ byte[] buf = new byte[1024];
+ for (int i = 0; i < 1024; i++)
+ {
+ buf[i] = 'a';
+ }
+ String bufStr = new String(buf);
+
+ // test zkClient
+ // legal-sized data gets written to zk
+ // write a znode of size less than 1m
+ final ZNRecord smallRecord = new ZNRecord("normalsize");
+ smallRecord.getSimpleFields().clear();
+ for (int i = 0; i < 900; i++)
+ {
+ smallRecord.setSimpleField(i + "", bufStr);
+ }
+
+ String path1 = "/" + root + "/test1";
+ zkClient.createPersistent(path1, true);
+ zkClient.writeData(path1, smallRecord);
+
+ ZNRecord record = zkClient.readData(path1);
+ Assert.assertTrue(serializer.serialize(record).length > 900 * 1024);
+
+ // oversized data doesn't create any data on zk
+ // prepare a znode of size larger than 1m
+ final ZNRecord largeRecord = new ZNRecord("oversize");
+ largeRecord.getSimpleFields().clear();
+ for (int i = 0; i < 1024; i++)
+ {
+ largeRecord.setSimpleField(i + "", bufStr);
+ }
+ String path2 = "/" + root + "/test2";
+ zkClient.createPersistent(path2, true);
+ try
+ {
+ zkClient.writeData(path2, largeRecord);
+ Assert.fail("Should fail because data size is larger than 1M");
+ }
+ catch (HelixException e)
+ {
+ // OK
+ }
+ record = zkClient.readData(path2);
+ Assert.assertNull(record);
+
+ // oversized write doesn't overwrite existing data on zk
+ record = zkClient.readData(path1);
+ try
+ {
+ zkClient.writeData(path1, largeRecord);
+ Assert.fail("Should fail because data size is larger than 1M");
+ }
+ catch (HelixException e)
+ {
+ // OK
+ }
+ ZNRecord recordNew = zkClient.readData(path1);
+ byte[] arr = serializer.serialize(record);
+ byte[] arrNew = serializer.serialize(recordNew);
+ Assert.assertTrue(Arrays.equals(arr, arrNew));
+
+ // test ZkDataAccessor
+ ZKHelixAdmin admin = new ZKHelixAdmin(zkClient);
+ admin.addCluster(className, true);
+ InstanceConfig instanceConfig = new InstanceConfig("localhost_12918");
+ admin.addInstance(className, instanceConfig);
+
+ // oversized data should not create any new data on zk
+ ZKHelixDataAccessor accessor =
+ new ZKHelixDataAccessor(className, new ZkBaseDataAccessor(zkClient));
+ Builder keyBuilder = accessor.keyBuilder();
+
+// ZNRecord statusUpdates = new ZNRecord("statusUpdates");
+ IdealState idealState = new IdealState("currentState");
+ idealState.setStateModelDefRef("MasterSlave");
+ idealState.setIdealStateMode("AUTO");
+ idealState.setNumPartitions(10);
+
+ for (int i = 0; i < 1024; i++)
+ {
+ idealState.getRecord().setSimpleField(i + "", bufStr);
+ }
+ boolean succeed =
+ accessor.setProperty(keyBuilder.idealStates("TestDB_1"),
+ idealState);
+ Assert.assertFalse(succeed);
+ HelixProperty property =
+ accessor.getProperty(keyBuilder.idealStates("TestDB_1"));
+ Assert.assertNull(property);
+
+ // legal sized data gets written to zk
+ idealState.getRecord().getSimpleFields().clear();
+ idealState.setStateModelDefRef("MasterSlave");
+ idealState.setIdealStateMode("AUTO");
+ idealState.setNumPartitions(10);
+
+ for (int i = 0; i < 900; i++)
+ {
+ idealState.getRecord().setSimpleField(i + "", bufStr);
+ }
+ succeed =
+ accessor.setProperty(keyBuilder.idealStates("TestDB_2"),
+ idealState);
+ Assert.assertTrue(succeed);
+ record =
+ accessor.getProperty(keyBuilder.idealStates("TestDB_2")).getRecord();
+ Assert.assertTrue(serializer.serialize(record).length > 900 * 1024);
+
+ // oversized data should not update existing data on zk
+ idealState.getRecord().getSimpleFields().clear();
+ idealState.setStateModelDefRef("MasterSlave");
+ idealState.setIdealStateMode("AUTO");
+ idealState.setNumPartitions(10);
+
+ for (int i = 900; i < 1024; i++)
+ {
+ idealState.getRecord().setSimpleField(i + "", bufStr);
+ }
+ // System.out.println("record: " + idealState.getRecord());
+ succeed =
+ accessor.updateProperty(keyBuilder.idealStates("TestDB_2"),
+ idealState);
+ Assert.assertFalse(succeed);
+ recordNew =
+ accessor.getProperty(keyBuilder.idealStates("TestDB_2")).getRecord();
+ arr = serializer.serialize(record);
+ arrNew = serializer.serialize(recordNew);
+ Assert.assertTrue(Arrays.equals(arr, arrNew));
+
+ System.out.println("END testZNRecordSizeLimitUseZNRecordStreamingSerializer at "
+ + new Date(System.currentTimeMillis()));
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBaseDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBaseDataAccessor.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBaseDataAccessor.java
new file mode 100644
index 0000000..031cedf
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBaseDataAccessor.java
@@ -0,0 +1,318 @@
+package org.apache.helix.manager.zk;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import org.I0Itec.zkclient.DataUpdater;
+import org.apache.helix.AccessOption;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZNRecordUpdater;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.zookeeper.data.Stat;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestZkBaseDataAccessor extends ZkUnitTestBase
+{
+ @Test
+ public void testSyncZkBaseDataAccessor()
+ {
+ System.out.println("START TestZkBaseDataAccessor.sync at " + new Date(System.currentTimeMillis()));
+
+ String root = "TestZkBaseDataAccessor_syn";
+ ZkClient zkClient = new ZkClient(ZK_ADDR);
+ zkClient.setZkSerializer(new ZNRecordSerializer());
+ zkClient.deleteRecursive("/" + root);
+
+ BaseDataAccessor<ZNRecord> accessor = new ZkBaseDataAccessor<ZNRecord>(zkClient);
+
+ // test sync create
+ for (int i = 0; i < 10; i++)
+ {
+ String msgId = "msg_" + i;
+ String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
+ boolean success = accessor.create(path, new ZNRecord(msgId), AccessOption.PERSISTENT);
+ Assert.assertTrue(success, "Should succeed in create");
+ }
+
+ // test get what we created
+ for (int i = 0; i < 10; i++)
+ {
+ String msgId = "msg_" + i;
+ String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
+ ZNRecord record = zkClient.readData(path);
+ Assert.assertEquals(record.getId(), msgId, "Should get what we created");
+ }
+
+ // test sync set
+ for (int i = 0; i < 10; i++)
+ {
+ String msgId = "msg_" + i;
+ String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
+ ZNRecord newRecord = new ZNRecord(msgId);
+ newRecord.setSimpleField("key1", "value1");
+ boolean success = accessor.set(path, newRecord, AccessOption.PERSISTENT);
+ Assert.assertTrue(success, "Should succeed in set");
+ }
+
+ // test get what we set
+ for (int i = 0; i < 10; i++)
+ {
+ String msgId = "msg_" + i;
+ String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
+ ZNRecord record = zkClient.readData(path);
+ Assert.assertEquals(record.getSimpleFields().size(), 1, "Should have 1 simple field set");
+ Assert.assertEquals(record.getSimpleField("key1"), "value1", "Should have value1 set");
+ }
+
+ // test sync update
+ for (int i = 0; i < 10; i++)
+ {
+ String msgId = "msg_" + i;
+ String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
+ ZNRecord newRecord = new ZNRecord(msgId);
+ newRecord.setSimpleField("key2", "value2");
+ boolean success = accessor.update(path, new ZNRecordUpdater(newRecord), AccessOption.PERSISTENT);
+ Assert.assertTrue(success, "Should succeed in update");
+ }
+
+ // test get what we updated
+ for (int i = 0; i < 10; i++)
+ {
+ String msgId = "msg_" + i;
+ String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
+ ZNRecord record = zkClient.readData(path);
+ Assert.assertEquals(record.getSimpleFields().size(), 2, "Should have 2 simple fields set");
+ Assert.assertEquals(record.getSimpleField("key2"), "value2", "Should have value2 set");
+ }
+
+ // test sync get
+ for (int i = 0; i < 10; i++)
+ {
+ String msgId = "msg_" + i;
+ String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
+ ZNRecord record = accessor.get(path, null, 0);
+ Assert.assertEquals(record.getSimpleFields().size(), 2, "Should have 2 simple fields set");
+ Assert.assertEquals(record.getSimpleField("key1"), "value1", "Should have value1 set");
+ Assert.assertEquals(record.getSimpleField("key2"), "value2", "Should have value2 set");
+ }
+
+ // test sync exist
+ for (int i = 0; i < 10; i++)
+ {
+ String msgId = "msg_" + i;
+ String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
+ boolean exists = accessor.exists(path, 0);
+ Assert.assertTrue(exists, "Should exist");
+ }
+
+ // test getStat()
+ for (int i = 0; i < 10; i++)
+ {
+ String msgId = "msg_" + i;
+ String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
+ Stat stat = accessor.getStat(path, 0);
+ Assert.assertNotNull(stat, "Stat should exist");
+ Assert.assertEquals(stat.getVersion(), 2, "DataVersion should be 2, since we set 1 and update 1");
+ }
+
+ // test sync remove
+ for (int i = 0; i < 10; i++)
+ {
+ String msgId = "msg_" + i;
+ String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
+ boolean success = accessor.remove(path, 0);
+ Assert.assertTrue(success, "Should remove");
+ }
+
+ // test get what we removed
+ for (int i = 0; i < 10; i++)
+ {
+ String msgId = "msg_" + i;
+ String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
+ boolean exists = zkClient.exists(path);
+ Assert.assertFalse(exists, "Should be removed");
+ }
+
+ zkClient.close();
+ System.out.println("END TestZkBaseDataAccessor.sync at " + new Date(System.currentTimeMillis()));
+ }
+
+ @Test
+ public void testAsyncZkBaseDataAccessor()
+ {
+ System.out.println("START TestZkBaseDataAccessor.async at " + new Date(System.currentTimeMillis()));
+
+ String root = "TestZkBaseDataAccessor_asyn";
+ ZkClient zkClient = new ZkClient(ZK_ADDR);
+ zkClient.setZkSerializer(new ZNRecordSerializer());
+ zkClient.deleteRecursive("/" + root);
+
+ ZkBaseDataAccessor<ZNRecord> accessor = new ZkBaseDataAccessor<ZNRecord>(zkClient);
+
+ // test async createChildren
+ String parentPath = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1");
+ List<ZNRecord> records = new ArrayList<ZNRecord>();
+ List<String> paths = new ArrayList<String>();
+ for (int i = 0; i < 10; i++)
+ {
+ String msgId = "msg_" + i;
+ paths.add(PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1",msgId));
+ records.add(new ZNRecord(msgId));
+ }
+ boolean[] success = accessor.createChildren(paths, records, AccessOption.PERSISTENT);
+ for (int i = 0; i < 10; i++)
+ {
+ String msgId = "msg_" + i;
+ Assert.assertTrue(success[i], "Should succeed in create " + msgId);
+ }
+
+ // test get what we created
+ for (int i = 0; i < 10; i++)
+ {
+ String msgId = "msg_" + i;
+ String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1", msgId);
+ ZNRecord record = zkClient.readData(path);
+ Assert.assertEquals(record.getId(), msgId, "Should get what we created");
+ }
+
+ // test async setChildren
+ parentPath = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1");
+ records = new ArrayList<ZNRecord>();
+ paths = new ArrayList<String>();
+ for (int i = 0; i < 10; i++)
+ {
+ String msgId = "msg_" + i;
+ paths.add(PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1",msgId));
+ ZNRecord newRecord = new ZNRecord(msgId);
+ newRecord.setSimpleField("key1", "value1");
+ records.add(newRecord);
+ }
+ success = accessor.setChildren(paths, records, AccessOption.PERSISTENT);
+ for (int i = 0; i < 10; i++)
+ {
+ String msgId = "msg_" + i;
+ Assert.assertTrue(success[i], "Should succeed in set " + msgId);
+ }
+
+ // test get what we set
+ for (int i = 0; i < 10; i++)
+ {
+ String msgId = "msg_" + i;
+ String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1", msgId);
+ ZNRecord record = zkClient.readData(path);
+ Assert.assertEquals(record.getSimpleFields().size(), 1, "Should have 1 simple field set");
+ Assert.assertEquals(record.getSimpleField("key1"), "value1", "Should have value1 set");
+ }
+
+ // test async updateChildren
+ parentPath = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1");
+// records = new ArrayList<ZNRecord>();
+ List<DataUpdater<ZNRecord>> znrecordUpdaters = new ArrayList<DataUpdater<ZNRecord>>();
+ paths = new ArrayList<String>();
+ for (int i = 0; i < 10; i++)
+ {
+ String msgId = "msg_" + i;
+ paths.add(PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1",msgId));
+ ZNRecord newRecord = new ZNRecord(msgId);
+ newRecord.setSimpleField("key2", "value2");
+// records.add(newRecord);
+ znrecordUpdaters.add(new ZNRecordUpdater(newRecord));
+ }
+ success = accessor.updateChildren(paths, znrecordUpdaters, AccessOption.PERSISTENT);
+ for (int i = 0; i < 10; i++)
+ {
+ String msgId = "msg_" + i;
+ Assert.assertTrue(success[i], "Should succeed in update " + msgId);
+ }
+
+ // test get what we updated
+ for (int i = 0; i < 10; i++)
+ {
+ String msgId = "msg_" + i;
+ String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1", msgId);
+ ZNRecord record = zkClient.readData(path);
+ Assert.assertEquals(record.getSimpleFields().size(), 2, "Should have 2 simple fields set");
+ Assert.assertEquals(record.getSimpleField("key2"), "value2", "Should have value2 set");
+ }
+
+ // test async getChildren
+ parentPath = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1");
+ records = accessor.getChildren(parentPath, null, 0);
+ for (int i = 0; i < 10; i++)
+ {
+ String msgId = "msg_" + i;
+ ZNRecord record = records.get(i);
+ Assert.assertEquals(record.getId(), msgId, "Should get what we updated");
+ Assert.assertEquals(record.getSimpleFields().size(), 2, "Should have 2 simple fields set");
+ Assert.assertEquals(record.getSimpleField("key2"), "value2", "Should have value2 set");
+ }
+
+ // test async exists
+ parentPath = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1");
+ paths = new ArrayList<String>();
+ for (int i = 0; i < 10; i++)
+ {
+ String msgId = "msg_" + i;
+ paths.add(PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1", msgId));
+ }
+ boolean[] exists = accessor.exists(paths, 0);
+ for (int i = 0; i < 10; i++)
+ {
+ String msgId = "msg_" + i;
+ Assert.assertTrue(exists[i], "Should exist " + msgId);
+ }
+
+ // test async getStats
+ parentPath = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1");
+ paths = new ArrayList<String>();
+ for (int i = 0; i < 10; i++)
+ {
+ String msgId = "msg_" + i;
+ paths.add(PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1", msgId));
+ }
+ Stat[] stats = accessor.getStats(paths, 0);
+ for (int i = 0; i < 10; i++)
+ {
+ String msgId = "msg_" + i;
+ Assert.assertNotNull(stats[i], "Stat should exist for " + msgId);
+ Assert.assertEquals(stats[i].getVersion(), 2, "DataVersion should be 2, since we set 1 and update 1 for " + msgId);
+ }
+
+ // test async remove
+ parentPath = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1");
+ paths = new ArrayList<String>();
+ for (int i = 0; i < 10; i++)
+ {
+ String msgId = "msg_" + i;
+ paths.add(PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1", msgId));
+ }
+ success = accessor.remove(paths, 0);
+ for (int i = 0; i < 10; i++)
+ {
+ String msgId = "msg_" + i;
+ Assert.assertTrue(success[i], "Should succeed in remove " + msgId);
+ }
+
+ // test get what we removed
+ for (int i = 0; i < 10; i++)
+ {
+ String msgId = "msg_" + i;
+ String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1", msgId);
+ boolean pathExists = zkClient.exists(path);
+ Assert.assertFalse(pathExists, "Should be removed " + msgId);
+ }
+
+ zkClient.close();
+ System.out.println("END TestZkBaseDataAccessor.async at " + new Date(System.currentTimeMillis()));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkCacheAsyncOpSingleThread.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkCacheAsyncOpSingleThread.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkCacheAsyncOpSingleThread.java
new file mode 100644
index 0000000..55cdc70
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkCacheAsyncOpSingleThread.java
@@ -0,0 +1,382 @@
+package org.apache.helix.manager.zk;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+
+import org.I0Itec.zkclient.DataUpdater;
+import org.apache.helix.AccessOption;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZNRecordUpdater;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkCacheBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestZkCacheAsyncOpSingleThread extends ZkUnitTestBase
+{
+ @Test
+ public void testHappyPathExtOpZkCacheBaseDataAccessor() throws Exception
+ {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ System.out.println("START " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ // init external base data accessor
+ ZkClient extZkclient = new ZkClient(ZK_ADDR);
+ extZkclient.setZkSerializer(new ZNRecordSerializer());
+ ZkBaseDataAccessor<ZNRecord> extBaseAccessor =
+ new ZkBaseDataAccessor<ZNRecord>(extZkclient);
+
+ // init zkCacheBaseDataAccessor
+ String curStatePath =
+ PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
+ clusterName,
+ "localhost_8901");
+ String extViewPath =
+ PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, clusterName);
+
+ ZkBaseDataAccessor<ZNRecord> baseAccessor =
+ new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+
+ extBaseAccessor.create(curStatePath, null, AccessOption.PERSISTENT);
+
+ List<String> zkCacheInitPaths = Arrays.asList(curStatePath, extViewPath);
+ ZkCacheBaseDataAccessor<ZNRecord> accessor =
+ new ZkCacheBaseDataAccessor<ZNRecord>(baseAccessor, null, null, zkCacheInitPaths);
+
+ // TestHelper.printCache(accessor._zkCache);
+ boolean ret =
+ TestHelper.verifyZkCache(zkCacheInitPaths,
+ accessor._zkCache._cache,
+ _gZkClient,
+ true);
+ Assert.assertTrue(ret, "zkCache doesn't match data on Zk");
+
+ // create 10 current states using external base accessor
+ List<String> paths = new ArrayList<String>();
+ List<ZNRecord> records = new ArrayList<ZNRecord>();
+ for (int i = 0; i < 10; i++)
+ {
+ String path =
+ PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
+ clusterName,
+ "localhost_8901",
+ "session_0",
+ "TestDB" + i);
+ ZNRecord record = new ZNRecord("TestDB" + i);
+
+ paths.add(path);
+ records.add(record);
+ }
+
+ boolean[] success = extBaseAccessor.createChildren(paths, records, AccessOption.PERSISTENT);
+ for (int i = 0; i < 10; i++)
+ {
+ Assert.assertTrue(success[i], "Should succeed in create: " + paths.get(i));
+ }
+
+ // wait zkEventThread update zkCache
+ Thread.sleep(100);
+
+ // verify wtCache
+ // TestHelper.printCache(accessor._zkCache);
+
+ ret =
+ TestHelper.verifyZkCache(zkCacheInitPaths,
+ accessor._zkCache._cache,
+ _gZkClient,
+ true);
+ // System.out.println("ret: " + ret);
+ Assert.assertTrue(ret, "zkCache doesn't match data on Zk");
+
+ // update each current state 10 times by external base accessor
+ List<DataUpdater<ZNRecord>> updaters = new ArrayList<DataUpdater<ZNRecord>>();
+ for (int j = 0; j < 10; j++)
+ {
+ paths.clear();
+ updaters.clear();
+ for (int i = 0; i < 10; i++)
+ {
+ String path = curStatePath + "/session_0/TestDB" + i;
+ ZNRecord newRecord = new ZNRecord("TestDB" + i);
+ newRecord.setSimpleField("" + j, "" + j);
+ DataUpdater<ZNRecord> updater = new ZNRecordUpdater(newRecord);
+ paths.add(path);
+ updaters.add(updater);
+ }
+ success = extBaseAccessor.updateChildren(paths, updaters, AccessOption.PERSISTENT);
+
+ for (int i = 0; i < 10; i++)
+ {
+ Assert.assertTrue(success[i], "Should succeed in update: " + paths.get(i));
+ }
+ }
+
+ // wait zkEventThread update zkCache
+ Thread.sleep(100);
+
+ // verify cache
+ // TestHelper.printCache(accessor._zkCache);
+ ret =
+ TestHelper.verifyZkCache(zkCacheInitPaths,
+ accessor._zkCache._cache,
+ _gZkClient,
+ true);
+ Assert.assertTrue(ret, "zkCache doesn't match data on Zk");
+
+ // set 10 external views by external accessor
+ paths.clear();
+ records.clear();
+ for (int i = 0; i < 10; i++)
+ {
+ String path =
+ PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, clusterName, "TestDB" + i);
+ ZNRecord record = new ZNRecord("TestDB" + i);
+
+ paths.add(path);
+ records.add(record);
+ }
+ success = extBaseAccessor.setChildren(paths, records, AccessOption.PERSISTENT);
+ for (int i = 0; i < 10; i++)
+ {
+ Assert.assertTrue(success[i], "Should succeed in set: " + paths.get(i));
+ }
+
+ // wait zkEventThread update zkCache
+ Thread.sleep(100);
+
+ // verify cache
+ // TestHelper.printCache(accessor._zkCache._cache);
+ ret =
+ TestHelper.verifyZkCache(zkCacheInitPaths,
+ accessor._zkCache._cache,
+ _gZkClient,
+ true);
+ Assert.assertTrue(ret, "zkCache doesn't match data on Zk");
+
+ // remove 10 external views by external accessor
+ paths.clear();
+ for (int i = 0; i < 10; i++)
+ {
+ String path =
+ PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, clusterName, "TestDB" + i);
+
+ paths.add(path);
+ }
+ success = extBaseAccessor.remove(paths, 0);
+ for (int i = 0; i < 10; i++)
+ {
+ Assert.assertTrue(success[i], "Should succeed in remove: " + paths.get(i));
+ }
+
+ // wait zkEventThread update zkCache
+ Thread.sleep(100);
+
+ // verify cache
+ // TestHelper.printCache(accessor._zkCache._cache);
+ ret =
+ TestHelper.verifyZkCache(zkCacheInitPaths,
+ accessor._zkCache._cache,
+ _gZkClient,
+ true);
+ Assert.assertTrue(ret, "zkCache doesn't match data on Zk");
+
+ // clean up
+ extZkclient.close();
+ System.out.println("END " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ }
+
+ @Test
+ public void testHappyPathSelfOpZkCacheBaseDataAccessor() throws Exception
+ {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ System.out.println("START " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ // init zkCacheDataAccessor
+ String curStatePath =
+ PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
+ clusterName,
+ "localhost_8901");
+ String extViewPath =
+ PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, clusterName);
+
+ ZkBaseDataAccessor<ZNRecord> baseAccessor =
+ new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+
+ baseAccessor.create(curStatePath, null, AccessOption.PERSISTENT);
+
+ List<String> zkCacheInitPaths = Arrays.asList(curStatePath, extViewPath);
+ ZkCacheBaseDataAccessor<ZNRecord> accessor =
+ new ZkCacheBaseDataAccessor<ZNRecord>(baseAccessor, null, null, zkCacheInitPaths);
+
+ // TestHelper.printCache(accessor._zkCache._cache);
+ boolean ret =
+ TestHelper.verifyZkCache(zkCacheInitPaths,
+ accessor._zkCache._cache,
+ _gZkClient,
+ true);
+ Assert.assertTrue(ret, "zkCache doesn't match data on Zk");
+
+ // create 10 current states using this accessor
+ List<String> paths = new ArrayList<String>();
+ List<ZNRecord> records = new ArrayList<ZNRecord>();
+ for (int i = 0; i < 10; i++)
+ {
+ String path =
+ PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
+ clusterName,
+ "localhost_8901",
+ "session_0",
+ "TestDB" + i);
+ ZNRecord record = new ZNRecord("TestDB" + i);
+
+ paths.add(path);
+ records.add(record);
+ }
+
+ boolean[] success = accessor.createChildren(paths, records, AccessOption.PERSISTENT);
+ for (int i = 0; i < 10; i++)
+ {
+ Assert.assertTrue(success[i], "Should succeed in create: " + paths.get(i));
+ }
+
+ // verify cache
+// TestHelper.printCache(accessor._zkCache._cache);
+ ret =
+ TestHelper.verifyZkCache(zkCacheInitPaths,
+ accessor._zkCache._cache,
+ _gZkClient,
+ false);
+ Assert.assertTrue(ret, "zkCache doesn't match data on Zk");
+
+ // update each current state 10 times by this accessor
+ List<DataUpdater<ZNRecord>> updaters = new ArrayList<DataUpdater<ZNRecord>>();
+ for (int j = 0; j < 10; j++)
+ {
+ paths.clear();
+ updaters.clear();
+ for (int i = 0; i < 10; i++)
+ {
+ String path = curStatePath + "/session_0/TestDB" + i;
+ ZNRecord newRecord = new ZNRecord("TestDB" + i);
+ newRecord.setSimpleField("" + j, "" + j);
+ DataUpdater<ZNRecord> updater = new ZNRecordUpdater(newRecord);
+ paths.add(path);
+ updaters.add(updater);
+ }
+ success = accessor.updateChildren(paths, updaters, AccessOption.PERSISTENT);
+
+ for (int i = 0; i < 10; i++)
+ {
+ Assert.assertTrue(success[i], "Should succeed in update: " + paths.get(i));
+ }
+ }
+
+ // verify cache
+ // TestHelper.printCache(accessor._zkCache._cache);
+ ret =
+ TestHelper.verifyZkCache(zkCacheInitPaths,
+ zkCacheInitPaths,
+ accessor._zkCache._cache,
+ _gZkClient,
+ true);
+ // ret = TestHelper.verifyZkCache(zkCacheInitPaths, accessor, _gZkClient, true);
+ // System.out.println("ret: " + ret);
+ Assert.assertTrue(ret, "zkCache doesn't match data on Zk");
+
+ // set 10 external views 10 times by this accessor
+ paths.clear();
+ records.clear();
+ for (int j = 0; j < 10; j++)
+ {
+ for (int i = 0; i < 10; i++)
+ {
+ String path =
+ PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, clusterName, "TestDB"
+ + i);
+ ZNRecord record = new ZNRecord("TestDB" + i);
+ record.setSimpleField("setKey", "" + j);
+
+ paths.add(path);
+ records.add(record);
+ }
+ success = accessor.setChildren(paths, records, AccessOption.PERSISTENT);
+ for (int i = 0; i < 10; i++)
+ {
+ Assert.assertTrue(success[i], "Should succeed in set: " + paths.get(i));
+ }
+ }
+
+ // verify cache
+ // TestHelper.printCache(accessor._zkCache._cache);
+ ret =
+ TestHelper.verifyZkCache(zkCacheInitPaths,
+ accessor._zkCache._cache,
+ _gZkClient,
+ true);
+ // System.out.println("ret: " + ret);
+ Assert.assertTrue(ret, "zkCache doesn't match data on Zk");
+
+ // get 10 external views
+ paths.clear();
+ records.clear();
+ for (int i = 0; i < 10; i++)
+ {
+ String path = extViewPath + "/TestDB" + i;
+ paths.add(path);
+ }
+
+ records = accessor.get(paths, null, 0);
+ for (int i = 0; i < 10; i++)
+ {
+ Assert.assertEquals(records.get(i).getId(), "TestDB" + i);
+ }
+
+ // getChildren
+ records.clear();
+ records = accessor.getChildren(extViewPath, null, 0);
+ for (int i = 0; i < 10; i++)
+ {
+ Assert.assertEquals(records.get(i).getId(), "TestDB" + i);
+ }
+
+ // // exists
+ paths.clear();
+ for (int i = 0; i < 10; i++)
+ {
+ String path = curStatePath + "/session_0/TestDB" + i;
+ // // PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
+ // // clusterName,
+ // // "localhost_8901",
+ // // "session_0",
+ // // "TestDB" + i);
+ paths.add(path);
+ }
+ success = accessor.exists(paths, 0);
+ for (int i = 0; i < 10; i++)
+ {
+ Assert.assertTrue(success[i], "Should exits: " + paths.get(i));
+ }
+
+ System.out.println("END " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkCacheSyncOpSingleThread.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkCacheSyncOpSingleThread.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkCacheSyncOpSingleThread.java
new file mode 100644
index 0000000..98c6cc3
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkCacheSyncOpSingleThread.java
@@ -0,0 +1,209 @@
+package org.apache.helix.manager.zk;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.helix.AccessOption;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZNRecordUpdater;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkCacheBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.store.HelixPropertyListener;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestZkCacheSyncOpSingleThread extends ZkUnitTestBase
+{
+ class TestListener implements HelixPropertyListener
+ {
+ ConcurrentLinkedQueue<String> _deletePathQueue = new ConcurrentLinkedQueue<String>();
+ ConcurrentLinkedQueue<String> _createPathQueue = new ConcurrentLinkedQueue<String>();
+ ConcurrentLinkedQueue<String> _changePathQueue = new ConcurrentLinkedQueue<String>();
+
+ @Override
+ public void onDataDelete(String path)
+ {
+ // System.out.println(Thread.currentThread().getName() + ", onDelete: " + path);
+ _deletePathQueue.add(path);
+ }
+
+ @Override
+ public void onDataCreate(String path)
+ {
+ // System.out.println(Thread.currentThread().getName() + ", onCreate: " + path);
+ _createPathQueue.add(path);
+ }
+
+ @Override
+ public void onDataChange(String path)
+ {
+ // System.out.println(Thread.currentThread().getName() + ", onChange: " + path);
+ _changePathQueue.add(path);
+ }
+
+ public void reset()
+ {
+ _deletePathQueue.clear();
+ _createPathQueue.clear();
+ _changePathQueue.clear();
+ }
+ }
+
+ @Test
+ public void testZkCacheCallbackExternalOpNoChroot() throws Exception
+ {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ System.out.println("START " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ // init external base data accessor
+ ZkClient zkclient = new ZkClient(ZK_ADDR);
+ zkclient.setZkSerializer(new ZNRecordSerializer());
+ ZkBaseDataAccessor<ZNRecord> extBaseAccessor =
+ new ZkBaseDataAccessor<ZNRecord>(zkclient);
+
+ // init zkCacheDataAccessor
+ String curStatePath =
+ PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
+ clusterName,
+ "localhost_8901");
+ String extViewPath =
+ PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, clusterName);
+
+ ZkBaseDataAccessor<ZNRecord> baseAccessor =
+ new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+
+ extBaseAccessor.create(curStatePath, null, AccessOption.PERSISTENT);
+
+ List<String> cachePaths = Arrays.asList(curStatePath, extViewPath);
+ ZkCacheBaseDataAccessor<ZNRecord> accessor =
+ new ZkCacheBaseDataAccessor<ZNRecord>(baseAccessor, null, null, cachePaths);
+ // TestHelper.printCache(accessor._zkCache._cache);
+
+ TestListener listener = new TestListener();
+ accessor.subscribe(curStatePath, listener);
+
+ // create 10 current states
+ List<String> createPaths = new ArrayList<String>();
+ for (int i = 0; i < 10; i++)
+ {
+ String path = curStatePath + "/session_0/TestDB" + i;
+ createPaths.add(path);
+ boolean success =
+ extBaseAccessor.create(path,
+ new ZNRecord("TestDB" + i),
+ AccessOption.PERSISTENT);
+ Assert.assertTrue(success, "Should succeed in create: " + path);
+ }
+
+ Thread.sleep(500);
+
+ // verify cache
+ // TestHelper.printCache(accessor._zkCache._cache);
+ boolean ret =
+ TestHelper.verifyZkCache(cachePaths, accessor._zkCache._cache, _gZkClient, true);
+ // System.out.println("ret: " + ret);
+ Assert.assertTrue(ret, "zkCache doesn't match data on Zk");
+ System.out.println("createCnt: " + listener._createPathQueue.size());
+ Assert.assertEquals(listener._createPathQueue.size(),
+ 11,
+ "Shall get 11 onCreate callbacks.");
+
+ // verify each callback path
+ createPaths.add(curStatePath + "/session_0");
+ List<String> createCallbackPaths = new ArrayList<String>(listener._createPathQueue);
+ Collections.sort(createPaths);
+ Collections.sort(createCallbackPaths);
+ // System.out.println("createCallbackPaths: " + createCallbackPaths);
+ Assert.assertEquals(createCallbackPaths,
+ createPaths,
+ "Should get create callbacks at " + createPaths + ", but was "
+ + createCallbackPaths);
+
+ // update each current state, single thread
+ List<String> updatePaths = new ArrayList<String>();
+ listener.reset();
+ for (int i = 0; i < 10; i++)
+ {
+ String path = curStatePath + "/session_0/TestDB" + i;
+ for (int j = 0; j < 10; j++)
+ {
+ updatePaths.add(path);
+ ZNRecord newRecord = new ZNRecord("TestDB" + i);
+ newRecord.setSimpleField("" + j, "" + j);
+ boolean success =
+ accessor.update(path, new ZNRecordUpdater(newRecord), AccessOption.PERSISTENT);
+ Assert.assertTrue(success, "Should succeed in update: " + path);
+ }
+ }
+ Thread.sleep(500);
+
+ // verify cache
+ // TestHelper.printCache(accessor._zkCache._cache);
+ ret =
+ TestHelper.verifyZkCache(cachePaths, accessor._zkCache._cache, _gZkClient, true);
+ // System.out.println("ret: " + ret);
+ Assert.assertTrue(ret, "zkCache doesn't match data on Zk");
+ System.out.println("changeCnt: " + listener._changePathQueue.size());
+ Assert.assertEquals(listener._changePathQueue.size(),
+ 100,
+ "Shall get 100 onChange callbacks.");
+
+ // verify each callback path
+ List<String> updateCallbackPaths = new ArrayList<String>(listener._changePathQueue);
+ Collections.sort(updatePaths);
+ Collections.sort(updateCallbackPaths);
+ Assert.assertEquals(updateCallbackPaths,
+ updatePaths,
+ "Should get change callbacks at " + updatePaths + ", but was "
+ + updateCallbackPaths);
+
+ // remove 10 current states
+ List<String> removePaths = new ArrayList<String>();
+ listener.reset();
+ for (int i = 0; i < 10; i++)
+ {
+ String path = curStatePath + "/session_0/TestDB" + i;
+ removePaths.add(path);
+ boolean success = accessor.remove(path, AccessOption.PERSISTENT);
+ Assert.assertTrue(success, "Should succeed in remove: " + path);
+ }
+ Thread.sleep(500);
+
+ // verify cache
+ // TestHelper.printCache(accessor._zkCache._cache);
+ ret =
+ TestHelper.verifyZkCache(cachePaths, accessor._zkCache._cache, _gZkClient, true);
+ // System.out.println("ret: " + ret);
+ Assert.assertTrue(ret, "zkCache doesn't match data on Zk");
+ System.out.println("deleteCnt: " + listener._deletePathQueue.size());
+ Assert.assertEquals(listener._deletePathQueue.size(),
+ 10,
+ "Shall get 10 onDelete callbacks.");
+
+ // verify each callback path
+ List<String> removeCallbackPaths = new ArrayList<String>(listener._deletePathQueue);
+ Collections.sort(removePaths);
+ Collections.sort(removeCallbackPaths);
+ Assert.assertEquals(removeCallbackPaths,
+ removePaths,
+ "Should get remove callbacks at " + removePaths + ", but was "
+ + removeCallbackPaths);
+
+ System.out.println("END " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java
new file mode 100644
index 0000000..4054679
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java
@@ -0,0 +1,146 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.manager.zk;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.ConfigScope;
+import org.apache.helix.ConfigScopeBuilder;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixException;
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.manager.MockListener;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.store.PropertyStore;
+import org.testng.Assert;
+import org.testng.AssertJUnit;
+import org.testng.annotations.Test;
+
+
+public class TestZkClusterManager extends ZkUnitTestBase
+{
+ final String className = getShortClassName();
+
+ @Test()
+ public void testController() throws Exception
+ {
+ System.out.println("START " + className + ".testController() at " + new Date(System.currentTimeMillis()));
+ final String clusterName = CLUSTER_PREFIX + "_" + className + "_controller";
+
+ // basic test
+ if (_gZkClient.exists("/" + clusterName))
+ {
+ _gZkClient.deleteRecursive("/" + clusterName);
+ }
+
+ ZKHelixManager controller = new ZKHelixManager(clusterName, null,
+ InstanceType.CONTROLLER,
+ ZK_ADDR);
+ try
+ {
+ controller.connect();
+ Assert.fail("Should throw HelixException if initial cluster structure is not setup");
+ } catch (HelixException e)
+ {
+ // OK
+ }
+
+ TestHelper.setupEmptyCluster(_gZkClient, clusterName);
+
+ controller.connect();
+ AssertJUnit.assertTrue(controller.isConnected());
+ controller.connect();
+ AssertJUnit.assertTrue(controller.isConnected());
+
+ MockListener listener = new MockListener();
+ listener.reset();
+
+ try
+ {
+ controller.addControllerListener(null);
+ Assert.fail("Should throw HelixException");
+ } catch (HelixException e)
+ {
+ // OK
+ }
+
+ controller.addControllerListener(listener);
+ AssertJUnit.assertTrue(listener.isControllerChangeListenerInvoked);
+ controller.removeListener(listener);
+
+ PropertyStore<ZNRecord> store = controller.getPropertyStore();
+ ZNRecord record = new ZNRecord("node_1");
+ store.setProperty("node_1", record);
+ record = store.getProperty("node_1");
+ AssertJUnit.assertEquals("node_1", record.getId());
+
+ controller.getMessagingService();
+ controller.getHealthReportCollector();
+ controller.getClusterManagmentTool();
+
+ controller.handleNewSession();
+ controller.disconnect();
+ AssertJUnit.assertFalse(controller.isConnected());
+
+ System.out.println("END " + className + ".testController() at " + new Date(System.currentTimeMillis()));
+ }
+
+ @Test()
+ public void testAdministrator() throws Exception
+ {
+ System.out.println("START " + className + ".testAdministrator() at " + new Date(System.currentTimeMillis()));
+ final String clusterName = CLUSTER_PREFIX + "_" + className + "_admin";
+
+ // basic test
+ if (_gZkClient.exists("/" + clusterName))
+ {
+ _gZkClient.deleteRecursive("/" + clusterName);
+ }
+
+ ZKHelixManager admin = new ZKHelixManager(clusterName, null,
+ InstanceType.ADMINISTRATOR,
+ ZK_ADDR);
+
+ TestHelper.setupEmptyCluster(_gZkClient, clusterName);
+
+ admin.connect();
+ AssertJUnit.assertTrue(admin.isConnected());
+
+ HelixAdmin adminTool = admin.getClusterManagmentTool();
+ ConfigScope scope = new ConfigScopeBuilder().forCluster(clusterName)
+ .forResource("testResource").forPartition("testPartition").build();
+ Map<String, String> properties = new HashMap<String, String>();
+ properties.put("pKey1", "pValue1");
+ properties.put("pKey2", "pValue2");
+ adminTool.setConfig(scope, properties);
+
+ properties = adminTool.getConfig(scope, TestHelper.setOf("pKey1", "pKey2"));
+ Assert.assertEquals(properties.size(), 2);
+ Assert.assertEquals(properties.get("pKey1"), "pValue1");
+ Assert.assertEquals(properties.get("pKey2"), "pValue2");
+
+ admin.disconnect();
+ AssertJUnit.assertFalse(admin.isConnected());
+
+ System.out.println("END " + className + ".testAdministrator() at " + new Date(System.currentTimeMillis()));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
new file mode 100644
index 0000000..c520b68
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
@@ -0,0 +1,198 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.manager.zk;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.ConfigScope;
+import org.apache.helix.ConfigScopeBuilder;
+import org.apache.helix.HelixException;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKUtil;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.StateModelDefinition;
+import org.testng.Assert;
+import org.testng.AssertJUnit;
+import org.testng.annotations.Test;
+
+
+public class TestZkHelixAdmin extends ZkUnitTestBase
+{
+ @Test()
+ public void testZkHelixAdmin()
+ {
+ System.out.println("START testZkHelixAdmin at " + new Date(System.currentTimeMillis()));
+
+ final String clusterName = getShortClassName();
+ if (_gZkClient.exists("/" + clusterName))
+ {
+ _gZkClient.deleteRecursive("/" + clusterName);
+ }
+
+ ZKHelixAdmin tool = new ZKHelixAdmin(_gZkClient);
+ tool.addCluster(clusterName, true);
+ Assert.assertTrue(ZKUtil.isClusterSetup(clusterName, _gZkClient));
+ tool.addCluster(clusterName, true);
+ Assert.assertTrue(ZKUtil.isClusterSetup(clusterName, _gZkClient));
+
+ List<String> list = tool.getClusters();
+ AssertJUnit.assertTrue(list.size() > 0);
+
+ try
+ {
+ tool.addCluster(clusterName, false);
+ Assert.fail("should fail if add an already existing cluster");
+ } catch (HelixException e)
+ {
+ // OK
+ }
+
+ InstanceConfig config = new InstanceConfig("host1_9999");
+ config.setHostName("host1");
+ config.setPort("9999");
+ tool.addInstance(clusterName, config);
+ tool.enableInstance(clusterName, "host1_9999", true);
+ String path = PropertyPathConfig.getPath(PropertyType.INSTANCES,
+ clusterName, "host1_9999");
+ AssertJUnit.assertTrue(_gZkClient.exists(path));
+
+ try
+ {
+ tool.addInstance(clusterName, config);
+ Assert.fail("should fail if add an alredy-existing instance");
+ } catch (HelixException e)
+ {
+ // OK
+ }
+ config = tool.getInstanceConfig(clusterName, "host1_9999");
+ AssertJUnit.assertEquals(config.getId(), "host1_9999");
+
+ tool.dropInstance(clusterName, config);
+ try
+ {
+ tool.getInstanceConfig(clusterName, "host1_9999");
+ Assert.fail("should fail if get a non-existent instance");
+ } catch (HelixException e)
+ {
+ // OK
+ }
+ try
+ {
+ tool.dropInstance(clusterName, config);
+ Assert.fail("should fail if drop on a non-existent instance");
+ } catch (HelixException e)
+ {
+ // OK
+ }
+ try
+ {
+ tool.enableInstance(clusterName, "host1_9999", false);
+ Assert.fail("should fail if enable a non-existent instance");
+ } catch (HelixException e)
+ {
+ // OK
+ }
+ ZNRecord stateModelRecord = new ZNRecord("id1");
+ try
+ {
+ tool.addStateModelDef(clusterName, "id1", new StateModelDefinition(
+ stateModelRecord));
+ path = PropertyPathConfig.getPath(PropertyType.STATEMODELDEFS,
+ clusterName, "id1");
+ AssertJUnit.assertTrue(_gZkClient.exists(path));
+ Assert.fail("should fail");
+ } catch (HelixException e)
+ {
+ // OK
+ }
+ try
+ {
+ tool.addStateModelDef(clusterName, "id1", new StateModelDefinition(
+ stateModelRecord));
+ Assert.fail("should fail if add an already-existing state model");
+ } catch (HelixException e)
+ {
+ // OK
+ }
+ list = tool.getStateModelDefs(clusterName);
+ AssertJUnit.assertEquals(list.size(), 0);
+
+ try
+ {
+ tool.addResource(clusterName, "resource", 10,
+ "nonexistStateModelDef");
+ Assert
+ .fail("should fail if add a resource without an existing state model");
+ } catch (HelixException e)
+ {
+ // OK
+ }
+ try
+ {
+ tool.addResource(clusterName, "resource", 10, "id1");
+ Assert.fail("should fail");
+ } catch (HelixException e)
+ {
+ // OK
+ }
+ list = tool.getResourcesInCluster(clusterName);
+ AssertJUnit.assertEquals(list.size(), 0);
+ try
+ {
+ tool.addResource(clusterName, "resource", 10, "id1");
+ Assert.fail("should fail");
+ } catch (HelixException e)
+ {
+ // OK
+ }
+ list = tool.getResourcesInCluster(clusterName);
+ AssertJUnit.assertEquals(list.size(), 0);
+
+ ExternalView resourceExternalView = tool.getResourceExternalView(
+ clusterName, "resource");
+ AssertJUnit.assertNull(resourceExternalView);
+
+ // test config support
+ ConfigScope scope = new ConfigScopeBuilder().forCluster(clusterName)
+ .forResource("testResource").forPartition("testPartition").build();
+ Map<String, String> properties = new HashMap<String, String>();
+ properties.put("pKey1", "pValue1");
+ properties.put("pKey2", "pValue2");
+
+ // make sure calling set/getConfig() many times will not drain zkClient resources
+ // int nbOfZkClients = ZkClient.getNumberOfConnections();
+ for (int i = 0; i < 100; i++)
+ {
+ tool.setConfig(scope, properties);
+ Map<String, String> newProperties = tool.getConfig(scope, properties.keySet());
+ Assert.assertEquals(newProperties.size(), 2);
+ Assert.assertEquals(newProperties.get("pKey1"), "pValue1");
+ Assert.assertEquals(newProperties.get("pKey2"), "pValue2");
+ }
+ // Assert.assertTrue(ZkClient.getNumberOfConnections() - nbOfZkClients < 5);
+
+ System.out.println("END testZkHelixAdmin at " + new Date(System.currentTimeMillis()));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/messaging/TestAsyncCallback.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/TestAsyncCallback.java b/helix-core/src/test/java/org/apache/helix/messaging/TestAsyncCallback.java
new file mode 100644
index 0000000..adda6cb
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/messaging/TestAsyncCallback.java
@@ -0,0 +1,127 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.messaging;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.helix.messaging.AsyncCallback;
+import org.apache.helix.model.Message;
+import org.testng.AssertJUnit;
+import org.testng.annotations.Test;
+
+
+public class TestAsyncCallback
+{
+ class AsyncCallbackSample extends AsyncCallback
+ {
+ int _onTimeOutCalled = 0;
+ int _onReplyMessageCalled = 0;
+ @Override
+ public void onTimeOut()
+ {
+ // TODO Auto-generated method stub
+ _onTimeOutCalled ++;
+ }
+
+ @Override
+ public void onReplyMessage(Message message)
+ {
+ _onReplyMessageCalled++;
+ }
+ }
+
+ @Test()
+ public void testAsyncCallback() throws Exception
+ {
+ System.out.println("START TestAsyncCallback at " + new Date(System.currentTimeMillis()));
+ AsyncCallbackSample callback = new AsyncCallbackSample();
+ AssertJUnit.assertFalse(callback.isInterrupted());
+ AssertJUnit.assertFalse(callback.isTimedOut());
+ AssertJUnit.assertTrue(callback.getMessageReplied().size() == 0);
+
+ int nMsgs = 5;
+
+ List<Message> messageSent = new ArrayList<Message>();
+ for(int i = 0;i < nMsgs; i++)
+ {
+ messageSent.add(new Message("Test", UUID.randomUUID().toString()));
+ }
+
+ callback.setMessagesSent(messageSent);
+
+ for(int i = 0;i < nMsgs; i++)
+ {
+ AssertJUnit.assertFalse(callback.isDone());
+ callback.onReply(new Message("TestReply", UUID.randomUUID().toString()));
+ }
+ AssertJUnit.assertTrue(callback.isDone());
+
+ AssertJUnit.assertTrue(callback._onTimeOutCalled == 0 );
+
+ sleep(50);
+ callback = new AsyncCallbackSample();
+ callback.setMessagesSent(messageSent);
+ callback.setTimeout(1000);
+ sleep(50);
+ callback.startTimer();
+ AssertJUnit.assertFalse(callback.isTimedOut());
+ for(int i = 0;i < nMsgs - 1; i++)
+ {
+ sleep(50);
+ AssertJUnit.assertFalse(callback.isDone());
+ AssertJUnit.assertTrue(callback._onReplyMessageCalled == i);
+ callback.onReply(new Message("TestReply", UUID.randomUUID().toString()));
+ }
+ sleep(1000);
+ AssertJUnit.assertTrue(callback.isTimedOut());
+ AssertJUnit.assertTrue(callback._onTimeOutCalled == 1 );
+ AssertJUnit.assertFalse(callback.isDone());
+
+ callback = new AsyncCallbackSample();
+ callback.setMessagesSent(messageSent);
+ callback.setTimeout(1000);
+ callback.startTimer();
+ sleep(50);
+ AssertJUnit.assertFalse(callback.isTimedOut());
+ for(int i = 0;i < nMsgs; i++)
+ {
+ AssertJUnit.assertFalse(callback.isDone());
+ sleep(50);
+ AssertJUnit.assertTrue(callback._onReplyMessageCalled == i);
+ callback.onReply(new Message("TestReply", UUID.randomUUID().toString()));
+ }
+ AssertJUnit.assertTrue(callback.isDone());
+ sleep(1300);
+ AssertJUnit.assertFalse(callback.isTimedOut());
+ AssertJUnit.assertTrue(callback._onTimeOutCalled == 0 );
+ System.out.println("END TestAsyncCallback at " + new Date(System.currentTimeMillis()));
+ }
+
+ void sleep(int time)
+ {
+ try
+ {
+ Thread.sleep(time);
+ }
+ catch(Exception e)
+ {
+ System.out.println(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/messaging/TestAsyncCallbackSvc.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/TestAsyncCallbackSvc.java b/helix-core/src/test/java/org/apache/helix/messaging/TestAsyncCallbackSvc.java
new file mode 100644
index 0000000..7908ab2
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/messaging/TestAsyncCallbackSvc.java
@@ -0,0 +1,128 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.messaging;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.Mocks;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.messaging.AsyncCallback;
+import org.apache.helix.messaging.handling.AsyncCallbackService;
+import org.apache.helix.messaging.handling.MessageHandler;
+import org.apache.helix.messaging.handling.TestHelixTaskExecutor.MockClusterManager;
+import org.apache.helix.model.Message;
+import org.testng.annotations.Test;
+import org.testng.AssertJUnit;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestAsyncCallbackSvc
+{
+ class MockHelixManager extends Mocks.MockManager
+ {
+ public String getSessionId()
+ {
+ return "123";
+ }
+ }
+
+ class TestAsyncCallback extends AsyncCallback
+ {
+ HashSet<String> _repliedMessageId = new HashSet<String>();
+ @Override
+ public void onTimeOut()
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void onReplyMessage(Message message)
+ {
+ // TODO Auto-generated method stub
+ _repliedMessageId.add(message.getMsgId());
+ }
+
+ }
+ @Test(groups =
+ { "unitTest" })
+ public void testAsyncCallbackSvc() throws Exception
+ {
+ AsyncCallbackService svc = new AsyncCallbackService();
+ HelixManager manager = new MockHelixManager();
+ NotificationContext changeContext = new NotificationContext(manager);
+
+ Message msg = new Message(svc.getMessageType(), UUID.randomUUID().toString());
+ msg.setTgtSessionId(manager.getSessionId());
+ try
+ {
+ MessageHandler aHandler = svc.createHandler(msg, changeContext);
+ }
+ catch(HelixException e)
+ {
+ AssertJUnit.assertTrue(e.getMessage().indexOf(msg.getMsgId())!= -1);
+ }
+ Message msg2 = new Message("RandomType", UUID.randomUUID().toString());
+ msg2.setTgtSessionId(manager.getSessionId());
+ try
+ {
+ MessageHandler aHandler = svc.createHandler(msg2, changeContext);
+ }
+ catch(HelixException e)
+ {
+ AssertJUnit.assertTrue(e.getMessage().indexOf(msg2.getMsgId())!= -1);
+ }
+ Message msg3 = new Message(svc.getMessageType(), UUID.randomUUID().toString());
+ msg3.setTgtSessionId(manager.getSessionId());
+ msg3.setCorrelationId("wfwegw");
+ try
+ {
+ MessageHandler aHandler = svc.createHandler(msg3, changeContext);
+ }
+ catch(HelixException e)
+ {
+ AssertJUnit.assertTrue(e.getMessage().indexOf(msg3.getMsgId())!= -1);
+ }
+
+ TestAsyncCallback callback = new TestAsyncCallback();
+ String corrId = UUID.randomUUID().toString();
+ svc.registerAsyncCallback(corrId, new TestAsyncCallback());
+ svc.registerAsyncCallback(corrId, callback);
+
+ List<Message> msgSent = new ArrayList<Message>();
+ msgSent.add(new Message("Test", UUID.randomUUID().toString()));
+ callback.setMessagesSent(msgSent);
+
+ msg = new Message(svc.getMessageType(), UUID.randomUUID().toString());
+ msg.setTgtSessionId("*");
+ msg.setCorrelationId(corrId);
+
+ MessageHandler aHandler = svc.createHandler(msg, changeContext);
+ Map<String, String> resultMap = new HashMap<String, String>();
+ aHandler.handleMessage();
+
+ AssertJUnit.assertTrue(callback.isDone());
+ AssertJUnit.assertTrue(callback._repliedMessageId.contains(msg.getMsgId()));
+ }
+}