You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:58 UTC
[31/42] Refactoring the package names and removing jsql parser
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertFireHistory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertFireHistory.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertFireHistory.java
new file mode 100644
index 0000000..f7768aa
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertFireHistory.java
@@ -0,0 +1,328 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.healthcheck;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.helix.ConfigScope;
+import org.apache.helix.ConfigScopeBuilder;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.healthcheck.HealthStatsAggregationTask;
+import org.apache.helix.integration.ZkStandAloneCMTestBase;
+import org.apache.helix.integration.ZkStandAloneCMTestBaseWithPropertyServerCheck;
+import org.apache.helix.model.AlertHistory;
+import org.apache.helix.model.HealthStat;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+/**
+ *
+ * setup a storage cluster and start a zk-based cluster controller in stand-alone mode
+ * start 5 dummy participants verify the current states at end
+ */
+
+public class TestAlertFireHistory extends ZkStandAloneCMTestBaseWithPropertyServerCheck
+{
+ String _statName = "TestStat@DB=db1";
+ String _stat = "TestStat";
+ String metricName1 = "TestMetric1";
+ String metricName2 = "TestMetric2";
+
+ String _alertStr1 = "EXP(decay(1.0)(localhost_*.TestStat@DB=db1.TestMetric1))CMP(GREATER)CON(20)";
+ String _alertStr2 = "EXP(decay(1.0)(localhost_*.TestStat@DB=db1.TestMetric2))CMP(GREATER)CON(100)";
+
+ void setHealthData(int[] val1, int[] val2)
+ {
+ for (int i = 0; i < NODE_NR; i++)
+ {
+ String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+ HelixManager manager = _startCMResultMap.get(instanceName)._manager;
+ ZNRecord record = new ZNRecord(_stat);
+ Map<String, String> valMap = new HashMap<String, String>();
+ valMap.put(metricName1, val1[i] + "");
+ valMap.put(metricName2, val2[i] + "");
+ record.setSimpleField("TimeStamp", new Date().getTime() + "");
+ record.setMapField(_statName, valMap);
+ HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+ Builder keyBuilder = helixDataAccessor.keyBuilder();
+ helixDataAccessor
+ .setProperty(keyBuilder.healthReport( manager.getInstanceName(), record.getId()), new HealthStat(record));
+ }
+ try
+ {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void TestAlertDisable() throws InterruptedException
+ {
+
+ int[] metrics1 = {10, 15, 22, 24, 16};
+ int[] metrics2 = {22, 115, 22, 141,16};
+ setHealthData(metrics1, metrics2);
+
+ String controllerName = CONTROLLER_PREFIX + "_0";
+ HelixManager manager = _startCMResultMap.get(controllerName)._manager;
+ manager.startTimerTasks();
+
+ _setupTool.getClusterManagementTool().addAlert(CLUSTER_NAME, _alertStr1);
+ _setupTool.getClusterManagementTool().addAlert(CLUSTER_NAME, _alertStr2);
+
+ ConfigScope scope = new ConfigScopeBuilder().forCluster(CLUSTER_NAME).build();
+ Map<String, String> properties = new HashMap<String, String>();
+ properties.put("healthChange.enabled", "false");
+ _setupTool.getClusterManagementTool().setConfig(scope, properties);
+
+ HealthStatsAggregationTask task = new HealthStatsAggregationTask(_startCMResultMap.get(controllerName)._manager);
+ task.run();
+ Thread.sleep(100);
+ HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+ Builder keyBuilder = helixDataAccessor.keyBuilder();
+
+ AlertHistory history = manager.getHelixDataAccessor().getProperty(keyBuilder.alertHistory());
+ //
+ Assert.assertEquals(history, null);
+
+ properties.put("healthChange.enabled", "true");
+ _setupTool.getClusterManagementTool().setConfig(scope, properties);
+
+ task.run();
+ Thread.sleep(100);
+
+ history = manager.getHelixDataAccessor().getProperty(keyBuilder.alertHistory());
+ //
+ Assert.assertNotNull(history);
+ Assert.assertEquals(history.getRecord().getMapFields().size(), 1);
+ }
+
+ @Test
+ public void TestAlertHistory() throws InterruptedException
+ {
+ int[] metrics1 = {10, 15, 22, 24, 16};
+ int[] metrics2 = {22, 115, 22, 141,16};
+ setHealthData(metrics1, metrics2);
+
+ String controllerName = CONTROLLER_PREFIX + "_0";
+ HelixManager manager = _startCMResultMap.get(controllerName)._manager;
+ manager.stopTimerTasks();
+
+ _setupTool.getClusterManagementTool().addAlert(CLUSTER_NAME, _alertStr1);
+ _setupTool.getClusterManagementTool().addAlert(CLUSTER_NAME, _alertStr2);
+
+ int historySize = 0;
+ HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+ Builder keyBuilder = helixDataAccessor.keyBuilder();
+ HelixProperty property = helixDataAccessor.getProperty(keyBuilder.alertHistory());
+ ZNRecord history = null;
+ if(property != null)
+ {
+ history = property.getRecord();
+ historySize = property.getRecord().getMapFields().size();
+ }
+
+ HealthStatsAggregationTask task = new HealthStatsAggregationTask(_startCMResultMap.get(controllerName)._manager);
+ task.run();
+ Thread.sleep(100);
+
+
+ history = helixDataAccessor.getProperty(keyBuilder.alertHistory()).getRecord();
+ //
+ Assert.assertEquals(history.getMapFields().size(), 1 + historySize);
+ TreeMap<String, Map<String, String>> recordMap = new TreeMap<String, Map<String, String>>();
+ recordMap.putAll( history.getMapFields());
+ Map<String, String> lastRecord = recordMap.firstEntry().getValue();
+ Assert.assertTrue(lastRecord.size() == 4);
+ Assert.assertTrue(lastRecord.get("(localhost_12920.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("ON"));
+ Assert.assertTrue(lastRecord.get("(localhost_12919.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("ON"));
+ Assert.assertTrue(lastRecord.get("(localhost_12921.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("ON"));
+ Assert.assertTrue(lastRecord.get("(localhost_12921.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("ON"));
+
+ setHealthData(metrics1, metrics2);
+ task.run();
+ Thread.sleep(100);
+ history = helixDataAccessor.getProperty(keyBuilder.alertHistory()).getRecord();
+ // no change
+ Assert.assertEquals(history.getMapFields().size(), 1 + historySize);
+ recordMap = new TreeMap<String, Map<String, String>>();
+ recordMap.putAll( history.getMapFields());
+ lastRecord = recordMap.firstEntry().getValue();
+ Assert.assertTrue(lastRecord.size() == 4);
+ Assert.assertTrue(lastRecord.get("(localhost_12920.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("ON"));
+ Assert.assertTrue(lastRecord.get("(localhost_12919.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("ON"));
+ Assert.assertTrue(lastRecord.get("(localhost_12921.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("ON"));
+ Assert.assertTrue(lastRecord.get("(localhost_12921.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("ON"));
+
+ int [] metrics3 = {21, 44, 22, 14, 16};
+ int [] metrics4 = {122, 115, 222, 41,16};
+ setHealthData(metrics3, metrics4);
+ task.run();
+ Thread.sleep(100);
+ history = helixDataAccessor.getProperty(keyBuilder.alertHistory()).getRecord();
+ // new delta should be recorded
+ Assert.assertEquals(history.getMapFields().size(), 2 + historySize);
+ recordMap = new TreeMap<String, Map<String, String>>();
+ recordMap.putAll( history.getMapFields());
+ lastRecord = recordMap.lastEntry().getValue();
+ Assert.assertEquals(lastRecord.size(), 6);
+ Assert.assertTrue(lastRecord.get("(localhost_12918.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("ON"));
+ Assert.assertTrue(lastRecord.get("(localhost_12921.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("OFF"));
+ Assert.assertTrue(lastRecord.get("(localhost_12920.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("ON"));
+ Assert.assertTrue(lastRecord.get("(localhost_12921.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("OFF"));
+ Assert.assertTrue(lastRecord.get("(localhost_12918.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("ON"));
+ Assert.assertTrue(lastRecord.get("(localhost_12919.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("ON"));
+
+ int [] metrics5 = {0, 0, 0, 0, 0};
+ int [] metrics6 = {0, 0, 0, 0,0};
+ setHealthData(metrics5, metrics6);
+ task.run();
+ Thread.sleep(500);
+ history = helixDataAccessor.getProperty(keyBuilder.alertHistory()).getRecord();
+ // reset everything
+ Assert.assertEquals(history.getMapFields().size(), 3 + historySize);
+ recordMap = new TreeMap<String, Map<String, String>>();
+ recordMap.putAll( history.getMapFields());
+ lastRecord = recordMap.lastEntry().getValue();
+ Assert.assertTrue(lastRecord.size() == 6);
+ Assert.assertTrue(lastRecord.get("(localhost_12918.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("OFF"));
+ Assert.assertTrue(lastRecord.get("(localhost_12920.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("OFF"));
+ Assert.assertTrue(lastRecord.get("(localhost_12920.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("OFF"));
+ Assert.assertTrue(lastRecord.get("(localhost_12919.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("OFF"));
+ Assert.assertTrue(lastRecord.get("(localhost_12918.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("OFF"));
+ Assert.assertTrue(lastRecord.get("(localhost_12919.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("OFF"));
+
+ // Size of the history should be 30
+ for(int i = 0;i < 27; i++)
+ {
+ int x = i % 2;
+ int y = (i+1) % 2;
+ int[] metricsx = {19 + 3*x, 19 + 3*y, 19 + 4*x, 18+4*y, 17+5*y};
+ int[] metricsy = {99 + 3*x, 99 + 3*y, 98 + 4*x, 98+4*y, 97+5*y};
+
+ setHealthData(metricsx, metricsy);
+ task.run();
+ Thread.sleep(100);
+ history = helixDataAccessor.getProperty(keyBuilder.alertHistory()).getRecord();
+
+ Assert.assertEquals(history.getMapFields().size(), Math.min(3 + i + 1 + historySize, 30));
+ recordMap = new TreeMap<String, Map<String, String>>();
+ recordMap.putAll( history.getMapFields());
+ lastRecord = recordMap.lastEntry().getValue();
+ if(i == 0)
+ {
+ Assert.assertTrue(lastRecord.size() == 6);
+ Assert.assertTrue(lastRecord.get("(localhost_12922.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("ON"));
+ Assert.assertTrue(lastRecord.get("(localhost_12922.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("ON"));
+ Assert.assertTrue(lastRecord.get("(localhost_12919.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("ON"));
+ Assert.assertTrue(lastRecord.get("(localhost_12921.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("ON"));
+ Assert.assertTrue(lastRecord.get("(localhost_12921.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("ON"));
+ Assert.assertTrue(lastRecord.get("(localhost_12919.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("ON"));
+ }
+ else
+ {
+ System.out.println(lastRecord.size());
+ Assert.assertEquals(lastRecord.size() , 10);
+ if(x == 0)
+ {
+ Assert.assertTrue(lastRecord.get("(localhost_12922.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("ON"));
+ Assert.assertTrue(lastRecord.get("(localhost_12922.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("ON"));
+ Assert.assertTrue(lastRecord.get("(localhost_12920.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("OFF"));
+ Assert.assertTrue(lastRecord.get("(localhost_12918.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("OFF"));
+ Assert.assertTrue(lastRecord.get("(localhost_12919.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("ON"));
+ Assert.assertTrue(lastRecord.get("(localhost_12921.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("ON"));
+ Assert.assertTrue(lastRecord.get("(localhost_12920.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("OFF"));
+ Assert.assertTrue(lastRecord.get("(localhost_12921.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("ON"));
+ Assert.assertTrue(lastRecord.get("(localhost_12918.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("OFF"));
+ Assert.assertTrue(lastRecord.get("(localhost_12919.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("ON"));
+ }
+ else
+ {
+ Assert.assertTrue(lastRecord.get("(localhost_12922.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("OFF"));
+ Assert.assertTrue(lastRecord.get("(localhost_12922.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("OFF"));
+ Assert.assertTrue(lastRecord.get("(localhost_12920.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("ON"));
+ Assert.assertTrue(lastRecord.get("(localhost_12918.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("ON"));
+ Assert.assertTrue(lastRecord.get("(localhost_12919.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("OFF"));
+ Assert.assertTrue(lastRecord.get("(localhost_12921.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("OFF"));
+ Assert.assertTrue(lastRecord.get("(localhost_12920.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("ON"));
+ Assert.assertTrue(lastRecord.get("(localhost_12921.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("OFF"));
+ Assert.assertTrue(lastRecord.get("(localhost_12918.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("ON"));
+ Assert.assertTrue(lastRecord.get("(localhost_12919.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("OFF"));
+ }
+ }
+ }
+ // size limit is 30
+ for(int i = 0;i < 10; i++)
+ {
+ int x = i % 2;
+ int y = (i+1) % 2;
+ int[] metricsx = {19 + 3*x, 19 + 3*y, 19 + 4*x, 18+4*y, 17+5*y};
+ int[] metricsy = {99 + 3*x, 99 + 3*y, 98 + 4*x, 98+4*y, 97+5*y};
+
+ setHealthData(metricsx, metricsy);
+ task.run();
+ Thread.sleep(100);
+ history = helixDataAccessor.getProperty(keyBuilder.alertHistory()).getRecord();
+
+ Assert.assertEquals(history.getMapFields().size(), 30);
+ recordMap = new TreeMap<String, Map<String, String>>();
+ recordMap.putAll( history.getMapFields());
+ lastRecord = recordMap.lastEntry().getValue();
+
+ Assert.assertEquals(lastRecord.size() , 10);
+ if(x == 0)
+ {
+ Assert.assertTrue(lastRecord.get("(localhost_12922.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("ON"));
+ Assert.assertTrue(lastRecord.get("(localhost_12922.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("ON"));
+ Assert.assertTrue(lastRecord.get("(localhost_12920.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("OFF"));
+ Assert.assertTrue(lastRecord.get("(localhost_12918.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("OFF"));
+ Assert.assertTrue(lastRecord.get("(localhost_12919.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("ON"));
+ Assert.assertTrue(lastRecord.get("(localhost_12921.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("ON"));
+ Assert.assertTrue(lastRecord.get("(localhost_12920.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("OFF"));
+ Assert.assertTrue(lastRecord.get("(localhost_12921.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("ON"));
+ Assert.assertTrue(lastRecord.get("(localhost_12918.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("OFF"));
+ Assert.assertTrue(lastRecord.get("(localhost_12919.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("ON"));
+ }
+ else
+ {
+ Assert.assertTrue(lastRecord.get("(localhost_12922.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("OFF"));
+ Assert.assertTrue(lastRecord.get("(localhost_12922.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("OFF"));
+ Assert.assertTrue(lastRecord.get("(localhost_12920.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("ON"));
+ Assert.assertTrue(lastRecord.get("(localhost_12918.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("ON"));
+ Assert.assertTrue(lastRecord.get("(localhost_12919.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("OFF"));
+ Assert.assertTrue(lastRecord.get("(localhost_12921.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("OFF"));
+ Assert.assertTrue(lastRecord.get("(localhost_12920.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("ON"));
+ Assert.assertTrue(lastRecord.get("(localhost_12921.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("OFF"));
+ Assert.assertTrue(lastRecord.get("(localhost_12918.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("ON"));
+ Assert.assertTrue(lastRecord.get("(localhost_12919.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("OFF"));
+ }
+ }
+
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/healthcheck/TestDummyAlerts.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestDummyAlerts.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestDummyAlerts.java
new file mode 100644
index 0000000..4d7b67b
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/healthcheck/TestDummyAlerts.java
@@ -0,0 +1,153 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.healthcheck;
+
+import java.util.Date;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.mock.controller.ClusterController;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.mock.storage.MockTransition;
+import org.apache.helix.model.HealthStat;
+import org.apache.helix.model.Message;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.MasterNbInExtViewVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestDummyAlerts extends ZkIntegrationTestBase
+{
+ public class DummyAlertsTransition extends MockTransition
+ {
+ private final AtomicBoolean _done = new AtomicBoolean(false);
+
+ @Override
+ public void doTransition(Message message, NotificationContext context)
+ {
+ HelixManager manager = context.getManager();
+ HelixDataAccessor accessor = manager.getHelixDataAccessor();
+ Builder keyBuilder = accessor.keyBuilder();
+
+ String instance = message.getTgtName();
+ if (_done.getAndSet(true) == false)
+ {
+ for (int i = 0; i < 5; i++)
+ {
+// System.out.println(instance + " sets healthReport: " + "mockAlerts" + i);
+ accessor.setProperty(keyBuilder.healthReport(instance, "mockAlerts"),
+ new HealthStat(new ZNRecord("mockAlerts" + i)));
+ }
+ }
+ }
+
+ }
+
+ @Test()
+ public void testDummyAlerts() throws Exception
+ {
+ // Logger.getRootLogger().setLevel(Level.INFO);
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ final int n = 5;
+
+ MockParticipant[] participants = new MockParticipant[n];
+
+ System.out.println("START " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant start
+ // port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 10, // partitions per resource
+ n, // number of nodes
+ 3, // replicas
+ "MasterSlave",
+ true); // do rebalance
+
+ ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
+ enableHealthCheck(clusterName);
+ setupTool.getClusterManagementTool()
+ .addAlert(clusterName,
+ "EXP(decay(1.0)(*.defaultPerfCounters@defaultPerfCounters.availableCPUs))CMP(GREATER)CON(2)");
+
+ // start controller
+ ClusterController controller =
+ new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ controller.syncStart();
+
+
+ // start participants
+ for (int i = 0; i < n; i++)
+ {
+ String instanceName = "localhost_" + (12918 + i);
+
+ participants[i] =
+ new MockParticipant(clusterName,
+ instanceName,
+ ZK_ADDR,
+ new DummyAlertsTransition());
+ participants[i].syncStart();
+ }
+
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // other verifications go here
+ ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+ Builder keyBuilder = accessor.keyBuilder();
+
+ for (int i = 0; i < n; i++)
+ {
+ String instance = "localhost_" + (12918 + i);
+ ZNRecord record =
+ accessor.getProperty(keyBuilder.healthReport(instance, "mockAlerts")).getRecord();
+ Assert.assertEquals(record.getId(), "mockAlerts4");
+ }
+
+ // clean up
+ Thread.sleep(1000);
+ controller.syncStop();
+ for (int i = 0; i < 5; i++)
+ {
+ participants[i].syncStop();
+ }
+
+ System.out.println("END " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/healthcheck/TestExpandAlert.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestExpandAlert.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestExpandAlert.java
new file mode 100644
index 0000000..6b948f3
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/healthcheck/TestExpandAlert.java
@@ -0,0 +1,195 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.healthcheck;
+
+import java.util.Date;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.TestHelper.StartCMResult;
+import org.apache.helix.alerts.AlertValueAndStatus;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.healthcheck.HealthStatsAggregationTask;
+import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
+import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.mock.storage.MockEspressoHealthReportProvider;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.mock.storage.MockTransition;
+import org.apache.helix.model.Message;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestExpandAlert extends ZkIntegrationTestBase
+{
+ ZkClient _zkClient;
+ protected ClusterSetup _setupTool = null;
+ protected final String _alertStr = "EXP(decay(1.0)(localhost_*.RestQueryStats@DBName=TestDB0.latency))CMP(GREATER)CON(16)";
+ protected final String _alertStatusStr = _alertStr+" : (localhost_12918.RestQueryStats@DBName=TestDB0.latency)";
+ protected final String _dbName = "TestDB0";
+
+ @BeforeClass ()
+ public void beforeClass() throws Exception
+ {
+ _zkClient = new ZkClient(ZK_ADDR);
+ _zkClient.setZkSerializer(new ZNRecordSerializer());
+
+ _setupTool = new ClusterSetup(ZK_ADDR);
+ }
+
+ @AfterClass
+ public void afterClass()
+ {
+ _zkClient.close();
+ }
+
+ public class ExpandAlertTransition extends MockTransition
+ {
+ @Override
+ public void doTransition(Message message, NotificationContext context)
+ {
+ HelixManager manager = context.getManager();
+ HelixDataAccessor accessor = manager.getHelixDataAccessor();
+ String fromState = message.getFromState();
+ String toState = message.getToState();
+ String instance = message.getTgtName();
+ String partition = message.getPartitionName();
+
+ if (fromState.equalsIgnoreCase("SLAVE")
+ && toState.equalsIgnoreCase("MASTER"))
+ {
+
+ //add a stat and report to ZK
+ //perhaps should keep reporter per instance...
+ ParticipantHealthReportCollectorImpl reporter =
+ new ParticipantHealthReportCollectorImpl(manager, instance);
+ MockEspressoHealthReportProvider provider = new
+ MockEspressoHealthReportProvider();
+ reporter.addHealthReportProvider(provider);
+ String statName = "latency";
+ provider.setStat(_dbName, statName,"15");
+ reporter.transmitHealthReports();
+
+
+ /*
+ for (int i = 0; i < 5; i++)
+ {
+ accessor.setProperty(PropertyType.HEALTHREPORT,
+ new ZNRecord("mockAlerts" + i),
+ instance,
+ "mockAlerts");
+ try
+ {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ */
+ }
+ }
+
+ }
+
+ @Test()
+ public void testExpandAlert() throws Exception
+ {
+ String clusterName = getShortClassName();
+ MockParticipant[] participants = new MockParticipant[5];
+
+ System.out.println("START TestExpandAlert at " + new Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName,
+ ZK_ADDR,
+ 12918, // participant start port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 10, // partitions per resource
+ 5, // number of nodes //change back to 5!!!
+ 3, // replicas //change back to 3!!!
+ "MasterSlave",
+ true); // do rebalance
+ // enableHealthCheck(clusterName);
+
+ _setupTool.getClusterManagementTool().addAlert(clusterName, _alertStr);
+
+ StartCMResult cmResult = TestHelper.startController(clusterName,
+ "controller_0",
+ ZK_ADDR,
+ HelixControllerMain.STANDALONE);
+ // start participants
+ for (int i = 0; i < 5; i++) //!!!change back to 5
+ {
+ String instanceName = "localhost_" + (12918 + i);
+
+ participants[i] = new MockParticipant(clusterName,
+ instanceName,
+ ZK_ADDR,
+ new ExpandAlertTransition());
+ participants[i].start();
+// new Thread(participants[i]).start();
+ }
+
+ boolean result = ClusterStateVerifier.verifyByPolling(
+ new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, clusterName));
+ Assert.assertTrue(result);
+
+ Thread.sleep(1000);
+ // HealthAggregationTask is supposed to run by a timer every 30s
+ // To make sure HealthAggregationTask is run, we invoke it explicitly for this test
+ new HealthStatsAggregationTask(cmResult._manager).run();
+ //sleep for a few seconds to give stats stage time to trigger
+ Thread.sleep(3000);
+
+ // other verifications go here
+ ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient));
+ Builder keyBuilder = accessor.keyBuilder();
+
+ //for (int i = 0; i < 1; i++) //change 1 back to 5
+ //{
+ //String instance = "localhost_" + (12918 + i);
+ //String instance = "localhost_12918";
+ ZNRecord record = accessor.getProperty(keyBuilder.alertStatus()).getRecord();
+ Map<String, Map<String,String>> recMap = record.getMapFields();
+ Set<String> keySet = recMap.keySet();
+ Map<String,String> alertStatusMap = recMap.get(_alertStatusStr);
+ String val = alertStatusMap.get(AlertValueAndStatus.VALUE_NAME);
+ boolean fired = Boolean.parseBoolean(alertStatusMap.get(AlertValueAndStatus.FIRED_NAME));
+ Assert.assertEquals(Double.parseDouble(val), Double.parseDouble("15.0"));
+ Assert.assertFalse(fired);
+ //}
+
+ System.out.println("END TestExpandAlert at " + new Date(System.currentTimeMillis()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleAlert.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleAlert.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleAlert.java
new file mode 100644
index 0000000..7f85b35
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleAlert.java
@@ -0,0 +1,207 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.healthcheck;
+
+import java.util.Date;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.TestHelper.StartCMResult;
+import org.apache.helix.alerts.AlertValueAndStatus;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.healthcheck.HealthStatsAggregationTask;
+import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
+import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.mock.storage.MockEspressoHealthReportProvider;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.mock.storage.MockTransition;
+import org.apache.helix.model.Message;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestSimpleAlert extends ZkIntegrationTestBase
+{
+ ZkClient _zkClient;
+ protected ClusterSetup _setupTool = null;
+ protected final String _alertStr = "EXP(decay(1.0)(localhost_12918.RestQueryStats@DBName=TestDB0.latency))CMP(GREATER)CON(10)";
+ protected final String _alertStatusStr = _alertStr; //+" : (*)";
+ protected final String _dbName = "TestDB0";
+
+ @BeforeClass ()
+ public void beforeClass() throws Exception
+ {
+ _zkClient = new ZkClient(ZK_ADDR);
+ _zkClient.setZkSerializer(new ZNRecordSerializer());
+
+ _setupTool = new ClusterSetup(ZK_ADDR);
+ }
+
+ @AfterClass
+ public void afterClass()
+ {
+ _zkClient.close();
+ }
+
+ public class SimpleAlertTransition extends MockTransition
+ {
+ int _alertValue;
+ public SimpleAlertTransition(int value)
+ {
+ _alertValue = value;
+ }
+ @Override
+ public void doTransition(Message message, NotificationContext context)
+ {
+ HelixManager manager = context.getManager();
+ HelixDataAccessor accessor = manager.getHelixDataAccessor();
+ String fromState = message.getFromState();
+ String toState = message.getToState();
+ String instance = message.getTgtName();
+ String partition = message.getPartitionName();
+
+ if (fromState.equalsIgnoreCase("SLAVE")
+ && toState.equalsIgnoreCase("MASTER"))
+ {
+
+ //add a stat and report to ZK
+ //perhaps should keep reporter per instance...
+ ParticipantHealthReportCollectorImpl reporter =
+ new ParticipantHealthReportCollectorImpl(manager, instance);
+ MockEspressoHealthReportProvider provider = new
+ MockEspressoHealthReportProvider();
+ reporter.addHealthReportProvider(provider);
+ String statName = "latency";
+ provider.setStat(_dbName, statName,""+(0.1+_alertValue));
+ reporter.transmitHealthReports();
+
+ /*
+ for (int i = 0; i < 5; i++)
+ {
+ accessor.setProperty(PropertyType.HEALTHREPORT,
+ new ZNRecord("mockAlerts" + i),
+ instance,
+ "mockAlerts");
+ try
+ {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ */
+ }
+ }
+
+ }
+
+ @Test()
+ public void testSimpleAlert() throws Exception
+ {
+ String clusterName = getShortClassName();
+ MockParticipant[] participants = new MockParticipant[5];
+
+ System.out.println("START TestSimpleAlert at " + new Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName,
+ ZK_ADDR,
+ 12918, // participant start port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 10, // partitions per resource
+ 5, // number of nodes //change back to 5!!!
+ 3, // replicas //change back to 3!!!
+ "MasterSlave",
+ true); // do rebalance
+
+ // enableHealthCheck(clusterName);
+
+
+ StartCMResult cmResult = TestHelper.startController(clusterName,
+ "controller_0",
+ ZK_ADDR,
+ HelixControllerMain.STANDALONE);
+ cmResult._manager.startTimerTasks();
+ _setupTool.getClusterManagementTool().addAlert(clusterName, _alertStr);
+ // start participants
+ for (int i = 0; i < 5; i++) //!!!change back to 5
+ {
+ String instanceName = "localhost_" + (12918 + i);
+
+ participants[i] = new MockParticipant(clusterName,
+ instanceName,
+ ZK_ADDR,
+ new SimpleAlertTransition(15));
+ participants[i].syncStart();
+// new Thread(participants[i]).start();
+ }
+
+ boolean result = ClusterStateVerifier.verifyByPolling(
+ new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, clusterName));
+ Assert.assertTrue(result);
+
+ // HealthAggregationTask is supposed to run by a timer every 30s
+ // To make sure HealthAggregationTask is run, we invoke it explicitly for this test
+ new HealthStatsAggregationTask(cmResult._manager).run();
+ //sleep for a few seconds to give stats stage time to trigger
+ Thread.sleep(3000);
+
+ // other verifications go here
+ ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient));
+ Builder keyBuilder = accessor.keyBuilder();
+ //for (int i = 0; i < 1; i++) //change 1 back to 5
+ //{
+ //String instance = "localhost_" + (12918 + i);
+ String instance = "localhost_12918";
+ ZNRecord record = accessor.getProperty(keyBuilder.alertStatus()).getRecord();
+ Map<String, Map<String,String>> recMap = record.getMapFields();
+ Set<String> keySet = recMap.keySet();
+ Map<String,String> alertStatusMap = recMap.get(_alertStatusStr);
+ String val = alertStatusMap.get(AlertValueAndStatus.VALUE_NAME);
+ boolean fired = Boolean.parseBoolean(alertStatusMap.get(AlertValueAndStatus.FIRED_NAME));
+ Assert.assertEquals(Double.parseDouble(val), Double.parseDouble("15.1"));
+ Assert.assertTrue(fired);
+
+ // Verify Alert history from ZK
+ ZNRecord alertHistory = accessor.getProperty(keyBuilder.alertHistory()).getRecord();
+
+ String deltakey = (String) (alertHistory.getMapFields().keySet().toArray()[0]);
+ Map<String, String> delta = alertHistory.getMapField(deltakey);
+ Assert.assertTrue(delta.size() == 1);
+ Assert.assertTrue(delta.get("EXP(decay(1.0)(localhost_12918.RestQueryStats@DBName#TestDB0.latency))CMP(GREATER)CON(10)--(%)").equals("ON"));
+ //}
+
+ System.out.println("END TestSimpleAlert at " + new Date(System.currentTimeMillis()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleWildcardAlert.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleWildcardAlert.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleWildcardAlert.java
new file mode 100644
index 0000000..9e1da02
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleWildcardAlert.java
@@ -0,0 +1,256 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.healthcheck;
+
+import java.util.Date;
+import java.util.Map;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.TestHelper.StartCMResult;
+import org.apache.helix.alerts.AlertValueAndStatus;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.healthcheck.HealthStatsAggregationTask;
+import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
+import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.mock.storage.MockEspressoHealthReportProvider;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.mock.storage.MockTransition;
+import org.apache.helix.model.Message;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestSimpleWildcardAlert extends ZkIntegrationTestBase
+{
+ ZkClient _zkClient;
+ protected ClusterSetup _setupTool = null;
+ protected final String _alertStr = "EXP(decay(1.0)(localhost_12918.RestQueryStats@DBName=TestDB0.latency))CMP(GREATER)CON(10)";
+ protected final String _alertStatusStr = _alertStr; //+" : (*)";
+ protected final String _dbName = "TestDB0";
+
+ @BeforeClass ()
+ public void beforeClass() throws Exception
+ {
+ _zkClient = new ZkClient(ZK_ADDR);
+ _zkClient.setZkSerializer(new ZNRecordSerializer());
+
+ _setupTool = new ClusterSetup(ZK_ADDR);
+ }
+
+ @AfterClass
+ public void afterClass()
+ {
+ _zkClient.close();
+ }
+
+ public class SimpleAlertTransition extends MockTransition
+ {
+ int _alertValue;
+ public SimpleAlertTransition(int value)
+ {
+ _alertValue = value;
+ }
+ @Override
+ public void doTransition(Message message, NotificationContext context)
+ {
+ HelixManager manager = context.getManager();
+ HelixDataAccessor accessor = manager.getHelixDataAccessor();
+ String fromState = message.getFromState();
+ String toState = message.getToState();
+ String instance = message.getTgtName();
+ String partition = message.getPartitionName();
+
+ if (fromState.equalsIgnoreCase("SLAVE")
+ && toState.equalsIgnoreCase("MASTER"))
+ {
+
+ //add a stat and report to ZK
+ //perhaps should keep reporter per instance...
+ ParticipantHealthReportCollectorImpl reporter =
+ new ParticipantHealthReportCollectorImpl(manager, instance);
+ MockEspressoHealthReportProvider provider = new
+ MockEspressoHealthReportProvider();
+ reporter.addHealthReportProvider(provider);
+ String statName = "latency";
+ provider.setStat(_dbName, statName,""+(0.1+_alertValue));
+ reporter.transmitHealthReports();
+
+ /*
+ for (int i = 0; i < 5; i++)
+ {
+ accessor.setProperty(PropertyType.HEALTHREPORT,
+ new ZNRecord("mockAlerts" + i),
+ instance,
+ "mockAlerts");
+ try
+ {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ */
+ }
+ }
+
+ }
+
+ @Test()
+ public void testSimpleWildcardAlert() throws Exception
+ {
+ String clusterName = getShortClassName();
+ MockParticipant[] participants = new MockParticipant[5];
+
+ System.out.println("START testSimpleWildcardAlert at " + new Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName,
+ ZK_ADDR,
+ 12944, // participant start port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 10, // partitions per resource
+ 5, // number of nodes //change back to 5!!!
+ 3, // replicas //change back to 3!!!
+ "MasterSlave",
+ true); // do rebalance
+
+ // enableHealthCheck(clusterName);
+
+ StartCMResult cmResult = TestHelper.startController(clusterName,
+ "controller_0",
+ ZK_ADDR,
+ HelixControllerMain.STANDALONE);
+ cmResult._manager.stopTimerTasks();
+
+ String alertwildcard = "EXP(decay(1.0)(localhost*.RestQueryStats@DBName=TestDB0.latency))CMP(GREATER)CON(10)";
+
+ _setupTool.getClusterManagementTool().addAlert(clusterName, alertwildcard);
+ // start participants
+ for (int i = 0; i < 5; i++) //!!!change back to 5
+ {
+ String instanceName = "localhost_" + (12944 + i);
+
+ participants[i] = new MockParticipant(clusterName,
+ instanceName,
+ ZK_ADDR,
+ new SimpleAlertTransition(i * 5));
+ participants[i].syncStart();
+ // new Thread(participants[i]).start();
+ }
+
+ boolean result = ClusterStateVerifier.verifyByPolling(
+ new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, clusterName));
+ Assert.assertTrue(result);
+
+ Thread.sleep(1000);
+ // HealthAggregationTask is supposed to run by a timer every 30s
+ // To make sure HealthAggregationTask is run, we invoke it explicitly for this test
+ new HealthStatsAggregationTask(cmResult._manager).run();
+ //sleep for a few seconds to give stats stage time to trigger
+ Thread.sleep(1000);
+
+ // other verifications go here
+ ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient));
+ Builder keyBuilder = accessor.keyBuilder();
+ ZNRecord record = accessor.getProperty(keyBuilder.alertStatus()).getRecord();
+ Map<String, Map<String,String>> recMap = record.getMapFields();
+ for(int i = 0; i < 2; i++)
+ {
+ String alertString = "(localhost_"+(12944 + i)+".RestQueryStats@DBName=TestDB0.latency)";
+ Map<String,String> alertStatusMap = recMap.get(alertwildcard+" : " + alertString);
+ String val = alertStatusMap.get(AlertValueAndStatus.VALUE_NAME);
+ boolean fired = Boolean.parseBoolean(alertStatusMap.get(AlertValueAndStatus.FIRED_NAME));
+ Assert.assertEquals(Double.parseDouble(val), (double)i * 5 + 0.1);
+ Assert.assertFalse(fired);
+ }
+ for(int i = 2; i < 5; i++)
+ {
+ String alertString = "(localhost_"+(12944 + i)+".RestQueryStats@DBName=TestDB0.latency)";
+ Map<String,String> alertStatusMap = recMap.get(alertwildcard+" : " + alertString);
+ String val = alertStatusMap.get(AlertValueAndStatus.VALUE_NAME);
+ boolean fired = Boolean.parseBoolean(alertStatusMap.get(AlertValueAndStatus.FIRED_NAME));
+ Assert.assertEquals(Double.parseDouble(val), (double)i * 5 + 0.1);
+ Assert.assertTrue(fired);
+ }
+ ZNRecord alertHistory = accessor.getProperty(keyBuilder.alertHistory()).getRecord();
+
+ String deltakey = (String) (alertHistory.getMapFields().keySet().toArray()[0]);
+ Map<String, String> delta = alertHistory.getMapField(deltakey);
+ Assert.assertEquals(delta.size() , 3);
+ for(int i = 2; i < 5; i++)
+ {
+ String alertString = "(localhost_"+(12944 + i)+".RestQueryStats@DBName#TestDB0.latency)GREATER(10)";
+ Assert.assertTrue(delta.get(alertString).equals("ON"));
+ }
+
+ // Drop and add another alert
+ _setupTool.getClusterManagementTool().dropAlert(clusterName, alertwildcard);
+ alertwildcard = "EXP(decay(1.0)(localhost*.RestQueryStats@DBName=TestDB0.latency))CMP(GREATER)CON(15)";
+ _setupTool.getClusterManagementTool().addAlert(clusterName, alertwildcard);
+ new HealthStatsAggregationTask(cmResult._manager).run();
+ Thread.sleep(1000);
+
+ record = accessor.getProperty(keyBuilder.alertStatus()).getRecord();
+ recMap = record.getMapFields();
+ for(int i = 0; i < 3; i++)
+ {
+ String alertString = "(localhost_"+(12944 + i)+".RestQueryStats@DBName=TestDB0.latency)";
+ Map<String,String> alertStatusMap = recMap.get(alertwildcard+" : " + alertString);
+ String val = alertStatusMap.get(AlertValueAndStatus.VALUE_NAME);
+ boolean fired = Boolean.parseBoolean(alertStatusMap.get(AlertValueAndStatus.FIRED_NAME));
+ Assert.assertEquals(Double.parseDouble(val), (double)i * 5 + 0.1);
+ Assert.assertFalse(fired);
+ }
+ for(int i = 3; i < 5; i++)
+ {
+ String alertString = "(localhost_"+(12944 + i)+".RestQueryStats@DBName=TestDB0.latency)";
+ Map<String,String> alertStatusMap = recMap.get(alertwildcard+" : " + alertString);
+ String val = alertStatusMap.get(AlertValueAndStatus.VALUE_NAME);
+ boolean fired = Boolean.parseBoolean(alertStatusMap.get(AlertValueAndStatus.FIRED_NAME));
+ Assert.assertEquals(Double.parseDouble(val), (double)i * 5 + 0.1);
+ Assert.assertTrue(fired);
+ }
+ alertHistory = accessor.getProperty(keyBuilder.alertHistory()).getRecord();
+
+ deltakey = (String) (alertHistory.getMapFields().keySet().toArray()[1]);
+ delta = alertHistory.getMapField(deltakey);
+ Assert.assertTrue(delta.size() == 2);
+ for(int i = 3; i < 5; i++)
+ {
+ String alertString = "(localhost_"+(12944 + i)+".RestQueryStats@DBName#TestDB0.latency)GREATER(15)";
+ Assert.assertTrue(delta.get(alertString).equals("ON"));
+ }
+
+ System.out.println("END testSimpleWildcardAlert at " + new Date(System.currentTimeMillis()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/healthcheck/TestStalenessAlert.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestStalenessAlert.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestStalenessAlert.java
new file mode 100644
index 0000000..2289661
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/healthcheck/TestStalenessAlert.java
@@ -0,0 +1,192 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.healthcheck;
+
+import java.util.Date;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.TestHelper.StartCMResult;
+import org.apache.helix.alerts.AlertValueAndStatus;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.healthcheck.HealthStatsAggregationTask;
+import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
+import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.mock.storage.MockEspressoHealthReportProvider;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.mock.storage.MockTransition;
+import org.apache.helix.model.Message;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestStalenessAlert extends ZkIntegrationTestBase
+{
+ ZkClient _zkClient;
+ protected ClusterSetup _setupTool = null;
+ protected final String _alertStr = "EXP(decay(1)(localhost_*.reportingage))CMP(GREATER)CON(600)";
+ protected final String _alertStatusStr = _alertStr+" : (localhost_12918.reportingage)";
+ protected final String _dbName = "TestDB0";
+
+ @BeforeClass ()
+ public void beforeClass() throws Exception
+ {
+ _zkClient = new ZkClient(ZK_ADDR);
+ _zkClient.setZkSerializer(new ZNRecordSerializer());
+
+ _setupTool = new ClusterSetup(ZK_ADDR);
+ }
+
+ @AfterClass
+ public void afterClass()
+ {
+ _zkClient.close();
+ }
+
+ public class StalenessAlertTransition extends MockTransition
+ {
+ @Override
+ public void doTransition(Message message, NotificationContext context)
+ {
+ HelixManager manager = context.getManager();
+ HelixDataAccessor accessor = manager.getHelixDataAccessor();
+ String fromState = message.getFromState();
+ String toState = message.getToState();
+ String instance = message.getTgtName();
+ String partition = message.getPartitionName();
+
+ if (fromState.equalsIgnoreCase("SLAVE")
+ && toState.equalsIgnoreCase("MASTER"))
+ {
+
+ //add a stat and report to ZK
+ //perhaps should keep reporter per instance...
+ ParticipantHealthReportCollectorImpl reporter =
+ new ParticipantHealthReportCollectorImpl(manager, instance);
+ MockEspressoHealthReportProvider provider = new
+ MockEspressoHealthReportProvider();
+ reporter.addHealthReportProvider(provider);
+ String statName = "latency";
+ provider.setStat(_dbName, statName,"15");
+ reporter.transmitHealthReports();
+
+ /*
+ for (int i = 0; i < 5; i++)
+ {
+ accessor.setProperty(PropertyType.HEALTHREPORT,
+ new ZNRecord("mockAlerts" + i),
+ instance,
+ "mockAlerts");
+ try
+ {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ */
+ }
+ }
+
+ }
+
+ @Test()
+ public void testStalenessAlert() throws Exception
+ {
+ String clusterName = getShortClassName();
+ MockParticipant[] participants = new MockParticipant[5];
+
+ System.out.println("START TestStalenessAlert at " + new Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName,
+ ZK_ADDR,
+ 12918, // participant start port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 10, // partitions per resource
+ 5, // number of nodes //change back to 5!!!
+ 3, // replicas //change back to 3!!!
+ "MasterSlave",
+ true); // do rebalance
+ // enableHealthCheck(clusterName);
+
+ _setupTool.getClusterManagementTool().addAlert(clusterName, _alertStr);
+
+ StartCMResult cmResult = TestHelper.startController(clusterName,
+ "controller_0",
+ ZK_ADDR,
+ HelixControllerMain.STANDALONE);
+ // start participants
+ for (int i = 0; i < 5; i++) //!!!change back to 5
+ {
+ String instanceName = "localhost_" + (12918 + i);
+
+ participants[i] = new MockParticipant(clusterName,
+ instanceName,
+ ZK_ADDR,
+ new StalenessAlertTransition());
+ participants[i].syncStart();
+// new Thread(participants[i]).start();
+ }
+
+ boolean result = ClusterStateVerifier.verifyByPolling(
+ new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, clusterName));
+ Assert.assertTrue(result);
+
+ // HealthAggregationTask is supposed to run by a timer every 30s
+ // To make sure HealthAggregationTask is run, we invoke it explicitly for this test
+ new HealthStatsAggregationTask(cmResult._manager).run();
+ //sleep for a few seconds to give stats stage time to trigger
+ Thread.sleep(3000);
+
+ // other verifications go here
+ ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient));
+ Builder keyBuilder = accessor.keyBuilder();
+ //for (int i = 0; i < 1; i++) //change 1 back to 5
+ //{
+ //String instance = "localhost_" + (12918 + i);
+ //String instance = "localhost_12918";
+ ZNRecord record = accessor.getProperty(keyBuilder.alertStatus()).getRecord();
+ Map<String, Map<String,String>> recMap = record.getMapFields();
+ Set<String> keySet = recMap.keySet();
+ Map<String,String> alertStatusMap = recMap.get(_alertStatusStr);
+ String val = alertStatusMap.get(AlertValueAndStatus.VALUE_NAME);
+ boolean fired = Boolean.parseBoolean(alertStatusMap.get(AlertValueAndStatus.FIRED_NAME));
+ //Assert.assertEquals(Double.parseDouble(val), Double.parseDouble("75.0"));
+ // Assert.assertFalse(fired);
+ //}
+
+ System.out.println("END TestStalenessAlert at " + new Date(System.currentTimeMillis()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/healthcheck/TestWildcardAlert.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestWildcardAlert.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestWildcardAlert.java
new file mode 100644
index 0000000..6d60260
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/healthcheck/TestWildcardAlert.java
@@ -0,0 +1,301 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.healthcheck;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import javax.management.AttributeNotFoundException;
+import javax.management.InstanceNotFoundException;
+import javax.management.IntrospectionException;
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanException;
+import javax.management.MBeanInfo;
+import javax.management.MBeanServerConnection;
+import javax.management.MBeanServerNotification;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.management.ReflectionException;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.TestHelper.StartCMResult;
+import org.apache.helix.alerts.AlertValueAndStatus;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.healthcheck.HealthStatsAggregationTask;
+import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
+import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.mock.storage.MockEspressoHealthReportProvider;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.mock.storage.MockTransition;
+import org.apache.helix.model.Message;
+import org.apache.helix.monitoring.mbeans.ClusterAlertMBeanCollection;
+import org.apache.helix.monitoring.mbeans.ClusterMBeanObserver;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestWildcardAlert extends ZkIntegrationTestBase
+{
+ public static class TestClusterMBeanObserver extends ClusterMBeanObserver
+ {
+ public Map<String, Map<String, Object>> _beanValueMap = new HashMap<String, Map<String, Object>>();
+
+ public TestClusterMBeanObserver(String domain) throws InstanceNotFoundException, IOException,
+ MalformedObjectNameException, NullPointerException
+ {
+ super(domain);
+ }
+
+ @Override
+ public void onMBeanRegistered(MBeanServerConnection server,
+ MBeanServerNotification mbsNotification)
+ {
+ try
+ {
+ MBeanInfo info = _server.getMBeanInfo(mbsNotification.getMBeanName());
+ MBeanAttributeInfo[] infos = info.getAttributes();
+ _beanValueMap.put(mbsNotification.getMBeanName().toString(), new HashMap<String, Object>());
+ for (MBeanAttributeInfo infoItem : infos)
+ {
+ Object val = _server.getAttribute(mbsNotification.getMBeanName(), infoItem.getName());
+ System.out.println(" " + infoItem.getName() + " : "
+ + _server.getAttribute(mbsNotification.getMBeanName(), infoItem.getName())
+ + " type : " + infoItem.getType());
+ _beanValueMap.get(mbsNotification.getMBeanName().toString()).put(infoItem.getName(), val);
+ }
+ } catch (Exception e)
+ {
+ _logger.error("Error getting bean info, domain=" + _domain, e);
+ }
+ }
+
+ @Override
+ public void onMBeanUnRegistered(MBeanServerConnection server,
+ MBeanServerNotification mbsNotification)
+ {
+ _beanValueMap.remove(mbsNotification.getMBeanName().toString());
+ }
+
+ public void refresh() throws MalformedObjectNameException, NullPointerException, InstanceNotFoundException, IntrospectionException, ReflectionException, IOException, AttributeNotFoundException, MBeanException
+ {
+ for(String beanName: _beanValueMap.keySet())
+ {
+ ObjectName objName = new ObjectName(beanName);
+ MBeanInfo info = _server.getMBeanInfo(objName);
+ MBeanAttributeInfo[] infos = info.getAttributes();
+ _beanValueMap.put(objName.toString(), new HashMap<String, Object>());
+ for(MBeanAttributeInfo infoItem : infos)
+ {
+ Object val = _server.getAttribute(objName, infoItem.getName());
+ System.out.println(" " + infoItem.getName() + " : " + _server.getAttribute(objName, infoItem.getName()) + " type : " + infoItem.getType());
+ _beanValueMap.get(objName.toString()).put(infoItem.getName(), val);
+ }
+ }
+ }
+
+ }
+
+ private static final Logger _logger = Logger.getLogger(TestWildcardAlert.class);
+ ZkClient _zkClient;
+ protected ClusterSetup _setupTool = null;
+ protected final String _alertStr = "EXP(decay(1)(localhost_*.RestQueryStats@DBName=TestDB0.latency)|EXPAND|SUMEACH)CMP(GREATER)CON(10)";
+ protected final String _alertStatusStr = _alertStr; // +" : (*)";
+ protected final String _dbName = "TestDB0";
+
+ @BeforeClass()
+ public void beforeClass() throws Exception
+ {
+ _zkClient = new ZkClient(ZK_ADDR);
+ _zkClient.setZkSerializer(new ZNRecordSerializer());
+
+ _setupTool = new ClusterSetup(ZK_ADDR);
+ }
+
+ @AfterClass
+ public void afterClass()
+ {
+ _zkClient.close();
+ }
+
+ public class WildcardAlertTransition extends MockTransition
+ {
+ @Override
+ public void doTransition(Message message, NotificationContext context)
+ {
+ HelixManager manager = context.getManager();
+ HelixDataAccessor accessor = manager.getHelixDataAccessor();
+ String fromState = message.getFromState();
+ String toState = message.getToState();
+ String instance = message.getTgtName();
+ String partition = message.getPartitionName();
+
+ if (fromState.equalsIgnoreCase("SLAVE") && toState.equalsIgnoreCase("MASTER"))
+ {
+ //add a stat and report to ZK
+ //perhaps should keep reporter per instance...
+ ParticipantHealthReportCollectorImpl reporter =
+ new ParticipantHealthReportCollectorImpl(manager, instance);
+ MockEspressoHealthReportProvider provider = new
+ MockEspressoHealthReportProvider();
+ reporter.addHealthReportProvider(provider);
+ String statName = "latency";
+ //using constant as timestamp so that when each partition does this transition,
+ //they do not advance timestamp, and no stats double-counted
+ String timestamp = "12345";
+ provider.setStat(_dbName, statName,"15", timestamp);
+
+
+ //sleep for random time and see about errors.
+ /*
+ Random r = new Random();
+ int x = r.nextInt(30000);
+ try {
+ Thread.sleep(x);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ */
+
+ reporter.transmitHealthReports();
+
+ /*
+ for (int i = 0; i < 5; i++)
+ {
+ accessor.setProperty(PropertyType.HEALTHREPORT,
+ new ZNRecord("mockAlerts" + i),
+ instance,
+ "mockAlerts");
+ try
+ {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ */
+ }
+ }
+
+ }
+
+ @Test()
+ public void testWildcardAlert() throws Exception
+ {
+ String clusterName = getShortClassName();
+ MockParticipant[] participants = new MockParticipant[5];
+
+ System.out.println("START TestWildcardAlert at " + new Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant start
+ // port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 10, // partitions per resource
+ 5, // number of nodes //change back to 5!!!
+ 3, // replicas //change back to 3!!!
+ "MasterSlave", true); // do rebalance
+
+ // enableHealthCheck(clusterName);
+
+ _setupTool.getClusterManagementTool().addAlert(clusterName, _alertStr);
+ // _setupTool.getClusterManagementTool().addAlert(clusterName, _alertStr2);
+
+ StartCMResult cmResult = TestHelper.startController(clusterName, "controller_0", ZK_ADDR, HelixControllerMain.STANDALONE);
+ // start participants
+ for (int i = 0; i < 5; i++) // !!!change back to 5
+ {
+ String instanceName = "localhost_" + (12918 + i);
+
+ participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR,
+ new WildcardAlertTransition());
+ participants[i].syncStart();
+// new Thread(participants[i]).start();
+ }
+
+ TestClusterMBeanObserver jmxMBeanObserver = new TestClusterMBeanObserver(
+ ClusterAlertMBeanCollection.DOMAIN_ALERT);
+
+ boolean result = ClusterStateVerifier.verifyByPolling(
+ new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, clusterName));
+ Assert.assertTrue(result);
+ Thread.sleep(3000);
+ // HealthAggregationTask is supposed to run by a timer every 30s
+ // To make sure HealthAggregationTask is run, we invoke it explicitly for this test
+ new HealthStatsAggregationTask(cmResult._manager).run();
+
+ //sleep for a few seconds to give stats stage time to trigger and for bean to trigger
+ Thread.sleep(3000);
+
+ ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient));
+ Builder keyBuilder = accessor.keyBuilder();
+
+ // for (int i = 0; i < 1; i++) //change 1 back to 5
+ // {
+ // String instance = "localhost_" + (12918 + i);
+ // String instance = "localhost_12918";
+ ZNRecord record = accessor.getProperty(keyBuilder.alertStatus()).getRecord();
+ Map<String, Map<String, String>> recMap = record.getMapFields();
+ Set<String> keySet = recMap.keySet();
+ Map<String, String> alertStatusMap = recMap.get(_alertStatusStr);
+ String val = alertStatusMap.get(AlertValueAndStatus.VALUE_NAME);
+ boolean fired = Boolean.parseBoolean(alertStatusMap.get(AlertValueAndStatus.FIRED_NAME));
+ Assert.assertEquals(Double.parseDouble(val), Double.parseDouble("75.0"));
+ Assert.assertTrue(fired);
+
+
+ // Make sure that the jmxObserver has received all the jmx bean value that is corresponding
+ //to the alerts.
+ jmxMBeanObserver.refresh();
+ Assert.assertTrue(jmxMBeanObserver._beanValueMap.size() >= 1);
+
+ String beanName = "HelixAlerts:alert=EXP(decay(1)(localhost_%.RestQueryStats@DBName#TestDB0.latency)|EXPAND|SUMEACH)CMP(GREATER)CON(10)--(%)";
+ Assert.assertTrue(jmxMBeanObserver._beanValueMap.containsKey(beanName));
+
+ Map<String, Object> beanValueMap = jmxMBeanObserver._beanValueMap.get(beanName);
+ Assert.assertEquals(beanValueMap.size(), 4);
+ Assert.assertEquals((beanValueMap.get("AlertFired")), new Integer(1));
+ Assert.assertEquals((beanValueMap.get("AlertValue")), new Double(75.0));
+ Assert
+ .assertEquals(
+ (String) (beanValueMap.get("SensorName")),
+ "EXP(decay(1)(localhost_%.RestQueryStats@DBName#TestDB0.latency)|EXPAND|SUMEACH)CMP(GREATER)CON(10)--(%)");
+ // }
+
+ System.out.println("END TestWildcardAlert at " + new Date(System.currentTimeMillis()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/FileCMTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/FileCMTestBase.java b/helix-core/src/test/java/org/apache/helix/integration/FileCMTestBase.java
new file mode 100644
index 0000000..0d85dcd
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/FileCMTestBase.java
@@ -0,0 +1,208 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.integration;
+
+import java.util.Date;
+import java.util.List;
+
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.GenericHelixController;
+import org.apache.helix.manager.file.FileHelixAdmin;
+import org.apache.helix.mock.storage.DummyProcess;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.model.InstanceConfig.InstanceConfigProperty;
+import org.apache.helix.store.PropertyJsonComparator;
+import org.apache.helix.store.PropertyJsonSerializer;
+import org.apache.helix.store.file.FilePropertyStore;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.IdealStateCalculatorForStorageNode;
+import org.apache.helix.tools.StateModelConfigGenerator;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+
+
+/**
+ * Test base for dynamic file-based cluster manager
+ *
+ * @author zzhang
+ *
+ */
+
+public class FileCMTestBase
+{
+ private static Logger logger =
+ Logger.getLogger(FileCMTestBase.class);
+ protected final String CLUSTER_NAME = "CLUSTER_"
+ + getShortClassName();
+ private static final String TEST_DB = "TestDB";
+ protected static final String STATE_MODEL = "MasterSlave";
+ protected static final int NODE_NR = 5;
+ protected static final int START_PORT = 12918;
+ final String ROOT_PATH = "/tmp/"
+ + getShortClassName();
+
+ protected final FilePropertyStore<ZNRecord> _fileStore =
+ new FilePropertyStore<ZNRecord>(new PropertyJsonSerializer<ZNRecord>(ZNRecord.class),
+ ROOT_PATH,
+ new PropertyJsonComparator<ZNRecord>(ZNRecord.class));
+ protected HelixManager _manager;
+ protected HelixAdmin _mgmtTool;
+
+ @BeforeClass()
+ public void beforeClass() throws Exception
+ {
+ System.out.println("START BEFORECLASS FileCMTestBase at "
+ + new Date(System.currentTimeMillis()));
+
+ // setup test cluster
+ _mgmtTool = new FileHelixAdmin(_fileStore);
+ _mgmtTool.addCluster(CLUSTER_NAME, true);
+
+ StateModelConfigGenerator generator = new StateModelConfigGenerator();
+ _mgmtTool.addStateModelDef(CLUSTER_NAME, "LeaderStandby",
+ new StateModelDefinition(generator.generateConfigForLeaderStandby()));
+
+ _mgmtTool.addStateModelDef(CLUSTER_NAME,
+ "OnlineOffline",
+ new StateModelDefinition(generator.generateConfigForOnlineOffline()));
+ _mgmtTool.addResource(CLUSTER_NAME, TEST_DB, 10, STATE_MODEL);
+ for (int i = 0; i < NODE_NR; i++)
+ {
+ addNodeToCluster(CLUSTER_NAME, "localhost", START_PORT + i);
+ }
+ rebalanceStorageCluster(CLUSTER_NAME, TEST_DB, 3);
+
+ // start dummy storage nodes
+ for (int i = 0; i < NODE_NR; i++)
+ {
+ DummyProcess process =
+ new DummyProcess(null,
+ CLUSTER_NAME,
+ "localhost_" + (START_PORT + i),
+ "dynamic-file",
+ null,
+ 0,
+ _fileStore);
+ try
+ {
+ process.start();
+ }
+ catch (Exception e)
+ {
+ logger.error("fail to start dummy participant using dynmaic file-based cluster-manager",
+ e);
+ }
+
+ _manager =
+ HelixManagerFactory.getDynamicFileHelixManager(CLUSTER_NAME,
+ "controller_0",
+ InstanceType.CONTROLLER,
+ _fileStore);
+
+ }
+
+ // start cluster manager controller
+ GenericHelixController controller = new GenericHelixController();
+ try
+ {
+ // manager.addConfigChangeListener(controller);
+ _manager.addLiveInstanceChangeListener(controller);
+ _manager.addIdealStateChangeListener(controller);
+ // manager.addExternalViewChangeListener(controller);
+ _manager.connect();
+ }
+ catch (Exception e)
+ {
+ logger.error("fail to start controller using dynamic file-based cluster-manager ",
+ e);
+ }
+
+ boolean result =
+ ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewFileVerifier(ROOT_PATH,
+ CLUSTER_NAME));
+ Assert.assertTrue(result);
+
+ System.out.println("END BEFORECLASS FileCMTestBase at "
+ + new Date(System.currentTimeMillis()));
+ }
+
+ @AfterClass()
+ public void afterClass() throws Exception
+ {
+ logger.info("START afterClass FileCMTestBase shutting down file-based cluster managers at "
+ + new Date(System.currentTimeMillis()));
+
+ // Thread.sleep(3000);
+ // _store.stop();
+ _manager.disconnect();
+ _manager.disconnect(); // test if disconnect() can be called twice
+
+ logger.info("END afterClass FileCMTestBase at "
+ + new Date(System.currentTimeMillis()));
+
+ }
+
+ private String getShortClassName()
+ {
+ String className = this.getClass().getName();
+ return className.substring(className.lastIndexOf('.') + 1);
+ }
+
+ private void addNodeToCluster(String clusterName, String host, int port)
+ {
+ // TODO use ClusterSetup
+ String nodeId = host + "_" + port;
+ ZNRecord nodeConfig = new ZNRecord(nodeId);
+ nodeConfig.setSimpleField(InstanceConfigProperty.HELIX_HOST.toString(), host);
+ nodeConfig.setSimpleField(InstanceConfigProperty.HELIX_PORT.toString(),
+ Integer.toString(port));
+ nodeConfig.setSimpleField(InstanceConfigProperty.HELIX_ENABLED.toString(),
+ Boolean.toString(true));
+ _mgmtTool.addInstance(CLUSTER_NAME, new InstanceConfig(nodeConfig));
+ }
+
+ protected void rebalanceStorageCluster(String clusterName,
+ String resourceName,
+ int replica)
+ {
+ List<String> nodeNames = _mgmtTool.getInstancesInCluster(clusterName);
+
+ IdealState idealState = _mgmtTool.getResourceIdealState(clusterName, resourceName);
+ idealState.setReplicas(Integer.toString(replica));
+ int partitions = idealState.getNumPartitions();
+
+ ZNRecord newIdealState =
+ IdealStateCalculatorForStorageNode.calculateIdealState(nodeNames,
+ partitions,
+ replica - 1,
+ resourceName,
+ "MASTER",
+ "SLAVE");
+
+ newIdealState.merge(idealState.getRecord());
+ _mgmtTool.setResourceIdealState(clusterName,
+ resourceName,
+ new IdealState(newIdealState));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/IntegrationTest.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/IntegrationTest.java b/helix-core/src/test/java/org/apache/helix/integration/IntegrationTest.java
new file mode 100644
index 0000000..5e1a68f
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/IntegrationTest.java
@@ -0,0 +1,39 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.integration;
+
+import java.util.Date;
+
+import org.testng.annotations.Test;
+
+/**
+ * This is a simple integration test. We will use this until we have framework
+ * which helps us write integration tests easily
+ *
+ * @author kgopalak
+ *
+ */
+
+public class IntegrationTest extends ZkStandAloneCMTestBase
+{
+ @Test
+ public void integrationTest() throws Exception
+ {
+ System.out.println("START IntegrationTest at " + new Date(System.currentTimeMillis()));
+// Thread.currentThread().join();
+ System.out.println("END IntegrationTest at " + new Date(System.currentTimeMillis()));
+ }
+}