You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:58 UTC
[28/42] Refactoring the package names and removing jsql parser
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/TestZKCallback.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestZKCallback.java b/helix-core/src/test/java/org/apache/helix/TestZKCallback.java
new file mode 100644
index 0000000..291cd1a
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/TestZKCallback.java
@@ -0,0 +1,258 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.helix.ConfigChangeListener;
+import org.apache.helix.CurrentStateChangeListener;
+import org.apache.helix.ExternalViewChangeListener;
+import org.apache.helix.HelixConstants;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.IdealStateChangeListener;
+import org.apache.helix.InstanceType;
+import org.apache.helix.LiveInstanceChangeListener;
+import org.apache.helix.MessageListener;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.tools.ClusterSetup;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestZKCallback extends ZkUnitTestBase
+{
+ private final String clusterName = CLUSTER_PREFIX + "_" + getShortClassName();
+
+ ZkClient _zkClient;
+
+ private static String[] createArgs(String str)
+ {
+ String[] split = str.split("[ ]+");
+ System.out.println(Arrays.toString(split));
+ return split;
+ }
+
+ public class TestCallbackListener implements MessageListener, LiveInstanceChangeListener,
+ ConfigChangeListener, CurrentStateChangeListener, ExternalViewChangeListener,
+ IdealStateChangeListener
+ {
+ boolean externalViewChangeReceived = false;
+ boolean liveInstanceChangeReceived = false;
+ boolean configChangeReceived = false;
+ boolean currentStateChangeReceived = false;
+ boolean messageChangeReceived = false;
+ boolean idealStateChangeReceived = false;
+
+ @Override
+ public void onExternalViewChange(List<ExternalView> externalViewList,
+ NotificationContext changeContext)
+ {
+ externalViewChangeReceived = true;
+ }
+
+ @Override
+ public void onStateChange(String instanceName, List<CurrentState> statesInfo,
+ NotificationContext changeContext)
+ {
+ currentStateChangeReceived = true;
+ }
+
+ @Override
+ public void onConfigChange(List<InstanceConfig> configs, NotificationContext changeContext)
+ {
+ configChangeReceived = true;
+ }
+
+ @Override
+ public void onLiveInstanceChange(List<LiveInstance> liveInstances,
+ NotificationContext changeContext)
+ {
+ liveInstanceChangeReceived = true;
+ }
+
+ @Override
+ public void onMessage(String instanceName, List<Message> messages,
+ NotificationContext changeContext)
+ {
+ messageChangeReceived = true;
+ }
+
+ void Reset()
+ {
+ externalViewChangeReceived = false;
+ liveInstanceChangeReceived = false;
+ configChangeReceived = false;
+ currentStateChangeReceived = false;
+ messageChangeReceived = false;
+ idealStateChangeReceived = false;
+ }
+
+ @Override
+ public void onIdealStateChange(List<IdealState> idealState, NotificationContext changeContext)
+ {
+ // TODO Auto-generated method stub
+ idealStateChangeReceived = true;
+ }
+ }
+
+ @Test()
+ public void testInvocation() throws Exception
+ {
+
+ HelixManager testHelixManager = HelixManagerFactory.getZKHelixManager(clusterName,
+ "localhost_8900", InstanceType.PARTICIPANT, ZK_ADDR);
+ testHelixManager.connect();
+
+ TestZKCallback test = new TestZKCallback();
+
+ TestZKCallback.TestCallbackListener testListener = test.new TestCallbackListener();
+
+ testHelixManager.addMessageListener(testListener, "localhost_8900");
+ testHelixManager.addCurrentStateChangeListener(testListener, "localhost_8900",
+ testHelixManager.getSessionId());
+ testHelixManager.addConfigChangeListener(testListener);
+ testHelixManager.addIdealStateChangeListener(testListener);
+ testHelixManager.addExternalViewChangeListener(testListener);
+ testHelixManager.addLiveInstanceChangeListener(testListener);
+ // Initial add listener should trigger the first execution of the
+ // listener callbacks
+ AssertJUnit.assertTrue(testListener.configChangeReceived
+ & testListener.currentStateChangeReceived & testListener.externalViewChangeReceived
+ & testListener.idealStateChangeReceived & testListener.liveInstanceChangeReceived
+ & testListener.messageChangeReceived);
+
+ testListener.Reset();
+ HelixDataAccessor accessor = testHelixManager.getHelixDataAccessor();
+ Builder keyBuilder = accessor.keyBuilder();
+
+ ExternalView extView = new ExternalView("db-12345");
+ accessor.setProperty(keyBuilder.externalView("db-12345"), extView);
+ Thread.sleep(100);
+ AssertJUnit.assertTrue(testListener.externalViewChangeReceived);
+ testListener.Reset();
+
+ CurrentState curState = new CurrentState("db-12345");
+ curState.setSessionId("sessionId");
+ curState.setStateModelDefRef("StateModelDef");
+ accessor.setProperty(keyBuilder.currentState("localhost_8900", testHelixManager.getSessionId(), curState.getId()), curState);
+ Thread.sleep(100);
+ AssertJUnit.assertTrue(testListener.currentStateChangeReceived);
+ testListener.Reset();
+
+ IdealState idealState = new IdealState("db-1234");
+ idealState.setNumPartitions(400);
+ idealState.setReplicas(Integer.toString(2));
+ idealState.setStateModelDefRef("StateModeldef");
+ accessor.setProperty(keyBuilder.idealStates("db-1234"), idealState);
+ Thread.sleep(100);
+ AssertJUnit.assertTrue(testListener.idealStateChangeReceived);
+ testListener.Reset();
+
+ // dummyRecord = new ZNRecord("db-12345");
+ // dataAccessor.setProperty(PropertyType.IDEALSTATES, idealState, "db-12345"
+ // );
+ // Thread.sleep(100);
+ // AssertJUnit.assertTrue(testListener.idealStateChangeReceived);
+ // testListener.Reset();
+
+ // dummyRecord = new ZNRecord("localhost:8900");
+ // List<ZNRecord> recList = new ArrayList<ZNRecord>();
+ // recList.add(dummyRecord);
+
+ testListener.Reset();
+ Message message = new Message(MessageType.STATE_TRANSITION, UUID.randomUUID().toString());
+ message.setTgtSessionId("*");
+ message.setResourceName("testResource");
+ message.setPartitionName("testPartitionKey");
+ message.setStateModelDef("MasterSlave");
+ message.setToState("toState");
+ message.setFromState("fromState");
+ message.setTgtName("testTarget");
+ message.setStateModelFactoryName(HelixConstants.DEFAULT_STATE_MODEL_FACTORY);
+
+ accessor.setProperty(keyBuilder.message("localhost_8900", message.getId()), message);
+ Thread.sleep(500);
+ AssertJUnit.assertTrue(testListener.messageChangeReceived);
+
+ // dummyRecord = new ZNRecord("localhost_9801");
+ LiveInstance liveInstance = new LiveInstance("localhost_9801");
+ liveInstance.setSessionId(UUID.randomUUID().toString());
+ liveInstance.setHelixVersion(UUID.randomUUID().toString());
+ accessor.setProperty(keyBuilder.liveInstance("localhost_9801"), liveInstance);
+ Thread.sleep(500);
+ AssertJUnit.assertTrue(testListener.liveInstanceChangeReceived);
+ testListener.Reset();
+
+ // dataAccessor.setNodeConfigs(recList); Thread.sleep(100);
+ // AssertJUnit.assertTrue(testListener.configChangeReceived);
+ // testListener.Reset();
+ }
+
+ @BeforeClass()
+ public void beforeClass() throws IOException, Exception
+ {
+ _zkClient = new ZkClient(ZK_ADDR);
+ _zkClient.setZkSerializer(new ZNRecordSerializer());
+ if (_zkClient.exists("/" + clusterName))
+ {
+ _zkClient.deleteRecursive("/" + clusterName);
+ }
+
+ ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR + " -addCluster "
+ + clusterName));
+ // ClusterSetup
+ // .processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR +
+ // " -addCluster relay-cluster-12345"));
+ ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR + " -addResource "
+ + clusterName + " db-12345 120 MasterSlave"));
+ ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR + " -addNode " + clusterName
+ + " localhost:8900"));
+ ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR + " -addNode " + clusterName
+ + " localhost:8901"));
+ ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR + " -addNode " + clusterName
+ + " localhost:8902"));
+ ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR + " -addNode " + clusterName
+ + " localhost:8903"));
+ ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR + " -addNode " + clusterName
+ + " localhost:8904"));
+ ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR + " -rebalance "
+ + clusterName + " db-12345 3"));
+ }
+
+ @AfterClass()
+ public void afterClass()
+ {
+ _zkClient.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/TestZKRoutingInfoProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestZKRoutingInfoProvider.java b/helix-core/src/test/java/org/apache/helix/TestZKRoutingInfoProvider.java
new file mode 100644
index 0000000..92c858c
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/TestZKRoutingInfoProvider.java
@@ -0,0 +1,208 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.ExternalViewGenerator;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.CurrentState.CurrentStateProperty;
+import org.testng.AssertJUnit;
+import org.testng.annotations.Test;
+
+
+public class TestZKRoutingInfoProvider
+{
+ public Map<String, List<ZNRecord>> createCurrentStates(String[] dbNames,
+ String[] nodeNames, int[] partitions, int[] replicas)
+ {
+ Map<String, List<ZNRecord>> currentStates = new TreeMap<String, List<ZNRecord>>();
+ Map<String, Map<String, ZNRecord>> currentStates2 = new TreeMap<String, Map<String, ZNRecord>>();
+
+ Map<String, String> stateMaster = new TreeMap<String, String>();
+ stateMaster.put(CurrentStateProperty.CURRENT_STATE.toString(), "MASTER");
+
+ Map<String, String> stateSlave = new TreeMap<String, String>();
+ stateSlave.put(CurrentStateProperty.CURRENT_STATE.toString(), "SLAVE");
+
+ for (int i = 0; i < nodeNames.length; i++)
+ {
+ currentStates.put(nodeNames[i], new ArrayList<ZNRecord>());
+ currentStates2.put(nodeNames[i], new TreeMap<String, ZNRecord>());
+ for (int j = 0; j < dbNames.length; j++)
+ {
+ ZNRecord dbPartitionState = new ZNRecord(dbNames[j]);
+ currentStates2.get(nodeNames[i]).put(dbNames[j], dbPartitionState);
+ }
+ }
+
+ Random rand = new Random(1234);
+ for (int j = 0; j < dbNames.length; j++)
+ {
+ int partition = partitions[j];
+ ArrayList<Integer> randomArray = new ArrayList<Integer>();
+ for (int i = 0; i < partition; i++)
+ {
+ randomArray.add(i);
+ }
+ Collections.shuffle(randomArray, rand);
+
+ for (int i = 0; i < partition; i++)
+ {
+ stateMaster.put(Message.Attributes.RESOURCE_NAME.toString(),
+ dbNames[j]);
+ stateSlave.put(Message.Attributes.RESOURCE_NAME.toString(),
+ dbNames[j]);
+ int nodes = nodeNames.length;
+ int master = randomArray.get(i) % nodes;
+ String partitionName = dbNames[j] + ".partition-" + i;
+ Map<String, Map<String, String>> map = (currentStates2
+ .get(nodeNames[master]).get(dbNames[j]).getMapFields());
+ assert (map != null);
+ map.put(partitionName, stateMaster);
+
+ for (int k = 1; k <= replicas[j]; k++)
+ {
+ int slave = (master + k) % nodes;
+ Map<String, Map<String, String>> map2 = currentStates2
+ .get(nodeNames[slave]).get(dbNames[j]).getMapFields();
+
+ map2.put(partitionName, stateSlave);
+ }
+ }
+ }
+ for (String nodeName : currentStates2.keySet())
+ {
+ Map<String, ZNRecord> recMap = currentStates2.get(nodeName);
+ List<ZNRecord> list = new ArrayList<ZNRecord>();
+ for (ZNRecord rec : recMap.values())
+ {
+ list.add(rec);
+ }
+ currentStates.put(nodeName, list);
+ }
+ return currentStates;
+ }
+
+ private void verify(Map<String, List<ZNRecord>> currentStates,
+ Map<String, Map<String, Set<String>>> routingMap)
+ {
+ int counter1 = 0;
+ int counter2 = 0;
+ for (String nodeName : currentStates.keySet())
+ {
+ List<ZNRecord> dbStateList = currentStates.get(nodeName);
+ for (ZNRecord dbState : dbStateList)
+ {
+ Map<String, Map<String, String>> dbStateMap = dbState.getMapFields();
+ for (String partitionName : dbStateMap.keySet())
+ {
+ Map<String, String> stateMap = dbStateMap
+ .get(partitionName);
+ String state = stateMap
+ .get(CurrentStateProperty.CURRENT_STATE.toString());
+ AssertJUnit.assertTrue(routingMap.get(partitionName).get(state)
+ .contains(nodeName));
+ counter1++;
+ }
+ }
+ }
+
+ for (String partitionName : routingMap.keySet())
+ {
+ Map<String, Set<String>> partitionState = routingMap.get(partitionName);
+ for (String state : partitionState.keySet())
+ {
+ counter2 += partitionState.get(state).size();
+ }
+ }
+ AssertJUnit.assertTrue(counter2 == counter1);
+ }
+
+ // public static void main(String[] args)
+ @Test ()
+ public void testInvocation() throws Exception
+ {
+ String[] dbNames = new String[3];
+ for (int i = 0; i < dbNames.length; i++)
+ {
+ dbNames[i] = "DB_" + i;
+ }
+ String[] nodeNames = new String[6];
+ for (int i = 0; i < nodeNames.length; i++)
+ {
+ nodeNames[i] = "LOCALHOST_100" + i;
+ }
+
+ int[] partitions = new int[dbNames.length];
+ for (int i = 0; i < partitions.length; i++)
+ {
+ partitions[i] = (i + 1) * 10;
+ }
+
+ int[] replicas = new int[dbNames.length];
+ for (int i = 0; i < replicas.length; i++)
+ {
+ replicas[i] = 3;
+ }
+ Map<String, List<ZNRecord>> currentStates = createCurrentStates(dbNames,
+ nodeNames, partitions, replicas);
+ ExternalViewGenerator provider = new ExternalViewGenerator();
+
+ List<ZNRecord> mockIdealStates = new ArrayList<ZNRecord>();
+ for (String dbName : dbNames)
+ {
+ ZNRecord rec = new ZNRecord(dbName);
+ mockIdealStates.add(rec);
+ }
+ List<ZNRecord> externalView = provider.computeExternalView(currentStates,
+ mockIdealStates);
+
+ Map<String, Map<String, Set<String>>> routingMap = provider
+ .getRouterMapFromExternalView(externalView);
+
+ verify(currentStates, routingMap);
+
+ /* write current state and external view to ZK */
+ /*
+ * String clusterName = "test-cluster44"; ZkClient zkClient = new
+ * ZkClient("localhost:2181"); zkClient.setZkSerializer(new
+ * ZNRecordSerializer());
+ *
+ * for(String nodeName : currentStates.keySet()) {
+ * if(zkClient.exists(CMUtil.getCurrentStatePath(clusterName, nodeName))) {
+ * zkClient.deleteRecursive(CMUtil.getCurrentStatePath(clusterName,
+ * nodeName)); } ZKUtil.createChildren(zkClient,CMUtil.getCurrentStatePath
+ * (clusterName, nodeName), currentStates.get(nodeName)); }
+ *
+ * //List<ZNRecord> externalView =
+ * ZKRoutingInfoProvider.computeExternalView(currentStates); String
+ * routingTablePath = CMUtil.getExternalViewPath(clusterName);
+ * if(zkClient.exists(routingTablePath)) {
+ * zkClient.deleteRecursive(routingTablePath); }
+ *
+ * ZKUtil.createChildren(zkClient, CMUtil.getExternalViewPath(clusterName),
+ * externalView);
+ */
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/TestZNRecord.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestZNRecord.java b/helix-core/src/test/java/org/apache/helix/TestZNRecord.java
new file mode 100644
index 0000000..2041da3
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/TestZNRecord.java
@@ -0,0 +1,128 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.ZNRecord;
+import org.testng.Assert;
+import org.testng.AssertJUnit;
+import org.testng.annotations.Test;
+
+
+public class TestZNRecord
+{
+
+ @Test
+ public void testEquals()
+ {
+ ZNRecord record1 = new ZNRecord("id");
+ record1.setSimpleField("k1", "v1");
+ record1.setMapField("m1", new HashMap<String, String>());
+ record1.getMapField("m1").put("k1", "v1");
+ record1.setListField("l1", new ArrayList<String>());
+ record1.getListField("l1").add("v1");
+ ZNRecord record2 = new ZNRecord("id");
+ record2.setSimpleField("k1", "v1");
+ record2.setMapField("m1", new HashMap<String, String>());
+ record2.getMapField("m1").put("k1", "v1");
+ record2.setListField("l1", new ArrayList<String>());
+ record2.getListField("l1").add("v1");
+
+ AssertJUnit.assertEquals(record1, record2);
+ record2.setSimpleField("k2", "v1");
+ AssertJUnit.assertNotSame(record1, record2);
+ }
+
+ @Test
+ public void testMerge()
+ {
+ ZNRecord record = new ZNRecord("record1");
+
+ // set simple field
+ record.setSimpleField("simpleKey1", "simpleValue1");
+
+ // set list field
+ List<String> list1 = new ArrayList<String>();
+ list1.add("list1Value1");
+ list1.add("list1Value2");
+ record.setListField("listKey1", list1);
+
+ // set map field
+ Map<String, String> map1 = new HashMap<String, String>();
+ map1.put("map1Key1", "map1Value1");
+ record.setMapField("mapKey1", map1);
+ // System.out.println(record);
+
+ ZNRecord updateRecord = new ZNRecord("updateRecord");
+
+ // set simple field
+ updateRecord.setSimpleField("simpleKey2", "simpleValue2");
+
+ // set list field
+ List<String> newList1 = new ArrayList<String>();
+ newList1.add("list1Value1");
+ newList1.add("list1Value2");
+ newList1.add("list1NewValue1");
+ newList1.add("list1NewValue2");
+ updateRecord.setListField("listKey1", newList1);
+
+ List<String> list2 = new ArrayList<String>();
+ list2.add("list2Value1");
+ list2.add("list2Value2");
+ updateRecord.setListField("listKey2", list2);
+
+ // set map field
+ Map<String, String> newMap1 = new HashMap<String, String>();
+ newMap1.put("map1NewKey1", "map1NewValue1");
+ updateRecord.setMapField("mapKey1", newMap1);
+
+ Map<String, String> map2 = new HashMap<String, String>();
+ map2.put("map2Key1", "map2Value1");
+ updateRecord.setMapField("mapKey2", map2);
+ // System.out.println(updateRecord);
+
+ record.merge(updateRecord);
+ // System.out.println(record);
+
+ ZNRecord expectRecord = new ZNRecord("record1");
+ expectRecord.setSimpleField("simpleKey1", "simpleValue1");
+ expectRecord.setSimpleField("simpleKey2", "simpleValue2");
+ List<String> expectList1 = new ArrayList<String>();
+ expectList1.add("list1Value1");
+ expectList1.add("list1Value2");
+ expectList1.add("list1Value1");
+ expectList1.add("list1Value2");
+ expectList1.add("list1NewValue1");
+ expectList1.add("list1NewValue2");
+ expectRecord.setListField("listKey1", expectList1);
+ List<String> expectList2 = new ArrayList<String>();
+ expectList2.add("list2Value1");
+ expectList2.add("list2Value2");
+ expectRecord.setListField("listKey2", expectList2);
+ Map<String, String> expectMap1 = new HashMap<String, String>();
+ expectMap1.put("map1Key1", "map1Value1");
+ expectMap1.put("map1NewKey1", "map1NewValue1");
+ expectRecord.setMapField("mapKey1", expectMap1);
+ Map<String, String> expectMap2 = new HashMap<String, String>();
+ expectMap2.put("map2Key1", "map2Value1");
+ expectRecord.setMapField("mapKey2", expectMap2);
+ Assert.assertEquals(record, expectRecord, "Should be equal.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/TestZNRecordBucketizer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestZNRecordBucketizer.java b/helix-core/src/test/java/org/apache/helix/TestZNRecordBucketizer.java
new file mode 100644
index 0000000..3576177
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/TestZNRecordBucketizer.java
@@ -0,0 +1,41 @@
+package org.apache.helix;
+
+import org.apache.helix.ZNRecordBucketizer;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestZNRecordBucketizer
+{
+ @Test
+ public void testZNRecordBucketizer()
+ {
+ final int bucketSize = 3;
+ ZNRecordBucketizer bucketizer = new ZNRecordBucketizer(bucketSize);
+ String[] partitionNames =
+ { "TestDB_0", "TestDB_1", "TestDB_2", "TestDB_3", "TestDB_4" };
+ for (int i = 0; i < partitionNames.length; i++)
+ {
+ String partitionName = partitionNames[i];
+ String bucketName = bucketizer.getBucketName(partitionName);
+ int startBucketNb = i / bucketSize * bucketSize;
+ int endBucketNb = startBucketNb + bucketSize - 1;
+ String expectBucketName = "TestDB_p" + startBucketNb + "-p" + endBucketNb;
+ System.out.println("Expect: " + expectBucketName + ", actual: " + bucketName);
+ Assert.assertEquals(expectBucketName, bucketName);
+
+ }
+
+// ZNRecord record = new ZNRecord("TestDB");
+// record.setSimpleField("key0", "value0");
+// record.setSimpleField("key1", "value1");
+// record.setListField("TestDB_0", Arrays.asList("localhost_00", "localhost_01"));
+// record.setListField("TestDB_1", Arrays.asList("localhost_10", "localhost_11"));
+// record.setListField("TestDB_2", Arrays.asList("localhost_20", "localhost_21"));
+// record.setListField("TestDB_3", Arrays.asList("localhost_30", "localhost_31"));
+// record.setListField("TestDB_4", Arrays.asList("localhost_40", "localhost_41"));
+//
+// System.out.println(bucketizer.bucketize(record));
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/TestZkClientWrapper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestZkClientWrapper.java b/helix-core/src/test/java/org/apache/helix/TestZkClientWrapper.java
new file mode 100644
index 0000000..9b3fcdb
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/TestZkClientWrapper.java
@@ -0,0 +1,121 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import org.I0Itec.zkclient.IZkStateListener;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestZkClientWrapper extends ZkUnitTestBase
+{
+ ZkClient _zkClient;
+
+ @BeforeClass
+ public void beforeClass()
+ {
+ _zkClient = new ZkClient(ZK_ADDR);
+ _zkClient.setZkSerializer(new ZNRecordSerializer());
+ }
+
+ @AfterClass
+ public void afterClass()
+ {
+ _zkClient.close();
+ }
+
+ @Test ()
+ void testGetStat()
+ {
+ String path = "/tmp/getStatTest";
+ _zkClient.deleteRecursive(path);
+
+ Stat stat, newStat;
+ stat = _zkClient.getStat(path);
+ AssertJUnit.assertNull(stat);
+ _zkClient.createPersistent(path, true);
+
+ stat = _zkClient.getStat(path);
+ AssertJUnit.assertNotNull(stat);
+
+ newStat = _zkClient.getStat(path);
+ AssertJUnit.assertEquals(stat, newStat);
+
+ _zkClient.writeData(path, new ZNRecord("Test"));
+ newStat = _zkClient.getStat(path);
+ AssertJUnit.assertNotSame(stat, newStat);
+ }
+
+ @Test ()
+ void testSessioExpire()
+ {
+ IZkStateListener listener = new IZkStateListener()
+ {
+
+ @Override
+ public void handleStateChanged(KeeperState state) throws Exception
+ {
+ System.out.println("In Old connection New state " + state);
+ }
+
+ @Override
+ public void handleNewSession() throws Exception
+ {
+ System.out.println("In Old connection New session");
+ }
+ };
+ _zkClient.subscribeStateChanges(listener);
+ ZkConnection connection = ((ZkConnection) _zkClient.getConnection());
+ ZooKeeper zookeeper = connection.getZookeeper();
+ System.out.println("old sessionId= " + zookeeper.getSessionId());
+ try
+ {
+ Watcher watcher = new Watcher(){
+ @Override
+ public void process(WatchedEvent event)
+ {
+ System.out.println("In New connection In process event:"+ event);
+ }
+ };
+ ZooKeeper newZookeeper = new ZooKeeper(connection.getServers(),
+ zookeeper.getSessionTimeout(), watcher , zookeeper.getSessionId(),
+ zookeeper.getSessionPasswd());
+ Thread.sleep(3000);
+ System.out.println("New sessionId= " + newZookeeper.getSessionId());
+ Thread.sleep(3000);
+ newZookeeper.close();
+ Thread.sleep(10000);
+ connection = ((ZkConnection) _zkClient.getConnection());
+ zookeeper = connection.getZookeeper();
+ System.out.println("After session expiry sessionId= " + zookeeper.getSessionId());
+ } catch (Exception e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/TestZkConnectionCount.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestZkConnectionCount.java b/helix-core/src/test/java/org/apache/helix/TestZkConnectionCount.java
new file mode 100644
index 0000000..fdf7887
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/TestZkConnectionCount.java
@@ -0,0 +1,46 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import org.apache.log4j.Logger;
+
+// TODO fix this test
+public class TestZkConnectionCount extends ZkUnitTestBase
+{
+ private static Logger LOG = Logger.getLogger(TestZkConnectionCount.class);
+
+ // @Test ()
+ public void testZkConnectionCount()
+ {
+
+// ZkClient zkClient;
+// int nrOfConn = ZkClient.getNumberOfConnections();
+// System.out.println("Number of zk connections made " + nrOfConn);
+//
+// ZkConnection zkConn = new ZkConnection(ZK_ADDR);
+//
+// zkClient = new ZkClient(zkConn);
+// AssertJUnit.assertEquals(nrOfConn + 1, ZkClient.getNumberOfConnections());
+//
+// zkClient = new ZkClient(ZK_ADDR);
+// AssertJUnit.assertEquals(nrOfConn + 2, ZkClient.getNumberOfConnections());
+//
+// zkClient = ZKClientPool.getZkClient(ZK_ADDR);
+// AssertJUnit.assertEquals(nrOfConn + 2, ZkClient.getNumberOfConnections());
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/TestZnodeModify.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestZnodeModify.java b/helix-core/src/test/java/org/apache/helix/TestZnodeModify.java
new file mode 100644
index 0000000..fa6b76f
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/TestZnodeModify.java
@@ -0,0 +1,278 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.IdealState.IdealStateModeProperty;
+import org.apache.helix.model.IdealState.IdealStateProperty;
+import org.apache.helix.tools.TestCommand;
+import org.apache.helix.tools.TestExecutor;
+import org.apache.helix.tools.TestTrigger;
+import org.apache.helix.tools.ZnodeOpArg;
+import org.apache.helix.tools.TestCommand.CommandType;
+import org.apache.helix.tools.TestExecutor.ZnodePropertyType;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestZnodeModify extends ZkUnitTestBase
+{
+ private static Logger logger = Logger.getLogger(TestZnodeModify.class);
+ private final String PREFIX = "/" + getShortClassName();
+
+ @Test ()
+ public void testBasic() throws Exception
+ {
+ logger.info("RUN: " + new Date(System.currentTimeMillis()));
+ List<TestCommand> commandList = new ArrayList<TestCommand>();
+
+ // test case for the basic flow, no timeout, no data trigger
+ String pathChild1 = PREFIX + "/basic_child1";
+ String pathChild2 = PREFIX + "/basic_child2";
+
+ TestCommand command;
+ ZnodeOpArg arg;
+ arg = new ZnodeOpArg(pathChild1, ZnodePropertyType.SIMPLE, "+", "key1", "simpleValue1");
+ command = new TestCommand(CommandType.MODIFY, arg);
+ commandList.add(command);
+
+ List<String> list = new ArrayList<String>();
+ list.add("listValue1");
+ list.add("listValue2");
+ arg = new ZnodeOpArg(pathChild1, ZnodePropertyType.LIST, "+", "key2", list);
+ command = new TestCommand(CommandType.MODIFY, arg);
+ commandList.add(command);
+
+ ZNRecord record = getExampleZNRecord();
+ arg = new ZnodeOpArg(pathChild2, ZnodePropertyType.ZNODE, "+", record);
+ command = new TestCommand(CommandType.MODIFY, arg);
+ commandList.add(command);
+
+ arg = new ZnodeOpArg(pathChild1, ZnodePropertyType.SIMPLE, "==", "key1");
+ command = new TestCommand(CommandType.VERIFY, new TestTrigger(1000, 0, "simpleValue1"), arg);
+ commandList.add(command);
+
+ arg = new ZnodeOpArg(pathChild1, ZnodePropertyType.LIST, "==", "key2");
+ command = new TestCommand(CommandType.VERIFY, new TestTrigger(1000, 0, list), arg);
+ commandList.add(command);
+
+ arg = new ZnodeOpArg(pathChild2, ZnodePropertyType.ZNODE, "==");
+ command = new TestCommand(CommandType.VERIFY, new TestTrigger(1000, 0, record), arg);
+ commandList.add(command);
+
+ Map<TestCommand, Boolean> results = TestExecutor.executeTest(commandList, ZK_ADDR);
+ for (Map.Entry<TestCommand, Boolean> entry : results.entrySet())
+ {
+ Assert.assertTrue(entry.getValue());
+ }
+
+ logger.info("END: " + new Date(System.currentTimeMillis()));
+ }
+
+ @Test ()
+ public void testDataTrigger() throws Exception
+ {
+ logger.info("RUN: " + new Date(System.currentTimeMillis()));
+ List<TestCommand> commandList = new ArrayList<TestCommand>();
+
+ // test case for data trigger, no timeout
+ String pathChild1 = PREFIX + "/data_trigger_child1";
+ String pathChild2 = PREFIX + "/data_trigger_child2";
+
+ ZnodeOpArg arg;
+ TestCommand command;
+
+ ZnodeOpArg arg1 = new ZnodeOpArg(pathChild1, ZnodePropertyType.SIMPLE, "+", "key1", "simpleValue1-new");
+ TestCommand command1 = new TestCommand(CommandType.MODIFY, new TestTrigger(0, 0, "simpleValue1"), arg1);
+ commandList.add(command1);
+
+ ZNRecord record = getExampleZNRecord();
+ ZNRecord recordNew = new ZNRecord(record);
+ recordNew.setSimpleField(IdealStateProperty.IDEAL_STATE_MODE.toString(), IdealStateModeProperty.AUTO.toString());
+ arg = new ZnodeOpArg(pathChild2, ZnodePropertyType.ZNODE, "+", recordNew);
+ command = new TestCommand(CommandType.MODIFY, new TestTrigger(0, 3000, record), arg);
+ commandList.add(command);
+
+ arg = new ZnodeOpArg(pathChild2, ZnodePropertyType.ZNODE, "+", record);
+ command = new TestCommand(CommandType.MODIFY, new TestTrigger(1000), arg);
+ commandList.add(command);
+
+ arg = new ZnodeOpArg(pathChild1, ZnodePropertyType.SIMPLE, "!=", "key1");
+ command = new TestCommand(CommandType.VERIFY, new TestTrigger(3100, 0, "simpleValue1-new"), arg);
+ commandList.add(command);
+
+ arg = new ZnodeOpArg(pathChild2, ZnodePropertyType.ZNODE, "==");
+ command = new TestCommand(CommandType.VERIFY, new TestTrigger(3100, 0, recordNew), arg);
+ commandList.add(command);
+
+ Map<TestCommand, Boolean> results = TestExecutor.executeTest(commandList, ZK_ADDR);
+
+ boolean result = results.remove(command1).booleanValue();
+ AssertJUnit.assertFalse(result);
+ for (Map.Entry<TestCommand, Boolean> entry : results.entrySet())
+ {
+ Assert.assertTrue(entry.getValue());
+ }
+
+ logger.info("END: " + new Date(System.currentTimeMillis()));
+ }
+
+ @Test ()
+ public void testTimeout() throws Exception
+ {
+ logger.info("RUN: " + new Date(System.currentTimeMillis()));
+ List<TestCommand> commandList = new ArrayList<TestCommand>();
+
+ // test case for timeout, no data trigger
+ String pathChild1 = PREFIX + "/timeout_child1";
+ String pathChild2 = PREFIX + "/timeout_child2";
+
+ ZnodeOpArg arg1 = new ZnodeOpArg(pathChild1, ZnodePropertyType.SIMPLE, "+", "key1", "simpleValue1-new");
+ TestCommand command1 = new TestCommand(CommandType.MODIFY, new TestTrigger(0, 1000, "simpleValue1"), arg1);
+ commandList.add(command1);
+
+ ZNRecord record = getExampleZNRecord();
+ ZNRecord recordNew = new ZNRecord(record);
+ recordNew.setSimpleField(IdealStateProperty.IDEAL_STATE_MODE.toString(), IdealStateModeProperty.AUTO.toString());
+ arg1 = new ZnodeOpArg(pathChild2, ZnodePropertyType.ZNODE, "+", recordNew);
+ command1 = new TestCommand(CommandType.MODIFY, new TestTrigger(0, 500, record), arg1);
+ commandList.add(command1);
+
+ arg1 = new ZnodeOpArg(pathChild1, ZnodePropertyType.SIMPLE, "==", "key1");
+ command1 = new TestCommand(CommandType.VERIFY, new TestTrigger(1000, 500, "simpleValue1-new"), arg1);
+ commandList.add(command1);
+
+ arg1 = new ZnodeOpArg(pathChild1, ZnodePropertyType.ZNODE, "==");
+ command1 = new TestCommand(CommandType.VERIFY, new TestTrigger(1000, 500, recordNew), arg1);
+ commandList.add(command1);
+
+ Map<TestCommand, Boolean> results = TestExecutor.executeTest(commandList, ZK_ADDR);
+ for (Map.Entry<TestCommand, Boolean> entry : results.entrySet())
+ {
+ Assert.assertFalse(entry.getValue());
+ }
+
+ logger.info("END: " + new Date(System.currentTimeMillis()));
+ }
+
+
+ @Test ()
+ public void testDataTriggerWithTimeout() throws Exception
+ {
+ logger.info("RUN: " + new Date(System.currentTimeMillis()));
+ List<TestCommand> commandList = new ArrayList<TestCommand>();
+
+ // test case for data trigger with timeout
+ final String pathChild1 = PREFIX + "/dataTriggerWithTimeout_child1";
+
+ final ZNRecord record = getExampleZNRecord();
+ ZNRecord recordNew = new ZNRecord(record);
+ recordNew.setSimpleField(IdealStateProperty.IDEAL_STATE_MODE.toString(), IdealStateModeProperty.AUTO.toString());
+ ZnodeOpArg arg1 = new ZnodeOpArg(pathChild1, ZnodePropertyType.ZNODE, "+", recordNew);
+ TestCommand command1 = new TestCommand(CommandType.MODIFY, new TestTrigger(0, 8000, record), arg1);
+ commandList.add(command1);
+
+ arg1 = new ZnodeOpArg(pathChild1, ZnodePropertyType.ZNODE, "==");
+ command1 = new TestCommand(CommandType.VERIFY, new TestTrigger(9000, 500, recordNew), arg1);
+ commandList.add(command1);
+
+ // start a separate thread to change znode at pathChild1
+ new Thread()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ Thread.sleep(3000);
+ final ZkClient zkClient = new ZkClient(ZK_ADDR);
+ zkClient.setZkSerializer(new ZNRecordSerializer());
+ zkClient.createPersistent(pathChild1, true);
+ zkClient.writeData(pathChild1, record);
+ }
+ catch (InterruptedException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ }.start();
+
+ Map<TestCommand, Boolean> results = TestExecutor.executeTest(commandList, ZK_ADDR);
+ for (Map.Entry<TestCommand, Boolean> entry : results.entrySet())
+ {
+ Assert.assertTrue(entry.getValue());
+ // System.out.println(entry.getValue() + ":" + entry.getKey());
+ }
+
+ }
+
+ ZkClient _zkClient;
+
+ @BeforeClass ()
+ public void beforeClass()
+ {
+ System.out.println("START " + getShortClassName() + " at "
+ + new Date(System.currentTimeMillis()));
+
+ _zkClient = new ZkClient(ZK_ADDR);
+ _zkClient.setZkSerializer(new ZNRecordSerializer());
+ if (_zkClient.exists(PREFIX))
+ {
+ _zkClient.deleteRecursive(PREFIX);
+ }
+
+ }
+
+ @AfterClass
+ public void afterClass()
+ {
+ _zkClient.close();
+
+ System.out.println("END " + getShortClassName() + " at "
+ + new Date(System.currentTimeMillis()));
+
+ }
+
+ private ZNRecord getExampleZNRecord()
+ {
+ ZNRecord record = new ZNRecord("TestDB");
+ record.setSimpleField(IdealStateProperty.IDEAL_STATE_MODE.toString(), IdealStateModeProperty.CUSTOMIZED.toString());
+ Map<String, String> map = new HashMap<String, String>();
+ map.put("localhost_12918", "MASTER");
+ map.put("localhost_12919", "SLAVE");
+ record.setMapField("TestDB_0", map);
+
+ List<String> list = new ArrayList<String>();
+ list.add("localhost_12918");
+ list.add("localhost_12919");
+ record.setListField("TestDB_0", list);
+ return record;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java b/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
new file mode 100644
index 0000000..5839f4f
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
@@ -0,0 +1,175 @@
+package org.apache.helix;
+
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import org.I0Itec.zkclient.IZkStateListener;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.helix.InstanceType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.ExternalView;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooKeeper.States;
+
+
+public class ZkTestHelper
+{
+ private static Logger LOG = Logger.getLogger(ZkTestHelper.class);
+
+ static
+ {
+ // Logger.getRootLogger().setLevel(Level.DEBUG);
+ }
+
+ // zkClusterManager that exposes zkclient
+ public static class TestZkHelixManager extends ZKHelixManager
+ {
+
+ public TestZkHelixManager(String clusterName,
+ String instanceName,
+ InstanceType instanceType,
+ String zkConnectString) throws Exception
+ {
+ super(clusterName, instanceName, instanceType, zkConnectString);
+ // TODO Auto-generated constructor stub
+ }
+
+ public ZkClient getZkClient()
+ {
+ return _zkClient;
+ }
+
+ }
+
+ public static void expireSession(final ZkClient zkClient) throws Exception
+ {
+ final CountDownLatch waitExpire = new CountDownLatch(1);
+
+ IZkStateListener listener = new IZkStateListener()
+ {
+ @Override
+ public void handleStateChanged(KeeperState state) throws Exception
+ {
+ // System.err.println("handleStateChanged. state: " + state);
+ }
+
+ @Override
+ public void handleNewSession() throws Exception
+ {
+ // make sure zkclient is connected again
+ zkClient.waitUntilConnected();
+
+ ZkConnection connection = ((ZkConnection) zkClient.getConnection());
+ ZooKeeper curZookeeper = connection.getZookeeper();
+
+ LOG.info("handleNewSession. sessionId: "
+ + Long.toHexString(curZookeeper.getSessionId()));
+ waitExpire.countDown();
+ }
+ };
+
+ zkClient.subscribeStateChanges(listener);
+
+ ZkConnection connection = ((ZkConnection) zkClient.getConnection());
+ ZooKeeper curZookeeper = connection.getZookeeper();
+ LOG.info("Before expiry. sessionId: " + Long.toHexString(curZookeeper.getSessionId()));
+
+ Watcher watcher = new Watcher()
+ {
+ @Override
+ public void process(WatchedEvent event)
+ {
+ LOG.info("Process watchEvent: " + event);
+ }
+ };
+
+ final ZooKeeper dupZookeeper =
+ new ZooKeeper(connection.getServers(),
+ curZookeeper.getSessionTimeout(),
+ watcher,
+ curZookeeper.getSessionId(),
+ curZookeeper.getSessionPasswd());
+ // wait until connected, then close
+ while (dupZookeeper.getState() != States.CONNECTED)
+ {
+ Thread.sleep(10);
+ }
+ dupZookeeper.close();
+
+ // make sure session expiry really happens
+ waitExpire.await();
+ zkClient.unsubscribeStateChanges(listener);
+
+ connection = (ZkConnection) zkClient.getConnection();
+ curZookeeper = connection.getZookeeper();
+
+ // System.err.println("zk: " + oldZookeeper);
+ LOG.info("After expiry. sessionId: " + Long.toHexString(curZookeeper.getSessionId()));
+ }
+
+ /*
+ * stateMap: partition->instance->state
+ */
+ public static boolean verifyState(ZkClient zkclient,
+ String clusterName,
+ String resourceName,
+ Map<String, Map<String, String>> expectStateMap,
+ String op)
+ {
+ boolean result = true;
+ ZkBaseDataAccessor<ZNRecord> baseAccessor =
+ new ZkBaseDataAccessor<ZNRecord>(zkclient);
+ ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
+ Builder keyBuilder = accessor.keyBuilder();
+
+ ExternalView extView = accessor.getProperty(keyBuilder.externalView(resourceName));
+ Map<String, Map<String, String>> actualStateMap = extView.getRecord().getMapFields();
+ for (String partition : actualStateMap.keySet())
+ {
+ for (String expectPartiton : expectStateMap.keySet())
+ {
+ if (!partition.matches(expectPartiton))
+ {
+ continue;
+ }
+
+ Map<String, String> actualInstanceStateMap = actualStateMap.get(partition);
+ Map<String, String> expectInstanceStateMap = expectStateMap.get(expectPartiton);
+ for (String instance : actualInstanceStateMap.keySet())
+ {
+ for (String expectInstance : expectStateMap.get(expectPartiton).keySet())
+ {
+ if (!instance.matches(expectInstance))
+ {
+ continue;
+ }
+
+ String actualState = actualInstanceStateMap.get(instance);
+ String expectState = expectInstanceStateMap.get(expectInstance);
+ boolean equals = expectState.equals(actualState);
+ if (op.equals("==") && !equals || op.equals("!=") && equals)
+ {
+ System.out.println(partition + "/" + instance
+ + " state mismatch. actual state: " + actualState + ", but expect: "
+ + expectState + ", op: " + op);
+ result = false;
+ }
+
+ }
+ }
+
+ }
+ }
+ return result;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java b/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java
new file mode 100644
index 0000000..09d48a9
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java
@@ -0,0 +1,433 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.I0Itec.zkclient.IZkStateListener;
+import org.I0Itec.zkclient.ZkConnection;
+import org.I0Itec.zkclient.ZkServer;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ConfigScope.ConfigScopeProperty;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.pipeline.Pipeline;
+import org.apache.helix.controller.pipeline.Stage;
+import org.apache.helix.controller.pipeline.StageContext;
+import org.apache.helix.controller.stages.ClusterEvent;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.model.IdealState.IdealStateModeProperty;
+import org.apache.helix.model.Message.Attributes;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.tools.StateModelConfigGenerator;
+import org.apache.helix.util.HelixUtil;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooKeeper;
+import org.testng.Assert;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeSuite;
+
+
+// TODO merge code with ZkIntegrationTestBase
+public class ZkUnitTestBase
+{
+ private static Logger LOG = Logger.getLogger(ZkUnitTestBase.class);
+ protected static ZkServer _zkServer = null;
+ protected static ZkClient _gZkClient;
+
+ public static final String ZK_ADDR = "localhost:2185";
+ protected static final String CLUSTER_PREFIX = "CLUSTER";
+ protected static final String CONTROLLER_CLUSTER_PREFIX = "CONTROLLER_CLUSTER";
+
+ @BeforeSuite(alwaysRun = true)
+ public void beforeSuite() throws Exception
+ {
+ _zkServer = TestHelper.startZkSever(ZK_ADDR);
+ AssertJUnit.assertTrue(_zkServer != null);
+
+ // System.out.println("Number of open zkClient before ZkUnitTests: "
+ // + ZkClient.getNumberOfConnections());
+
+ _gZkClient = new ZkClient(ZK_ADDR);
+ _gZkClient.setZkSerializer(new ZNRecordSerializer());
+ }
+
+ @AfterSuite(alwaysRun = true)
+ public void afterTest()
+ {
+ TestHelper.stopZkServer(_zkServer);
+ _zkServer = null;
+ _gZkClient.close();
+
+ // System.out.println("Number of open zkClient after ZkUnitTests: "
+ // + ZkClient.getNumberOfConnections());
+
+ }
+
+ protected String getShortClassName()
+ {
+ String className = this.getClass().getName();
+ return className.substring(className.lastIndexOf('.') + 1);
+ }
+
+ protected String getCurrentLeader(ZkClient zkClient, String clusterName)
+ {
+ String leaderPath =
+ HelixUtil.getControllerPropertyPath(clusterName, PropertyType.LEADER);
+ ZNRecord leaderRecord = zkClient.<ZNRecord> readData(leaderPath);
+ if (leaderRecord == null)
+ {
+ return null;
+ }
+
+ String leader = leaderRecord.getSimpleField(PropertyType.LEADER.toString());
+ return leader;
+ }
+
+ protected void stopCurrentLeader(ZkClient zkClient,
+ String clusterName,
+ Map<String, Thread> threadMap,
+ Map<String, HelixManager> managerMap)
+ {
+ String leader = getCurrentLeader(zkClient, clusterName);
+ Assert.assertTrue(leader != null);
+ System.out.println("stop leader:" + leader + " in " + clusterName);
+ Assert.assertTrue(leader != null);
+
+ HelixManager manager = managerMap.remove(leader);
+ Assert.assertTrue(manager != null);
+ manager.disconnect();
+
+ Thread thread = threadMap.remove(leader);
+ Assert.assertTrue(thread != null);
+ thread.interrupt();
+
+ boolean isNewLeaderElected = false;
+ try
+ {
+ // Thread.sleep(2000);
+ for (int i = 0; i < 5; i++)
+ {
+ Thread.sleep(1000);
+ String newLeader = getCurrentLeader(zkClient, clusterName);
+ if (!newLeader.equals(leader))
+ {
+ isNewLeaderElected = true;
+ System.out.println("new leader elected: " + newLeader + " in " + clusterName);
+ break;
+ }
+ }
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ if (isNewLeaderElected == false)
+ {
+ System.out.println("fail to elect a new leader elected in " + clusterName);
+ }
+ AssertJUnit.assertTrue(isNewLeaderElected);
+ }
+
+ public void verifyInstance(ZkClient zkClient,
+ String clusterName,
+ String instance,
+ boolean wantExists)
+ {
+ // String instanceConfigsPath = HelixUtil.getConfigPath(clusterName);
+ String instanceConfigsPath =
+ PropertyPathConfig.getPath(PropertyType.CONFIGS,
+ clusterName,
+ ConfigScopeProperty.PARTICIPANT.toString());
+ String instanceConfigPath = instanceConfigsPath + "/" + instance;
+ String instancePath = HelixUtil.getInstancePath(clusterName, instance);
+ AssertJUnit.assertEquals(wantExists, zkClient.exists(instanceConfigPath));
+ AssertJUnit.assertEquals(wantExists, zkClient.exists(instancePath));
+ }
+
+ public void verifyResource(ZkClient zkClient,
+ String clusterName,
+ String resource,
+ boolean wantExists)
+ {
+ String resourcePath = HelixUtil.getIdealStatePath(clusterName) + "/" + resource;
+ AssertJUnit.assertEquals(wantExists, zkClient.exists(resourcePath));
+ }
+
+ public void verifyEnabled(ZkClient zkClient,
+ String clusterName,
+ String instance,
+ boolean wantEnabled)
+ {
+ ZKHelixDataAccessor accessor =
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(zkClient));
+ Builder keyBuilder = accessor.keyBuilder();
+
+ InstanceConfig config = accessor.getProperty(keyBuilder.instanceConfig(instance));
+ AssertJUnit.assertEquals(wantEnabled, config.getInstanceEnabled());
+ }
+
+ public void verifyReplication(ZkClient zkClient,
+ String clusterName,
+ String resource,
+ int repl)
+ {
+ ZKHelixDataAccessor accessor =
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(zkClient));
+ Builder keyBuilder = accessor.keyBuilder();
+
+ IdealState idealState = accessor.getProperty(keyBuilder.idealStates(resource));
+ for (String partitionName : idealState.getPartitionSet())
+ {
+ if (idealState.getIdealStateMode() == IdealStateModeProperty.AUTO)
+ {
+ AssertJUnit.assertEquals(repl, idealState.getPreferenceList(partitionName).size());
+ }
+ else if (idealState.getIdealStateMode() == IdealStateModeProperty.CUSTOMIZED)
+ {
+ AssertJUnit.assertEquals(repl, idealState.getInstanceStateMap(partitionName)
+ .size());
+ }
+ }
+ }
+
+ protected void simulateSessionExpiry(ZkConnection zkConnection) throws IOException,
+ InterruptedException
+ {
+ ZooKeeper oldZookeeper = zkConnection.getZookeeper();
+ LOG.info("Old sessionId = " + oldZookeeper.getSessionId());
+
+ Watcher watcher = new Watcher()
+ {
+ @Override
+ public void process(WatchedEvent event)
+ {
+ LOG.info("In New connection, process event:" + event);
+ }
+ };
+
+ ZooKeeper newZookeeper =
+ new ZooKeeper(zkConnection.getServers(),
+ oldZookeeper.getSessionTimeout(),
+ watcher,
+ oldZookeeper.getSessionId(),
+ oldZookeeper.getSessionPasswd());
+ LOG.info("New sessionId = " + newZookeeper.getSessionId());
+ // Thread.sleep(3000);
+ newZookeeper.close();
+ Thread.sleep(10000);
+ oldZookeeper = zkConnection.getZookeeper();
+ LOG.info("After session expiry sessionId = " + oldZookeeper.getSessionId());
+ }
+
+ protected void simulateSessionExpiry(ZkClient zkClient) throws IOException,
+ InterruptedException
+ {
+ IZkStateListener listener = new IZkStateListener()
+ {
+ @Override
+ public void handleStateChanged(KeeperState state) throws Exception
+ {
+ LOG.info("In Old connection, state changed:" + state);
+ }
+
+ @Override
+ public void handleNewSession() throws Exception
+ {
+ LOG.info("In Old connection, new session");
+ }
+ };
+ zkClient.subscribeStateChanges(listener);
+ ZkConnection connection = ((ZkConnection) zkClient.getConnection());
+ ZooKeeper oldZookeeper = connection.getZookeeper();
+ LOG.info("Old sessionId = " + oldZookeeper.getSessionId());
+
+ Watcher watcher = new Watcher()
+ {
+ @Override
+ public void process(WatchedEvent event)
+ {
+ LOG.info("In New connection, process event:" + event);
+ }
+ };
+
+ ZooKeeper newZookeeper =
+ new ZooKeeper(connection.getServers(),
+ oldZookeeper.getSessionTimeout(),
+ watcher,
+ oldZookeeper.getSessionId(),
+ oldZookeeper.getSessionPasswd());
+ LOG.info("New sessionId = " + newZookeeper.getSessionId());
+ // Thread.sleep(3000);
+ newZookeeper.close();
+ Thread.sleep(10000);
+ connection = (ZkConnection) zkClient.getConnection();
+ oldZookeeper = connection.getZookeeper();
+ LOG.info("After session expiry sessionId = " + oldZookeeper.getSessionId());
+ }
+
+ protected void setupStateModel(String clusterName)
+ {
+ ZKHelixDataAccessor accessor =
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
+ Builder keyBuilder = accessor.keyBuilder();
+
+ StateModelConfigGenerator generator = new StateModelConfigGenerator();
+ StateModelDefinition masterSlave =
+ new StateModelDefinition(generator.generateConfigForMasterSlave());
+ accessor.setProperty(keyBuilder.stateModelDef(masterSlave.getId()), masterSlave);
+
+ StateModelDefinition leaderStandby =
+ new StateModelDefinition(generator.generateConfigForLeaderStandby());
+ accessor.setProperty(keyBuilder.stateModelDef(leaderStandby.getId()), leaderStandby);
+
+ StateModelDefinition onlineOffline =
+ new StateModelDefinition(generator.generateConfigForOnlineOffline());
+ accessor.setProperty(keyBuilder.stateModelDef(onlineOffline.getId()), onlineOffline);
+
+ }
+
+ protected List<IdealState> setupIdealState(String clusterName,
+ int[] nodes,
+ String[] resources,
+ int partitions,
+ int replicas)
+ {
+ ZKHelixDataAccessor accessor =
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
+ Builder keyBuilder = accessor.keyBuilder();
+
+ List<IdealState> idealStates = new ArrayList<IdealState>();
+ List<String> instances = new ArrayList<String>();
+ for (int i : nodes)
+ {
+ instances.add("localhost_" + i);
+ }
+
+ for (String resourceName : resources)
+ {
+ IdealState idealState = new IdealState(resourceName);
+ for (int p = 0; p < partitions; p++)
+ {
+ List<String> value = new ArrayList<String>();
+ for (int r = 0; r < replicas; r++)
+ {
+ int n = nodes[(p + r) % nodes.length];
+ value.add("localhost_" + n);
+ }
+ idealState.getRecord().setListField(resourceName + "_" + p, value);
+ }
+
+ idealState.setReplicas(Integer.toString(replicas));
+ idealState.setStateModelDefRef("MasterSlave");
+ idealState.setIdealStateMode(IdealStateModeProperty.AUTO.toString());
+ idealState.setNumPartitions(partitions);
+ idealStates.add(idealState);
+
+ // System.out.println(idealState);
+ accessor.setProperty(keyBuilder.idealStates(resourceName), idealState);
+ }
+ return idealStates;
+ }
+
+ protected void setupLiveInstances(String clusterName, int[] liveInstances)
+ {
+ ZKHelixDataAccessor accessor =
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
+ Builder keyBuilder = accessor.keyBuilder();
+
+ for (int i = 0; i < liveInstances.length; i++)
+ {
+ String instance = "localhost_" + liveInstances[i];
+ LiveInstance liveInstance = new LiveInstance(instance);
+ liveInstance.setSessionId("session_" + liveInstances[i]);
+ liveInstance.setHelixVersion("0.0.0");
+ accessor.setProperty(keyBuilder.liveInstance(instance), liveInstance);
+ }
+ }
+
+ protected void setupInstances(String clusterName, int[] instances)
+ {
+ HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+ for (int i = 0; i < instances.length; i++)
+ {
+ String instance = "localhost_" + instances[i];
+ InstanceConfig instanceConfig = new InstanceConfig(instance);
+ instanceConfig.setHostName("localhost");
+ instanceConfig.setPort("" + instances[i]);
+ instanceConfig.setInstanceEnabled(true);
+ admin.addInstance(clusterName, instanceConfig);
+ }
+ }
+
+ protected void runPipeline(ClusterEvent event, Pipeline pipeline)
+ {
+ try
+ {
+ pipeline.handle(event);
+ pipeline.finish();
+ }
+ catch (Exception e)
+ {
+ LOG.error("Exception while executing pipeline:" + pipeline
+ + ". Will not continue to next pipeline", e);
+ }
+ }
+
+ protected void runStage(ClusterEvent event, Stage stage) throws Exception
+ {
+ StageContext context = new StageContext();
+ stage.init(context);
+ stage.preProcess();
+ stage.process(event);
+ stage.postProcess();
+ }
+
+ protected Message createMessage(MessageType type,
+ String msgId,
+ String fromState,
+ String toState,
+ String resourceName,
+ String tgtName)
+ {
+ Message msg = new Message(type.toString(), msgId);
+ msg.setFromState(fromState);
+ msg.setToState(toState);
+ msg.getRecord().setSimpleField(Attributes.RESOURCE_NAME.toString(), resourceName);
+ msg.setTgtName(tgtName);
+ return msg;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/alerts/TestAddAlerts.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/alerts/TestAddAlerts.java b/helix-core/src/test/java/org/apache/helix/alerts/TestAddAlerts.java
new file mode 100644
index 0000000..acce1f5
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/alerts/TestAddAlerts.java
@@ -0,0 +1,121 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.alerts;
+
+import java.util.Map;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.Mocks.MockManager;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.alerts.AlertParser;
+import org.apache.helix.alerts.AlertsHolder;
+import org.apache.helix.controller.stages.HealthDataCache;
+import org.testng.AssertJUnit;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class TestAddAlerts
+{
+
+ protected static final String CLUSTER_NAME = "TestCluster";
+
+ MockManager _helixManager;
+ AlertsHolder _alertsHolder;
+
+ public final String EXP = AlertParser.EXPRESSION_NAME;
+ public final String CMP = AlertParser.COMPARATOR_NAME;
+ public final String CON = AlertParser.CONSTANT_NAME;
+
+ @BeforeMethod()
+ public void setup()
+ {
+ _helixManager = new MockManager(CLUSTER_NAME);
+ _alertsHolder = new AlertsHolder(_helixManager, new HealthDataCache());
+ }
+
+ public boolean alertRecordContains(ZNRecord rec, String alertName)
+ {
+ Map<String, Map<String, String>> alerts = rec.getMapFields();
+ return alerts.containsKey(alertName);
+ }
+
+ public int alertsSize(ZNRecord rec)
+ {
+ Map<String, Map<String, String>> alerts = rec.getMapFields();
+ return alerts.size();
+ }
+
+ @Test()
+ public void testAddAlert() throws Exception
+ {
+ String alert =
+ EXP + "(accumulate()(dbFoo.partition10.latency))" + CMP + "(GREATER)" + CON
+ + "(10)";
+ _alertsHolder.addAlert(alert);
+ HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
+ Builder keyBuilder = accessor.keyBuilder();
+
+ ZNRecord rec = accessor.getProperty(keyBuilder.alerts()).getRecord();
+ System.out.println("alert: " + alert);
+ System.out.println("rec: " + rec.toString());
+ AssertJUnit.assertTrue(alertRecordContains(rec, alert));
+ AssertJUnit.assertEquals(1, alertsSize(rec));
+ }
+
+ @Test()
+ public void testAddTwoAlerts() throws Exception
+ {
+ String alert1 =
+ EXP + "(accumulate()(dbFoo.partition10.latency))" + CMP + "(GREATER)" + CON
+ + "(10)";
+ String alert2 =
+ EXP + "(accumulate()(dbFoo.partition10.latency))" + CMP + "(GREATER)" + CON
+ + "(100)";
+ _alertsHolder.addAlert(alert1);
+ _alertsHolder.addAlert(alert2);
+
+ HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
+ Builder keyBuilder = accessor.keyBuilder();
+
+ ZNRecord rec = accessor.getProperty(keyBuilder.alerts()).getRecord();
+ // System.out.println("alert: "+alert1);
+ System.out.println("rec: " + rec.toString());
+ AssertJUnit.assertTrue(alertRecordContains(rec, alert1));
+ AssertJUnit.assertTrue(alertRecordContains(rec, alert2));
+ AssertJUnit.assertEquals(2, alertsSize(rec));
+ }
+
+ @Test(groups = { "unitTest" })
+ public void testAddTwoWildcardAlert() throws Exception
+ {
+ String alert1 =
+ EXP + "(accumulate()(dbFoo.partition*.put*))" + CMP + "(GREATER)" + CON + "(10)";
+ _alertsHolder.addAlert(alert1);
+
+ HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
+ Builder keyBuilder = accessor.keyBuilder();
+
+ ZNRecord rec = accessor.getProperty(keyBuilder.alerts()).getRecord();
+ // System.out.println("alert: "+alert1);
+ System.out.println("rec: " + rec.toString());
+ AssertJUnit.assertTrue(alertRecordContains(rec, alert1));
+ AssertJUnit.assertEquals(1, alertsSize(rec));
+ }
+
+ // add 2 wildcard alert here
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/alerts/TestAddPersistentStats.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/alerts/TestAddPersistentStats.java b/helix-core/src/test/java/org/apache/helix/alerts/TestAddPersistentStats.java
new file mode 100644
index 0000000..be3b1d9
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/alerts/TestAddPersistentStats.java
@@ -0,0 +1,217 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.alerts;
+
+import java.util.Map;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.Mocks.MockManager;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.alerts.StatsHolder;
+import org.apache.helix.controller.stages.HealthDataCache;
+import org.testng.AssertJUnit;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class TestAddPersistentStats
+{
+
+ protected static final String CLUSTER_NAME = "TestCluster";
+
+ MockManager _helixManager;
+ StatsHolder _statsHolder;
+
+ @BeforeMethod(groups = { "unitTest" })
+ public void setup()
+ {
+ _helixManager = new MockManager(CLUSTER_NAME);
+ _statsHolder = new StatsHolder(_helixManager, new HealthDataCache());
+ }
+
+ public boolean statRecordContains(ZNRecord rec, String statName)
+ {
+ Map<String, Map<String, String>> stats = rec.getMapFields();
+ return stats.containsKey(statName);
+ }
+
+ public int statsSize(ZNRecord rec)
+ {
+ Map<String, Map<String, String>> stats = rec.getMapFields();
+ return stats.size();
+ }
+
+ @Test(groups = { "unitTest" })
+ public void testAddStat() throws Exception
+ {
+ String stat = "window(5)(dbFoo.partition10.latency)";
+ _statsHolder.addStat(stat);
+ _statsHolder.persistStats();
+
+ HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
+ Builder keyBuilder = accessor.keyBuilder();
+
+ ZNRecord rec = accessor.getProperty(keyBuilder.persistantStat()).getRecord();
+ System.out.println("rec: " + rec.toString());
+ AssertJUnit.assertTrue(statRecordContains(rec, stat));
+ AssertJUnit.assertEquals(1, statsSize(rec));
+ }
+
+ @Test(groups = { "unitTest" })
+ public void testAddTwoStats() throws Exception
+ {
+ String stat1 = "window(5)(dbFoo.partition10.latency)";
+ _statsHolder.addStat(stat1);
+ _statsHolder.persistStats();
+ String stat2 = "window(5)(dbFoo.partition11.latency)";
+ _statsHolder.addStat(stat2);
+ _statsHolder.persistStats();
+
+ HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
+ Builder keyBuilder = accessor.keyBuilder();
+
+ ZNRecord rec = accessor.getProperty(keyBuilder.persistantStat()).getRecord();
+
+ System.out.println("rec: " + rec.toString());
+ AssertJUnit.assertTrue(statRecordContains(rec, stat1));
+ AssertJUnit.assertTrue(statRecordContains(rec, stat2));
+ AssertJUnit.assertEquals(2, statsSize(rec));
+ }
+
+ @Test(groups = { "unitTest" })
+ public void testAddDuplicateStat() throws Exception
+ {
+ String stat = "window(5)(dbFoo.partition10.latency)";
+ _statsHolder.addStat(stat);
+ _statsHolder.addStat(stat);
+ _statsHolder.persistStats();
+
+ HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
+ Builder keyBuilder = accessor.keyBuilder();
+
+ ZNRecord rec = accessor.getProperty(keyBuilder.persistantStat()).getRecord();
+
+ System.out.println("rec: " + rec.toString());
+ AssertJUnit.assertTrue(statRecordContains(rec, stat));
+ AssertJUnit.assertEquals(1, statsSize(rec));
+ }
+
+ @Test(groups = { "unitTest" })
+ public void testAddPairOfStats() throws Exception
+ {
+ String exp = "accumulate()(dbFoo.partition10.latency, dbFoo.partition10.count)";
+ _statsHolder.addStat(exp);
+ _statsHolder.persistStats();
+
+ HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
+ Builder keyBuilder = accessor.keyBuilder();
+
+ ZNRecord rec = accessor.getProperty(keyBuilder.persistantStat()).getRecord();
+ System.out.println("rec: " + rec.toString());
+ AssertJUnit.assertTrue(statRecordContains(rec,
+ "accumulate()(dbFoo.partition10.latency)"));
+ AssertJUnit.assertTrue(statRecordContains(rec,
+ "accumulate()(dbFoo.partition10.count)"));
+ AssertJUnit.assertEquals(2, statsSize(rec));
+ }
+
+ @Test(groups = { "unitTest" })
+ public void testAddStatsWithOperators() throws Exception
+ {
+ String exp =
+ "accumulate()(dbFoo.partition10.latency, dbFoo.partition10.count)|EACH|ACCUMULATE|DIVIDE";
+ _statsHolder.addStat(exp);
+ _statsHolder.persistStats();
+
+ HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
+ Builder keyBuilder = accessor.keyBuilder();
+
+ ZNRecord rec = accessor.getProperty(keyBuilder.persistantStat()).getRecord();
+
+ System.out.println("rec: " + rec.toString());
+ AssertJUnit.assertTrue(statRecordContains(rec,
+ "accumulate()(dbFoo.partition10.latency)"));
+ AssertJUnit.assertTrue(statRecordContains(rec,
+ "accumulate()(dbFoo.partition10.count)"));
+ AssertJUnit.assertEquals(2, statsSize(rec));
+ }
+
+ @Test(groups = { "unitTest" })
+ public void testAddNonExistentAggregator() throws Exception
+ {
+ String exp = "fakeagg()(dbFoo.partition10.latency)";
+ boolean caughtException = false;
+ try
+ {
+ _statsHolder.addStat(exp);
+ }
+ catch (HelixException e)
+ {
+ caughtException = true;
+ }
+ AssertJUnit.assertTrue(caughtException);
+ }
+
+ @Test(groups = { "unitTest" })
+ public void testGoodAggregatorBadArgs() throws Exception
+ {
+ String exp = "accumulate(10)(dbFoo.partition10.latency)";
+ boolean caughtException = false;
+ try
+ {
+ _statsHolder.addStat(exp);
+ }
+ catch (HelixException e)
+ {
+ caughtException = true;
+ }
+ AssertJUnit.assertTrue(caughtException);
+ }
+
+ @Test(groups = { "unitTest" })
+ public void testAddBadNestingStat1() throws Exception
+ {
+ String exp = "window((5)(dbFoo.partition10.latency)";
+ boolean caughtException = false;
+ try
+ {
+ _statsHolder.addStat(exp);
+ }
+ catch (HelixException e)
+ {
+ caughtException = true;
+ }
+ AssertJUnit.assertTrue(caughtException);
+ }
+
+ @Test(groups = { "unitTest" })
+ public void testAddBadNestingStat2() throws Exception
+ {
+ String exp = "window(5)(dbFoo.partition10.latency))";
+ boolean caughtException = false;
+ try
+ {
+ _statsHolder.addStat(exp);
+ }
+ catch (HelixException e)
+ {
+ caughtException = true;
+ }
+ AssertJUnit.assertTrue(caughtException);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/alerts/TestAlertValidation.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/alerts/TestAlertValidation.java b/helix-core/src/test/java/org/apache/helix/alerts/TestAlertValidation.java
new file mode 100644
index 0000000..bb08b6c
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/alerts/TestAlertValidation.java
@@ -0,0 +1,165 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.alerts;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.alerts.AlertParser;
+import org.testng.annotations.Test;
+import org.testng.AssertJUnit;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+@Test
+public class TestAlertValidation {
+
+ public final String EXP = AlertParser.EXPRESSION_NAME;
+ public final String CMP = AlertParser.COMPARATOR_NAME;
+ public final String CON = AlertParser.CONSTANT_NAME;
+
+ @Test
+ public void testSimple() {
+ String alertName = EXP + "(accumulate()(dbFoo.partition10.latency)) "
+ + CMP + "(GREATER) " + CON + "(10)";
+ boolean caughtException = false;
+ try {
+ AlertParser.validateAlert(alertName);
+ } catch (HelixException e) {
+ caughtException = true;
+ e.printStackTrace();
+ }
+ AssertJUnit.assertFalse(caughtException);
+ }
+
+ @Test
+ public void testSingleInSingleOut() {
+ String alertName = EXP
+ + "(accumulate()(dbFoo.partition10.latency)|EXPAND) " + CMP
+ + "(GREATER) " + CON + "(10)";
+ boolean caughtException = false;
+ try {
+ AlertParser.validateAlert(alertName);
+ } catch (HelixException e) {
+ caughtException = true;
+ e.printStackTrace();
+ }
+ AssertJUnit.assertFalse(caughtException);
+ }
+
+ @Test
+ public void testDoubleInDoubleOut() {
+ String alertName = EXP
+ + "(accumulate()(dbFoo.partition10.latency, dbFoo.partition11.latency)|EXPAND) "
+ + CMP + "(GREATER) " + CON + "(10)";
+ boolean caughtException = false;
+ try {
+ AlertParser.validateAlert(alertName);
+ } catch (HelixException e) {
+ caughtException = true;
+ e.printStackTrace();
+ }
+ AssertJUnit.assertTrue(caughtException);
+ }
+
+ @Test
+ public void testTwoStageOps() {
+ String alertName = EXP
+ + "(accumulate()(dbFoo.partition*.latency, dbFoo.partition*.count)|EXPAND|DIVIDE) "
+ + CMP + "(GREATER) " + CON + "(10)";
+ boolean caughtException = false;
+ try {
+ AlertParser.validateAlert(alertName);
+ } catch (HelixException e) {
+ caughtException = true;
+ e.printStackTrace();
+ }
+ AssertJUnit.assertFalse(caughtException);
+ }
+
+ @Test
+ public void testTwoListsIntoOne() {
+ String alertName = EXP
+ + "(accumulate()(dbFoo.partition10.latency, dbFoo.partition11.count)|SUM) "
+ + CMP + "(GREATER) " + CON + "(10)";
+ boolean caughtException = false;
+ try {
+ AlertParser.validateAlert(alertName);
+ } catch (HelixException e) {
+ caughtException = true;
+ e.printStackTrace();
+ }
+ AssertJUnit.assertFalse(caughtException);
+ }
+
+ @Test
+ public void testSumEach() {
+ String alertName = EXP
+ + "(accumulate()(dbFoo.partition*.latency, dbFoo.partition*.count)|EXPAND|SUMEACH|DIVIDE) "
+ + CMP + "(GREATER) " + CON + "(10)";
+ boolean caughtException = false;
+ try {
+ AlertParser.validateAlert(alertName);
+ } catch (HelixException e) {
+ caughtException = true;
+ e.printStackTrace();
+ }
+ AssertJUnit.assertFalse(caughtException);
+ }
+
+ @Test
+ public void testNeedTwoTuplesGetOne() {
+ String alertName = EXP
+ + "(accumulate()(dbFoo.partition*.latency)|EXPAND|DIVIDE) "
+ + CMP + "(GREATER) " + CON + "(10)";
+ boolean caughtException = false;
+ try {
+ AlertParser.validateAlert(alertName);
+ } catch (HelixException e) {
+ caughtException = true;
+ e.printStackTrace();
+ }
+ AssertJUnit.assertTrue(caughtException);
+ }
+
+ @Test
+ public void testExtraPipe() {
+ String alertName = EXP + "(accumulate()(dbFoo.partition10.latency)|) "
+ + CMP + "(GREATER) " + CON + "(10)";
+ boolean caughtException = false;
+ try {
+ AlertParser.validateAlert(alertName);
+ } catch (HelixException e) {
+ caughtException = true;
+ e.printStackTrace();
+ }
+ AssertJUnit.assertTrue(caughtException);
+ }
+
+ @Test
+ public void testAlertUnknownOp() {
+ String alertName = EXP
+ + "(accumulate()(dbFoo.partition10.latency)|BADOP) " + CMP
+ + "(GREATER) " + CON + "(10)";
+ boolean caughtException = false;
+ try {
+ AlertParser.validateAlert(alertName);
+ } catch (HelixException e) {
+ caughtException = true;
+ e.printStackTrace();
+ }
+ AssertJUnit.assertTrue(caughtException);
+ }
+}