You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:59 UTC
[41/42] Refactoring the package names and removing jsql parser
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/store/zk/TestZKPropertyStore.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/store/zk/TestZKPropertyStore.java b/helix-core/src/test/java/org/apache/helix/store/zk/TestZKPropertyStore.java
new file mode 100644
index 0000000..1012402
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/store/zk/TestZKPropertyStore.java
@@ -0,0 +1,537 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.store.zk;
+
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.I0Itec.zkclient.DataUpdater;
+import org.I0Itec.zkclient.IZkDataListener;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.manager.zk.ByteArraySerializer;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.store.PropertyChangeListener;
+import org.apache.helix.store.PropertyJsonComparator;
+import org.apache.helix.store.PropertyJsonSerializer;
+import org.apache.helix.store.PropertyStoreException;
+import org.apache.helix.store.zk.ZKPropertyStore;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+// TODO need to write performance test for zk-property store
+public class TestZKPropertyStore extends ZkUnitTestBase
+{
+ private static final Logger LOG = Logger.getLogger(TestZKPropertyStore.class);
+ final String className = getShortClassName();
+ final int bufSize = 128;
+ final int mapNr = 10;
+ final int firstLevelNr = 10;
+ final int secondLevelNr = 10;
+ final int totalNodes = firstLevelNr * secondLevelNr;
+
+ class TestListener implements PropertyChangeListener<ZNRecord>
+ {
+ Map<String, String> _keySet;
+
+ public TestListener(Map<String, String> keySet)
+ {
+ _keySet = keySet;
+ }
+
+ @Override
+ public void onPropertyChange(String key)
+ {
+ long now = System.currentTimeMillis();
+ _keySet.put(key, Long.toString(now));
+ }
+ }
+
+ private class TestUpdater implements DataUpdater<ZNRecord>
+ {
+ @Override
+ public ZNRecord update(ZNRecord current)
+ {
+ char[] data = new char[bufSize];
+
+ for (int i = 0; i < bufSize; i++)
+ {
+ data[i] = 'e';
+ }
+
+ Map<String, String> map = new TreeMap<String, String>();
+ for (int i = 0; i < mapNr; i++)
+ {
+ map.put("key_" + i, new String(data));
+ }
+
+ String nodeId = current.getId();
+ ZNRecord record = new ZNRecord(nodeId);
+ record.setSimpleFields(map);
+ return record;
+ }
+ }
+
+ String getNodeId(int i, int j)
+ {
+ return "childNode_" + i + "_" + j;
+ }
+
+ String getSecondLevelKey(int i, int j)
+ {
+ return "/node_" + i + "/" + getNodeId(i, j);
+ }
+
+ String getFirstLevelKey(int i)
+ {
+ return "/node_" + i;
+ }
+
+ // TODO: separate into small tests
+ @Test()
+ public void testZKPropertyStore() throws Exception
+ {
+ System.out.println("START " + className + " at "
+ + new Date(System.currentTimeMillis()));
+ // LOG.info("number of connections is " + ZkClient.getNumberOfConnections());
+
+ // clean up zk
+ final String propertyStoreRoot = buildPropertyStoreRoot();
+ if (_gZkClient.exists(propertyStoreRoot))
+ {
+ _gZkClient.deleteRecursive(propertyStoreRoot);
+ }
+
+ ZkClient zkclient = new ZkClient(ZK_ADDR);
+ zkclient.setZkSerializer(new ByteArraySerializer());
+ ZKPropertyStore<ZNRecord> store =
+ new ZKPropertyStore<ZNRecord>(zkclient,
+ new PropertyJsonSerializer<ZNRecord>(ZNRecord.class),
+ propertyStoreRoot);
+
+ // test back to back add-delete-add
+
+ store.setProperty("child0", new ZNRecord("child0"));
+
+ ZNRecord record2 = store.getProperty("child0"); // will put the record in cache
+ String child0Path = propertyStoreRoot + "/child0";
+ _gZkClient.subscribeDataChanges(child0Path, new IZkDataListener()
+ {
+ @Override
+ public void handleDataDeleted(String dataPath) throws Exception
+ {
+ // TODO Auto-generated method stub
+ System.out.println("TestZKPropertyStore.testZKPropertyStore().new IZkDataListener() {...}.handleDataDeleted()");
+ }
+
+ @Override
+ public void handleDataChange(String dataPath, Object data) throws Exception
+ {
+ // TODO Auto-generated method stub
+ System.out.println("TestZKPropertyStore.testZKPropertyStore().new IZkDataListener() {...}.handleDataChange()");
+ }
+ });
+ for (int i = 0; i < 2; i++)
+ {
+ _gZkClient.delete(child0Path);
+ _gZkClient.createPersistent(child0Path, new ZNRecord("child0-new"));
+ }
+ record2 = store.getProperty("child0");
+ Assert.assertEquals(record2.getId(), "child0-new");
+ _gZkClient.delete(child0Path);
+ Thread.sleep(300); // should wait for zk callback to remove "child0" from cache
+ record2 = store.getProperty("child0");
+ Assert.assertNull(record2);
+
+ // zookeeper has a default 1M limit on size
+ char[] data = new char[bufSize];
+ for (int i = 0; i < bufSize; i++)
+ {
+ data[i] = 'a';
+ }
+
+ Map<String, String> map = new TreeMap<String, String>();
+ for (int i = 0; i < mapNr; i++)
+ {
+ map.put("key_" + i, new String(data));
+ }
+ String node = "node";
+ ZNRecord record = new ZNRecord(node);
+ record.setSimpleFields(map);
+
+ ZNRecordSerializer serializer = new ZNRecordSerializer();
+ int bytesPerNode = serializer.serialize(record).length;
+ System.out.println("use znode of size " + bytesPerNode / 1024 + "K");
+ Assert.assertTrue(bytesPerNode < 1024 * 1024,
+ "zookeeper has a default 1M limit on size");
+
+ // test getPropertyRootNamespace()
+ String root = store.getPropertyRootNamespace();
+ Assert.assertEquals(root, propertyStoreRoot);
+
+ // set 100 nodes and get 100 nodes, verify get what we set
+ long start = System.currentTimeMillis();
+ setNodes(store, 'a', false);
+ long end = System.currentTimeMillis();
+ System.out.println("ZKPropertyStore write throughput is " + bytesPerNode * totalNodes
+ / (end - start) + " kilo-bytes per second");
+
+ start = System.currentTimeMillis();
+ for (int i = 0; i < 10; i++)
+ {
+ for (int j = 0; j < 10; j++)
+ {
+ String nodeId = getNodeId(i, j);
+ String key = getSecondLevelKey(i, j);
+ record = store.getProperty(key);
+ Assert.assertEquals(record.getId(), nodeId);
+ }
+ }
+ end = System.currentTimeMillis();
+ System.out.println("ZKPropertyStore read throughput is " + bytesPerNode * totalNodes
+ / (end - start) + " kilo-bytes per second");
+
+ // test subscribe
+ Map<String, String> keyMap = new TreeMap<String, String>();
+ // verify initial callbacks invoked for all 100 nodes
+ PropertyChangeListener<ZNRecord> listener = new TestListener(keyMap);
+ store.subscribeForPropertyChange("", listener);
+ System.out.println("keyMap size: " + keyMap.size());
+ Assert.assertTrue(keyMap.size() > 100);
+ for (int i = 0; i < firstLevelNr; i++)
+ {
+ for (int j = 0; j < secondLevelNr; j++)
+ {
+ String key = getSecondLevelKey(i, j);
+ Assert.assertTrue(keyMap.containsKey(key));
+ }
+ }
+
+ // change nodes via property store interface
+ // and verify all notifications have been received (TODO: without latency?)
+ start = System.currentTimeMillis();
+ keyMap.clear();
+ setNodes(store, 'b', true);
+
+ // wait for all callbacks completed
+ for (int i = 0; i < 10; i++)
+ {
+ System.out.println("keySet size: " + keyMap.size());
+ if (keyMap.size() == totalNodes)
+ {
+ break;
+ }
+ Thread.sleep(500);
+ }
+ Assert.assertEquals(keyMap.size(), totalNodes, "should receive " + totalNodes
+ + " callbacks");
+ end = System.currentTimeMillis();
+ long waitTime = (end - start) * 2;
+
+ long maxLatency = 0;
+ for (int i = 0; i < firstLevelNr; i++)
+ {
+ for (int j = 0; j < secondLevelNr; j++)
+ {
+ String key = getSecondLevelKey(i, j);
+ Assert.assertTrue(keyMap.containsKey(key));
+ record = store.getProperty(key);
+ start = Long.parseLong(record.getSimpleField("SetTimestamp"));
+ end = Long.parseLong(keyMap.get(key));
+ long latency = end - start;
+ if (latency > maxLatency)
+ {
+ maxLatency = latency;
+ }
+ }
+ }
+ System.out.println("ZKPropertyStore callback latency is " + maxLatency
+ + " millisecond");
+
+ // change nodes via native zkclient interface
+ // and verify all notifications have been received with some latency
+ keyMap.clear();
+ setNodes(_gZkClient, propertyStoreRoot, 'a', true);
+
+ // wait for all callbacks completed
+ Thread.sleep(waitTime);
+ Assert.assertEquals(keyMap.size(), totalNodes, "should receive " + totalNodes
+ + " callbacks");
+
+ // remove node via native zkclient interface
+ // should receive callbacks on parent key
+ keyMap.clear();
+ for (int i = 0; i < firstLevelNr; i++)
+ {
+ int j = 0;
+ String key = getSecondLevelKey(i, j);
+ _gZkClient.delete(propertyStoreRoot + key);
+ }
+ Thread.sleep(waitTime);
+ for (int i = 0; i < firstLevelNr; i++)
+ {
+ String key = getFirstLevelKey(i);
+ Assert.assertTrue(keyMap.containsKey(key), "should receive callbacks on " + key);
+ }
+
+ keyMap.clear();
+ for (int j = 1; j < secondLevelNr; j++)
+ {
+ int i = 0;
+ String key = getSecondLevelKey(i, j);
+ _gZkClient.delete(propertyStoreRoot + key);
+ }
+ Thread.sleep(waitTime);
+ String node0Key = getFirstLevelKey(0);
+ Assert.assertTrue(keyMap.containsKey(node0Key), "should receive callback on "
+ + node0Key);
+
+ // add back removed nodes
+ // should receive callbacks on parent key
+ keyMap.clear();
+ for (int i = 0; i < bufSize; i++)
+ {
+ data[i] = 'a';
+ }
+
+ map = new TreeMap<String, String>();
+ for (int i = 0; i < mapNr; i++)
+ {
+ map.put("key_" + i, new String(data));
+ }
+
+ for (int i = 0; i < firstLevelNr; i++)
+ {
+ int j = 0;
+ String nodeId = getNodeId(i, j);
+ String key = getSecondLevelKey(i, j);
+ record = new ZNRecord(nodeId);
+ record.setSimpleFields(map);
+ store.setProperty(key, record);
+ }
+ Thread.sleep(waitTime);
+ for (int i = 0; i < firstLevelNr; i++)
+ {
+ String key = getFirstLevelKey(i);
+ Assert.assertTrue(keyMap.containsKey(key), "should receive callbacks on " + key);
+ }
+
+ keyMap.clear();
+ for (int j = 1; j < secondLevelNr; j++)
+ {
+ int i = 0;
+ String nodeId = getNodeId(i, j);
+ String key = getSecondLevelKey(i, j);
+ record = new ZNRecord(nodeId);
+ record.setSimpleFields(map);
+ store.setProperty(key, record);
+ }
+ Thread.sleep(waitTime);
+ node0Key = getFirstLevelKey(0);
+ Assert.assertTrue(keyMap.containsKey(node0Key), "should receive callback on "
+ + node0Key);
+
+ // test unsubscribe
+ store.unsubscribeForPropertyChange("", listener);
+ // change all nodes and verify no notification happens
+ keyMap.clear();
+ setNodes(store, 'c', false);
+ Thread.sleep(waitTime);
+ Assert.assertEquals(keyMap.size(), 0);
+
+ // test getPropertyNames
+ List<String> names = store.getPropertyNames("");
+ int cnt = 0;
+ for (String name : names)
+ {
+ int i = cnt / 10;
+ int j = cnt % 10;
+ cnt++;
+ String key = getSecondLevelKey(i, j);
+ Assert.assertEquals(name, key);
+ }
+
+ // test compare and set
+ char[] updateData = new char[bufSize];
+ for (int i = 0; i < bufSize; i++)
+ {
+ data[i] = 'c';
+ updateData[i] = 'd';
+ }
+
+ Map<String, String> updateMap = new TreeMap<String, String>();
+ for (int i = 0; i < 10; i++)
+ {
+ map.put("key_" + i, new String(data));
+ updateMap.put("key_" + i, new String(updateData));
+ }
+
+ PropertyJsonComparator<ZNRecord> comparator =
+ new PropertyJsonComparator<ZNRecord>(ZNRecord.class);
+ for (int i = 0; i < firstLevelNr; i++)
+ {
+ for (int j = 0; j < secondLevelNr; j++)
+ {
+ String nodeId = getNodeId(i, j);
+ record = new ZNRecord(nodeId);
+ record.setSimpleFields(map);
+ String key = getSecondLevelKey(i, j);
+
+ ZNRecord update = new ZNRecord(nodeId);
+ update.setSimpleFields(updateMap);
+ boolean succeed = store.compareAndSet(key, record, update, comparator);
+ Assert.assertTrue(succeed);
+ record = store.getProperty(key);
+ Assert.assertEquals(record.getSimpleField("key_0").charAt(0), 'd');
+ }
+ }
+
+ // test updateUntilSucceed
+ TestUpdater updater = new TestUpdater();
+ for (int i = 0; i < firstLevelNr; i++)
+ {
+ for (int j = 0; j < secondLevelNr; j++)
+ {
+ String key = getSecondLevelKey(i, j);
+
+ store.updatePropertyUntilSucceed(key, updater);
+ record = store.getProperty(key);
+ Assert.assertEquals(record.getSimpleField("key_0").charAt(0), 'e');
+ }
+ }
+
+ // test exist
+ for (int i = 0; i < firstLevelNr; i++)
+ {
+ for (int j = 0; j < secondLevelNr; j++)
+ {
+ String key = getSecondLevelKey(i, j);
+
+ boolean exist = store.exists(key);
+ Assert.assertTrue(exist);
+ }
+ }
+
+ // test removeProperty
+ for (int i = 0; i < firstLevelNr; i++)
+ {
+ int j = 0;
+
+ String key = getSecondLevelKey(i, j);
+
+ store.removeProperty(key);
+ record = store.getProperty(key);
+ Assert.assertNull(record);
+ }
+
+ // test removePropertyNamespace
+ for (int i = 0; i < firstLevelNr; i++)
+ {
+ String key = "/node_" + i;
+ store.removeNamespace(key);
+ }
+
+ for (int i = 0; i < firstLevelNr; i++)
+ {
+ for (int j = 0; j < secondLevelNr; j++)
+ {
+ String key = getSecondLevelKey(i, j);
+
+ store.removeProperty(key);
+ record = store.getProperty(key);
+ Assert.assertNull(record);
+ }
+ }
+
+ System.out.println("END " + className + " at " + new Date(System.currentTimeMillis()));
+ }
+
+ private String buildPropertyStoreRoot()
+ {
+ return "/" + className;
+ }
+
+ private void setNodes(ZKPropertyStore<ZNRecord> store, char c, boolean needTimestamp) throws PropertyStoreException
+ {
+ char[] data = new char[bufSize];
+
+ for (int i = 0; i < bufSize; i++)
+ {
+ data[i] = c;
+ }
+
+ Map<String, String> map = new TreeMap<String, String>();
+ for (int i = 0; i < mapNr; i++)
+ {
+ map.put("key_" + i, new String(data));
+ }
+
+ for (int i = 0; i < firstLevelNr; i++)
+ {
+ for (int j = 0; j < secondLevelNr; j++)
+ {
+ String nodeId = getNodeId(i, j);
+ ZNRecord record = new ZNRecord(nodeId);
+ record.setSimpleFields(map);
+ if (needTimestamp)
+ {
+ long now = System.currentTimeMillis();
+ record.setSimpleField("SetTimestamp", Long.toString(now));
+ }
+ String key = getSecondLevelKey(i, j);
+ store.setProperty(key, record);
+ }
+ }
+ }
+
+ private void setNodes(ZkClient zkClient, String root, char c, boolean needTimestamp) throws PropertyStoreException
+ {
+ char[] data = new char[bufSize];
+
+ for (int i = 0; i < bufSize; i++)
+ {
+ data[i] = c;
+ }
+
+ Map<String, String> map = new TreeMap<String, String>();
+ for (int i = 0; i < mapNr; i++)
+ {
+ map.put("key_" + i, new String(data));
+ }
+
+ for (int i = 0; i < firstLevelNr; i++)
+ {
+ for (int j = 0; j < secondLevelNr; j++)
+ {
+ String nodeId = getNodeId(i, j);
+ ZNRecord record = new ZNRecord(nodeId);
+ record.setSimpleFields(map);
+ if (needTimestamp)
+ {
+ long now = System.currentTimeMillis();
+ record.setSimpleField("SetTimestamp", Long.toString(now));
+ }
+ String key = getSecondLevelKey(i, j);
+ zkClient.writeData(root + key, record);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/store/zk/TestZKPropertyStoreMultiThread.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/store/zk/TestZKPropertyStoreMultiThread.java b/helix-core/src/test/java/org/apache/helix/store/zk/TestZKPropertyStoreMultiThread.java
new file mode 100644
index 0000000..e1e9140
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/store/zk/TestZKPropertyStoreMultiThread.java
@@ -0,0 +1,157 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.store.zk;
+
+import java.util.Date;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import org.I0Itec.zkclient.DataUpdater;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.store.PropertyJsonComparator;
+import org.apache.helix.store.PropertyJsonSerializer;
+import org.apache.helix.store.zk.ZKPropertyStore;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestZKPropertyStoreMultiThread extends ZkUnitTestBase
+{
+ private static final Logger LOG = Logger.getLogger(TestZKPropertyStoreMultiThread.class);
+
+ private final String _root = "/" + getShortClassName();
+ private final ZNRecord _record = new ZNRecord("key1");
+
+
+ @BeforeClass()
+ public void beforeClass()
+ {
+ if (_gZkClient.exists(_root))
+ {
+ _gZkClient.deleteRecursive(_root);
+ }
+ }
+
+ public class TestCallableCAS implements Callable<Boolean>
+ {
+ @Override
+ public Boolean call()
+ {
+ try
+ {
+ ZKPropertyStore<ZNRecord> store
+ = new ZKPropertyStore<ZNRecord>(new ZkClient(ZK_ADDR),
+ new PropertyJsonSerializer<ZNRecord>(ZNRecord.class),
+ _root);
+ PropertyJsonComparator<ZNRecord> comparator = new PropertyJsonComparator<ZNRecord>(ZNRecord.class);
+ long id = Thread.currentThread().getId();
+
+ store.setProperty("key1", _record);
+
+ boolean success;
+ do
+ {
+ ZNRecord current = store.getProperty("key1");
+ ZNRecord update = new ZNRecord(current);
+ update.setSimpleField("thread_" + id, "simpleValue");
+
+ success = store.compareAndSet("key1", current, update, comparator);
+ } while (!success);
+
+ return Boolean.TRUE;
+ }
+ catch (Exception e)
+ {
+ LOG.error(e);
+ return Boolean.FALSE;
+ }
+ }
+ }
+
+ @Test ()
+ public void testCmpAndSet()
+ {
+ System.out.println("START testCmpAndSet at" + new Date(System.currentTimeMillis()));
+
+ Map<String, Boolean> results = TestHelper.<Boolean>startThreadsConcurrently(5, new TestCallableCAS(), 10);
+ Assert.assertEquals(results.size(), 5);
+ for (Boolean result : results.values())
+ {
+ Assert.assertTrue(result.booleanValue());
+ }
+
+ System.out.println("END testCmpAndSet at" + new Date(System.currentTimeMillis()));
+ }
+
+ private class TestUpdater implements DataUpdater<ZNRecord>
+ {
+
+ @Override
+ public ZNRecord update(ZNRecord current)
+ {
+ long id = Thread.currentThread().getId();
+
+ current.setSimpleField("thread_" + id, "simpleValue");
+ return current;
+ }
+
+ }
+
+ public class TestCallableUpdate implements Callable<Boolean>
+ {
+ @Override
+ public Boolean call()
+ {
+ try
+ {
+ ZKPropertyStore<ZNRecord> store
+ = new ZKPropertyStore<ZNRecord>(new ZkClient(ZK_ADDR),
+ new PropertyJsonSerializer<ZNRecord>(ZNRecord.class),
+ _root);
+
+ store.setProperty("key2", _record);
+ store.updatePropertyUntilSucceed("key2", new TestUpdater());
+
+ return Boolean.TRUE;
+ }
+ catch (Exception e)
+ {
+ LOG.error(e);
+ return Boolean.FALSE;
+ }
+ }
+ }
+
+ @Test()
+ public void testUpdateUntilSucceed()
+ {
+ System.out.println("START testUpdateUntilSucceed at" + new Date(System.currentTimeMillis()));
+
+ Map<String, Boolean> results = TestHelper.<Boolean>startThreadsConcurrently(5, new TestCallableUpdate(), 10);
+ Assert.assertEquals(results.size(), 5);
+ for (Boolean result : results.values())
+ {
+ Assert.assertTrue(result.booleanValue());
+ }
+
+ System.out.println("END testUpdateUntilSucceed at" + new Date(System.currentTimeMillis()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/store/zk/TestZkHelixPropertyStore.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/store/zk/TestZkHelixPropertyStore.java b/helix-core/src/test/java/org/apache/helix/store/zk/TestZkHelixPropertyStore.java
new file mode 100644
index 0000000..1f6eaff
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/store/zk/TestZkHelixPropertyStore.java
@@ -0,0 +1,369 @@
+package org.apache.helix.store.zk;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
+import org.apache.helix.AccessOption;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.store.HelixPropertyListener;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestZkHelixPropertyStore extends ZkUnitTestBase
+{
+ final String _root = "/" + getShortClassName();
+ final int bufSize = 128;
+ final int mapNr = 10;
+ final int firstLevelNr = 10;
+ final int secondLevelNr = 10;
+
+ // final int totalNodes = firstLevelNr * secondLevelNr;
+
+ class TestListener implements HelixPropertyListener
+ {
+ Map<String, Long> _changeKeys = new HashMap<String, Long>();
+ Map<String, Long> _createKeys = new HashMap<String, Long>();
+ Map<String, Long> _deleteKeys = new HashMap<String, Long>();
+
+ public void reset()
+ {
+ _changeKeys.clear();
+ _createKeys.clear();
+ _deleteKeys.clear();
+ }
+
+ @Override
+ public void onDataChange(String path)
+ {
+ _changeKeys.put(path, new Long(System.currentTimeMillis()));
+ }
+
+ @Override
+ public void onDataCreate(String path)
+ {
+ _createKeys.put(path, new Long(System.currentTimeMillis()));
+ }
+
+ @Override
+ public void onDataDelete(String path)
+ {
+ _deleteKeys.put(path, new Long(System.currentTimeMillis()));
+ }
+ }
+
+ @Test
+ public void testSet()
+ {
+ // Logger.getRootLogger().setLevel(Level.INFO);
+
+ System.out.println("START testSet() at " + new Date(System.currentTimeMillis()));
+
+ String subRoot = _root + "/" + "set";
+ List<String> subscribedPaths = new ArrayList<String>();
+ subscribedPaths.add(subRoot);
+ ZkHelixPropertyStore<ZNRecord> store =
+ new ZkHelixPropertyStore<ZNRecord>(new ZkBaseDataAccessor<ZNRecord>(_gZkClient),
+ subRoot,
+ subscribedPaths);
+
+ setNodes(store, 'a', false);
+ for (int i = 0; i < 10; i++)
+ {
+ for (int j = 0; j < 10; j++)
+ {
+ String nodeId = getNodeId(i, j);
+ String key = getSecondLevelKey(i, j);
+ ZNRecord record = store.get(key, null, 0);
+ Assert.assertEquals(record.getId(), nodeId);
+ }
+ }
+
+ long startT = System.currentTimeMillis();
+ for (int i = 0; i < 1000; i++)
+ {
+ ZNRecord record = store.get("/node_0/childNode_0_0", null, 0);
+ Assert.assertNotNull(record);
+ }
+ long endT = System.currentTimeMillis();
+ System.out.println("1000 Get() time used: " + (endT - startT) + "ms");
+ Assert.assertTrue((endT - startT) < 60, "1000 Gets should be finished within 50ms");
+
+ store.stop();
+ System.out.println("END testSet() at " + new Date(System.currentTimeMillis()));
+ }
+
+ @Test
+ public void testLocalTriggeredCallback() throws Exception
+ {
+ System.out.println("START testLocalTriggeredCallback() at "
+ + new Date(System.currentTimeMillis()));
+
+ String subRoot = _root + "/" + "localCallback";
+ List<String> subscribedPaths = new ArrayList<String>();
+ subscribedPaths.add(subRoot);
+ ZkHelixPropertyStore<ZNRecord> store =
+ new ZkHelixPropertyStore<ZNRecord>(new ZkBaseDataAccessor<ZNRecord>(_gZkClient),
+ subRoot,
+ subscribedPaths);
+
+ // change nodes via property store interface
+ // and verify all notifications have been received
+ TestListener listener = new TestListener();
+ store.subscribe("/", listener);
+
+ // test dataCreate callbacks
+ listener.reset();
+ setNodes(store, 'a', true);
+
+ // wait until all callbacks have been received
+ Thread.sleep(500);
+ int expectCreateNodes = 1 + firstLevelNr + firstLevelNr * secondLevelNr;
+ System.out.println("createKey#:" + listener._createKeys.size() + ", changeKey#:"
+ + listener._changeKeys.size() + ", deleteKey#:" + listener._deleteKeys.size());
+ Assert.assertTrue(listener._createKeys.size() == expectCreateNodes, "Should receive "
+ + expectCreateNodes + " create callbacks");
+
+ // test dataChange callbacks
+ listener.reset();
+ setNodes(store, 'b', true);
+
+ // wait until all callbacks have been received
+ Thread.sleep(500);
+ int expectChangeNodes = firstLevelNr * secondLevelNr;
+ System.out.println("createKey#:" + listener._createKeys.size() + ", changeKey#:"
+ + listener._changeKeys.size() + ", deleteKey#:" + listener._deleteKeys.size());
+ Assert.assertTrue(listener._changeKeys.size() >= expectChangeNodes,
+ "Should receive at least " + expectChangeNodes
+ + " change callbacks");
+
+ // test delete callbacks
+ listener.reset();
+ int expectDeleteNodes = 1 + firstLevelNr + firstLevelNr * secondLevelNr;
+ store.remove("/", 0);
+ // wait until all callbacks have been received
+ Thread.sleep(500);
+
+ System.out.println("createKey#:" + listener._createKeys.size() + ", changeKey#:"
+ + listener._changeKeys.size() + ", deleteKey#:" + listener._deleteKeys.size());
+ Assert.assertTrue(listener._deleteKeys.size() == expectDeleteNodes, "Should receive "
+ + expectDeleteNodes + " delete callbacks");
+
+ store.stop();
+ System.out.println("END testLocalTriggeredCallback() at "
+ + new Date(System.currentTimeMillis()));
+ }
+
+ @Test
+ public void testZkTriggeredCallback() throws Exception
+ {
+ System.out.println("START testZkTriggeredCallback() at "
+ + new Date(System.currentTimeMillis()));
+
+ String subRoot = _root + "/" + "zkCallback";
+ List<String> subscribedPaths = Arrays.asList(subRoot);
+ ZkHelixPropertyStore<ZNRecord> store =
+ new ZkHelixPropertyStore<ZNRecord>(new ZkBaseDataAccessor<ZNRecord>(_gZkClient),
+ subRoot,
+ subscribedPaths);
+
+ // change nodes via property store interface
+ // and verify all notifications have been received
+ TestListener listener = new TestListener();
+ store.subscribe("/", listener);
+
+ // test create callbacks
+ listener.reset();
+ setNodes(_gZkClient, subRoot, 'a', true);
+ int expectCreateNodes = 1 + firstLevelNr + firstLevelNr * secondLevelNr;
+ Thread.sleep(500);
+
+ System.out.println("createKey#:" + listener._createKeys.size() + ", changeKey#:"
+ + listener._changeKeys.size() + ", deleteKey#:" + listener._deleteKeys.size());
+ Assert.assertTrue(listener._createKeys.size() == expectCreateNodes, "Should receive "
+ + expectCreateNodes + " create callbacks");
+
+ // test change callbacks
+ listener.reset();
+ setNodes(_gZkClient, subRoot, 'b', true);
+ int expectChangeNodes = firstLevelNr * secondLevelNr;
+ Thread.sleep(500);
+
+ System.out.println("createKey#:" + listener._createKeys.size() + ", changeKey#:"
+ + listener._changeKeys.size() + ", deleteKey#:" + listener._deleteKeys.size());
+ Assert.assertTrue(listener._changeKeys.size() >= expectChangeNodes,
+ "Should receive at least " + expectChangeNodes
+ + " change callbacks");
+
+ // test delete callbacks
+ listener.reset();
+ int expectDeleteNodes = 1 + firstLevelNr + firstLevelNr * secondLevelNr;
+ _gZkClient.deleteRecursive(subRoot);
+ Thread.sleep(1000);
+
+ System.out.println("createKey#:" + listener._createKeys.size() + ", changeKey#:"
+ + listener._changeKeys.size() + ", deleteKey#:" + listener._deleteKeys.size());
+ Assert.assertTrue(listener._deleteKeys.size() == expectDeleteNodes, "Should receive "
+ + expectDeleteNodes + " delete callbacks");
+
+ store.stop();
+ System.out.println("END testZkTriggeredCallback() at "
+ + new Date(System.currentTimeMillis()));
+ }
+
+ @Test
+ public void testBackToBackRemoveAndSet() throws Exception
+ {
+ System.out.println("START testBackToBackRemoveAndSet() at "
+ + new Date(System.currentTimeMillis()));
+
+ String subRoot = _root + "/" + "backToBackRemoveAndSet";
+ List<String> subscribedPaths = new ArrayList<String>();
+ subscribedPaths.add(subRoot);
+ ZkHelixPropertyStore<ZNRecord> store =
+ new ZkHelixPropertyStore<ZNRecord>(new ZkBaseDataAccessor<ZNRecord>(_gZkClient),
+ subRoot,
+ subscribedPaths);
+
+ store.set("/child0", new ZNRecord("child0"), AccessOption.PERSISTENT);
+
+ ZNRecord record = store.get("/child0", null, 0); // will put the record in cache
+ Assert.assertEquals(record.getId(), "child0");
+ // System.out.println("1:get:" + record);
+
+ String child0Path = subRoot + "/child0";
+ for (int i = 0; i < 2; i++)
+ {
+ _gZkClient.delete(child0Path);
+ _gZkClient.createPersistent(child0Path, new ZNRecord("child0-new-" + i));
+ }
+
+ Thread.sleep(500); // should wait for zk callback to add "/child0" into cache
+ record = store.get("/child0", null, 0);
+ Assert.assertEquals(record.getId(),
+ "child0-new-1",
+ "Cache shoulde be updated to latest create");
+ // System.out.println("2:get:" + record);
+
+ _gZkClient.delete(child0Path);
+ Thread.sleep(500); // should wait for zk callback to remove "/child0" from cache
+ try
+ {
+ record = store.get("/child0", null, AccessOption.THROW_EXCEPTION_IFNOTEXIST);
+ Assert.fail("/child0 should have been removed");
+ }
+ catch (ZkNoNodeException e)
+ {
+ // OK.
+ }
+ // System.out.println("3:get:" + record);
+
+ store.stop();
+ System.out.println("END testBackToBackRemoveAndSet() at "
+ + new Date(System.currentTimeMillis()));
+ }
+
+ private String getNodeId(int i, int j)
+ {
+ return "childNode_" + i + "_" + j;
+ }
+
+ private String getSecondLevelKey(int i, int j)
+ {
+ return "/node_" + i + "/" + getNodeId(i, j);
+ }
+
+ private String getFirstLevelKey(int i)
+ {
+ return "/node_" + i;
+ }
+
+ private void setNodes(ZkHelixPropertyStore<ZNRecord> store,
+ char c,
+ boolean needTimestamp)
+ {
+ char[] data = new char[bufSize];
+
+ for (int i = 0; i < bufSize; i++)
+ {
+ data[i] = c;
+ }
+
+ Map<String, String> map = new TreeMap<String, String>();
+ for (int i = 0; i < mapNr; i++)
+ {
+ map.put("key_" + i, new String(data));
+ }
+
+ for (int i = 0; i < firstLevelNr; i++)
+ {
+ for (int j = 0; j < secondLevelNr; j++)
+ {
+ String nodeId = getNodeId(i, j);
+ ZNRecord record = new ZNRecord(nodeId);
+ record.setSimpleFields(map);
+ if (needTimestamp)
+ {
+ long now = System.currentTimeMillis();
+ record.setSimpleField("SetTimestamp", Long.toString(now));
+ }
+ String key = getSecondLevelKey(i, j);
+ store.set(key, record, AccessOption.PERSISTENT);
+ }
+ }
+ }
+
+ private void setNodes(ZkClient zkClient, String root, char c, boolean needTimestamp)
+ {
+ char[] data = new char[bufSize];
+
+ for (int i = 0; i < bufSize; i++)
+ {
+ data[i] = c;
+ }
+
+ Map<String, String> map = new TreeMap<String, String>();
+ for (int i = 0; i < mapNr; i++)
+ {
+ map.put("key_" + i, new String(data));
+ }
+
+ for (int i = 0; i < firstLevelNr; i++)
+ {
+ String firstLevelKey = getFirstLevelKey(i);
+
+ for (int j = 0; j < secondLevelNr; j++)
+ {
+ String nodeId = getNodeId(i, j);
+ ZNRecord record = new ZNRecord(nodeId);
+ record.setSimpleFields(map);
+ if (needTimestamp)
+ {
+ long now = System.currentTimeMillis();
+ record.setSimpleField("SetTimestamp", Long.toString(now));
+ }
+ String key = getSecondLevelKey(i, j);
+ try
+ {
+ zkClient.writeData(root + key, record);
+ }
+ catch (ZkNoNodeException e)
+ {
+ zkClient.createPersistent(root + firstLevelKey, true);
+ zkClient.createPersistent(root + key, record);
+ }
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/store/zk/TestZkPropertyStoreSessionExpiry.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/store/zk/TestZkPropertyStoreSessionExpiry.java b/helix-core/src/test/java/org/apache/helix/store/zk/TestZkPropertyStoreSessionExpiry.java
new file mode 100644
index 0000000..03ff074
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/store/zk/TestZkPropertyStoreSessionExpiry.java
@@ -0,0 +1,107 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.store.zk;
+
+import java.util.Date;
+
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.store.PropertyChangeListener;
+import org.apache.helix.store.PropertyJsonSerializer;
+import org.apache.helix.store.zk.ZKPropertyStore;
+import org.apache.log4j.Logger;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestZkPropertyStoreSessionExpiry extends ZkUnitTestBase
+{
+ private static final Logger LOG = Logger.getLogger(TestZkPropertyStoreSessionExpiry.class);
+
+ private class TestPropertyChangeListener
+ implements PropertyChangeListener<String>
+ {
+ public boolean _propertyChangeReceived = false;
+
+ @Override
+ public void onPropertyChange(String key)
+ {
+ // TODO Auto-generated method stub
+ LOG.info("property change, " + key);
+ _propertyChangeReceived = true;
+ }
+ }
+
+ ZkClient _zkClient;
+
+ @BeforeClass
+ public void beforeClass()
+ {
+ _zkClient = new ZkClient(ZK_ADDR);
+ _zkClient.setZkSerializer(new ZNRecordSerializer());
+ }
+
+ @AfterClass
+ public void afterClass()
+ {
+ _zkClient.close();
+ }
+
+
+ @Test()
+ public void testZkPropertyStoreSessionExpiry() throws Exception
+ {
+ LOG.info("START " + getShortClassName() + " at " + new Date(System.currentTimeMillis()));
+
+ PropertyJsonSerializer<String> serializer = new PropertyJsonSerializer<String>(String.class);
+
+ ZkConnection zkConn = new ZkConnection(ZK_ADDR);
+
+ final String propertyStoreRoot = "/" + getShortClassName();
+ if (_zkClient.exists(propertyStoreRoot))
+ {
+ _zkClient.deleteRecursive(propertyStoreRoot);
+ }
+
+ ZKPropertyStore<String> zkPropertyStore
+ = new ZKPropertyStore<String>(new ZkClient(zkConn), serializer, propertyStoreRoot);
+
+ zkPropertyStore.setProperty("/child1/grandchild1", "grandchild1");
+ zkPropertyStore.setProperty("/child1/grandchild2", "grandchild2");
+
+ TestPropertyChangeListener listener = new TestPropertyChangeListener();
+ zkPropertyStore.subscribeForPropertyChange("", listener);
+
+ listener._propertyChangeReceived = false;
+ zkPropertyStore.setProperty("/child2/grandchild3", "grandchild3");
+ Thread.sleep(100);
+ AssertJUnit.assertEquals(listener._propertyChangeReceived, true);
+
+ simulateSessionExpiry(zkConn);
+
+ listener._propertyChangeReceived = false;
+ zkPropertyStore.setProperty("/child2/grandchild4", "grandchild4");
+ Thread.sleep(100);
+ AssertJUnit.assertEquals(listener._propertyChangeReceived, true);
+
+ LOG.info("END " + getShortClassName() + " at " + new Date(System.currentTimeMillis()));
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/tools/TestClusterSetup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/tools/TestClusterSetup.java b/helix-core/src/test/java/org/apache/helix/tools/TestClusterSetup.java
new file mode 100644
index 0000000..6c0c8c4
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/tools/TestClusterSetup.java
@@ -0,0 +1,494 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.tools;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Date;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class TestClusterSetup extends ZkUnitTestBase
+{
+ private static Logger LOG =
+ Logger.getLogger(TestClusterSetup.class);
+
+ protected static final String CLUSTER_NAME = "TestClusterSetup";
+ protected static final String TEST_DB = "TestDB";
+ protected static final String INSTANCE_PREFIX = "instance:";
+ protected static final String STATE_MODEL = "MasterSlave";
+ protected static final String TEST_NODE = "testnode:1";
+
+ ZkClient _zkClient;
+ ClusterSetup _clusterSetup;
+
+ String instanceColonToUnderscoreFormat(String colonFormat)
+ {
+ int lastPos = colonFormat.lastIndexOf(":");
+ if (lastPos <= 0)
+ {
+ String error = "Invalid storage Instance info format: " + colonFormat;
+ LOG.warn(error);
+ throw new HelixException(error);
+ }
+ String host = colonFormat.substring(0, lastPos);
+ String portStr = colonFormat.substring(lastPos + 1);
+ return host + "_" + portStr;
+ }
+
+ private static String[] createArgs(String str)
+ {
+ String[] split = str.split("[ ]+");
+ System.out.println(Arrays.toString(split));
+ return split;
+ }
+
+ @BeforeClass()
+ public void beforeClass() throws IOException,
+ Exception
+ {
+ System.out.println("START TestClusterSetup.beforeClass() "
+ + new Date(System.currentTimeMillis()));
+
+ _zkClient = new ZkClient(ZK_ADDR);
+ _zkClient.setZkSerializer(new ZNRecordSerializer());
+ }
+
+ @AfterClass()
+ public void afterClass()
+ {
+ _zkClient.close();
+ System.out.println("END TestClusterSetup.afterClass() "
+ + new Date(System.currentTimeMillis()));
+ }
+
+ @BeforeMethod()
+ public void setup()
+ {
+
+ _zkClient.deleteRecursive("/" + CLUSTER_NAME);
+ _clusterSetup = new ClusterSetup(ZK_ADDR);
+ _clusterSetup.addCluster(CLUSTER_NAME, true);
+ }
+
+ @Test()
+ public void testAddInstancesToCluster() throws Exception
+ {
+ String instanceAddresses[] = new String[3];
+ for (int i = 0; i < 3; i++)
+ {
+ String currInstance = INSTANCE_PREFIX + i;
+ instanceAddresses[i] = currInstance;
+ }
+ String nextInstanceAddress = INSTANCE_PREFIX + 3;
+
+ _clusterSetup.addInstancesToCluster(CLUSTER_NAME, instanceAddresses);
+
+ // verify instances
+ for (String instance : instanceAddresses)
+ {
+ verifyInstance(_zkClient,
+ CLUSTER_NAME,
+ instanceColonToUnderscoreFormat(instance),
+ true);
+ }
+
+ _clusterSetup.addInstanceToCluster(CLUSTER_NAME, nextInstanceAddress);
+ verifyInstance(_zkClient,
+ CLUSTER_NAME,
+ instanceColonToUnderscoreFormat(nextInstanceAddress),
+ true);
+ // re-add
+ boolean caughtException = false;
+ try
+ {
+ _clusterSetup.addInstanceToCluster(CLUSTER_NAME, nextInstanceAddress);
+ }
+ catch (HelixException e)
+ {
+ caughtException = true;
+ }
+ AssertJUnit.assertTrue(caughtException);
+
+ // bad instance format
+ String badFormatInstance = "badinstance";
+ caughtException = false;
+ try
+ {
+ _clusterSetup.addInstanceToCluster(CLUSTER_NAME, badFormatInstance);
+ }
+ catch (HelixException e)
+ {
+ caughtException = true;
+ }
+ AssertJUnit.assertTrue(caughtException);
+
+ }
+
+ @Test()
+ public void testDisableDropInstancesFromCluster() throws Exception
+ {
+ testAddInstancesToCluster();
+ String instanceAddresses[] = new String[3];
+ for (int i = 0; i < 3; i++)
+ {
+ String currInstance = INSTANCE_PREFIX + i;
+ instanceAddresses[i] = currInstance;
+ }
+ String nextInstanceAddress = INSTANCE_PREFIX + 3;
+
+ boolean caughtException = false;
+ // drop without disabling
+ try
+ {
+ _clusterSetup.dropInstanceFromCluster(CLUSTER_NAME, nextInstanceAddress);
+ }
+ catch (HelixException e)
+ {
+ caughtException = true;
+ }
+ AssertJUnit.assertTrue(caughtException);
+
+ // disable
+ _clusterSetup.getClusterManagementTool()
+ .enableInstance(CLUSTER_NAME,
+ instanceColonToUnderscoreFormat(nextInstanceAddress),
+ false);
+ verifyEnabled(_zkClient,
+ CLUSTER_NAME,
+ instanceColonToUnderscoreFormat(nextInstanceAddress),
+ false);
+
+ // drop
+ _clusterSetup.dropInstanceFromCluster(CLUSTER_NAME, nextInstanceAddress);
+ verifyInstance(_zkClient,
+ CLUSTER_NAME,
+ instanceColonToUnderscoreFormat(nextInstanceAddress),
+ false);
+
+ // re-drop
+ caughtException = false;
+ try
+ {
+ _clusterSetup.dropInstanceFromCluster(CLUSTER_NAME, nextInstanceAddress);
+ }
+ catch (HelixException e)
+ {
+ caughtException = true;
+ }
+ AssertJUnit.assertTrue(caughtException);
+ /*
+ * //drop a set _clusterSetup.getClusterManagementTool().enableInstances(CLUSTER_NAME,
+ * instanceAddresses, false); _clusterSetup.dropInstancesFromCluster(CLUSTER_NAME,
+ * instanceAddresses);
+ */
+
+ // bad format disable, drop
+ String badFormatInstance = "badinstance";
+ caughtException = false;
+ try
+ {
+ _clusterSetup.getClusterManagementTool().enableInstance(CLUSTER_NAME,
+ badFormatInstance,
+ false);
+ }
+ catch (HelixException e)
+ {
+ caughtException = true;
+ }
+ AssertJUnit.assertTrue(caughtException);
+
+ caughtException = false;
+ try
+ {
+ _clusterSetup.dropInstanceFromCluster(CLUSTER_NAME, badFormatInstance);
+ }
+ catch (HelixException e)
+ {
+ caughtException = true;
+ }
+ AssertJUnit.assertTrue(caughtException);
+ }
+
+ @Test()
+ public void testAddResource() throws Exception
+ {
+ try
+ {
+ _clusterSetup.addResourceToCluster(CLUSTER_NAME, TEST_DB, 16, STATE_MODEL);
+ }
+ catch(Exception e)
+ {}
+ verifyResource(_zkClient, CLUSTER_NAME, TEST_DB, true);
+ }
+
+ @Test()
+ public void testRemoveResource() throws Exception
+ {
+ _clusterSetup.setupTestCluster(CLUSTER_NAME);
+ verifyResource(_zkClient, CLUSTER_NAME, TEST_DB, true);
+ _clusterSetup.dropResourceFromCluster(CLUSTER_NAME, TEST_DB);
+ verifyResource(_zkClient, CLUSTER_NAME, TEST_DB, false);
+ }
+
+ @Test()
+ public void testRebalanceCluster() throws Exception
+ {
+ _clusterSetup.setupTestCluster(CLUSTER_NAME);
+ // testAddInstancesToCluster();
+ testAddResource();
+ _clusterSetup.rebalanceStorageCluster(CLUSTER_NAME, TEST_DB, 4);
+ verifyReplication(_zkClient, CLUSTER_NAME, TEST_DB, 4);
+ }
+
+ /*
+ * @Test (groups = {"unitTest"}) public void testPrintUsage() throws Exception { Options
+ * cliOptions = ClusterSetup.constructCommandLineOptions();
+ * ClusterSetup.printUsage(null); }
+ */
+
+ @Test()
+ public void testParseCommandLinesArgs() throws Exception
+ {
+ // ClusterSetup
+ // .processCommandLineArgs(createArgs("-zkSvr "+ZK_ADDR+ " help"));
+
+ // wipe ZK
+ _zkClient.deleteRecursive("/" + CLUSTER_NAME);
+ _clusterSetup = new ClusterSetup(ZK_ADDR);
+
+ ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR + " --addCluster "
+ + CLUSTER_NAME));
+
+ // wipe again
+ _zkClient.deleteRecursive("/" + CLUSTER_NAME);
+ _clusterSetup = new ClusterSetup(ZK_ADDR);
+
+ _clusterSetup.setupTestCluster(CLUSTER_NAME);
+
+ ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR + " --addNode "
+ + CLUSTER_NAME + " " + TEST_NODE));
+ verifyInstance(_zkClient,
+ CLUSTER_NAME,
+ instanceColonToUnderscoreFormat(TEST_NODE),
+ true);
+ try
+ {
+ ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR
+ + " --addResource " + CLUSTER_NAME + " " + TEST_DB + " 4 " + STATE_MODEL));
+ }
+ catch(Exception e)
+ {
+
+ }
+ verifyResource(_zkClient, CLUSTER_NAME, TEST_DB, true);
+ // ClusterSetup
+ // .processCommandLineArgs(createArgs("-zkSvr "+ZK_ADDR+" --addNode node-1"));
+ ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR
+ + " --enableInstance " + CLUSTER_NAME + " "
+ + instanceColonToUnderscoreFormat(TEST_NODE) + " true"));
+ verifyEnabled(_zkClient,
+ CLUSTER_NAME,
+ instanceColonToUnderscoreFormat(TEST_NODE),
+ true);
+
+ // TODO: verify list commands
+ /*
+ * ClusterSetup
+ * .processCommandLineArgs(createArgs("-zkSvr "+ZK_ADDR+" --listClusterInfo "
+ * +CLUSTER_NAME)); ClusterSetup
+ * .processCommandLineArgs(createArgs("-zkSvr "+ZK_ADDR+" --listClusters"));
+ * ClusterSetup
+ * .processCommandLineArgs(createArgs("-zkSvr "+ZK_ADDR+" --listInstanceInfo "
+ * +CLUSTER_NAME+" "+instanceColonToUnderscoreFormat(TEST_NODE))); ClusterSetup
+ * .processCommandLineArgs
+ * (createArgs("-zkSvr "+ZK_ADDR+" --listInstances "+CLUSTER_NAME)); ClusterSetup
+ * .processCommandLineArgs
+ * (createArgs("-zkSvr "+ZK_ADDR+" --listResourceInfo "+CLUSTER_NAME +" "+TEST_DB));
+ * ClusterSetup
+ * .processCommandLineArgs(createArgs("-zkSvr "+ZK_ADDR+" --listResources "
+ * +CLUSTER_NAME)); ClusterSetup
+ * .processCommandLineArgs(createArgs("-zkSvr "+ZK_ADDR+" --listStateModel "
+ * +CLUSTER_NAME+" "+STATE_MODEL)); ClusterSetup
+ * .processCommandLineArgs(createArgs("-zkSvr "
+ * +ZK_ADDR+" --listStateModels "+CLUSTER_NAME));
+ */
+ // ClusterSetup
+ // .processCommandLineArgs(createArgs("-zkSvr "+ZK_ADDR+" --rebalance "+CLUSTER_NAME+" "+TEST_DB+" 1"));
+ ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR
+ + " --enableInstance " + CLUSTER_NAME + " "
+ + instanceColonToUnderscoreFormat(TEST_NODE) + " false"));
+ verifyEnabled(_zkClient,
+ CLUSTER_NAME,
+ instanceColonToUnderscoreFormat(TEST_NODE),
+ false);
+ ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR + " --dropNode "
+ + CLUSTER_NAME + " " + TEST_NODE));
+ }
+
+ @Test()
+ public void testSetGetConfig() throws Exception
+ {
+ System.out.println("START testSetGetConfig() " + new Date(System.currentTimeMillis()));
+
+ // basic
+ String scopesStr = "CLUSTER=" + CLUSTER_NAME + ",PARTICIPANT=localhost_0";
+ String propertiesStr = "key1=value1,key2=value2";
+ String keysStr = "key1,key2";
+ _clusterSetup.setConfig(scopesStr, propertiesStr);
+ String valuesStr = _clusterSetup.getConfig(scopesStr, keysStr);
+ Assert.assertEquals(valuesStr, propertiesStr);
+
+ System.out.println("END testSetGetConfig() " + new Date(System.currentTimeMillis()));
+
+ }
+
+ @Test
+ public void testEnableCluster() throws Exception
+ {
+ // Logger.getRootLogger().setLevel(Level.INFO);
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+
+ System.out.println("START " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 10, // partitions per resource
+ 5, // number of nodes
+ 3, // replicas
+ "MasterSlave",
+ true); // do rebalance
+
+ // pause cluster
+ ClusterSetup.processCommandLineArgs(new String[] { "--zkSvr", ZK_ADDR,
+ "--enableCluster", clusterName, "false" });
+
+ Builder keyBuilder = new Builder(clusterName);
+ boolean exists = _gZkClient.exists(keyBuilder.pause().getPath());
+ Assert.assertTrue(exists, "pause node under controller should be created");
+
+ // resume cluster
+ ClusterSetup.processCommandLineArgs(new String[] { "--zkSvr", ZK_ADDR,
+ "--enableCluster", clusterName, "true" });
+
+ exists = _gZkClient.exists(keyBuilder.pause().getPath());
+ Assert.assertFalse(exists, "pause node under controller should be removed");
+
+ System.out.println("END " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ }
+
+ @Test
+ public void testDropInstance() throws Exception
+ {
+ // drop without stop, should throw exception
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+
+ System.out.println("START " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 10, // partitions per resource
+ 5, // number of nodes
+ 3, // replicas
+ "MasterSlave",
+ true); // do rebalance
+
+ // add fake liveInstance
+ ZKHelixDataAccessor accessor =
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+ Builder keyBuilder = new Builder(clusterName);
+ LiveInstance liveInstance = new LiveInstance("localhost_12918");
+ liveInstance.setSessionId("session_0");
+ liveInstance.setHelixVersion("version_0");
+ accessor.setProperty(keyBuilder.liveInstance("localhost_12918"), liveInstance);
+
+ // drop without stop the process, should throw exception
+ try
+ {
+ ClusterSetup.processCommandLineArgs(new String[] { "--zkSvr", ZK_ADDR,
+ "--dropNode", clusterName, "localhost:12918" });
+ Assert.fail("Should throw exception since localhost_12918 is still in LIVEINSTANCES/");
+ }
+ catch (Exception e)
+ {
+ // OK
+ }
+ accessor.removeProperty(keyBuilder.liveInstance("localhost_12918"));
+
+ // drop without disable, should throw exception
+ try
+ {
+ ClusterSetup.processCommandLineArgs(new String[] { "--zkSvr", ZK_ADDR,
+ "--dropNode", clusterName, "localhost:12918" });
+ Assert.fail("Should throw exception since localhost_12918 is enabled");
+ }
+ catch (Exception e)
+ {
+ // e.printStackTrace();
+ // OK
+ }
+
+ // drop it
+ ClusterSetup.processCommandLineArgs(new String[] { "--zkSvr", ZK_ADDR,
+ "--enableInstance", clusterName, "localhost_12918", "false" });
+ ClusterSetup.processCommandLineArgs(new String[] { "--zkSvr", ZK_ADDR, "--dropNode",
+ clusterName, "localhost:12918" });
+
+ Assert.assertNull(accessor.getProperty(keyBuilder.instanceConfig("localhost_12918")),
+ "Instance config should be dropped");
+ Assert.assertFalse(_gZkClient.exists(PropertyPathConfig.getPath(PropertyType.INSTANCES,
+ clusterName,
+ "localhost_12918")),
+ "Instance/host should be dropped");
+
+ System.out.println("END " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/tools/TestHelixAdminScenariosCli.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/tools/TestHelixAdminScenariosCli.java b/helix-core/src/test/java/org/apache/helix/tools/TestHelixAdminScenariosCli.java
new file mode 100644
index 0000000..9b9fcb0
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/tools/TestHelixAdminScenariosCli.java
@@ -0,0 +1,423 @@
+package org.apache.helix.tools;
+/*
+ * Simulate all the admin tasks needed by using command line tool
+ *
+ * */
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.TestHelper.StartCMResult;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.controller.restlet.ZKPropertyTransferServer;
+import org.apache.helix.controller.restlet.ZkPropertyTransferClient;
+import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.manager.zk.ZKUtil;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.MasterNbInExtViewVerifier;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestHelixAdminScenariosCli extends ZkIntegrationTestBase
+{
+ Map<String, StartCMResult> _startCMResultMap =
+ new HashMap<String, StartCMResult>();
+
+ public static String ObjectToJson(Object object) throws JsonGenerationException,
+ JsonMappingException,
+ IOException
+ {
+ ObjectMapper mapper = new ObjectMapper();
+ SerializationConfig serializationConfig = mapper.getSerializationConfig();
+ serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+
+ StringWriter sw = new StringWriter();
+ mapper.writeValue(sw, object);
+
+ return sw.toString();
+ }
+
+
+ @Test
+ public void testAddDeleteClusterAndInstanceAndResource() throws Exception
+ {
+ // Helix bug helix-102
+ //ZKPropertyTransferServer.PERIOD = 500;
+ //ZkPropertyTransferClient.SEND_PERIOD = 500;
+ //ZKPropertyTransferServer.getInstance().init(19999, ZK_ADDR);
+
+ /**======================= Add clusters ==============================*/
+
+ testAddCluster();
+
+ /**================= Add / drop some resources ===========================*/
+
+ testAddResource();
+
+ /**====================== Add / delete instances ===========================*/
+
+ testAddInstance();
+
+ /**===================== Rebalance resource ===========================*/
+
+ testRebalanceResource();
+
+ /**==================== start the clusters =============================*/
+
+ testStartCluster();
+
+ /**==================== drop add resource in live clusters ===================*/
+ testDropAddResource();
+ /**======================Operations with live node ============================*/
+
+ testInstanceOperations();
+
+ /**============================ expand cluster ===========================*/
+
+ testExpandCluster();
+
+ /**============================ deactivate cluster ===========================*/
+ testDeactivateCluster();
+
+ }
+
+ void assertClusterSetupException(String command)
+ {
+ boolean exceptionThrown = false;
+ try
+ {
+ ClusterSetup.processCommandLineArgs(command.split(" "));
+ }
+ catch(Exception e)
+ {
+ exceptionThrown = true;
+ }
+ Assert.assertTrue(exceptionThrown);
+ }
+
+ public void testAddCluster() throws Exception
+ {
+ String command = "--zkSvr localhost:2183 -addCluster clusterTest";
+ ClusterSetup.processCommandLineArgs(command.split(" "));
+
+ // malformed cluster name
+ command = "--zkSvr localhost:2183 -addCluster /ClusterTest";
+ assertClusterSetupException(command);
+
+ // Add the grand cluster
+ command = "--zkSvr localhost:2183 -addCluster \"Klazt3rz";
+ ClusterSetup.processCommandLineArgs(command.split(" "));
+
+ command = "--zkSvr localhost:2183 -addCluster \\ClusterTest";
+ ClusterSetup.processCommandLineArgs(command.split(" "));
+
+ // Add already exist cluster
+ command = "--zkSvr localhost:2183 -addCluster clusterTest";
+ assertClusterSetupException(command);
+
+ // delete cluster without resource and instance
+ Assert.assertTrue(ZKUtil.isClusterSetup("Klazt3rz", _gZkClient));
+ Assert.assertTrue(ZKUtil.isClusterSetup("clusterTest", _gZkClient));
+ Assert.assertTrue(ZKUtil.isClusterSetup("\\ClusterTest", _gZkClient));
+
+ command = "-zkSvr localhost:2183 -dropCluster \\ClusterTest";
+ ClusterSetup.processCommandLineArgs(command.split(" "));
+
+ command = "-zkSvr localhost:2183 -dropCluster clusterTest1";
+ ClusterSetup.processCommandLineArgs(command.split(" "));
+
+ command = "-zkSvr localhost:2183 -dropCluster clusterTest";
+ ClusterSetup.processCommandLineArgs(command.split(" "));
+
+ Assert.assertFalse(_gZkClient.exists("/clusterTest"));
+ Assert.assertFalse(_gZkClient.exists("/\\ClusterTest"));
+ Assert.assertFalse(_gZkClient.exists("/clusterTest1"));
+
+ command = "-zkSvr localhost:2183 -addCluster clusterTest1";
+ ClusterSetup.processCommandLineArgs(command.split(" "));
+ }
+
+ public void testAddResource() throws Exception
+ {
+ String command = "-zkSvr localhost:2183 -addResource clusterTest1 db_22 144 MasterSlave";
+ ClusterSetup.processCommandLineArgs(command.split(" "));
+
+ command = "-zkSvr localhost:2183 -addResource clusterTest1 db_11 44 MasterSlave";
+ ClusterSetup.processCommandLineArgs(command.split(" "));
+
+ // Add duplicate resource
+ command = "-zkSvr localhost:2183 -addResource clusterTest1 db_22 55 OnlineOffline";
+ assertClusterSetupException(command);
+
+ // drop resource now
+ command = "-zkSvr localhost:2183 -dropResource clusterTest1 db_11 ";
+ ClusterSetup.processCommandLineArgs(command.split(" "));
+
+ command = "-zkSvr localhost:2183 -addResource clusterTest1 db_11 44 MasterSlave";
+ ClusterSetup.processCommandLineArgs(command.split(" "));
+ }
+
+ private void testDeactivateCluster() throws Exception, InterruptedException
+ {
+ String command;
+ HelixDataAccessor accessor;
+ String path;
+ // deactivate cluster
+ command = "-zkSvr localhost:2183 -activateCluster clusterTest1 Klazt3rz false";
+ ClusterSetup.processCommandLineArgs(command.split(" "));
+ Thread.sleep(6000);
+
+ accessor = _startCMResultMap.get("localhost_1231")._manager.getHelixDataAccessor();
+ path = accessor.keyBuilder().controllerLeader().getPath();
+ Assert.assertFalse(_gZkClient.exists(path));
+
+ command = "-zkSvr localhost:2183 -dropCluster clusterTest1";
+ assertClusterSetupException(command);
+
+ // leader node should be gone
+ for(StartCMResult result : _startCMResultMap.values())
+ {
+ result._manager.disconnect();
+ result._thread.interrupt();
+ }
+
+ command = "-zkSvr localhost:2183 -dropCluster clusterTest1";
+ ClusterSetup.processCommandLineArgs(command.split(" "));
+
+ command = "-zkSvr localhost:2183 -dropCluster Klazt3rz";
+ ClusterSetup.processCommandLineArgs(command.split(" "));
+ }
+
+ private void testDropAddResource() throws Exception
+ {
+ ZNRecord record = _gSetupTool._admin.getResourceIdealState("clusterTest1", "db_11").getRecord();
+ String x = ObjectToJson(record);
+
+ FileWriter fos = new FileWriter("/tmp/temp.log");
+ PrintWriter pw = new PrintWriter(fos);
+ pw.write(x);
+ pw.close();
+
+ String command = "-zkSvr localhost:2183 -dropResource clusterTest1 db_11 ";
+ ClusterSetup.processCommandLineArgs(command.split(" "));
+
+ boolean verifyResult =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ "clusterTest1"));
+ Assert.assertTrue(verifyResult);
+
+ command = "-zkSvr localhost:2183 -addIdealState clusterTest1 db_11 \"/tmp/temp.log\"";
+ ClusterSetup.processCommandLineArgs(command.split(" "));
+
+ verifyResult =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ "clusterTest1"));
+ Assert.assertTrue(verifyResult);
+
+ ZNRecord record2 = _gSetupTool._admin.getResourceIdealState("clusterTest1", "db_11").getRecord();
+ Assert.assertTrue(record2.equals(record));
+ }
+
+ private void testExpandCluster() throws Exception
+ {
+ String command;
+ HelixDataAccessor accessor;
+ boolean verifyResult;
+ String path;
+
+ command = "-zkSvr localhost:2183 -addNode clusterTest1 localhost:12331;localhost:12341;localhost:12351;localhost:12361";
+ ClusterSetup.processCommandLineArgs(command.split(" "));
+
+ command = "-zkSvr localhost:2183 -expandCluster clusterTest1";
+ ClusterSetup.processCommandLineArgs(command.split(" "));
+
+ for(int i = 3; i<= 6; i++)
+ {
+ StartCMResult result =
+ TestHelper.startDummyProcess(ZK_ADDR, "clusterTest1", "localhost_123"+i+"1");
+ _startCMResultMap.put("localhost_123"+i + "1", result);
+ }
+
+ verifyResult =
+ ClusterStateVerifier.verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR,
+ "clusterTest1"));
+ Assert.assertTrue(verifyResult);
+
+ verifyResult =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ "clusterTest1"));
+ Assert.assertTrue(verifyResult);
+ }
+
+ private void testInstanceOperations() throws Exception
+ {
+ String command;
+ HelixDataAccessor accessor;
+ boolean verifyResult;
+ // drop node should fail as not disabled
+ command = "-zkSvr localhost:2183 -dropNode clusterTest1 localhost:1232";
+ assertClusterSetupException(command);
+
+ // disabled node
+ command = "-zkSvr localhost:2183 -enableInstance clusterTest1 localhost:1232 false";
+ ClusterSetup.processCommandLineArgs(command.split(" "));
+
+ // Cannot drop / swap
+ command = "-zkSvr localhost:2183 -dropNode clusterTest1 localhost:1232";
+ assertClusterSetupException(command);
+
+ command = "-zkSvr localhost:2183 -swapInstance clusterTest1 localhost_1232 localhost_12320";
+ assertClusterSetupException(command);
+
+ // disconnect the node
+ _startCMResultMap.get("localhost_1232")._manager.disconnect();
+ _startCMResultMap.get("localhost_1232")._thread.interrupt();
+
+ // add new node then swap instance
+ command = "-zkSvr localhost:2183 -addNode clusterTest1 localhost:12320";
+ ClusterSetup.processCommandLineArgs(command.split(" "));
+
+ // swap instance. The instance get swapped out should not exist anymore
+ command = "-zkSvr localhost:2183 -swapInstance clusterTest1 localhost_1232 localhost_12320";
+ ClusterSetup.processCommandLineArgs(command.split(" "));
+
+ accessor = _startCMResultMap.get("localhost_1231")._manager.getHelixDataAccessor();
+ String path = accessor.keyBuilder().instanceConfig("localhost_1232").getPath();
+ Assert.assertFalse(_gZkClient.exists(path));
+
+ _startCMResultMap.put("localhost_12320", TestHelper.startDummyProcess(ZK_ADDR, "clusterTest1", "localhost_12320"));
+ }
+
+ private void testStartCluster() throws Exception, InterruptedException
+ {
+ String command;
+ //start mock nodes
+ for(int i = 0; i < 6 ; i++)
+ {
+ StartCMResult result =
+ TestHelper.startDummyProcess(ZK_ADDR, "clusterTest1", "localhost_123"+i);
+ _startCMResultMap.put("localhost_123"+i, result);
+ }
+
+ //start controller nodes
+ for(int i = 0; i< 2; i++)
+ {
+ StartCMResult result =
+ TestHelper.startController("Klazt3rz", "controller_900" + i, ZK_ADDR, HelixControllerMain.DISTRIBUTED);
+
+ _startCMResultMap.put("controller_900" + i, result);
+ }
+ Thread.sleep(100);
+
+ // activate clusters
+ // wrong grand clustername
+ command = "-zkSvr localhost:2183 -activateCluster clusterTest1 Klazters true";
+ assertClusterSetupException(command);
+
+ // wrong cluster name
+ command = "-zkSvr localhost:2183 -activateCluster clusterTest2 Klazt3rs true";
+ assertClusterSetupException(command);
+
+ command = "-zkSvr localhost:2183 -activateCluster clusterTest1 Klazt3rz true";
+ ClusterSetup.processCommandLineArgs(command.split(" "));
+ Thread.sleep(500);
+
+ command = "-zkSvr localhost:2183 -dropCluster clusterTest1";
+ assertClusterSetupException(command);
+
+ // verify leader node
+ HelixDataAccessor accessor = _startCMResultMap.get("controller_9001")._manager.getHelixDataAccessor();
+ LiveInstance controllerLeader = accessor.getProperty(accessor.keyBuilder().controllerLeader());
+ Assert.assertTrue(controllerLeader.getInstanceName().startsWith("controller_900"));
+
+ accessor = _startCMResultMap.get("localhost_1232")._manager.getHelixDataAccessor();
+ LiveInstance leader = accessor.getProperty(accessor.keyBuilder().controllerLeader());
+ Assert.assertTrue(leader.getInstanceName().startsWith("controller_900"));
+
+ boolean verifyResult =
+ ClusterStateVerifier.verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR,
+ "clusterTest1"));
+ Assert.assertTrue(verifyResult);
+
+ verifyResult =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ "clusterTest1"));
+ Assert.assertTrue(verifyResult);
+ }
+
+ private void testRebalanceResource() throws Exception
+ {
+ String command = "-zkSvr localhost:2183 -rebalance clusterTest1 db_11 3";
+ ClusterSetup.processCommandLineArgs(command.split(" "));
+
+ command = "-zkSvr localhost:2183 -dropResource clusterTest1 db_11 ";
+ ClusterSetup.processCommandLineArgs(command.split(" "));
+
+ // re-add and rebalance
+ command = "-zkSvr localhost:2183 -addResource clusterTest1 db_11 48 MasterSlave";
+ ClusterSetup.processCommandLineArgs(command.split(" "));
+
+ command = "-zkSvr localhost:2183 -rebalance clusterTest1 db_11 3";
+ ClusterSetup.processCommandLineArgs(command.split(" "));
+
+ // rebalance with key prefix
+ command = "-zkSvr localhost:2183 -rebalance clusterTest1 db_22 2 -key alias";
+ ClusterSetup.processCommandLineArgs(command.split(" "));
+ }
+
+ private void testAddInstance() throws Exception
+ {
+ String command;
+ for(int i = 0; i < 3; i++)
+ {
+ command = "-zkSvr localhost:2183 -addNode clusterTest1 localhost:123"+i;
+ ClusterSetup.processCommandLineArgs(command.split(" "));
+ }
+ command = "-zkSvr localhost:2183 -addNode clusterTest1 localhost:1233;localhost:1234;localhost:1235;localhost:1236";
+ ClusterSetup.processCommandLineArgs(command.split(" "));
+
+ // delete one node without disable
+ command = "-zkSvr localhost:2183 -dropNode clusterTest1 localhost:1236";
+ assertClusterSetupException(command);
+
+ // delete non-exist node
+ command = "-zkSvr localhost:2183 -dropNode clusterTest1 localhost:12367";
+ assertClusterSetupException(command);
+
+ // disable node
+ command = "-zkSvr localhost:2183 -enableInstance clusterTest1 localhost:1236 false";
+ ClusterSetup.processCommandLineArgs(command.split(" "));
+
+ command = "-zkSvr localhost:2183 -dropNode clusterTest1 localhost:1236";
+ ClusterSetup.processCommandLineArgs(command.split(" "));
+
+ // add node to controller cluster
+ command = "-zkSvr localhost:2183 -addNode Klazt3rz controller:9000;controller:9001";
+ ClusterSetup.processCommandLineArgs(command.split(" "));
+
+ // add a dup host
+ command = "-zkSvr localhost:2183 -addNode clusterTest1 localhost:1234";
+ assertClusterSetupException(command);
+
+ // drop and add resource
+ command = "-zkSvr localhost:2183 -dropResource clusterTest1 db_11 ";
+ ClusterSetup.processCommandLineArgs(command.split(" "));
+
+ command = "-zkSvr localhost:2183 -addResource clusterTest1 db_11 12 MasterSlave";
+ ClusterSetup.processCommandLineArgs(command.split(" "));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/util/TestZKClientPool.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/util/TestZKClientPool.java b/helix-core/src/test/java/org/apache/helix/util/TestZKClientPool.java
new file mode 100644
index 0000000..509d6c0
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/util/TestZKClientPool.java
@@ -0,0 +1,72 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.util;
+
+import java.util.Date;
+
+import org.I0Itec.zkclient.ZkServer;
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.util.ZKClientPool;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestZKClientPool
+{
+
+ @Test
+ public void test() throws Exception
+ {
+ String testName = "TestZKClientPool";
+ System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis()));
+
+ String zkAddr = "localhost:2187";
+ ZkServer zkServer = TestHelper.startZkSever(zkAddr);
+ ZkClient zkClient = ZKClientPool.getZkClient(zkAddr);
+
+ zkClient.createPersistent("/" + testName, new ZNRecord(testName));
+ ZNRecord record = zkClient.readData("/" + testName);
+ Assert.assertEquals(record.getId(), testName);
+
+ TestHelper.stopZkServer(zkServer);
+
+ // restart zk
+ zkServer = TestHelper.startZkSever(zkAddr);
+ try
+ {
+ zkClient = ZKClientPool.getZkClient(zkAddr);
+ record = zkClient.readData("/" + testName);
+ Assert.fail("should fail on zk no node exception");
+ } catch (ZkNoNodeException e)
+ {
+ // OK
+ } catch (Exception e)
+ {
+ Assert.fail("should not fail on exception other than ZkNoNodeException");
+ }
+
+ zkClient.createPersistent("/" + testName, new ZNRecord(testName));
+ record = zkClient.readData("/" + testName);
+ Assert.assertEquals(record.getId(), testName);
+
+ zkClient.close();
+ TestHelper.stopZkServer(zkServer);
+ System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/mockservice/src/main/java/org/apache/helix/CMConnector.java
----------------------------------------------------------------------
diff --git a/mockservice/src/main/java/org/apache/helix/CMConnector.java b/mockservice/src/main/java/org/apache/helix/CMConnector.java
new file mode 100644
index 0000000..5eaa8a1
--- /dev/null
+++ b/mockservice/src/main/java/org/apache/helix/CMConnector.java
@@ -0,0 +1,42 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+
+
+public class CMConnector {
+
+ HelixManager _manager;
+
+ public CMConnector(final String clusterName, final String instanceName, final String zkAddr) throws Exception //, final ZkClient zkClient) throws Exception
+ {
+ _manager = null;
+ _manager = HelixManagerFactory
+ .getZKHelixManager(clusterName, instanceName, InstanceType.PARTICIPANT, zkAddr); //, zkClient);
+ _manager.connect();
+ }
+
+ public HelixManager getManager() {
+ return _manager;
+ }
+
+ public void disconnect() {
+ _manager.disconnect();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/mockservice/src/main/java/org/apache/helix/EspressoResource.java
----------------------------------------------------------------------
diff --git a/mockservice/src/main/java/org/apache/helix/EspressoResource.java b/mockservice/src/main/java/org/apache/helix/EspressoResource.java
new file mode 100644
index 0000000..30e8853
--- /dev/null
+++ b/mockservice/src/main/java/org/apache/helix/EspressoResource.java
@@ -0,0 +1,214 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import java.io.IOException;
+import java.io.Reader;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.restlet.Context;
+import org.restlet.data.Form;
+import org.restlet.data.MediaType;
+import org.restlet.data.Method;
+import org.restlet.data.Request;
+import org.restlet.data.Response;
+import org.restlet.data.Status;
+import org.restlet.resource.Representation;
+import org.restlet.resource.Resource;
+import org.restlet.resource.ResourceException;
+import org.restlet.resource.StringRepresentation;
+import org.restlet.resource.Variant;
+
+
+public class EspressoResource extends Resource {
+
+ private static final int POST_BODY_BUFFER_SIZE = 1024*10;
+
+ private static final Logger logger = Logger
+ .getLogger(EspressoResource.class);
+
+ Context _context;
+
+ public EspressoResource(Context context,
+ Request request,
+ Response response)
+ {
+ super(context, request, response);
+ getVariants().add(new Variant(MediaType.TEXT_PLAIN));
+ getVariants().add(new Variant(MediaType.APPLICATION_JSON));
+ _context = context;
+ }
+
+ public boolean allowGet()
+ {
+ System.out.println("PutResource.allowGet()");
+ return true;
+ }
+
+ public boolean allowPost()
+ {
+ System.out.println("PutResource.allowPost()");
+ return true;
+ }
+
+ public boolean allowPut()
+ {
+ System.out.println("PutResource.allowPut()");
+ return true;
+ }
+
+ public boolean allowDelete()
+ {
+ return false;
+ }
+
+ /*
+ * Handle get requests
+ * @see org.restlet.resource.Resource#represent(org.restlet.resource.Variant)
+ */
+ public Representation represent(Variant variant)
+ {
+ StringRepresentation presentation = null;
+ try
+ {
+ String databaseId = (String)getRequest().getAttributes().get(MockEspressoService.DATABASENAME);
+ String tableId = (String)getRequest().getAttributes().get(MockEspressoService.TABLENAME);
+ String resourceId = (String)getRequest().getAttributes().get(MockEspressoService.RESOURCENAME);
+ String subResourceId = (String)getRequest().getAttributes().get(MockEspressoService.SUBRESOURCENAME);
+ logger.debug("Done getting request components");
+ logger.debug("method: "+getRequest().getMethod());
+ String composedKey = databaseId + tableId + resourceId; // + subResourceId;
+ EspressoStorageMockNode mock = (EspressoStorageMockNode)_context.getAttributes().get(MockEspressoService.CONTEXT_MOCK_NODE_NAME);
+
+
+ if (getRequest().getMethod() == Method.PUT) {
+ logger.debug("processing PUT");
+ Reader postBodyReader;
+ //TODO: get to no fixed size on buffer
+ char[] postBody = new char[POST_BODY_BUFFER_SIZE];
+ postBodyReader = getRequest().getEntity().getReader();
+ postBodyReader.read(postBody);
+ logger.debug("postBody: "+new String(postBody));
+ mock.doPut(databaseId, composedKey, new String(postBody));
+ presentation = new StringRepresentation("Put succeeded", MediaType.APPLICATION_JSON);
+ }
+ else if (getRequest().getMethod() == Method.GET) {
+ logger.debug("processing GET");
+ String result = mock.doGet(databaseId, composedKey);
+ logger.debug("result: "+result);
+ if (result == null) {
+ presentation = new StringRepresentation("Record not found", MediaType.APPLICATION_JSON);
+ getResponse().setStatus(Status.CLIENT_ERROR_NOT_FOUND,"Record not found");
+ }
+ else {
+ getResponse().setStatus(Status.SUCCESS_OK,"Success");
+ presentation = new StringRepresentation(result, MediaType.APPLICATION_JSON);
+ }
+ }
+ }
+
+ catch (IOException e) {
+ presentation = new StringRepresentation(e.getMessage(), MediaType.APPLICATION_JSON);
+ e.printStackTrace();
+ }
+
+ catch(Exception e)
+ {
+ String error = "Error with op"; //ClusterRepresentationUtil.getErrorAsJsonStringFromException(e);
+ presentation = new StringRepresentation(error, MediaType.APPLICATION_JSON);
+ e.printStackTrace();
+ }
+ return presentation;
+ }
+
+ /*
+ * Handle put requests (non-Javadoc)
+ * @see org.restlet.resource.Resource#storeRepresentation(org.restlet.resource.Representation)
+ */
+ public void storeRepresentation(Representation entity) throws ResourceException {
+ logger.debug("in storeRepresentation");
+ StringRepresentation presentation = null;
+ // try {
+ Form requestHeaders = (Form) getRequest().getAttributes().get("org.restlet.http.headers");
+ Map<String, String> headerMap = requestHeaders.getValuesMap();
+ logger.debug("HEADERS MAP");
+ for (String key : headerMap.keySet()) {
+ logger.debug(key+" : "+headerMap.get(key));
+ }
+ // } catch (IOException e1) {
+ // TODO Auto-generated catch block
+ //e1.printStackTrace();
+ //}
+ try
+ {
+ logger.debug("in PutResource handle");
+ String databaseId = (String)getRequest().getAttributes().get(MockEspressoService.DATABASENAME);
+ String tableId = (String)getRequest().getAttributes().get(MockEspressoService.TABLENAME);
+ String resourceId = (String)getRequest().getAttributes().get(MockEspressoService.RESOURCENAME);
+ String subResourceId = (String)getRequest().getAttributes().get(MockEspressoService.SUBRESOURCENAME);
+ logger.debug("Done getting request components");
+ logger.debug("method: "+getRequest().getMethod());
+ String composedKey = databaseId + tableId + resourceId; // + subResourceId;
+ EspressoStorageMockNode mock = (EspressoStorageMockNode)_context.getAttributes().get(MockEspressoService.CONTEXT_MOCK_NODE_NAME);
+
+ if (getRequest().getMethod() == Method.PUT) {
+ logger.debug("processing PUT");
+ Reader postBodyReader;
+ //TODO: get to no fixed size on buffer
+ char[] postBody = new char[POST_BODY_BUFFER_SIZE];
+ postBodyReader = getRequest().getEntity().getReader();
+ postBodyReader.read(postBody);
+ logger.debug("postBody: "+new String(postBody));
+ mock.doPut(databaseId, composedKey, new String(postBody));
+ presentation = new StringRepresentation("Put succeeded", MediaType.APPLICATION_JSON);
+ }
+ else if (getRequest().getMethod() == Method.GET) {
+ logger.debug("Processing GET");
+ String result = mock.doGet(databaseId, composedKey);
+ logger.debug("result: "+result);
+ if (result == null) {
+ presentation = new StringRepresentation("Record not found", MediaType.APPLICATION_JSON);
+ }
+ else {
+ presentation = new StringRepresentation(result, MediaType.APPLICATION_JSON);
+ }
+ }
+ }
+
+ catch (IOException e) {
+ presentation = new StringRepresentation(e.getMessage(), MediaType.APPLICATION_JSON);
+ e.printStackTrace();
+ }
+
+ catch(Exception e)
+ {
+ String error = "Error with op";
+ presentation = new StringRepresentation(error, MediaType.APPLICATION_JSON);
+ e.printStackTrace();
+ }
+ finally {
+ entity.release();
+ }
+ }
+}