You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 00:26:41 UTC
[41/47] Refactoring from com.linkedin.helix to org.apache.helix
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/manager/zk/TestWtCacheSyncOpSingleThread.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/manager/zk/TestWtCacheSyncOpSingleThread.java b/helix-core/src/test/java/com/linkedin/helix/manager/zk/TestWtCacheSyncOpSingleThread.java
deleted file mode 100644
index fb5879e..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/manager/zk/TestWtCacheSyncOpSingleThread.java
+++ /dev/null
@@ -1,194 +0,0 @@
-package com.linkedin.helix.manager.zk;
-
-import java.util.Arrays;
-import java.util.Date;
-import java.util.List;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.AccessOption;
-import com.linkedin.helix.PropertyPathConfig;
-import com.linkedin.helix.PropertyType;
-import com.linkedin.helix.TestHelper;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.ZNRecordUpdater;
-import com.linkedin.helix.ZkUnitTestBase;
-
-public class TestWtCacheSyncOpSingleThread extends ZkUnitTestBase
-{
- // TODO: add TestZkCacheSyncOpSingleThread
- // TODO: add TestZkCacheAsyncOpMultiThread
- @Test
- public void testHappyPathZkCacheBaseDataAccessor() throws Exception
- {
- String className = TestHelper.getTestClassName();
- String methodName = TestHelper.getTestMethodName();
- String clusterName = className + "_" + methodName;
- System.out.println("START " + clusterName + " at "
- + new Date(System.currentTimeMillis()));
-
- // init zkCacheDataAccessor
- String curStatePath =
- PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
- clusterName,
- "localhost_8901");
- String extViewPath =
- PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, clusterName);
-
- ZkBaseDataAccessor<ZNRecord> baseAccessor =
- new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
-
- baseAccessor.create(curStatePath, null, AccessOption.PERSISTENT);
-
- List<String> cachePaths = Arrays.asList(curStatePath, extViewPath);
- ZkCacheBaseDataAccessor<ZNRecord> accessor =
- new ZkCacheBaseDataAccessor<ZNRecord>(baseAccessor,
- null,
- cachePaths,
- null);
-
- boolean ret = TestHelper.verifyZkCache(cachePaths, accessor._wtCache._cache, _gZkClient, true);
- Assert.assertTrue(ret, "wtCache doesn't match data on Zk");
-
-
- // create 10 current states
- for (int i = 0; i < 10; i++)
- {
- String path = curStatePath + "/session_0/TestDB" + i;
- boolean success =
- accessor.create(path, new ZNRecord("TestDB" + i), AccessOption.PERSISTENT);
- Assert.assertTrue(success, "Should succeed in create: " + path);
- }
-
- // verify wtCache
- // TestHelper.printCache(accessor._wtCache);
- ret = TestHelper.verifyZkCache(cachePaths, accessor._wtCache._cache, _gZkClient, false);
- Assert.assertTrue(ret, "wtCache doesn't match data on Zk");
-
- // update each current state 10 times, single thread
- for (int i = 0; i < 10; i++)
- {
- String path = curStatePath + "/session_0/TestDB" + i;
- for (int j = 0; j < 10; j++)
- {
- ZNRecord newRecord = new ZNRecord("TestDB" + i);
- newRecord.setSimpleField("" + j, "" + j);
- boolean success =
- accessor.update(path, new ZNRecordUpdater(newRecord), AccessOption.PERSISTENT);
- Assert.assertTrue(success, "Should succeed in update: " + path);
-
- }
- }
-
- // verify cache
-// TestHelper.printCache(accessor._wtCache._cache);
- ret = TestHelper.verifyZkCache(cachePaths, accessor._wtCache._cache, _gZkClient, false);
- Assert.assertTrue(ret, "wtCache doesn't match data on Zk");
-
- // set 10 external views
- for (int i = 0; i < 10; i++)
- {
- String path =
- PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, clusterName, "TestDB" + i);
- boolean success = accessor.set(path, new ZNRecord("TestDB" + i), AccessOption.PERSISTENT);
- Assert.assertTrue(success, "Should succeed in set: " + path);
- }
-
- // verify wtCache
- // accessor.printWtCache();
- ret = TestHelper.verifyZkCache(cachePaths, accessor._wtCache._cache, _gZkClient, false);
- Assert.assertTrue(ret, "wtCache doesn't match data on Zk");
-
-
- // get 10 external views
- for (int i = 0; i < 10; i++)
- {
- String path =
- PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, clusterName, "TestDB" + i);
- ZNRecord record = accessor.get(path, null, 0);
- Assert.assertEquals(record.getId(), "TestDB" + i);
- }
-
- // getChildNames
- List<String> childNames = accessor.getChildNames(extViewPath, 0);
- // System.out.println(childNames);
- Assert.assertEquals(childNames.size(), 10, "Should contain only: TestDB0-9");
- for (int i = 0; i < 10; i++)
- {
- Assert.assertTrue(childNames.contains("TestDB" + i));
- }
-
- // exists
- for (int i = 0; i < 10; i++)
- {
- String path =
- PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
- clusterName,
- "localhost_8901",
- "session_0",
- "TestDB" + i);
-
- Assert.assertTrue(accessor.exists(path, 0));
- }
-
- System.out.println("END " + clusterName + " at "
- + new Date(System.currentTimeMillis()));
- }
-
- @Test
- public void testCreateFailZkCacheBaseDataAccessor()
- {
- String className = TestHelper.getTestClassName();
- String methodName = TestHelper.getTestMethodName();
- String clusterName = className + "_" + methodName;
- System.out.println("START " + clusterName + " at "
- + new Date(System.currentTimeMillis()));
-
- // init zkCacheDataAccessor
- String curStatePath =
- PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
- clusterName,
- "localhost_8901");
-
- ZkBaseDataAccessor<ZNRecord> baseAccessor =
- new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
-
- ZkCacheBaseDataAccessor<ZNRecord> accessor =
- new ZkCacheBaseDataAccessor<ZNRecord>(baseAccessor,
- null,
- Arrays.asList(curStatePath),
- null);
-
- // create 10 current states
- for (int i = 0; i < 10; i++)
- {
- String path =
- PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
- clusterName,
- "localhost_8901",
- "session_1",
- "TestDB" + i);
- boolean success =
- accessor.create(path, new ZNRecord("TestDB" + i), AccessOption.PERSISTENT);
- Assert.assertTrue(success, "Should succeed in create: " + path);
- }
-
- // create same 10 current states again, should fail
- for (int i = 0; i < 10; i++)
- {
- String path =
- PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
- clusterName,
- "localhost_8901",
- "session_1",
- "TestDB" + i);
- boolean success =
- accessor.create(path, new ZNRecord("TestDB" + i), AccessOption.PERSISTENT);
- Assert.assertFalse(success, "Should fail in create due to NodeExists: " + path);
- }
-
- System.out.println("END " + clusterName + " at "
- + new Date(System.currentTimeMillis()));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/manager/zk/TestZKDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/manager/zk/TestZKDataAccessor.java b/helix-core/src/test/java/com/linkedin/helix/manager/zk/TestZKDataAccessor.java
deleted file mode 100644
index fbf7fcf..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/manager/zk/TestZKDataAccessor.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.manager.zk;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Date;
-import java.util.List;
-
-import org.apache.zookeeper.data.Stat;
-import org.testng.Assert;
-import org.testng.AssertJUnit;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.DataAccessor;
-import com.linkedin.helix.PropertyPathConfig;
-import com.linkedin.helix.PropertyType;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.ZkUnitTestBase;
-import com.linkedin.helix.model.ExternalView;
-import com.linkedin.helix.model.IdealState;
-import com.linkedin.helix.model.IdealState.IdealStateModeProperty;
-
-
-public class TestZKDataAccessor extends ZkUnitTestBase
-{
- private DataAccessor _accessor;
- private String _clusterName;
- private final String resource = "resource";
- private ZkClient _zkClient;
-
- @Test ()
- public void testSet()
- {
- IdealState idealState = new IdealState(resource);
- idealState.setNumPartitions(20);
- idealState.setReplicas(Integer.toString(2));
- idealState.setStateModelDefRef("StateModel1");
- idealState.setIdealStateMode(IdealStateModeProperty.AUTO.toString());
- boolean success = _accessor.setProperty(PropertyType.IDEALSTATES, idealState, resource);
- AssertJUnit.assertTrue(success);
- String path = PropertyPathConfig.getPath(PropertyType.IDEALSTATES, _clusterName, resource);
- AssertJUnit.assertTrue(_zkClient.exists(path));
- AssertJUnit.assertEquals(idealState.getRecord(), _zkClient.readData(path));
-
- idealState.setNumPartitions(20);
- success = _accessor.setProperty(PropertyType.IDEALSTATES, idealState, resource);
- AssertJUnit.assertTrue(success);
- AssertJUnit.assertTrue(_zkClient.exists(path));
- AssertJUnit.assertEquals(idealState.getRecord(), _zkClient.readData(path));
- }
-
- @Test ()
- public void testGet()
- {
- String path = PropertyPathConfig.getPath(PropertyType.IDEALSTATES, _clusterName, resource);
- IdealState idealState = new IdealState(resource);
- idealState.setIdealStateMode(IdealStateModeProperty.AUTO.toString());
-
- _zkClient.delete(path);
- _zkClient.createPersistent(new File(path).getParent(), true);
- _zkClient.createPersistent(path, idealState.getRecord());
- IdealState idealStateRead = _accessor.getProperty(IdealState.class, PropertyType.IDEALSTATES, resource);
- AssertJUnit.assertEquals(idealState.getRecord(), idealStateRead.getRecord());
- }
-
- @Test ()
- public void testRemove()
- {
- String path = PropertyPathConfig.getPath(PropertyType.IDEALSTATES, _clusterName, resource);
- IdealState idealState = new IdealState(resource);
- idealState.setIdealStateMode(IdealStateModeProperty.AUTO.toString());
-
- _zkClient.delete(path);
- _zkClient.createPersistent(new File(path).getParent(), true);
- _zkClient.createPersistent(path, idealState.getRecord());
- boolean success = _accessor.removeProperty(PropertyType.IDEALSTATES, resource);
- AssertJUnit.assertTrue(success);
- AssertJUnit.assertFalse(_zkClient.exists(path));
- IdealState idealStateRead = _accessor.getProperty(IdealState.class, PropertyType.IDEALSTATES, resource);
- AssertJUnit.assertNull(idealStateRead);
-
- }
-
- @Test ()
- public void testUpdate()
- {
- String path = PropertyPathConfig.getPath(PropertyType.IDEALSTATES, _clusterName, resource);
- IdealState idealState = new IdealState(resource);
- idealState.setIdealStateMode(IdealStateModeProperty.AUTO.toString());
-
- _zkClient.delete(path);
- _zkClient.createPersistent(new File(path).getParent(), true);
- _zkClient.createPersistent(path, idealState.getRecord());
- Stat stat = _zkClient.getStat(path);
-
- idealState.setIdealStateMode(IdealStateModeProperty.CUSTOMIZED.toString());
-
- boolean success = _accessor.updateProperty(PropertyType.IDEALSTATES, idealState, resource);
- AssertJUnit.assertTrue(success);
- AssertJUnit.assertTrue(_zkClient.exists(path));
- ZNRecord value = _zkClient.readData(path);
- AssertJUnit.assertEquals(idealState.getRecord(), value);
- Stat newstat = _zkClient.getStat(path);
-
- AssertJUnit.assertEquals(stat.getCtime(), newstat.getCtime());
- AssertJUnit.assertNotSame(stat.getMtime(), newstat.getMtime());
- AssertJUnit.assertTrue(stat.getMtime() < newstat.getMtime());
- }
-
- @Test ()
- public void testGetChildValues()
- {
- List<ExternalView> list = _accessor.getChildValues(ExternalView.class, PropertyType.EXTERNALVIEW, _clusterName);
- AssertJUnit.assertEquals(0, list.size());
- }
-
- @Test
- public void testBackToBackRemoveAndSet()
- {
- // CONFIG is cached
- _accessor.setProperty(PropertyType.CONFIGS, new ZNRecord("id1"), "config1");
- ZNRecord record = _accessor.getProperty(PropertyType.CONFIGS, "config1");
- // System.out.println(record.getId());
- Assert.assertEquals(record.getId(), "id1");
- String path = PropertyPathConfig.getPath(PropertyType.CONFIGS, _clusterName, "config1");
- _zkClient.delete(path);
- _zkClient.createPersistent(path, new ZNRecord("id1-new"));
- record = _accessor.getProperty(PropertyType.CONFIGS, "config1");
- // System.out.println(record.getId());
- Assert.assertEquals(record.getId(), "id1-new", "Should update cache since creation time is changed.");
- }
-
- @BeforeClass
- public void beforeClass() throws IOException, Exception
- {
- _clusterName = CLUSTER_PREFIX + "_" + getShortClassName();
-
- System.out.println("START TestZKDataAccessor at " + new Date(System.currentTimeMillis()));
- _zkClient = new ZkClient(ZK_ADDR);
- _zkClient.setZkSerializer(new ZNRecordSerializer());
-
- if (_zkClient.exists("/" + _clusterName))
- {
- _zkClient.deleteRecursive("/" + _clusterName);
- }
- _accessor = new ZKDataAccessor(_clusterName, _zkClient);
- }
-
- @AfterClass
- public void afterClass()
- {
- _zkClient.close();
- System.out.println("END TestZKDataAccessor at " + new Date(System.currentTimeMillis()));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/manager/zk/TestZKDataAccessorCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/manager/zk/TestZKDataAccessorCache.java b/helix-core/src/test/java/com/linkedin/helix/manager/zk/TestZKDataAccessorCache.java
deleted file mode 100644
index b24d2e8..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/manager/zk/TestZKDataAccessorCache.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.manager.zk;
-
-import java.io.IOException;
-import java.util.Date;
-import java.util.List;
-
-import org.apache.log4j.Logger;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.ConfigScope.ConfigScopeProperty;
-import com.linkedin.helix.PropertyPathConfig;
-import com.linkedin.helix.PropertyType;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.ZkUnitTestBase;
-
-public class TestZKDataAccessorCache extends ZkUnitTestBase
-{
- private static Logger LOG = Logger.getLogger(TestZKDataAccessorCache.class);
- private ZKDataAccessor _accessor;
- private String _clusterName;
- private ZkClient _zkClient;
-
- @BeforeClass
- public void beforeClass() throws IOException, Exception
- {
- _clusterName = CLUSTER_PREFIX + "_" + getShortClassName();
-
- System.out.println("START TestZKCacheDataAccessor at " + new Date(System.currentTimeMillis()));
- _zkClient = new ZkClient(ZK_ADDR);
- _zkClient.setZkSerializer(new ZNRecordSerializer());
-
- if (_zkClient.exists("/" + _clusterName))
- {
- _zkClient.deleteRecursive("/" + _clusterName);
- }
- _zkClient.createPersistent(
- PropertyPathConfig.getPath(PropertyType.CONFIGS, _clusterName,
- ConfigScopeProperty.CLUSTER.toString(), _clusterName), true);
- _zkClient.createPersistent(
- PropertyPathConfig.getPath(PropertyType.CONFIGS, _clusterName,
- ConfigScopeProperty.PARTICIPANT.toString()), true);
- _zkClient.createPersistent(
- PropertyPathConfig.getPath(PropertyType.CONFIGS, _clusterName,
- ConfigScopeProperty.RESOURCE.toString()), true);
- _zkClient.createPersistent(PropertyPathConfig.getPath(PropertyType.IDEALSTATES, _clusterName),
- true);
- _zkClient.createPersistent(PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, _clusterName),
- true);
- _zkClient.createPersistent(
- PropertyPathConfig.getPath(PropertyType.LIVEINSTANCES, _clusterName), true);
- _zkClient.createPersistent(
- PropertyPathConfig.getPath(PropertyType.STATEMODELDEFS, _clusterName), true);
- _zkClient.createPersistent(PropertyPathConfig.getPath(PropertyType.CURRENTSTATES, _clusterName,
- "localhost_12918", "123456"), true);
-
- _accessor = new ZKDataAccessor(_clusterName, _zkClient);
- }
-
- @AfterClass
- public void afterClass()
- {
- _zkClient.close();
- System.out.println("END TestZKCacheDataAccessor at " + new Date(System.currentTimeMillis()));
- }
-
- @Test
- public void testAccessorCache()
- {
- testAccessorCache(PropertyType.IDEALSTATES);
- testAccessorCache(PropertyType.STATEMODELDEFS);
- testAccessorCache(PropertyType.LIVEINSTANCES);
- testAccessorCache(PropertyType.CONFIGS, ConfigScopeProperty.PARTICIPANT.toString());
- testAccessorCache(PropertyType.EXTERNALVIEW);
- testAccessorCache(PropertyType.CURRENTSTATES, "localhost_12918", "123456");
- }
-
- private void testAccessorCache(PropertyType type, String... keys)
- {
- String parentPath = PropertyPathConfig.getPath(type, _clusterName, keys);
- _zkClient.createPersistent(parentPath + "/child1", new ZNRecord("child1"));
- ZNRecord record2 = new ZNRecord("child2");
- _zkClient.createPersistent(parentPath + "/child2", record2);
-
- List<ZNRecord> records = _accessor.getChildValues(type, keys);
- LOG.debug("records:" + records);
- Assert.assertNotNull(getRecord(records, "child1"));
- Assert.assertNotNull(getRecord(records, "child2"));
-
- // no data change
- List<ZNRecord> newRecords = _accessor.getChildValues(type, keys);
- LOG.debug("new records:" + newRecords);
- Assert.assertEquals(getRecord(newRecords, "child1"), getRecord(records, "child1"));
-
- // change value of an existing znode
- record2.setSimpleField("key1", "value1");
- _zkClient.writeData(parentPath + "/child2", record2);
- newRecords = _accessor.getChildValues(type, keys);
- LOG.debug("new records:" + newRecords);
- Assert.assertEquals(getRecord(newRecords, "child2").getSimpleField("key1"), "value1");
- Assert.assertNotSame(getRecord(newRecords, "child2"), getRecord(records, "child2"));
-
- // add a new child
- _zkClient.createPersistent(parentPath + "/child3", new ZNRecord("child3"));
- records = newRecords;
- newRecords = _accessor.getChildValues(type, keys);
- LOG.debug("new records:" + newRecords);
- Assert.assertNull(getRecord(records, "child3"));
- Assert.assertNotNull(getRecord(newRecords, "child3"));
-
- // delete a child
- _zkClient.delete(parentPath + "/child2");
- records = newRecords;
- newRecords = _accessor.getChildValues(type, keys);
- LOG.debug("new records:" + newRecords);
- Assert.assertNotNull(getRecord(records, "child2"));
- Assert.assertNull(getRecord(newRecords, "child2"),
- "Should be null, since child2 has been deleted");
- }
-
- private ZNRecord getRecord(List<ZNRecord> list, String id)
- {
- for (ZNRecord record : list)
- {
- if (record.getId().equals(id))
- {
- return record;
- }
- }
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/manager/zk/TestZKLiveInstanceData.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/manager/zk/TestZKLiveInstanceData.java b/helix-core/src/test/java/com/linkedin/helix/manager/zk/TestZKLiveInstanceData.java
deleted file mode 100644
index 581479a..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/manager/zk/TestZKLiveInstanceData.java
+++ /dev/null
@@ -1,150 +0,0 @@
-package com.linkedin.helix.manager.zk;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.HelixManagerFactory;
-import com.linkedin.helix.InstanceType;
-import com.linkedin.helix.LiveInstanceChangeListener;
-import com.linkedin.helix.NotificationContext;
-import com.linkedin.helix.PropertyKey;
-import com.linkedin.helix.ZkUnitTestBase;
-import com.linkedin.helix.model.LiveInstance;
-import com.linkedin.helix.tools.ClusterSetup;
-
-public class TestZKLiveInstanceData extends ZkUnitTestBase
-{
- private final String clusterName = CLUSTER_PREFIX + "_" + getShortClassName();
-
- @Test
- public void testDataChange() throws Exception
- {
- // Create an admin and add LiveInstanceChange listener to it
- HelixManager adminManager =
- HelixManagerFactory.getZKHelixManager(clusterName,
- null,
- InstanceType.ADMINISTRATOR,
- ZK_ADDR);
- adminManager.connect();
- final BlockingQueue<List<LiveInstance>> changeList =
- new LinkedBlockingQueue<List<LiveInstance>>();
-
- adminManager.addLiveInstanceChangeListener(new LiveInstanceChangeListener()
- {
- @Override
- public void onLiveInstanceChange(List<LiveInstance> liveInstances,
- NotificationContext changeContext)
- {
- // The queue is basically unbounded, so shouldn't throw exception when calling
- // "add".
- changeList.add(deepCopy(liveInstances));
- }
- });
-
- // Check the initial condition
- List<LiveInstance> instances = changeList.poll(1, TimeUnit.SECONDS);
- Assert.assertNotNull(instances, "Expecting a list of live instance");
- Assert.assertTrue(instances.isEmpty(), "Expecting an empty list of live instance");
- // Join as participant, should trigger a live instance change event
- HelixManager manager =
- HelixManagerFactory.getZKHelixManager(clusterName,
- "localhost_54321",
- InstanceType.PARTICIPANT,
- ZK_ADDR);
- manager.connect();
- instances = changeList.poll(1, TimeUnit.SECONDS);
- Assert.assertNotNull(instances, "Expecting a list of live instance");
- Assert.assertEquals(instances.size(), 1, "Expecting one live instance");
- Assert.assertEquals(instances.get(0).getInstanceName(), manager.getInstanceName());
- // Update data in the live instance node, should trigger another live instance change
- // event
- HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
- PropertyKey propertyKey =
- helixDataAccessor.keyBuilder().liveInstance(manager.getInstanceName());
- LiveInstance instance = helixDataAccessor.getProperty(propertyKey);
-
- Map<String, String> map = new TreeMap<String, String>();
- map.put("k1", "v1");
- instance.getRecord().setMapField("test", map);
- Assert.assertTrue(helixDataAccessor.updateProperty(propertyKey, instance),
- "Failed to update live instance node");
-
- instances = changeList.poll(1, TimeUnit.SECONDS);
- Assert.assertNotNull(instances, "Expecting a list of live instance");
- Assert.assertEquals(instances.get(0).getRecord().getMapField("test"),
- map,
- "Wrong map data.");
- manager.disconnect();
- Thread.sleep(1000); // wait for callback finish
-
- instances = changeList.poll(1, TimeUnit.SECONDS);
- Assert.assertNotNull(instances, "Expecting a list of live instance");
- Assert.assertTrue(instances.isEmpty(), "Expecting an empty list of live instance");
-
- adminManager.disconnect();
-
- }
-
- @BeforeClass()
- public void beforeClass() throws Exception
- {
- ZkClient zkClient = null;
- try
- {
- zkClient = new ZkClient(ZK_ADDR);
- zkClient.setZkSerializer(new ZNRecordSerializer());
- if (zkClient.exists("/" + clusterName))
- {
- zkClient.deleteRecursive("/" + clusterName);
- }
- }
- finally
- {
- if (zkClient != null)
- {
- zkClient.close();
- }
- }
-
- ClusterSetup.processCommandLineArgs(getArgs("-zkSvr",
- ZK_ADDR,
- "-addCluster",
- clusterName));
- ClusterSetup.processCommandLineArgs(getArgs("-zkSvr",
- ZK_ADDR,
- "-addNode",
- clusterName,
- "localhost:54321"));
- ClusterSetup.processCommandLineArgs(getArgs("-zkSvr",
- ZK_ADDR,
- "-addNode",
- clusterName,
- "localhost:54322"));
- }
-
- private String[] getArgs(String... args)
- {
- return args;
- }
-
- private List<LiveInstance> deepCopy(List<LiveInstance> instances)
- {
- List<LiveInstance> result = new ArrayList<LiveInstance>();
- for (LiveInstance instance : instances)
- {
- result.add(new LiveInstance(instance.getRecord()));
- }
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/manager/zk/TestZKPropertyTransferServer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/manager/zk/TestZKPropertyTransferServer.java b/helix-core/src/test/java/com/linkedin/helix/manager/zk/TestZKPropertyTransferServer.java
deleted file mode 100644
index 5cb0d99..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/manager/zk/TestZKPropertyTransferServer.java
+++ /dev/null
@@ -1,58 +0,0 @@
-package com.linkedin.helix.manager.zk;
-
-import org.apache.log4j.Logger;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.TestHelper;
-import com.linkedin.helix.TestHelper.StartCMResult;
-import com.linkedin.helix.controller.HelixControllerMain;
-import com.linkedin.helix.controller.restlet.ZKPropertyTransferServer;
-import com.linkedin.helix.integration.ZkStandAloneCMTestBaseWithPropertyServerCheck;
-
-public class TestZKPropertyTransferServer extends ZkStandAloneCMTestBaseWithPropertyServerCheck
-{
- private static Logger LOG =
- Logger.getLogger(TestZKPropertyTransferServer.class);
-
- @Test
- public void TestControllerChange() throws Exception
- {
- String controllerName = CONTROLLER_PREFIX + "_0";
- _startCMResultMap.get(controllerName)._manager.disconnect();
-
- Thread.sleep(1000);
-
- // kill controller, participant should not know about the svc url
- for (int i = 0; i < NODE_NR; i++)
- {
- String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
- HelixDataAccessor accessor = _startCMResultMap.get(instanceName)._manager.getHelixDataAccessor();
- ZKHelixDataAccessor zkAccessor = (ZKHelixDataAccessor) accessor;
- Assert.assertTrue(zkAccessor._zkPropertyTransferSvcUrl == null || zkAccessor._zkPropertyTransferSvcUrl.equals(""));
- }
- _startCMResultMap.get(controllerName)._thread.interrupt();
- _startCMResultMap.remove(controllerName);
-
- StartCMResult startResult =
- TestHelper.startController(CLUSTER_NAME,
- controllerName,
- ZK_ADDR,
- HelixControllerMain.STANDALONE);
- _startCMResultMap.put(controllerName, startResult);
-
- Thread.sleep(1000);
-
- // create controller again, the svc url is notified to the participants
- for (int i = 0; i < NODE_NR; i++)
- {
- String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
- HelixDataAccessor accessor = _startCMResultMap.get(instanceName)._manager.getHelixDataAccessor();
- ZKHelixDataAccessor zkAccessor = (ZKHelixDataAccessor) accessor;
- Assert.assertTrue(zkAccessor._zkPropertyTransferSvcUrl.equals(ZKPropertyTransferServer.getInstance().getWebserviceUrl()));
- }
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/manager/zk/TestZKUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/manager/zk/TestZKUtil.java b/helix-core/src/test/java/com/linkedin/helix/manager/zk/TestZKUtil.java
deleted file mode 100644
index dd7656b..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/manager/zk/TestZKUtil.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.manager.zk;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.log4j.Logger;
-import org.testng.AssertJUnit;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.ConfigScope.ConfigScopeProperty;
-import com.linkedin.helix.PropertyPathConfig;
-import com.linkedin.helix.PropertyType;
-import com.linkedin.helix.TestHelper;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.ZkUnitTestBase;
-
-public class TestZKUtil extends ZkUnitTestBase
-{
- private static Logger LOG = Logger.getLogger(TestZKUtil.class);
-
- String clusterName = CLUSTER_PREFIX + "_" + getShortClassName();
- ZkClient _zkClient;
-
- @BeforeClass()
- public void beforeClass() throws IOException, Exception
- {
- _zkClient = new ZkClient(ZK_ADDR);
- _zkClient.setZkSerializer(new ZNRecordSerializer());
- if (_zkClient.exists("/" + clusterName))
- {
- _zkClient.deleteRecursive("/" + clusterName);
- }
-
- boolean result = ZKUtil.isClusterSetup(clusterName, _zkClient);
- AssertJUnit.assertFalse(result);
- result = ZKUtil.isClusterSetup(null, _zkClient);
- AssertJUnit.assertFalse(result);
-
- result = ZKUtil.isClusterSetup(null, null);
- AssertJUnit.assertFalse(result);
-
- result = ZKUtil.isClusterSetup(clusterName, null);
- AssertJUnit.assertFalse(result);
-
- TestHelper.setupEmptyCluster(_zkClient, clusterName);
- }
-
- @AfterClass()
- public void afterClass()
- {
- _zkClient.close();
- }
-
- @Test()
- public void testIsClusterSetup()
- {
- boolean result = ZKUtil.isClusterSetup(clusterName, _zkClient);
- AssertJUnit.assertTrue(result);
- }
-
- @Test()
- public void testChildrenOperations()
- {
- List<ZNRecord> list = new ArrayList<ZNRecord>();
- list.add(new ZNRecord("id1"));
- list.add(new ZNRecord("id2"));
- String path = PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
- ConfigScopeProperty.PARTICIPANT.toString());
- ZKUtil.createChildren(_zkClient, path, list);
- list = ZKUtil.getChildren(_zkClient, path);
- AssertJUnit.assertEquals(2, list.size());
-
- ZKUtil.dropChildren(_zkClient, path, list);
- ZKUtil.dropChildren(_zkClient, path, new ZNRecord("id1"));
- list = ZKUtil.getChildren(_zkClient, path);
- AssertJUnit.assertEquals(0, list.size());
-
- ZKUtil.dropChildren(_zkClient, path, (List<ZNRecord>) null);
- }
-
- @Test()
- public void testUpdateIfExists()
- {
- String path = PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
- ConfigScopeProperty.PARTICIPANT.toString(), "id3");
- ZNRecord record = new ZNRecord("id4");
- ZKUtil.updateIfExists(_zkClient, path, record, false);
- AssertJUnit.assertFalse(_zkClient.exists(path));
- _zkClient.createPersistent(path);
- ZKUtil.updateIfExists(_zkClient, path, record, false);
- AssertJUnit.assertTrue(_zkClient.exists(path));
- record = _zkClient.<ZNRecord> readData(path);
- AssertJUnit.assertEquals("id4", record.getId());
- }
-
- @Test()
- public void testSubtract()
- {
- String path = PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
- ConfigScopeProperty.PARTICIPANT.toString(), "id5");
- ZNRecord record = new ZNRecord("id5");
- record.setSimpleField("key1", "value1");
- _zkClient.createPersistent(path, record);
- ZKUtil.subtract(_zkClient, path, record);
- record = _zkClient.<ZNRecord> readData(path);
- AssertJUnit.assertNull(record.getSimpleField("key1"));
- }
-
- @Test()
- public void testNullChildren()
- {
- String path = PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
- ConfigScopeProperty.PARTICIPANT.toString(), "id6");
- ZKUtil.createChildren(_zkClient, path, (List<ZNRecord>) null);
- }
-
- @Test()
- public void testCreateOrUpdate()
- {
- String path = PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
- ConfigScopeProperty.PARTICIPANT.toString(), "id7");
- ZNRecord record = new ZNRecord("id7");
- ZKUtil.createOrUpdate(_zkClient, path, record, true, true);
- record = _zkClient.<ZNRecord> readData(path);
- AssertJUnit.assertEquals("id7", record.getId());
- }
-
- @Test()
- public void testCreateOrReplace()
- {
- String path = PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
- ConfigScopeProperty.PARTICIPANT.toString(), "id8");
- ZNRecord record = new ZNRecord("id8");
- ZKUtil.createOrReplace(_zkClient, path, record, true);
- record = _zkClient.<ZNRecord> readData(path);
- AssertJUnit.assertEquals("id8", record.getId());
- record = new ZNRecord("id9");
- ZKUtil.createOrReplace(_zkClient, path, record, true);
- record = _zkClient.<ZNRecord> readData(path);
- AssertJUnit.assertEquals("id9", record.getId());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/manager/zk/TestZNRecordSizeLimit.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/manager/zk/TestZNRecordSizeLimit.java b/helix-core/src/test/java/com/linkedin/helix/manager/zk/TestZNRecordSizeLimit.java
deleted file mode 100644
index 9ed3a34..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/manager/zk/TestZNRecordSizeLimit.java
+++ /dev/null
@@ -1,322 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.manager.zk;
-
-import java.util.Arrays;
-import java.util.Date;
-
-import org.apache.log4j.Logger;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.HelixException;
-import com.linkedin.helix.HelixProperty;
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.ZkUnitTestBase;
-import com.linkedin.helix.model.IdealState;
-import com.linkedin.helix.model.InstanceConfig;
-
-public class TestZNRecordSizeLimit extends ZkUnitTestBase
-{
- private static Logger LOG = Logger.getLogger(TestZNRecordSizeLimit.class);
-
- @Test
- public void testZNRecordSizeLimitUseZNRecordSerializer()
- {
- String className = getShortClassName();
- System.out.println("START testZNRecordSizeLimitUseZNRecordSerializer at "
- + new Date(System.currentTimeMillis()));
-
- ZNRecordSerializer serializer = new ZNRecordSerializer();
- ZkClient zkClient = new ZkClient(ZK_ADDR);
- zkClient.setZkSerializer(serializer);
- String root = className;
- byte[] buf = new byte[1024];
- for (int i = 0; i < 1024; i++)
- {
- buf[i] = 'a';
- }
- String bufStr = new String(buf);
-
- // test zkClient
- // legal-sized data gets written to zk
- // write a znode of size less than 1m
- final ZNRecord smallRecord = new ZNRecord("normalsize");
- smallRecord.getSimpleFields().clear();
- for (int i = 0; i < 900; i++)
- {
- smallRecord.setSimpleField(i + "", bufStr);
- }
-
- String path1 = "/" + root + "/test1";
- zkClient.createPersistent(path1, true);
- zkClient.writeData(path1, smallRecord);
-
- ZNRecord record = zkClient.readData(path1);
- Assert.assertTrue(serializer.serialize(record).length > 900 * 1024);
-
- // oversized data doesn't create any data on zk
- // prepare a znode of size larger than 1m
- final ZNRecord largeRecord = new ZNRecord("oversize");
- largeRecord.getSimpleFields().clear();
- for (int i = 0; i < 1024; i++)
- {
- largeRecord.setSimpleField(i + "", bufStr);
- }
- String path2 = "/" + root + "/test2";
- zkClient.createPersistent(path2, true);
- try
- {
- zkClient.writeData(path2, largeRecord);
- Assert.fail("Should fail because data size is larger than 1M");
- }
- catch (HelixException e)
- {
- // OK
- }
- record = zkClient.readData(path2);
- Assert.assertNull(record);
-
- // oversized write doesn't overwrite existing data on zk
- record = zkClient.readData(path1);
- try
- {
- zkClient.writeData(path1, largeRecord);
- Assert.fail("Should fail because data size is larger than 1M");
- }
- catch (HelixException e)
- {
- // OK
- }
- ZNRecord recordNew = zkClient.readData(path1);
- byte[] arr = serializer.serialize(record);
- byte[] arrNew = serializer.serialize(recordNew);
- Assert.assertTrue(Arrays.equals(arr, arrNew));
-
- // test ZkDataAccessor
- ZKHelixAdmin admin = new ZKHelixAdmin(zkClient);
- admin.addCluster(className, true);
- InstanceConfig instanceConfig = new InstanceConfig("localhost_12918");
- admin.addInstance(className, instanceConfig);
-
- // oversized data should not create any new data on zk
- ZKHelixDataAccessor accessor =
- new ZKHelixDataAccessor(className, new ZkBaseDataAccessor(zkClient));
- Builder keyBuilder = accessor.keyBuilder();
-
- IdealState idealState = new IdealState("currentState");
- idealState.setStateModelDefRef("MasterSlave");
- idealState.setIdealStateMode("AUTO");
- idealState.setNumPartitions(10);
-
- for (int i = 0; i < 1024; i++)
- {
- idealState.getRecord().setSimpleField(i + "", bufStr);
- }
- boolean succeed = accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
- Assert.assertFalse(succeed);
- HelixProperty property =
- accessor.getProperty(keyBuilder.stateTransitionStatus("localhost_12918",
- "session_1",
- "partition_1"));
- Assert.assertNull(property);
-
- // legal sized data gets written to zk
- idealState.getRecord().getSimpleFields().clear();
- idealState.setStateModelDefRef("MasterSlave");
- idealState.setIdealStateMode("AUTO");
- idealState.setNumPartitions(10);
-
- for (int i = 0; i < 900; i++)
- {
- idealState.getRecord().setSimpleField(i + "", bufStr);
- }
- succeed = accessor.setProperty(keyBuilder.idealStates("TestDB1"), idealState);
- Assert.assertTrue(succeed);
- record =
- accessor.getProperty(keyBuilder.idealStates("TestDB1")).getRecord();
- Assert.assertTrue(serializer.serialize(record).length > 900 * 1024);
-
- // oversized data should not update existing data on zk
- idealState.getRecord().getSimpleFields().clear();
- idealState.setStateModelDefRef("MasterSlave");
- idealState.setIdealStateMode("AUTO");
- idealState.setNumPartitions(10);
- for (int i = 900; i < 1024; i++)
- {
- idealState.getRecord().setSimpleField(i + "", bufStr);
- }
- // System.out.println("record: " + idealState.getRecord());
- succeed =
- accessor.updateProperty(keyBuilder.idealStates("TestDB1"), idealState);
- Assert.assertFalse(succeed);
- recordNew =
- accessor.getProperty(keyBuilder.idealStates("TestDB1")).getRecord();
- arr = serializer.serialize(record);
- arrNew = serializer.serialize(recordNew);
- Assert.assertTrue(Arrays.equals(arr, arrNew));
-
- System.out.println("END testZNRecordSizeLimitUseZNRecordSerializer at "
- + new Date(System.currentTimeMillis()));
- }
-
- @Test
- public void testZNRecordSizeLimitUseZNRecordStreamingSerializer()
- {
- String className = getShortClassName();
- System.out.println("START testZNRecordSizeLimitUseZNRecordStreamingSerializer at "
- + new Date(System.currentTimeMillis()));
-
- ZNRecordStreamingSerializer serializer = new ZNRecordStreamingSerializer();
- ZkClient zkClient = new ZkClient(ZK_ADDR);
- zkClient.setZkSerializer(serializer);
- String root = className;
- byte[] buf = new byte[1024];
- for (int i = 0; i < 1024; i++)
- {
- buf[i] = 'a';
- }
- String bufStr = new String(buf);
-
- // test zkClient
- // legal-sized data gets written to zk
- // write a znode of size less than 1m
- final ZNRecord smallRecord = new ZNRecord("normalsize");
- smallRecord.getSimpleFields().clear();
- for (int i = 0; i < 900; i++)
- {
- smallRecord.setSimpleField(i + "", bufStr);
- }
-
- String path1 = "/" + root + "/test1";
- zkClient.createPersistent(path1, true);
- zkClient.writeData(path1, smallRecord);
-
- ZNRecord record = zkClient.readData(path1);
- Assert.assertTrue(serializer.serialize(record).length > 900 * 1024);
-
- // oversized data doesn't create any data on zk
- // prepare a znode of size larger than 1m
- final ZNRecord largeRecord = new ZNRecord("oversize");
- largeRecord.getSimpleFields().clear();
- for (int i = 0; i < 1024; i++)
- {
- largeRecord.setSimpleField(i + "", bufStr);
- }
- String path2 = "/" + root + "/test2";
- zkClient.createPersistent(path2, true);
- try
- {
- zkClient.writeData(path2, largeRecord);
- Assert.fail("Should fail because data size is larger than 1M");
- }
- catch (HelixException e)
- {
- // OK
- }
- record = zkClient.readData(path2);
- Assert.assertNull(record);
-
- // oversized write doesn't overwrite existing data on zk
- record = zkClient.readData(path1);
- try
- {
- zkClient.writeData(path1, largeRecord);
- Assert.fail("Should fail because data size is larger than 1M");
- }
- catch (HelixException e)
- {
- // OK
- }
- ZNRecord recordNew = zkClient.readData(path1);
- byte[] arr = serializer.serialize(record);
- byte[] arrNew = serializer.serialize(recordNew);
- Assert.assertTrue(Arrays.equals(arr, arrNew));
-
- // test ZkDataAccessor
- ZKHelixAdmin admin = new ZKHelixAdmin(zkClient);
- admin.addCluster(className, true);
- InstanceConfig instanceConfig = new InstanceConfig("localhost_12918");
- admin.addInstance(className, instanceConfig);
-
- // oversized data should not create any new data on zk
- ZKHelixDataAccessor accessor =
- new ZKHelixDataAccessor(className, new ZkBaseDataAccessor(zkClient));
- Builder keyBuilder = accessor.keyBuilder();
-
-// ZNRecord statusUpdates = new ZNRecord("statusUpdates");
- IdealState idealState = new IdealState("currentState");
- idealState.setStateModelDefRef("MasterSlave");
- idealState.setIdealStateMode("AUTO");
- idealState.setNumPartitions(10);
-
- for (int i = 0; i < 1024; i++)
- {
- idealState.getRecord().setSimpleField(i + "", bufStr);
- }
- boolean succeed =
- accessor.setProperty(keyBuilder.idealStates("TestDB_1"),
- idealState);
- Assert.assertFalse(succeed);
- HelixProperty property =
- accessor.getProperty(keyBuilder.idealStates("TestDB_1"));
- Assert.assertNull(property);
-
- // legal sized data gets written to zk
- idealState.getRecord().getSimpleFields().clear();
- idealState.setStateModelDefRef("MasterSlave");
- idealState.setIdealStateMode("AUTO");
- idealState.setNumPartitions(10);
-
- for (int i = 0; i < 900; i++)
- {
- idealState.getRecord().setSimpleField(i + "", bufStr);
- }
- succeed =
- accessor.setProperty(keyBuilder.idealStates("TestDB_2"),
- idealState);
- Assert.assertTrue(succeed);
- record =
- accessor.getProperty(keyBuilder.idealStates("TestDB_2")).getRecord();
- Assert.assertTrue(serializer.serialize(record).length > 900 * 1024);
-
- // oversized data should not update existing data on zk
- idealState.getRecord().getSimpleFields().clear();
- idealState.setStateModelDefRef("MasterSlave");
- idealState.setIdealStateMode("AUTO");
- idealState.setNumPartitions(10);
-
- for (int i = 900; i < 1024; i++)
- {
- idealState.getRecord().setSimpleField(i + "", bufStr);
- }
- // System.out.println("record: " + idealState.getRecord());
- succeed =
- accessor.updateProperty(keyBuilder.idealStates("TestDB_2"),
- idealState);
- Assert.assertFalse(succeed);
- recordNew =
- accessor.getProperty(keyBuilder.idealStates("TestDB_2")).getRecord();
- arr = serializer.serialize(record);
- arrNew = serializer.serialize(recordNew);
- Assert.assertTrue(Arrays.equals(arr, arrNew));
-
- System.out.println("END testZNRecordSizeLimitUseZNRecordStreamingSerializer at "
- + new Date(System.currentTimeMillis()));
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/manager/zk/TestZkBaseDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/manager/zk/TestZkBaseDataAccessor.java b/helix-core/src/test/java/com/linkedin/helix/manager/zk/TestZkBaseDataAccessor.java
deleted file mode 100644
index b734212..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/manager/zk/TestZkBaseDataAccessor.java
+++ /dev/null
@@ -1,315 +0,0 @@
-package com.linkedin.helix.manager.zk;
-
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-
-import org.I0Itec.zkclient.DataUpdater;
-import org.apache.zookeeper.data.Stat;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.AccessOption;
-import com.linkedin.helix.BaseDataAccessor;
-import com.linkedin.helix.PropertyPathConfig;
-import com.linkedin.helix.PropertyType;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.ZNRecordUpdater;
-import com.linkedin.helix.ZkUnitTestBase;
-
-public class TestZkBaseDataAccessor extends ZkUnitTestBase
-{
- @Test
- public void testSyncZkBaseDataAccessor()
- {
- System.out.println("START TestZkBaseDataAccessor.sync at " + new Date(System.currentTimeMillis()));
-
- String root = "TestZkBaseDataAccessor_syn";
- ZkClient zkClient = new ZkClient(ZK_ADDR);
- zkClient.setZkSerializer(new ZNRecordSerializer());
- zkClient.deleteRecursive("/" + root);
-
- BaseDataAccessor<ZNRecord> accessor = new ZkBaseDataAccessor<ZNRecord>(zkClient);
-
- // test sync create
- for (int i = 0; i < 10; i++)
- {
- String msgId = "msg_" + i;
- String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
- boolean success = accessor.create(path, new ZNRecord(msgId), AccessOption.PERSISTENT);
- Assert.assertTrue(success, "Should succeed in create");
- }
-
- // test get what we created
- for (int i = 0; i < 10; i++)
- {
- String msgId = "msg_" + i;
- String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
- ZNRecord record = zkClient.readData(path);
- Assert.assertEquals(record.getId(), msgId, "Should get what we created");
- }
-
- // test sync set
- for (int i = 0; i < 10; i++)
- {
- String msgId = "msg_" + i;
- String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
- ZNRecord newRecord = new ZNRecord(msgId);
- newRecord.setSimpleField("key1", "value1");
- boolean success = accessor.set(path, newRecord, AccessOption.PERSISTENT);
- Assert.assertTrue(success, "Should succeed in set");
- }
-
- // test get what we set
- for (int i = 0; i < 10; i++)
- {
- String msgId = "msg_" + i;
- String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
- ZNRecord record = zkClient.readData(path);
- Assert.assertEquals(record.getSimpleFields().size(), 1, "Should have 1 simple field set");
- Assert.assertEquals(record.getSimpleField("key1"), "value1", "Should have value1 set");
- }
-
- // test sync update
- for (int i = 0; i < 10; i++)
- {
- String msgId = "msg_" + i;
- String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
- ZNRecord newRecord = new ZNRecord(msgId);
- newRecord.setSimpleField("key2", "value2");
- boolean success = accessor.update(path, new ZNRecordUpdater(newRecord), AccessOption.PERSISTENT);
- Assert.assertTrue(success, "Should succeed in update");
- }
-
- // test get what we updated
- for (int i = 0; i < 10; i++)
- {
- String msgId = "msg_" + i;
- String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
- ZNRecord record = zkClient.readData(path);
- Assert.assertEquals(record.getSimpleFields().size(), 2, "Should have 2 simple fields set");
- Assert.assertEquals(record.getSimpleField("key2"), "value2", "Should have value2 set");
- }
-
- // test sync get
- for (int i = 0; i < 10; i++)
- {
- String msgId = "msg_" + i;
- String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
- ZNRecord record = accessor.get(path, null, 0);
- Assert.assertEquals(record.getSimpleFields().size(), 2, "Should have 2 simple fields set");
- Assert.assertEquals(record.getSimpleField("key1"), "value1", "Should have value1 set");
- Assert.assertEquals(record.getSimpleField("key2"), "value2", "Should have value2 set");
- }
-
- // test sync exist
- for (int i = 0; i < 10; i++)
- {
- String msgId = "msg_" + i;
- String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
- boolean exists = accessor.exists(path, 0);
- Assert.assertTrue(exists, "Should exist");
- }
-
- // test getStat()
- for (int i = 0; i < 10; i++)
- {
- String msgId = "msg_" + i;
- String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
- Stat stat = accessor.getStat(path, 0);
- Assert.assertNotNull(stat, "Stat should exist");
- Assert.assertEquals(stat.getVersion(), 2, "DataVersion should be 2, since we set 1 and update 1");
- }
-
- // test sync remove
- for (int i = 0; i < 10; i++)
- {
- String msgId = "msg_" + i;
- String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
- boolean success = accessor.remove(path, 0);
- Assert.assertTrue(success, "Should remove");
- }
-
- // test get what we removed
- for (int i = 0; i < 10; i++)
- {
- String msgId = "msg_" + i;
- String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
- boolean exists = zkClient.exists(path);
- Assert.assertFalse(exists, "Should be removed");
- }
-
- zkClient.close();
- System.out.println("END TestZkBaseDataAccessor.sync at " + new Date(System.currentTimeMillis()));
- }
-
- @Test
- public void testAsyncZkBaseDataAccessor()
- {
- System.out.println("START TestZkBaseDataAccessor.async at " + new Date(System.currentTimeMillis()));
-
- String root = "TestZkBaseDataAccessor_asyn";
- ZkClient zkClient = new ZkClient(ZK_ADDR);
- zkClient.setZkSerializer(new ZNRecordSerializer());
- zkClient.deleteRecursive("/" + root);
-
- ZkBaseDataAccessor<ZNRecord> accessor = new ZkBaseDataAccessor<ZNRecord>(zkClient);
-
- // test async createChildren
- String parentPath = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1");
- List<ZNRecord> records = new ArrayList<ZNRecord>();
- List<String> paths = new ArrayList<String>();
- for (int i = 0; i < 10; i++)
- {
- String msgId = "msg_" + i;
- paths.add(PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1",msgId));
- records.add(new ZNRecord(msgId));
- }
- boolean[] success = accessor.createChildren(paths, records, AccessOption.PERSISTENT);
- for (int i = 0; i < 10; i++)
- {
- String msgId = "msg_" + i;
- Assert.assertTrue(success[i], "Should succeed in create " + msgId);
- }
-
- // test get what we created
- for (int i = 0; i < 10; i++)
- {
- String msgId = "msg_" + i;
- String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1", msgId);
- ZNRecord record = zkClient.readData(path);
- Assert.assertEquals(record.getId(), msgId, "Should get what we created");
- }
-
- // test async setChildren
- parentPath = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1");
- records = new ArrayList<ZNRecord>();
- paths = new ArrayList<String>();
- for (int i = 0; i < 10; i++)
- {
- String msgId = "msg_" + i;
- paths.add(PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1",msgId));
- ZNRecord newRecord = new ZNRecord(msgId);
- newRecord.setSimpleField("key1", "value1");
- records.add(newRecord);
- }
- success = accessor.setChildren(paths, records, AccessOption.PERSISTENT);
- for (int i = 0; i < 10; i++)
- {
- String msgId = "msg_" + i;
- Assert.assertTrue(success[i], "Should succeed in set " + msgId);
- }
-
- // test get what we set
- for (int i = 0; i < 10; i++)
- {
- String msgId = "msg_" + i;
- String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1", msgId);
- ZNRecord record = zkClient.readData(path);
- Assert.assertEquals(record.getSimpleFields().size(), 1, "Should have 1 simple field set");
- Assert.assertEquals(record.getSimpleField("key1"), "value1", "Should have value1 set");
- }
-
- // test async updateChildren
- parentPath = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1");
-// records = new ArrayList<ZNRecord>();
- List<DataUpdater<ZNRecord>> znrecordUpdaters = new ArrayList<DataUpdater<ZNRecord>>();
- paths = new ArrayList<String>();
- for (int i = 0; i < 10; i++)
- {
- String msgId = "msg_" + i;
- paths.add(PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1",msgId));
- ZNRecord newRecord = new ZNRecord(msgId);
- newRecord.setSimpleField("key2", "value2");
-// records.add(newRecord);
- znrecordUpdaters.add(new ZNRecordUpdater(newRecord));
- }
- success = accessor.updateChildren(paths, znrecordUpdaters, AccessOption.PERSISTENT);
- for (int i = 0; i < 10; i++)
- {
- String msgId = "msg_" + i;
- Assert.assertTrue(success[i], "Should succeed in update " + msgId);
- }
-
- // test get what we updated
- for (int i = 0; i < 10; i++)
- {
- String msgId = "msg_" + i;
- String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1", msgId);
- ZNRecord record = zkClient.readData(path);
- Assert.assertEquals(record.getSimpleFields().size(), 2, "Should have 2 simple fields set");
- Assert.assertEquals(record.getSimpleField("key2"), "value2", "Should have value2 set");
- }
-
- // test async getChildren
- parentPath = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1");
- records = accessor.getChildren(parentPath, null, 0);
- for (int i = 0; i < 10; i++)
- {
- String msgId = "msg_" + i;
- ZNRecord record = records.get(i);
- Assert.assertEquals(record.getId(), msgId, "Should get what we updated");
- Assert.assertEquals(record.getSimpleFields().size(), 2, "Should have 2 simple fields set");
- Assert.assertEquals(record.getSimpleField("key2"), "value2", "Should have value2 set");
- }
-
- // test async exists
- parentPath = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1");
- paths = new ArrayList<String>();
- for (int i = 0; i < 10; i++)
- {
- String msgId = "msg_" + i;
- paths.add(PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1", msgId));
- }
- boolean[] exists = accessor.exists(paths, 0);
- for (int i = 0; i < 10; i++)
- {
- String msgId = "msg_" + i;
- Assert.assertTrue(exists[i], "Should exist " + msgId);
- }
-
- // test async getStats
- parentPath = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1");
- paths = new ArrayList<String>();
- for (int i = 0; i < 10; i++)
- {
- String msgId = "msg_" + i;
- paths.add(PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1", msgId));
- }
- Stat[] stats = accessor.getStats(paths, 0);
- for (int i = 0; i < 10; i++)
- {
- String msgId = "msg_" + i;
- Assert.assertNotNull(stats[i], "Stat should exist for " + msgId);
- Assert.assertEquals(stats[i].getVersion(), 2, "DataVersion should be 2, since we set 1 and update 1 for " + msgId);
- }
-
- // test async remove
- parentPath = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1");
- paths = new ArrayList<String>();
- for (int i = 0; i < 10; i++)
- {
- String msgId = "msg_" + i;
- paths.add(PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1", msgId));
- }
- success = accessor.remove(paths, 0);
- for (int i = 0; i < 10; i++)
- {
- String msgId = "msg_" + i;
- Assert.assertTrue(success[i], "Should succeed in remove " + msgId);
- }
-
- // test get what we removed
- for (int i = 0; i < 10; i++)
- {
- String msgId = "msg_" + i;
- String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1", msgId);
- boolean pathExists = zkClient.exists(path);
- Assert.assertFalse(pathExists, "Should be removed " + msgId);
- }
-
- zkClient.close();
- System.out.println("END TestZkBaseDataAccessor.async at " + new Date(System.currentTimeMillis()));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/manager/zk/TestZkCacheAsyncOpSingleThread.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/manager/zk/TestZkCacheAsyncOpSingleThread.java b/helix-core/src/test/java/com/linkedin/helix/manager/zk/TestZkCacheAsyncOpSingleThread.java
deleted file mode 100644
index a78c549..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/manager/zk/TestZkCacheAsyncOpSingleThread.java
+++ /dev/null
@@ -1,378 +0,0 @@
-package com.linkedin.helix.manager.zk;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.List;
-
-import org.I0Itec.zkclient.DataUpdater;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.AccessOption;
-import com.linkedin.helix.PropertyPathConfig;
-import com.linkedin.helix.PropertyType;
-import com.linkedin.helix.TestHelper;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.ZNRecordUpdater;
-import com.linkedin.helix.ZkUnitTestBase;
-
-public class TestZkCacheAsyncOpSingleThread extends ZkUnitTestBase
-{
- @Test
- public void testHappyPathExtOpZkCacheBaseDataAccessor() throws Exception
- {
- String className = TestHelper.getTestClassName();
- String methodName = TestHelper.getTestMethodName();
- String clusterName = className + "_" + methodName;
- System.out.println("START " + clusterName + " at "
- + new Date(System.currentTimeMillis()));
-
- // init external base data accessor
- ZkClient extZkclient = new ZkClient(ZK_ADDR);
- extZkclient.setZkSerializer(new ZNRecordSerializer());
- ZkBaseDataAccessor<ZNRecord> extBaseAccessor =
- new ZkBaseDataAccessor<ZNRecord>(extZkclient);
-
- // init zkCacheBaseDataAccessor
- String curStatePath =
- PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
- clusterName,
- "localhost_8901");
- String extViewPath =
- PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, clusterName);
-
- ZkBaseDataAccessor<ZNRecord> baseAccessor =
- new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
-
- extBaseAccessor.create(curStatePath, null, AccessOption.PERSISTENT);
-
- List<String> zkCacheInitPaths = Arrays.asList(curStatePath, extViewPath);
- ZkCacheBaseDataAccessor<ZNRecord> accessor =
- new ZkCacheBaseDataAccessor<ZNRecord>(baseAccessor, null, null, zkCacheInitPaths);
-
- // TestHelper.printCache(accessor._zkCache);
- boolean ret =
- TestHelper.verifyZkCache(zkCacheInitPaths,
- accessor._zkCache._cache,
- _gZkClient,
- true);
- Assert.assertTrue(ret, "zkCache doesn't match data on Zk");
-
- // create 10 current states using external base accessor
- List<String> paths = new ArrayList<String>();
- List<ZNRecord> records = new ArrayList<ZNRecord>();
- for (int i = 0; i < 10; i++)
- {
- String path =
- PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
- clusterName,
- "localhost_8901",
- "session_0",
- "TestDB" + i);
- ZNRecord record = new ZNRecord("TestDB" + i);
-
- paths.add(path);
- records.add(record);
- }
-
- boolean[] success = extBaseAccessor.createChildren(paths, records, AccessOption.PERSISTENT);
- for (int i = 0; i < 10; i++)
- {
- Assert.assertTrue(success[i], "Should succeed in create: " + paths.get(i));
- }
-
- // wait zkEventThread update zkCache
- Thread.sleep(100);
-
- // verify wtCache
- // TestHelper.printCache(accessor._zkCache);
-
- ret =
- TestHelper.verifyZkCache(zkCacheInitPaths,
- accessor._zkCache._cache,
- _gZkClient,
- true);
- // System.out.println("ret: " + ret);
- Assert.assertTrue(ret, "zkCache doesn't match data on Zk");
-
- // update each current state 10 times by external base accessor
- List<DataUpdater<ZNRecord>> updaters = new ArrayList<DataUpdater<ZNRecord>>();
- for (int j = 0; j < 10; j++)
- {
- paths.clear();
- updaters.clear();
- for (int i = 0; i < 10; i++)
- {
- String path = curStatePath + "/session_0/TestDB" + i;
- ZNRecord newRecord = new ZNRecord("TestDB" + i);
- newRecord.setSimpleField("" + j, "" + j);
- DataUpdater<ZNRecord> updater = new ZNRecordUpdater(newRecord);
- paths.add(path);
- updaters.add(updater);
- }
- success = extBaseAccessor.updateChildren(paths, updaters, AccessOption.PERSISTENT);
-
- for (int i = 0; i < 10; i++)
- {
- Assert.assertTrue(success[i], "Should succeed in update: " + paths.get(i));
- }
- }
-
- // wait zkEventThread update zkCache
- Thread.sleep(100);
-
- // verify cache
- // TestHelper.printCache(accessor._zkCache);
- ret =
- TestHelper.verifyZkCache(zkCacheInitPaths,
- accessor._zkCache._cache,
- _gZkClient,
- true);
- Assert.assertTrue(ret, "zkCache doesn't match data on Zk");
-
- // set 10 external views by external accessor
- paths.clear();
- records.clear();
- for (int i = 0; i < 10; i++)
- {
- String path =
- PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, clusterName, "TestDB" + i);
- ZNRecord record = new ZNRecord("TestDB" + i);
-
- paths.add(path);
- records.add(record);
- }
- success = extBaseAccessor.setChildren(paths, records, AccessOption.PERSISTENT);
- for (int i = 0; i < 10; i++)
- {
- Assert.assertTrue(success[i], "Should succeed in set: " + paths.get(i));
- }
-
- // wait zkEventThread update zkCache
- Thread.sleep(100);
-
- // verify cache
- // TestHelper.printCache(accessor._zkCache._cache);
- ret =
- TestHelper.verifyZkCache(zkCacheInitPaths,
- accessor._zkCache._cache,
- _gZkClient,
- true);
- Assert.assertTrue(ret, "zkCache doesn't match data on Zk");
-
- // remove 10 external views by external accessor
- paths.clear();
- for (int i = 0; i < 10; i++)
- {
- String path =
- PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, clusterName, "TestDB" + i);
-
- paths.add(path);
- }
- success = extBaseAccessor.remove(paths, 0);
- for (int i = 0; i < 10; i++)
- {
- Assert.assertTrue(success[i], "Should succeed in remove: " + paths.get(i));
- }
-
- // wait zkEventThread update zkCache
- Thread.sleep(100);
-
- // verify cache
- // TestHelper.printCache(accessor._zkCache._cache);
- ret =
- TestHelper.verifyZkCache(zkCacheInitPaths,
- accessor._zkCache._cache,
- _gZkClient,
- true);
- Assert.assertTrue(ret, "zkCache doesn't match data on Zk");
-
- // clean up
- extZkclient.close();
- System.out.println("END " + clusterName + " at "
- + new Date(System.currentTimeMillis()));
-
- }
-
- @Test
- public void testHappyPathSelfOpZkCacheBaseDataAccessor() throws Exception
- {
- String className = TestHelper.getTestClassName();
- String methodName = TestHelper.getTestMethodName();
- String clusterName = className + "_" + methodName;
- System.out.println("START " + clusterName + " at "
- + new Date(System.currentTimeMillis()));
-
- // init zkCacheDataAccessor
- String curStatePath =
- PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
- clusterName,
- "localhost_8901");
- String extViewPath =
- PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, clusterName);
-
- ZkBaseDataAccessor<ZNRecord> baseAccessor =
- new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
-
- baseAccessor.create(curStatePath, null, AccessOption.PERSISTENT);
-
- List<String> zkCacheInitPaths = Arrays.asList(curStatePath, extViewPath);
- ZkCacheBaseDataAccessor<ZNRecord> accessor =
- new ZkCacheBaseDataAccessor<ZNRecord>(baseAccessor, null, null, zkCacheInitPaths);
-
- // TestHelper.printCache(accessor._zkCache._cache);
- boolean ret =
- TestHelper.verifyZkCache(zkCacheInitPaths,
- accessor._zkCache._cache,
- _gZkClient,
- true);
- Assert.assertTrue(ret, "zkCache doesn't match data on Zk");
-
- // create 10 current states using this accessor
- List<String> paths = new ArrayList<String>();
- List<ZNRecord> records = new ArrayList<ZNRecord>();
- for (int i = 0; i < 10; i++)
- {
- String path =
- PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
- clusterName,
- "localhost_8901",
- "session_0",
- "TestDB" + i);
- ZNRecord record = new ZNRecord("TestDB" + i);
-
- paths.add(path);
- records.add(record);
- }
-
- boolean[] success = accessor.createChildren(paths, records, AccessOption.PERSISTENT);
- for (int i = 0; i < 10; i++)
- {
- Assert.assertTrue(success[i], "Should succeed in create: " + paths.get(i));
- }
-
- // verify cache
-// TestHelper.printCache(accessor._zkCache._cache);
- ret =
- TestHelper.verifyZkCache(zkCacheInitPaths,
- accessor._zkCache._cache,
- _gZkClient,
- false);
- Assert.assertTrue(ret, "zkCache doesn't match data on Zk");
-
- // update each current state 10 times by this accessor
- List<DataUpdater<ZNRecord>> updaters = new ArrayList<DataUpdater<ZNRecord>>();
- for (int j = 0; j < 10; j++)
- {
- paths.clear();
- updaters.clear();
- for (int i = 0; i < 10; i++)
- {
- String path = curStatePath + "/session_0/TestDB" + i;
- ZNRecord newRecord = new ZNRecord("TestDB" + i);
- newRecord.setSimpleField("" + j, "" + j);
- DataUpdater<ZNRecord> updater = new ZNRecordUpdater(newRecord);
- paths.add(path);
- updaters.add(updater);
- }
- success = accessor.updateChildren(paths, updaters, AccessOption.PERSISTENT);
-
- for (int i = 0; i < 10; i++)
- {
- Assert.assertTrue(success[i], "Should succeed in update: " + paths.get(i));
- }
- }
-
- // verify cache
- // TestHelper.printCache(accessor._zkCache._cache);
- ret =
- TestHelper.verifyZkCache(zkCacheInitPaths,
- zkCacheInitPaths,
- accessor._zkCache._cache,
- _gZkClient,
- true);
- // ret = TestHelper.verifyZkCache(zkCacheInitPaths, accessor, _gZkClient, true);
- // System.out.println("ret: " + ret);
- Assert.assertTrue(ret, "zkCache doesn't match data on Zk");
-
- // set 10 external views 10 times by this accessor
- paths.clear();
- records.clear();
- for (int j = 0; j < 10; j++)
- {
- for (int i = 0; i < 10; i++)
- {
- String path =
- PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, clusterName, "TestDB"
- + i);
- ZNRecord record = new ZNRecord("TestDB" + i);
- record.setSimpleField("setKey", "" + j);
-
- paths.add(path);
- records.add(record);
- }
- success = accessor.setChildren(paths, records, AccessOption.PERSISTENT);
- for (int i = 0; i < 10; i++)
- {
- Assert.assertTrue(success[i], "Should succeed in set: " + paths.get(i));
- }
- }
-
- // verify cache
- // TestHelper.printCache(accessor._zkCache._cache);
- ret =
- TestHelper.verifyZkCache(zkCacheInitPaths,
- accessor._zkCache._cache,
- _gZkClient,
- true);
- // System.out.println("ret: " + ret);
- Assert.assertTrue(ret, "zkCache doesn't match data on Zk");
-
- // get 10 external views
- paths.clear();
- records.clear();
- for (int i = 0; i < 10; i++)
- {
- String path = extViewPath + "/TestDB" + i;
- paths.add(path);
- }
-
- records = accessor.get(paths, null, 0);
- for (int i = 0; i < 10; i++)
- {
- Assert.assertEquals(records.get(i).getId(), "TestDB" + i);
- }
-
- // getChildren
- records.clear();
- records = accessor.getChildren(extViewPath, null, 0);
- for (int i = 0; i < 10; i++)
- {
- Assert.assertEquals(records.get(i).getId(), "TestDB" + i);
- }
-
- // // exists
- paths.clear();
- for (int i = 0; i < 10; i++)
- {
- String path = curStatePath + "/session_0/TestDB" + i;
- // // PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
- // // clusterName,
- // // "localhost_8901",
- // // "session_0",
- // // "TestDB" + i);
- paths.add(path);
- }
- success = accessor.exists(paths, 0);
- for (int i = 0; i < 10; i++)
- {
- Assert.assertTrue(success[i], "Should exits: " + paths.get(i));
- }
-
- System.out.println("END " + clusterName + " at "
- + new Date(System.currentTimeMillis()));
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/manager/zk/TestZkCacheSyncOpSingleThread.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/manager/zk/TestZkCacheSyncOpSingleThread.java b/helix-core/src/test/java/com/linkedin/helix/manager/zk/TestZkCacheSyncOpSingleThread.java
deleted file mode 100644
index c11354f..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/manager/zk/TestZkCacheSyncOpSingleThread.java
+++ /dev/null
@@ -1,205 +0,0 @@
-package com.linkedin.helix.manager.zk;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Date;
-import java.util.List;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.AccessOption;
-import com.linkedin.helix.PropertyPathConfig;
-import com.linkedin.helix.PropertyType;
-import com.linkedin.helix.TestHelper;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.ZNRecordUpdater;
-import com.linkedin.helix.ZkUnitTestBase;
-import com.linkedin.helix.store.HelixPropertyListener;
-
-public class TestZkCacheSyncOpSingleThread extends ZkUnitTestBase
-{
- class TestListener implements HelixPropertyListener
- {
- ConcurrentLinkedQueue<String> _deletePathQueue = new ConcurrentLinkedQueue<String>();
- ConcurrentLinkedQueue<String> _createPathQueue = new ConcurrentLinkedQueue<String>();
- ConcurrentLinkedQueue<String> _changePathQueue = new ConcurrentLinkedQueue<String>();
-
- @Override
- public void onDataDelete(String path)
- {
- // System.out.println(Thread.currentThread().getName() + ", onDelete: " + path);
- _deletePathQueue.add(path);
- }
-
- @Override
- public void onDataCreate(String path)
- {
- // System.out.println(Thread.currentThread().getName() + ", onCreate: " + path);
- _createPathQueue.add(path);
- }
-
- @Override
- public void onDataChange(String path)
- {
- // System.out.println(Thread.currentThread().getName() + ", onChange: " + path);
- _changePathQueue.add(path);
- }
-
- public void reset()
- {
- _deletePathQueue.clear();
- _createPathQueue.clear();
- _changePathQueue.clear();
- }
- }
-
- @Test
- public void testZkCacheCallbackExternalOpNoChroot() throws Exception
- {
- String className = TestHelper.getTestClassName();
- String methodName = TestHelper.getTestMethodName();
- String clusterName = className + "_" + methodName;
- System.out.println("START " + clusterName + " at "
- + new Date(System.currentTimeMillis()));
-
- // init external base data accessor
- ZkClient zkclient = new ZkClient(ZK_ADDR);
- zkclient.setZkSerializer(new ZNRecordSerializer());
- ZkBaseDataAccessor<ZNRecord> extBaseAccessor =
- new ZkBaseDataAccessor<ZNRecord>(zkclient);
-
- // init zkCacheDataAccessor
- String curStatePath =
- PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
- clusterName,
- "localhost_8901");
- String extViewPath =
- PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, clusterName);
-
- ZkBaseDataAccessor<ZNRecord> baseAccessor =
- new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
-
- extBaseAccessor.create(curStatePath, null, AccessOption.PERSISTENT);
-
- List<String> cachePaths = Arrays.asList(curStatePath, extViewPath);
- ZkCacheBaseDataAccessor<ZNRecord> accessor =
- new ZkCacheBaseDataAccessor<ZNRecord>(baseAccessor, null, null, cachePaths);
- // TestHelper.printCache(accessor._zkCache._cache);
-
- TestListener listener = new TestListener();
- accessor.subscribe(curStatePath, listener);
-
- // create 10 current states
- List<String> createPaths = new ArrayList<String>();
- for (int i = 0; i < 10; i++)
- {
- String path = curStatePath + "/session_0/TestDB" + i;
- createPaths.add(path);
- boolean success =
- extBaseAccessor.create(path,
- new ZNRecord("TestDB" + i),
- AccessOption.PERSISTENT);
- Assert.assertTrue(success, "Should succeed in create: " + path);
- }
-
- Thread.sleep(500);
-
- // verify cache
- // TestHelper.printCache(accessor._zkCache._cache);
- boolean ret =
- TestHelper.verifyZkCache(cachePaths, accessor._zkCache._cache, _gZkClient, true);
- // System.out.println("ret: " + ret);
- Assert.assertTrue(ret, "zkCache doesn't match data on Zk");
- System.out.println("createCnt: " + listener._createPathQueue.size());
- Assert.assertEquals(listener._createPathQueue.size(),
- 11,
- "Shall get 11 onCreate callbacks.");
-
- // verify each callback path
- createPaths.add(curStatePath + "/session_0");
- List<String> createCallbackPaths = new ArrayList<String>(listener._createPathQueue);
- Collections.sort(createPaths);
- Collections.sort(createCallbackPaths);
- // System.out.println("createCallbackPaths: " + createCallbackPaths);
- Assert.assertEquals(createCallbackPaths,
- createPaths,
- "Should get create callbacks at " + createPaths + ", but was "
- + createCallbackPaths);
-
- // update each current state, single thread
- List<String> updatePaths = new ArrayList<String>();
- listener.reset();
- for (int i = 0; i < 10; i++)
- {
- String path = curStatePath + "/session_0/TestDB" + i;
- for (int j = 0; j < 10; j++)
- {
- updatePaths.add(path);
- ZNRecord newRecord = new ZNRecord("TestDB" + i);
- newRecord.setSimpleField("" + j, "" + j);
- boolean success =
- accessor.update(path, new ZNRecordUpdater(newRecord), AccessOption.PERSISTENT);
- Assert.assertTrue(success, "Should succeed in update: " + path);
- }
- }
- Thread.sleep(500);
-
- // verify cache
- // TestHelper.printCache(accessor._zkCache._cache);
- ret =
- TestHelper.verifyZkCache(cachePaths, accessor._zkCache._cache, _gZkClient, true);
- // System.out.println("ret: " + ret);
- Assert.assertTrue(ret, "zkCache doesn't match data on Zk");
- System.out.println("changeCnt: " + listener._changePathQueue.size());
- Assert.assertEquals(listener._changePathQueue.size(),
- 100,
- "Shall get 100 onChange callbacks.");
-
- // verify each callback path
- List<String> updateCallbackPaths = new ArrayList<String>(listener._changePathQueue);
- Collections.sort(updatePaths);
- Collections.sort(updateCallbackPaths);
- Assert.assertEquals(updateCallbackPaths,
- updatePaths,
- "Should get change callbacks at " + updatePaths + ", but was "
- + updateCallbackPaths);
-
- // remove 10 current states
- List<String> removePaths = new ArrayList<String>();
- listener.reset();
- for (int i = 0; i < 10; i++)
- {
- String path = curStatePath + "/session_0/TestDB" + i;
- removePaths.add(path);
- boolean success = accessor.remove(path, AccessOption.PERSISTENT);
- Assert.assertTrue(success, "Should succeed in remove: " + path);
- }
- Thread.sleep(500);
-
- // verify cache
- // TestHelper.printCache(accessor._zkCache._cache);
- ret =
- TestHelper.verifyZkCache(cachePaths, accessor._zkCache._cache, _gZkClient, true);
- // System.out.println("ret: " + ret);
- Assert.assertTrue(ret, "zkCache doesn't match data on Zk");
- System.out.println("deleteCnt: " + listener._deletePathQueue.size());
- Assert.assertEquals(listener._deletePathQueue.size(),
- 10,
- "Shall get 10 onDelete callbacks.");
-
- // verify each callback path
- List<String> removeCallbackPaths = new ArrayList<String>(listener._deletePathQueue);
- Collections.sort(removePaths);
- Collections.sort(removeCallbackPaths);
- Assert.assertEquals(removeCallbackPaths,
- removePaths,
- "Should get remove callbacks at " + removePaths + ", but was "
- + removeCallbackPaths);
-
- System.out.println("END " + clusterName + " at "
- + new Date(System.currentTimeMillis()));
- }
-}