You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:58 UTC
[30/42] Refactoring the package names and removing jsql parser
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
new file mode 100644
index 0000000..a839353
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
@@ -0,0 +1,126 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.stages;
+
+import java.util.Map;
+
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.stages.AttributeName;
+import org.apache.helix.controller.stages.CurrentStateComputationStage;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.controller.stages.ReadClusterDataStage;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.testng.AssertJUnit;
+import org.testng.annotations.Test;
+
+
+public class TestCurrentStateComputationStage extends BaseStageTest
+{
+
+ @Test
+ public void testEmptyCS()
+ {
+ Map<String, Resource> resourceMap = getResourceMap();
+ event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
+ CurrentStateComputationStage stage = new CurrentStateComputationStage();
+ runStage(event, new ReadClusterDataStage());
+ runStage(event, stage);
+ CurrentStateOutput output =
+ event.getAttribute(AttributeName.CURRENT_STATE.toString());
+ AssertJUnit.assertEquals(output.getCurrentStateMap("testResourceName",
+ new Partition("testResourceName_0"))
+ .size(),
+ 0);
+ }
+
+ @Test
+ public void testSimpleCS()
+ {
+ // setup resource
+ Map<String, Resource> resourceMap = getResourceMap();
+
+ setupLiveInstances(5);
+
+ event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
+ CurrentStateComputationStage stage = new CurrentStateComputationStage();
+ runStage(event, new ReadClusterDataStage());
+ runStage(event, stage);
+ CurrentStateOutput output1 =
+ event.getAttribute(AttributeName.CURRENT_STATE.toString());
+ AssertJUnit.assertEquals(output1.getCurrentStateMap("testResourceName",
+ new Partition("testResourceName_0"))
+ .size(),
+ 0);
+
+ // Add a state transition messages
+ Message message = new Message(Message.MessageType.STATE_TRANSITION, "msg1");
+ message.setFromState("OFFLINE");
+ message.setToState("SLAVE");
+ message.setResourceName("testResourceName");
+ message.setPartitionName("testResourceName_1");
+ message.setTgtName("localhost_3");
+ message.setTgtSessionId("session_3");
+
+ Builder keyBuilder = accessor.keyBuilder();
+ accessor.setProperty(keyBuilder.message("localhost_" + 3, message.getId()), message);
+
+ runStage(event, new ReadClusterDataStage());
+ runStage(event, stage);
+ CurrentStateOutput output2 =
+ event.getAttribute(AttributeName.CURRENT_STATE.toString());
+ String pendingState =
+ output2.getPendingState("testResourceName",
+ new Partition("testResourceName_1"),
+ "localhost_3");
+ AssertJUnit.assertEquals(pendingState, "SLAVE");
+
+ ZNRecord record1 = new ZNRecord("testResourceName");
+ // Add a current state that matches sessionId and one that does not match
+ CurrentState stateWithLiveSession = new CurrentState(record1);
+ stateWithLiveSession.setSessionId("session_3");
+ stateWithLiveSession.setStateModelDefRef("MasterSlave");
+ stateWithLiveSession.setState("testResourceName_1", "OFFLINE");
+ ZNRecord record2 = new ZNRecord("testResourceName");
+ CurrentState stateWithDeadSession = new CurrentState(record2);
+ stateWithDeadSession.setSessionId("session_dead");
+ stateWithDeadSession.setStateModelDefRef("MasterSlave");
+ stateWithDeadSession.setState("testResourceName_1", "MASTER");
+
+ accessor.setProperty(keyBuilder.currentState("localhost_3",
+ "session_3",
+ "testResourceName"),
+ stateWithLiveSession);
+ accessor.setProperty(keyBuilder.currentState("localhost_3",
+ "session_dead",
+ "testResourceName"),
+ stateWithDeadSession);
+ runStage(event, new ReadClusterDataStage());
+ runStage(event, stage);
+ CurrentStateOutput output3 =
+ event.getAttribute(AttributeName.CURRENT_STATE.toString());
+ String currentState =
+ output3.getCurrentState("testResourceName",
+ new Partition("testResourceName_1"),
+ "localhost_3");
+ AssertJUnit.assertEquals(currentState, "OFFLINE");
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
new file mode 100644
index 0000000..ff1ad43
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
@@ -0,0 +1,365 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.stages;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.pipeline.Pipeline;
+import org.apache.helix.controller.stages.AttributeName;
+import org.apache.helix.controller.stages.ClusterEvent;
+import org.apache.helix.controller.stages.MessageSelectionStageOutput;
+import org.apache.helix.controller.stages.MessageThrottleStage;
+import org.apache.helix.controller.stages.MessageThrottleStageOutput;
+import org.apache.helix.controller.stages.ReadClusterDataStage;
+import org.apache.helix.controller.stages.ResourceComputationStage;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.ClusterConstraints;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.ClusterConstraints.ConstraintAttribute;
+import org.apache.helix.model.ClusterConstraints.ConstraintItem;
+import org.apache.helix.model.ClusterConstraints.ConstraintType;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestMessageThrottleStage extends ZkUnitTestBase
+{
+ private static final Logger LOG =
+ Logger.getLogger(TestMessageThrottleStage.class.getName());
+ final String _className = getShortClassName();
+
+ @Test
+ public void testMsgThrottleBasic() throws Exception
+ {
+ String clusterName = "CLUSTER_" + _className + "_basic";
+ System.out.println("START " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ HelixDataAccessor accessor =
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
+ HelixManager manager = new DummyClusterManager(clusterName, accessor);
+
+ // ideal state: node0 is MASTER, node1 is SLAVE
+ // replica=2 means 1 master and 1 slave
+ setupIdealState(clusterName, new int[] { 0, 1 }, new String[] { "TestDB" }, 1, 2);
+ setupLiveInstances(clusterName, new int[] { 0, 1 });
+ setupStateModel(clusterName);
+
+ ClusterEvent event = new ClusterEvent("testEvent");
+ event.addAttribute("helixmanager", manager);
+
+ MessageThrottleStage throttleStage = new MessageThrottleStage();
+ try
+ {
+ runStage(event, throttleStage);
+ Assert.fail("Should throw exception since DATA_CACHE is null");
+ }
+ catch (Exception e)
+ {
+ // OK
+ }
+
+ Pipeline dataRefresh = new Pipeline();
+ dataRefresh.addStage(new ReadClusterDataStage());
+ runPipeline(event, dataRefresh);
+
+ try
+ {
+ runStage(event, throttleStage);
+ Assert.fail("Should throw exception since RESOURCE is null");
+ }
+ catch (Exception e)
+ {
+ // OK
+ }
+ runStage(event, new ResourceComputationStage());
+
+ try
+ {
+ runStage(event, throttleStage);
+ Assert.fail("Should throw exception since MESSAGE_SELECT is null");
+ }
+ catch (Exception e)
+ {
+ // OK
+ }
+ MessageSelectionStageOutput msgSelectOutput = new MessageSelectionStageOutput();
+ List<Message> selectMessages = new ArrayList<Message>();
+ Message msg =
+ createMessage(MessageType.STATE_TRANSITION,
+ "msgId-001",
+ "OFFLINE",
+ "SLAVE",
+ "TestDB",
+ "localhost_0");
+ selectMessages.add(msg);
+
+ msgSelectOutput.addMessages("TestDB", new Partition("TestDB_0"), selectMessages);
+ event.addAttribute(AttributeName.MESSAGES_SELECTED.toString(), msgSelectOutput);
+
+ runStage(event, throttleStage);
+
+ MessageThrottleStageOutput msgThrottleOutput =
+ event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
+ Assert.assertEquals(msgThrottleOutput.getMessages("TestDB", new Partition("TestDB_0"))
+ .size(),
+ 1);
+
+ System.out.println("END " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ }
+
+ @Test()
+ public void testMsgThrottleConstraints() throws Exception
+ {
+ String clusterName = "CLUSTER_" + _className + "_constraints";
+ System.out.println("START " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ HelixDataAccessor accessor =
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
+ HelixManager manager = new DummyClusterManager(clusterName, accessor);
+
+ // ideal state: node0 is MASTER, node1 is SLAVE
+ // replica=2 means 1 master and 1 slave
+ setupIdealState(clusterName, new int[] { 0, 1 }, new String[] { "TestDB" }, 1, 2);
+ setupLiveInstances(clusterName, new int[] { 0, 1 });
+ setupStateModel(clusterName);
+
+ // setup constraints
+ ZNRecord record = new ZNRecord(ConstraintType.MESSAGE_CONSTRAINT.toString());
+
+ // constraint0:
+ // "MESSAGE_TYPE=STATE_TRANSITION,CONSTRAINT_VALUE=ANY"
+ record.setMapField("constraint0", new TreeMap<String, String>());
+ record.getMapField("constraint0").put("MESSAGE_TYPE", "STATE_TRANSITION");
+ record.getMapField("constraint0").put("CONSTRAINT_VALUE", "ANY");
+ ConstraintItem constraint0 = new ConstraintItem(record.getMapField("constraint0"));
+
+ // constraint1:
+ // "MESSAGE_TYPE=STATE_TRANSITION,TRANSITION=OFFLINE-SLAVE,CONSTRAINT_VALUE=ANY"
+ record.setMapField("constraint1", new TreeMap<String, String>());
+ record.getMapField("constraint1").put("MESSAGE_TYPE", "STATE_TRANSITION");
+ record.getMapField("constraint1").put("TRANSITION", "OFFLINE-SLAVE");
+ record.getMapField("constraint1").put("CONSTRAINT_VALUE", "50");
+ ConstraintItem constraint1 = new ConstraintItem(record.getMapField("constraint1"));
+
+ // constraint2:
+ // "MESSAGE_TYPE=STATE_TRANSITION,TRANSITION=OFFLINE-SLAVE,INSTANCE=.*,RESOURCE=TestDB,CONSTRAINT_VALUE=2";
+ record.setMapField("constraint2", new TreeMap<String, String>());
+ record.getMapField("constraint2").put("MESSAGE_TYPE", "STATE_TRANSITION");
+ record.getMapField("constraint2").put("TRANSITION", "OFFLINE-SLAVE");
+ record.getMapField("constraint2").put("INSTANCE", ".*");
+ record.getMapField("constraint2").put("RESOURCE", "TestDB");
+ record.getMapField("constraint2").put("CONSTRAINT_VALUE", "2");
+ ConstraintItem constraint2 = new ConstraintItem(record.getMapField("constraint2"));
+
+ // constraint3:
+ // "MESSAGE_TYPE=STATE_TRANSITION,TRANSITION=OFFLINE-SLAVE,INSTANCE=localhost_12918,RESOURCE=.*,CONSTRAINT_VALUE=1";
+ record.setMapField("constraint3", new TreeMap<String, String>());
+ record.getMapField("constraint3").put("MESSAGE_TYPE", "STATE_TRANSITION");
+ record.getMapField("constraint3").put("TRANSITION", "OFFLINE-SLAVE");
+ record.getMapField("constraint3").put("INSTANCE", "localhost_1");
+ record.getMapField("constraint3").put("RESOURCE", ".*");
+ record.getMapField("constraint3").put("CONSTRAINT_VALUE", "1");
+ ConstraintItem constraint3 = new ConstraintItem(record.getMapField("constraint3"));
+
+ // constraint4:
+ // "MESSAGE_TYPE=STATE_TRANSITION,TRANSITION=OFFLINE-SLAVE,INSTANCE=.*,RESOURCE=.*,CONSTRAINT_VALUE=10"
+ record.setMapField("constraint4", new TreeMap<String, String>());
+ record.getMapField("constraint4").put("MESSAGE_TYPE", "STATE_TRANSITION");
+ record.getMapField("constraint4").put("TRANSITION", "OFFLINE-SLAVE");
+ record.getMapField("constraint4").put("INSTANCE", ".*");
+ record.getMapField("constraint4").put("RESOURCE", ".*");
+ record.getMapField("constraint4").put("CONSTRAINT_VALUE", "10");
+ ConstraintItem constraint4 = new ConstraintItem(record.getMapField("constraint4"));
+
+ // constraint5:
+ // "MESSAGE_TYPE=STATE_TRANSITION,TRANSITION=OFFLINE-SLAVE,INSTANCE=localhost_12918,RESOURCE=TestDB,CONSTRAINT_VALUE=5"
+ record.setMapField("constraint5", new TreeMap<String, String>());
+ record.getMapField("constraint5").put("MESSAGE_TYPE", "STATE_TRANSITION");
+ record.getMapField("constraint5").put("TRANSITION", "OFFLINE-SLAVE");
+ record.getMapField("constraint5").put("INSTANCE", "localhost_0");
+ record.getMapField("constraint5").put("RESOURCE", "TestDB");
+ record.getMapField("constraint5").put("CONSTRAINT_VALUE", "3");
+ ConstraintItem constraint5 = new ConstraintItem(record.getMapField("constraint5"));
+
+ Builder keyBuilder = accessor.keyBuilder();
+ accessor.setProperty(keyBuilder.constraint(ConstraintType.MESSAGE_CONSTRAINT.toString()),
+ new ClusterConstraints(record));
+
+ // ClusterConstraints constraint =
+ // accessor.getProperty(ClusterConstraints.class,
+ // PropertyType.CONFIGS,
+ // ConfigScopeProperty.CONSTRAINT.toString(),
+ // ConstraintType.MESSAGE_CONSTRAINT.toString());
+ ClusterConstraints constraint =
+ accessor.getProperty(keyBuilder.constraint(ConstraintType.MESSAGE_CONSTRAINT.toString()));
+
+ MessageThrottleStage throttleStage = new MessageThrottleStage();
+
+ // test constraintSelection
+ // message1: hit contraintSelection rule1 and rule2
+ Message msg1 =
+ createMessage(MessageType.STATE_TRANSITION,
+ "msgId-001",
+ "OFFLINE",
+ "SLAVE",
+ "TestDB",
+ "localhost_0");
+
+ Map<ConstraintAttribute, String> msgAttr =
+ ClusterConstraints.toConstraintAttributes(msg1);
+ Set<ConstraintItem> matches = constraint.match(msgAttr);
+ System.out.println(msg1 + " matches(" + matches.size() + "): " + matches);
+ Assert.assertEquals(matches.size(), 5);
+ Assert.assertTrue(containsConstraint(matches, constraint0));
+ Assert.assertTrue(containsConstraint(matches, constraint1));
+ Assert.assertTrue(containsConstraint(matches, constraint2));
+ Assert.assertTrue(containsConstraint(matches, constraint4));
+ Assert.assertTrue(containsConstraint(matches, constraint5));
+
+ matches = throttleStage.selectConstraints(matches, msgAttr);
+ System.out.println(msg1 + " matches(" + matches.size() + "): " + matches);
+ Assert.assertEquals(matches.size(), 2);
+ Assert.assertTrue(containsConstraint(matches, constraint1));
+ Assert.assertTrue(containsConstraint(matches, constraint5));
+
+ // message2: hit contraintSelection rule1, rule2, and rule3
+ Message msg2 =
+ createMessage(MessageType.STATE_TRANSITION,
+ "msgId-002",
+ "OFFLINE",
+ "SLAVE",
+ "TestDB",
+ "localhost_1");
+
+ msgAttr = ClusterConstraints.toConstraintAttributes(msg2);
+ matches = constraint.match(msgAttr);
+ System.out.println(msg2 + " matches(" + matches.size() + "): " + matches);
+ Assert.assertEquals(matches.size(), 5);
+ Assert.assertTrue(containsConstraint(matches, constraint0));
+ Assert.assertTrue(containsConstraint(matches, constraint1));
+ Assert.assertTrue(containsConstraint(matches, constraint2));
+ Assert.assertTrue(containsConstraint(matches, constraint3));
+ Assert.assertTrue(containsConstraint(matches, constraint4));
+
+ matches = throttleStage.selectConstraints(matches, msgAttr);
+ System.out.println(msg2 + " matches(" + matches.size() + "): " + matches);
+ Assert.assertEquals(matches.size(), 2);
+ Assert.assertTrue(containsConstraint(matches, constraint1));
+ Assert.assertTrue(containsConstraint(matches, constraint3));
+
+ // test messageThrottleStage
+ ClusterEvent event = new ClusterEvent("testEvent");
+ event.addAttribute("helixmanager", manager);
+
+ Pipeline dataRefresh = new Pipeline();
+ dataRefresh.addStage(new ReadClusterDataStage());
+ runPipeline(event, dataRefresh);
+ runStage(event, new ResourceComputationStage());
+ MessageSelectionStageOutput msgSelectOutput = new MessageSelectionStageOutput();
+
+ Message msg3 =
+ createMessage(MessageType.STATE_TRANSITION,
+ "msgId-003",
+ "OFFLINE",
+ "SLAVE",
+ "TestDB",
+ "localhost_0");
+
+ Message msg4 =
+ createMessage(MessageType.STATE_TRANSITION,
+ "msgId-004",
+ "OFFLINE",
+ "SLAVE",
+ "TestDB",
+ "localhost_0");
+
+ Message msg5 =
+ createMessage(MessageType.STATE_TRANSITION,
+ "msgId-005",
+ "OFFLINE",
+ "SLAVE",
+ "TestDB",
+ "localhost_0");
+
+ Message msg6 =
+ createMessage(MessageType.STATE_TRANSITION,
+ "msgId-006",
+ "OFFLINE",
+ "SLAVE",
+ "TestDB",
+ "localhost_1");
+
+ List<Message> selectMessages = new ArrayList<Message>();
+ selectMessages.add(msg1);
+ selectMessages.add(msg2);
+ selectMessages.add(msg3);
+ selectMessages.add(msg4);
+ selectMessages.add(msg5); // should be throttled
+ selectMessages.add(msg6); // should be throttled
+
+ msgSelectOutput.addMessages("TestDB", new Partition("TestDB_0"), selectMessages);
+ event.addAttribute(AttributeName.MESSAGES_SELECTED.toString(), msgSelectOutput);
+
+ runStage(event, throttleStage);
+
+ MessageThrottleStageOutput msgThrottleOutput =
+ event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
+ List<Message> throttleMessages =
+ msgThrottleOutput.getMessages("TestDB", new Partition("TestDB_0"));
+ Assert.assertEquals(throttleMessages.size(), 4);
+ Assert.assertTrue(throttleMessages.contains(msg1));
+ Assert.assertTrue(throttleMessages.contains(msg2));
+ Assert.assertTrue(throttleMessages.contains(msg3));
+ Assert.assertTrue(throttleMessages.contains(msg4));
+
+ System.out.println("END " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ }
+
+ private boolean containsConstraint(Set<ConstraintItem> constraints,
+ ConstraintItem constraint)
+ {
+ for (ConstraintItem item : constraints)
+ {
+ if (item.toString().equals(constraint.toString()))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ // add pending message test case
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java
new file mode 100644
index 0000000..fd9a7e2
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java
@@ -0,0 +1,118 @@
+package org.apache.helix.controller.stages;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.controller.stages.MessageSelectionStage;
+import org.apache.helix.controller.stages.MessageSelectionStage.Bounds;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestMsgSelectionStage
+{
+ @Test
+ public void testMasterXfer()
+ {
+ System.out.println("START testMasterXfer at " + new Date(System.currentTimeMillis()));
+
+ Map<String, LiveInstance> liveInstances = new HashMap<String, LiveInstance>();
+ liveInstances.put("localhost_0", new LiveInstance("localhost_0"));
+ liveInstances.put("localhost_1", new LiveInstance("localhost_1"));
+
+ Map<String, String> currentStates = new HashMap<String, String>();
+ currentStates.put("localhost_0", "SLAVE");
+ currentStates.put("localhost_1", "MASTER");
+
+ Map<String, String> pendingStates = new HashMap<String, String>();
+
+ List<Message> messages = new ArrayList<Message>();
+ messages.add(TestHelper.createMessage("msgId_0",
+ "SLAVE",
+ "MASTER",
+ "localhost_0",
+ "TestDB",
+ "TestDB_0"));
+ messages.add(TestHelper.createMessage("msgId_1",
+ "MASTER",
+ "SLAVE",
+ "localhost_1",
+ "TestDB",
+ "TestDB_0"));
+
+ Map<String, Bounds> stateConstraints = new HashMap<String, Bounds>();
+ stateConstraints.put("MASTER", new Bounds(0, 1));
+ stateConstraints.put("SLAVE", new Bounds(0, 2));
+
+ Map<String, Integer> stateTransitionPriorities = new HashMap<String, Integer>();
+ stateTransitionPriorities.put("MASTER-SLAVE", 0);
+ stateTransitionPriorities.put("SLAVE-MASTER", 1);
+
+
+ List<Message> selectedMsg =
+ new MessageSelectionStage().selectMessages(liveInstances,
+ currentStates,
+ pendingStates,
+ messages,
+ stateConstraints,
+ stateTransitionPriorities,
+ "OFFLINE");
+
+ Assert.assertEquals(selectedMsg.size(), 1);
+ Assert.assertEquals(selectedMsg.get(0).getMsgId(), "msgId_1");
+ System.out.println("END testMasterXfer at " + new Date(System.currentTimeMillis()));
+ }
+
+ @Test
+ public void testMasterXferAfterMasterResume()
+ {
+ System.out.println("START testMasterXferAfterMasterResume at "
+ + new Date(System.currentTimeMillis()));
+
+ Map<String, LiveInstance> liveInstances = new HashMap<String, LiveInstance>();
+ liveInstances.put("localhost_0", new LiveInstance("localhost_0"));
+ liveInstances.put("localhost_1", new LiveInstance("localhost_1"));
+
+ Map<String, String> currentStates = new HashMap<String, String>();
+ currentStates.put("localhost_0", "SLAVE");
+ currentStates.put("localhost_1", "SLAVE");
+
+ Map<String, String> pendingStates = new HashMap<String, String>();
+ pendingStates.put("localhost_1", "MASTER");
+
+ List<Message> messages = new ArrayList<Message>();
+ messages.add(TestHelper.createMessage("msgId_0",
+ "SLAVE",
+ "MASTER",
+ "localhost_0",
+ "TestDB",
+ "TestDB_0"));
+
+ Map<String, Bounds> stateConstraints = new HashMap<String, Bounds>();
+ stateConstraints.put("MASTER", new Bounds(0, 1));
+ stateConstraints.put("SLAVE", new Bounds(0, 2));
+
+ Map<String, Integer> stateTransitionPriorities = new HashMap<String, Integer>();
+ stateTransitionPriorities.put("MASTER-SLAVE", 0);
+ stateTransitionPriorities.put("SLAVE-MASTER", 1);
+
+ List<Message> selectedMsg =
+ new MessageSelectionStage().selectMessages(liveInstances,
+ currentStates,
+ pendingStates,
+ messages,
+ stateConstraints,
+ stateTransitionPriorities,
+ "OFFLINE");
+
+ Assert.assertEquals(selectedMsg.size(), 0);
+ System.out.println("END testMasterXferAfterMasterResume at "
+ + new Date(System.currentTimeMillis()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/controller/stages/TestParseInfoFromAlert.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestParseInfoFromAlert.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestParseInfoFromAlert.java
new file mode 100644
index 0000000..ded4ec6
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestParseInfoFromAlert.java
@@ -0,0 +1,38 @@
+package org.apache.helix.controller.stages;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.controller.stages.StatsAggregationStage;
+import org.apache.helix.integration.ZkStandAloneCMTestBase;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestParseInfoFromAlert extends ZkStandAloneCMTestBase
+{
+ @Test
+ public void TestParse()
+ {
+ StatsAggregationStage stage = new StatsAggregationStage();
+ String controllerName = CONTROLLER_PREFIX + "_0";
+ HelixManager manager = _startCMResultMap.get(controllerName)._manager;
+
+ String instanceName = stage.parseInstanceName("localhost_12918.TestStat@DB=123.latency", manager);
+ Assert.assertTrue(instanceName.equals("localhost_12918"));
+
+ instanceName = stage.parseInstanceName("localhost_12955.TestStat@DB=123.latency", manager);
+ Assert.assertTrue(instanceName == null);
+
+
+ instanceName = stage.parseInstanceName("localhost_12922.TestStat@DB=123.latency", manager);
+ Assert.assertTrue(instanceName.equals("localhost_12922"));
+
+
+
+ String resourceName = stage.parseResourceName("localhost_12918.TestStat@DB=TestDB.latency", manager);
+ Assert.assertTrue(resourceName.equals("TestDB"));
+
+
+ String partitionName = stage.parsePartitionName("localhost_12918.TestStat@DB=TestDB;Partition=TestDB_22.latency", manager);
+ Assert.assertTrue(partitionName.equals("TestDB_22"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
new file mode 100644
index 0000000..5b9ace5
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
@@ -0,0 +1,428 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.stages;
+
+import java.util.Date;
+import java.util.List;
+
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.controller.pipeline.Pipeline;
+import org.apache.helix.controller.stages.AttributeName;
+import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
+import org.apache.helix.controller.stages.ClusterEvent;
+import org.apache.helix.controller.stages.CurrentStateComputationStage;
+import org.apache.helix.controller.stages.MessageGenerationPhase;
+import org.apache.helix.controller.stages.MessageSelectionStage;
+import org.apache.helix.controller.stages.MessageSelectionStageOutput;
+import org.apache.helix.controller.stages.MessageThrottleStage;
+import org.apache.helix.controller.stages.ReadClusterDataStage;
+import org.apache.helix.controller.stages.ResourceComputationStage;
+import org.apache.helix.controller.stages.TaskAssignmentStage;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Message.Attributes;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestRebalancePipeline extends ZkUnitTestBase
+{
+ private static final Logger LOG =
+ Logger.getLogger(TestRebalancePipeline.class.getName());
+ final String _className = getShortClassName();
+
+ @Test
+ public void testDuplicateMsg()
+ {
+ String clusterName = "CLUSTER_" + _className + "_dup";
+ System.out.println("START " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ HelixDataAccessor accessor =
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
+
+ HelixManager manager = new DummyClusterManager(clusterName, accessor);
+ ClusterEvent event = new ClusterEvent("testEvent");
+ event.addAttribute("helixmanager", manager);
+
+ final String resourceName = "testResource_dup";
+ String[] resourceGroups = new String[] { resourceName };
+ // ideal state: node0 is MASTER, node1 is SLAVE
+ // replica=2 means 1 master and 1 slave
+ setupIdealState(clusterName, new int[] { 0, 1 }, resourceGroups, 1, 2);
+ setupLiveInstances(clusterName, new int[] { 0, 1 });
+ setupStateModel(clusterName);
+
+ // cluster data cache refresh pipeline
+ Pipeline dataRefresh = new Pipeline();
+ dataRefresh.addStage(new ReadClusterDataStage());
+
+ // rebalance pipeline
+ Pipeline rebalancePipeline = new Pipeline();
+ rebalancePipeline.addStage(new ResourceComputationStage());
+ rebalancePipeline.addStage(new CurrentStateComputationStage());
+ rebalancePipeline.addStage(new BestPossibleStateCalcStage());
+ rebalancePipeline.addStage(new MessageGenerationPhase());
+ rebalancePipeline.addStage(new MessageSelectionStage());
+ rebalancePipeline.addStage(new MessageThrottleStage());
+ rebalancePipeline.addStage(new TaskAssignmentStage());
+
+ // round1: set node0 currentState to OFFLINE and node1 currentState to OFFLINE
+ setCurrentState(clusterName,
+ "localhost_0",
+ resourceName,
+ resourceName + "_0",
+ "session_0",
+ "OFFLINE");
+ setCurrentState(clusterName,
+ "localhost_1",
+ resourceName,
+ resourceName + "_0",
+ "session_1",
+ "SLAVE");
+
+ runPipeline(event, dataRefresh);
+ runPipeline(event, rebalancePipeline);
+ MessageSelectionStageOutput msgSelOutput =
+ event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
+ List<Message> messages =
+ msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
+ Assert.assertEquals(messages.size(),
+ 1,
+ "Should output 1 message: OFFLINE-SLAVE for node0");
+ Message message = messages.get(0);
+ Assert.assertEquals(message.getFromState(), "OFFLINE");
+ Assert.assertEquals(message.getToState(), "SLAVE");
+ Assert.assertEquals(message.getTgtName(), "localhost_0");
+
+ // round2: updates node0 currentState to SLAVE but keep the
+ // message, make sure controller should not send S->M until removal is done
+ setCurrentState(clusterName,
+ "localhost_0",
+ resourceName,
+ resourceName + "_0",
+ "session_1",
+ "SLAVE");
+
+ runPipeline(event, dataRefresh);
+ runPipeline(event, rebalancePipeline);
+ msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
+ messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
+ Assert.assertEquals(messages.size(),
+ 0,
+ "Should NOT output 1 message: SLAVE-MASTER for node1");
+
+ System.out.println("END " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ }
+
+ @Test
+ public void testMsgTriggeredRebalance() throws Exception
+ {
+ String clusterName = "CLUSTER_" + _className + "_msgTrigger";
+ System.out.println("START " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ HelixDataAccessor accessor =
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
+ HelixManager manager = new DummyClusterManager(clusterName, accessor);
+ ClusterEvent event = new ClusterEvent("testEvent");
+
+ final String resourceName = "testResource_dup";
+ String[] resourceGroups = new String[] { resourceName };
+
+ TestHelper.setupEmptyCluster(_gZkClient, clusterName);
+
+ // ideal state: node0 is MASTER, node1 is SLAVE
+ // replica=2 means 1 master and 1 slave
+ setupIdealState(clusterName, new int[] { 0, 1 }, resourceGroups, 1, 2);
+ setupStateModel(clusterName);
+ setupInstances(clusterName, new int[] { 0, 1 });
+ setupLiveInstances(clusterName, new int[] { 0, 1 });
+
+ TestHelper.startController(clusterName,
+ "controller_0",
+ ZK_ADDR,
+ HelixControllerMain.STANDALONE);
+
+ // round1: controller sends O->S to both node0 and node1
+ Thread.sleep(1000);
+
+ Builder keyBuilder = accessor.keyBuilder();
+ List<String> messages = accessor.getChildNames(keyBuilder.messages("localhost_0"));
+ Assert.assertEquals(messages.size(), 1);
+ messages = accessor.getChildNames(keyBuilder.messages("localhost_1"));
+ Assert.assertEquals(messages.size(), 1);
+
+ // round2: node0 and node1 update current states but not removing messages
+ // controller's rebalance pipeline should be triggered but since messages are not
+ // removed
+ // no new messages will be sent
+ setCurrentState(clusterName,
+ "localhost_0",
+ resourceName,
+ resourceName + "_0",
+ "session_0",
+ "SLAVE");
+ setCurrentState(clusterName,
+ "localhost_1",
+ resourceName,
+ resourceName + "_0",
+ "session_1",
+ "SLAVE");
+ Thread.sleep(1000);
+ messages = accessor.getChildNames(keyBuilder.messages("localhost_0"));
+ Assert.assertEquals(messages.size(), 1);
+
+ messages = accessor.getChildNames(keyBuilder.messages("localhost_1"));
+ Assert.assertEquals(messages.size(), 1);
+
+ // round3: node0 removes message and controller's rebalance pipeline should be
+ // triggered
+ // and sends S->M to node0
+ messages = accessor.getChildNames(keyBuilder.messages("localhost_0"));
+ accessor.removeProperty(keyBuilder.message("localhost_0", messages.get(0)));
+ Thread.sleep(1000);
+
+ messages = accessor.getChildNames(keyBuilder.messages("localhost_0"));
+ Assert.assertEquals(messages.size(), 1);
+ ZNRecord msg =
+ accessor.getProperty(keyBuilder.message("localhost_0", messages.get(0)))
+ .getRecord();
+ String toState = msg.getSimpleField(Attributes.TO_STATE.toString());
+ Assert.assertEquals(toState, "MASTER");
+
+ System.out.println("END " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ }
+
+ @Test
+ public void testChangeIdealStateWithPendingMsg()
+ {
+ String clusterName = "CLUSTER_" + _className + "_pending";
+ System.out.println("START " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ HelixDataAccessor accessor =
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
+ HelixManager manager = new DummyClusterManager(clusterName, accessor);
+ ClusterEvent event = new ClusterEvent("testEvent");
+ event.addAttribute("helixmanager", manager);
+
+ final String resourceName = "testResource_pending";
+ String[] resourceGroups = new String[] { resourceName };
+ // ideal state: node0 is MASTER, node1 is SLAVE
+ // replica=2 means 1 master and 1 slave
+ setupIdealState(clusterName, new int[] { 0, 1 }, resourceGroups, 1, 2);
+ setupLiveInstances(clusterName, new int[] { 0, 1 });
+ setupStateModel(clusterName);
+
+ // cluster data cache refresh pipeline
+ Pipeline dataRefresh = new Pipeline();
+ dataRefresh.addStage(new ReadClusterDataStage());
+
+ // rebalance pipeline
+ Pipeline rebalancePipeline = new Pipeline();
+ rebalancePipeline.addStage(new ResourceComputationStage());
+ rebalancePipeline.addStage(new CurrentStateComputationStage());
+ rebalancePipeline.addStage(new BestPossibleStateCalcStage());
+ rebalancePipeline.addStage(new MessageGenerationPhase());
+ rebalancePipeline.addStage(new MessageSelectionStage());
+ rebalancePipeline.addStage(new MessageThrottleStage());
+ rebalancePipeline.addStage(new TaskAssignmentStage());
+
+ // round1: set node0 currentState to OFFLINE and node1 currentState to SLAVE
+ setCurrentState(clusterName,
+ "localhost_0",
+ resourceName,
+ resourceName + "_0",
+ "session_0",
+ "OFFLINE");
+ setCurrentState(clusterName,
+ "localhost_1",
+ resourceName,
+ resourceName + "_0",
+ "session_1",
+ "SLAVE");
+
+ runPipeline(event, dataRefresh);
+ runPipeline(event, rebalancePipeline);
+ MessageSelectionStageOutput msgSelOutput =
+ event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
+ List<Message> messages =
+ msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
+ Assert.assertEquals(messages.size(),
+ 1,
+ "Should output 1 message: OFFLINE-SLAVE for node0");
+ Message message = messages.get(0);
+ Assert.assertEquals(message.getFromState(), "OFFLINE");
+ Assert.assertEquals(message.getToState(), "SLAVE");
+ Assert.assertEquals(message.getTgtName(), "localhost_0");
+
+ // round2: drop resource, but keep the
+ // message, make sure controller should not send O->DROPPEDN until O->S is done
+ HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+ admin.dropResource(clusterName, resourceName);
+
+ runPipeline(event, dataRefresh);
+ runPipeline(event, rebalancePipeline);
+ msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
+ messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
+ Assert.assertEquals(messages.size(),
+ 1,
+ "Should output only 1 message: OFFLINE->DROPPED for localhost_1");
+
+ message = messages.get(0);
+ Assert.assertEquals(message.getFromState(), "SLAVE");
+ Assert.assertEquals(message.getToState(), "OFFLINE");
+ Assert.assertEquals(message.getTgtName(), "localhost_1");
+
+ // round3: remove O->S for localhost_0, controller should now send O->DROPPED to
+ // localhost_0
+ Builder keyBuilder = accessor.keyBuilder();
+ List<String> msgIds = accessor.getChildNames(keyBuilder.messages("localhost_0"));
+ accessor.removeProperty(keyBuilder.message("localhost_0", msgIds.get(0)));
+ runPipeline(event, dataRefresh);
+ runPipeline(event, rebalancePipeline);
+ msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
+ messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
+ Assert.assertEquals(messages.size(),
+ 1,
+ "Should output 1 message: OFFLINE->DROPPED for localhost_0");
+ message = messages.get(0);
+ Assert.assertEquals(message.getFromState(), "OFFLINE");
+ Assert.assertEquals(message.getToState(), "DROPPED");
+ Assert.assertEquals(message.getTgtName(), "localhost_0");
+
+ System.out.println("END " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ }
+
+ @Test
+ public void testMasterXfer()
+ {
+ String clusterName = "CLUSTER_" + _className + "_xfer";
+
+ System.out.println("START " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ HelixDataAccessor accessor =
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
+ HelixManager manager = new DummyClusterManager(clusterName, accessor);
+ ClusterEvent event = new ClusterEvent("testEvent");
+ event.addAttribute("helixmanager", manager);
+
+ final String resourceName = "testResource_xfer";
+ String[] resourceGroups = new String[] { resourceName };
+ // ideal state: node0 is MASTER, node1 is SLAVE
+ // replica=2 means 1 master and 1 slave
+ setupIdealState(clusterName, new int[] { 0, 1 }, resourceGroups, 1, 2);
+ setupLiveInstances(clusterName, new int[] { 1 });
+ setupStateModel(clusterName);
+
+ // cluster data cache refresh pipeline
+ Pipeline dataRefresh = new Pipeline();
+ dataRefresh.addStage(new ReadClusterDataStage());
+
+ // rebalance pipeline
+ Pipeline rebalancePipeline = new Pipeline();
+ rebalancePipeline.addStage(new ResourceComputationStage());
+ rebalancePipeline.addStage(new CurrentStateComputationStage());
+ rebalancePipeline.addStage(new BestPossibleStateCalcStage());
+ rebalancePipeline.addStage(new MessageGenerationPhase());
+ rebalancePipeline.addStage(new MessageSelectionStage());
+ rebalancePipeline.addStage(new MessageThrottleStage());
+ rebalancePipeline.addStage(new TaskAssignmentStage());
+
+ // round1: set node1 currentState to SLAVE
+ setCurrentState(clusterName,
+ "localhost_1",
+ resourceName,
+ resourceName + "_0",
+ "session_1",
+ "SLAVE");
+
+ runPipeline(event, dataRefresh);
+ runPipeline(event, rebalancePipeline);
+ MessageSelectionStageOutput msgSelOutput =
+ event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
+ List<Message> messages =
+ msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
+ Assert.assertEquals(messages.size(),
+ 1,
+ "Should output 1 message: SLAVE-MASTER for node1");
+ Message message = messages.get(0);
+ Assert.assertEquals(message.getFromState(), "SLAVE");
+ Assert.assertEquals(message.getToState(), "MASTER");
+ Assert.assertEquals(message.getTgtName(), "localhost_1");
+
+ // round2: updates node0 currentState to SLAVE but keep the
+ // message, make sure controller should not send S->M until removal is done
+ setupLiveInstances(clusterName, new int[] { 0 });
+ setCurrentState(clusterName,
+ "localhost_0",
+ resourceName,
+ resourceName + "_0",
+ "session_0",
+ "SLAVE");
+
+ runPipeline(event, dataRefresh);
+ runPipeline(event, rebalancePipeline);
+ msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
+ messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
+ Assert.assertEquals(messages.size(),
+ 0,
+ "Should NOT output 1 message: SLAVE-MASTER for node0");
+
+ System.out.println("END " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ }
+
+ protected void setCurrentState(String clusterName,
+ String instance,
+ String resourceGroupName,
+ String resourceKey,
+ String sessionId,
+ String state)
+ {
+ ZKHelixDataAccessor accessor =
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
+ Builder keyBuilder = accessor.keyBuilder();
+
+ CurrentState curState = new CurrentState(resourceGroupName);
+ curState.setState(resourceKey, state);
+ curState.setSessionId(sessionId);
+ curState.setStateModelDefRef("MasterSlave");
+ accessor.setProperty(keyBuilder.currentState(instance, sessionId, resourceGroupName),
+ curState);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
new file mode 100644
index 0000000..2ca2472
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
@@ -0,0 +1,245 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.stages;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.pipeline.StageContext;
+import org.apache.helix.controller.stages.AttributeName;
+import org.apache.helix.controller.stages.ClusterEvent;
+import org.apache.helix.controller.stages.ReadClusterDataStage;
+import org.apache.helix.controller.stages.ResourceComputationStage;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Resource;
+import org.apache.helix.tools.IdealStateCalculatorForStorageNode;
+import org.testng.AssertJUnit;
+import org.testng.annotations.Test;
+
+
+public class TestResourceComputationStage extends BaseStageTest
+{
+ /**
+ * Case where we have one resource in IdealState
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testSimple() throws Exception
+ {
+ int nodes = 5;
+ List<String> instances = new ArrayList<String>();
+ for (int i = 0; i < nodes; i++)
+ {
+ instances.add("localhost_" + i);
+ }
+ int partitions = 10;
+ int replicas = 1;
+ String resourceName = "testResource";
+ ZNRecord record = IdealStateCalculatorForStorageNode.calculateIdealState(
+ instances, partitions, replicas, resourceName, "MASTER", "SLAVE");
+ IdealState idealState = new IdealState(record);
+ idealState.setStateModelDefRef("MasterSlave");
+
+ HelixDataAccessor accessor = manager.getHelixDataAccessor();
+ Builder keyBuilder = accessor.keyBuilder();
+ accessor.setProperty(keyBuilder.idealStates(resourceName),
+ idealState);
+ ResourceComputationStage stage = new ResourceComputationStage();
+ runStage(event, new ReadClusterDataStage());
+ runStage(event, stage);
+
+ Map<String, Resource> resource = event
+ .getAttribute(AttributeName.RESOURCES.toString());
+ AssertJUnit.assertEquals(1, resource.size());
+
+ AssertJUnit.assertEquals(resource.keySet().iterator().next(),
+ resourceName);
+ AssertJUnit.assertEquals(resource.values().iterator().next()
+ .getResourceName(), resourceName);
+ AssertJUnit.assertEquals(resource.values().iterator().next()
+ .getStateModelDefRef(), idealState.getStateModelDefRef());
+ AssertJUnit.assertEquals(resource.values().iterator().next()
+ .getPartitions().size(), partitions);
+ }
+
+ @Test
+ public void testMultipleResources() throws Exception
+ {
+// List<IdealState> idealStates = new ArrayList<IdealState>();
+ String[] resources = new String[]
+ { "testResource1", "testResource2" };
+ List<IdealState> idealStates = setupIdealState(5, resources, 10, 1);
+ ResourceComputationStage stage = new ResourceComputationStage();
+ runStage(event, new ReadClusterDataStage());
+ runStage(event, stage);
+
+ Map<String, Resource> resourceMap = event
+ .getAttribute(AttributeName.RESOURCES.toString());
+ AssertJUnit.assertEquals(resources.length, resourceMap.size());
+
+ for (int i = 0; i < resources.length; i++)
+ {
+ String resourceName = resources[i];
+ IdealState idealState = idealStates.get(i);
+ AssertJUnit.assertTrue(resourceMap.containsKey(resourceName));
+ AssertJUnit.assertEquals(resourceMap.get(resourceName)
+ .getResourceName(), resourceName);
+ AssertJUnit.assertEquals(resourceMap.get(resourceName)
+ .getStateModelDefRef(), idealState.getStateModelDefRef());
+ AssertJUnit.assertEquals(resourceMap.get(resourceName)
+ .getPartitions().size(), idealState.getNumPartitions());
+ }
+ }
+
+ @Test
+ public void testMultipleResourcesWithSomeDropped() throws Exception
+ {
+ int nodes = 5;
+ List<String> instances = new ArrayList<String>();
+ for (int i = 0; i < nodes; i++)
+ {
+ instances.add("localhost_" + i);
+ }
+ String[] resources = new String[]
+ { "testResource1", "testResource2" };
+ List<IdealState> idealStates = new ArrayList<IdealState>();
+ for (int i = 0; i < resources.length; i++)
+ {
+ int partitions = 10;
+ int replicas = 1;
+ String resourceName = resources[i];
+ ZNRecord record = IdealStateCalculatorForStorageNode
+ .calculateIdealState(instances, partitions, replicas,
+ resourceName, "MASTER", "SLAVE");
+ IdealState idealState = new IdealState(record);
+ idealState.setStateModelDefRef("MasterSlave");
+
+ HelixDataAccessor accessor = manager.getHelixDataAccessor();
+ Builder keyBuilder = accessor.keyBuilder();
+ accessor.setProperty(keyBuilder.idealStates(resourceName),
+ idealState);
+
+
+ idealStates.add(idealState);
+ }
+ // ADD A LIVE INSTANCE WITH A CURRENT STATE THAT CONTAINS RESOURCE WHICH NO
+ // LONGER EXISTS IN IDEALSTATE
+ String instanceName = "localhost_" + 3;
+ LiveInstance liveInstance = new LiveInstance(instanceName);
+ String sessionId = UUID.randomUUID().toString();
+ liveInstance.setSessionId(sessionId);
+
+ HelixDataAccessor accessor = manager.getHelixDataAccessor();
+ Builder keyBuilder = accessor.keyBuilder();
+ accessor.setProperty(keyBuilder.liveInstance(instanceName),
+ liveInstance);
+
+ String oldResource = "testResourceOld";
+ CurrentState currentState = new CurrentState(oldResource);
+ currentState.setState("testResourceOld_0", "OFFLINE");
+ currentState.setState("testResourceOld_1", "SLAVE");
+ currentState.setState("testResourceOld_2", "MASTER");
+ currentState.setStateModelDefRef("MasterSlave");
+ accessor.setProperty(keyBuilder.currentState(instanceName, sessionId, oldResource),
+ currentState);
+
+ ResourceComputationStage stage = new ResourceComputationStage();
+ runStage(event, new ReadClusterDataStage());
+ runStage(event, stage);
+
+ Map<String, Resource> resourceMap = event
+ .getAttribute(AttributeName.RESOURCES.toString());
+ // +1 because it will have one for current state
+ AssertJUnit.assertEquals(resources.length + 1, resourceMap.size());
+
+ for (int i = 0; i < resources.length; i++)
+ {
+ String resourceName = resources[i];
+ IdealState idealState = idealStates.get(i);
+ AssertJUnit.assertTrue(resourceMap.containsKey(resourceName));
+ AssertJUnit.assertEquals(resourceMap.get(resourceName)
+ .getResourceName(), resourceName);
+ AssertJUnit.assertEquals(resourceMap.get(resourceName)
+ .getStateModelDefRef(), idealState.getStateModelDefRef());
+ AssertJUnit.assertEquals(resourceMap.get(resourceName)
+ .getPartitions().size(), idealState.getNumPartitions());
+ }
+ // Test the data derived from CurrentState
+ AssertJUnit.assertTrue(resourceMap.containsKey(oldResource));
+ AssertJUnit.assertEquals(resourceMap.get(oldResource)
+ .getResourceName(), oldResource);
+ AssertJUnit.assertEquals(resourceMap.get(oldResource)
+ .getStateModelDefRef(), currentState.getStateModelDefRef());
+ AssertJUnit
+ .assertEquals(resourceMap.get(oldResource).getPartitions()
+ .size(), currentState.getPartitionStateMap().size());
+ AssertJUnit.assertNotNull(resourceMap.get(oldResource).getPartition("testResourceOld_0"));
+ AssertJUnit.assertNotNull(resourceMap.get(oldResource).getPartition("testResourceOld_1"));
+ AssertJUnit.assertNotNull(resourceMap.get(oldResource).getPartition("testResourceOld_2"));
+
+ }
+
+ @Test
+ public void testNull()
+ {
+ ClusterEvent event = new ClusterEvent("sampleEvent");
+ ResourceComputationStage stage = new ResourceComputationStage();
+ StageContext context = new StageContext();
+ stage.init(context);
+ stage.preProcess();
+ boolean exceptionCaught = false;
+ try
+ {
+ stage.process(event);
+ } catch (Exception e)
+ {
+ exceptionCaught = true;
+ }
+ AssertJUnit.assertTrue(exceptionCaught);
+ stage.postProcess();
+ }
+
+
+// public void testEmptyCluster()
+// {
+// ClusterEvent event = new ClusterEvent("sampleEvent");
+// ClusterManager manager = new Mocks.MockManager();
+// event.addAttribute("clustermanager", manager);
+// ResourceComputationStage stage = new ResourceComputationStage();
+// StageContext context = new StageContext();
+// stage.init(context);
+// stage.preProcess();
+// boolean exceptionCaught = false;
+// try
+// {
+// stage.process(event);
+// } catch (Exception e)
+// {
+// exceptionCaught = true;
+// }
+// Assert.assertTrue(exceptionCaught);
+// stage.postProcess();
+// }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/healthcheck/TestAddDropAlert.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestAddDropAlert.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestAddDropAlert.java
new file mode 100644
index 0000000..d21d02d
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/healthcheck/TestAddDropAlert.java
@@ -0,0 +1,194 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.healthcheck;
+
+import java.util.Date;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.TestHelper.StartCMResult;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.healthcheck.HealthStatsAggregationTask;
+import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
+import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.mock.storage.MockEspressoHealthReportProvider;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.mock.storage.MockTransition;
+import org.apache.helix.model.Message;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestAddDropAlert extends ZkIntegrationTestBase
+{
+ ZkClient _zkClient;
+ protected ClusterSetup _setupTool = null;
+ protected final String _alertStr =
+ "EXP(accumulate()(localhost_12918.RestQueryStats@DBName=TestDB0.latency))CMP(GREATER)CON(10)";
+ protected final String _alertStatusStr = _alertStr; // +" : (*)";
+ protected final String _dbName = "TestDB0";
+
+ @BeforeClass()
+ public void beforeClass() throws Exception
+ {
+ _zkClient = new ZkClient(ZK_ADDR);
+ _zkClient.setZkSerializer(new ZNRecordSerializer());
+
+ _setupTool = new ClusterSetup(ZK_ADDR);
+ }
+
+ @AfterClass
+ public void afterClass()
+ {
+ _zkClient.close();
+ }
+
+ public class AddDropAlertTransition extends MockTransition
+ {
+ @Override
+ public void doTransition(Message message, NotificationContext context)
+ {
+ HelixManager manager = context.getManager();
+ HelixDataAccessor accessor = manager.getHelixDataAccessor();
+ String fromState = message.getFromState();
+ String toState = message.getToState();
+ String instance = message.getTgtName();
+ String partition = message.getPartitionName();
+
+ if (fromState.equalsIgnoreCase("SLAVE") && toState.equalsIgnoreCase("MASTER"))
+ {
+
+ // add a stat and report to ZK
+ // perhaps should keep reporter per instance...
+ ParticipantHealthReportCollectorImpl reporter =
+ new ParticipantHealthReportCollectorImpl(manager, instance);
+ MockEspressoHealthReportProvider provider =
+ new MockEspressoHealthReportProvider();
+ reporter.addHealthReportProvider(provider);
+ String statName = "latency";
+ provider.setStat(_dbName, statName, "15");
+ reporter.transmitHealthReports();
+
+ // sleep long enough for first set of alerts to report and alert to get deleted
+ // then change reported data
+ try
+ {
+ Thread.sleep(10000);
+ }
+ catch (InterruptedException e)
+ {
+ System.err.println("Error sleeping");
+ }
+ provider.setStat(_dbName, statName, "1");
+ reporter.transmitHealthReports();
+
+ /*
+ * for (int i = 0; i < 5; i++) { accessor.setProperty(PropertyType.HEALTHREPORT,
+ * new ZNRecord("mockAlerts" + i), instance, "mockAlerts"); try {
+ * Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated
+ * catch block e.printStackTrace(); } }
+ */
+ }
+ }
+ }
+
+ @Test()
+ public void testAddDropAlert() throws Exception
+ {
+ String clusterName = getShortClassName();
+ MockParticipant[] participants = new MockParticipant[5];
+
+ System.out.println("START TestAddDropAlert at "
+ + new Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant start port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 10, // partitions per resource group
+ 5, // number of nodes //change back to 5!!!
+ 1, // replicas //change back to 3!!!
+ "MasterSlave",
+ true); // do rebalance
+ // enableHealthCheck(clusterName);
+
+ _setupTool.getClusterManagementTool().addAlert(clusterName, _alertStr);
+
+ StartCMResult cmResult =
+ TestHelper.startController(clusterName,
+ "controller_0",
+ ZK_ADDR,
+ HelixControllerMain.STANDALONE);
+ // start participants
+ for (int i = 0; i < 5; i++) // !!!change back to 5
+ {
+ String instanceName = "localhost_" + (12918 + i);
+
+ participants[i] =
+ new MockParticipant(clusterName,
+ instanceName,
+ ZK_ADDR,
+ new AddDropAlertTransition());
+ participants[i].syncStart();
+// new Thread(participants[i]).start();
+ }
+
+ boolean result =
+ ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // drop alert soon after adding, but leave enough time for alert to fire once
+ // Thread.sleep(3000);
+ ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient));
+ Builder keyBuilder = accessor.keyBuilder();
+
+ new HealthStatsAggregationTask(cmResult._manager).run();
+ String instance = "localhost_12918";
+ ZNRecord record = accessor.getProperty(keyBuilder.alertStatus()).getRecord();
+ Map<String, Map<String, String>> recMap = record.getMapFields();
+ Set<String> keySet = recMap.keySet();
+ Assert.assertTrue(keySet.size() > 0);
+
+ _setupTool.getClusterManagementTool().dropAlert(clusterName, _alertStr);
+ new HealthStatsAggregationTask(cmResult._manager).run();
+ // other verifications go here
+ // for (int i = 0; i < 1; i++) //change 1 back to 5
+ // {
+ // String instance = "localhost_" + (12918 + i);
+ record = accessor.getProperty(keyBuilder.alertStatus()).getRecord();
+ recMap = record.getMapFields();
+ keySet = recMap.keySet();
+ Assert.assertEquals(keySet.size(), 0);
+ // }
+
+ System.out.println("END TestAddDropAlert at " + new Date(System.currentTimeMillis()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertActionTriggering.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertActionTriggering.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertActionTriggering.java
new file mode 100644
index 0000000..5699119
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertActionTriggering.java
@@ -0,0 +1,189 @@
+package org.apache.helix.healthcheck;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.ConfigScope;
+import org.apache.helix.ConfigScopeBuilder;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.healthcheck.HealthStatsAggregationTask;
+import org.apache.helix.integration.ZkStandAloneCMTestBaseWithPropertyServerCheck;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.HealthStat;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.InstanceConfig.InstanceConfigProperty;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestAlertActionTriggering extends
+ ZkStandAloneCMTestBaseWithPropertyServerCheck
+{
+ String _statName = "TestStat@DB=db1";
+ String _stat = "TestStat";
+ String metricName1 = "TestMetric1";
+ String metricName2 = "TestMetric2";
+ void setHealthData(int[] val1, int[] val2)
+ {
+ for (int i = 0; i < NODE_NR; i++)
+ {
+ String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+ HelixManager manager = _startCMResultMap.get(instanceName)._manager;
+ ZNRecord record = new ZNRecord(_stat);
+ Map<String, String> valMap = new HashMap<String, String>();
+ valMap.put(metricName1, val1[i] + "");
+ valMap.put(metricName2, val2[i] + "");
+ record.setSimpleField("TimeStamp", new Date().getTime() + "");
+ record.setMapField(_statName, valMap);
+ HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+ Builder keyBuilder = helixDataAccessor.keyBuilder();
+ helixDataAccessor
+ .setProperty(keyBuilder.healthReport( manager.getInstanceName(), record.getId()), new HealthStat(record));
+ }
+ try
+ {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ void setHealthData2(int[] val1)
+ {
+ for (int i = 0; i < NODE_NR; i++)
+ {
+ String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+ HelixManager manager = _startCMResultMap.get(instanceName)._manager;
+ ZNRecord record = new ZNRecord(_stat);
+ Map<String, String> valMap = new HashMap<String, String>();
+ valMap.put(metricName2, val1[i] + "");
+ record.setSimpleField("TimeStamp", new Date().getTime() + "");
+ record.setMapField("TestStat@DB=TestDB;Partition=TestDB_3", valMap);
+ HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+ Builder keyBuilder = helixDataAccessor.keyBuilder();
+ helixDataAccessor
+ .setProperty(keyBuilder.healthReport( manager.getInstanceName(), record.getId()), new HealthStat(record));
+ }
+ try
+ {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testAlertActionDisableNode() throws InterruptedException
+ {
+ ConfigScope scope = new ConfigScopeBuilder().forCluster(CLUSTER_NAME).build();
+ Map<String, String> properties = new HashMap<String, String>();
+ properties.put("healthChange.enabled", "true");
+ _setupTool.getClusterManagementTool().setConfig(scope, properties);
+
+ String alertStr1 = "EXP(decay(1.0)(localhost_*.TestStat@DB=db1.TestMetric1))CMP(GREATER)CON(20)ACTION(DISABLE_INSTANCE)";
+ String alertStr2 = "EXP(decay(1.0)(localhost_*.TestStat@DB=db1.TestMetric2))CMP(GREATER)CON(120)ACTION(DISABLE_INSTANCE)";
+ String alertStr3 = "EXP(decay(1.0)(localhost_*.TestStat@DB=TestDB;Partition=*.TestMetric2))CMP(GREATER)CON(160)ACTION(DISABLE_PARTITION)";
+
+ _setupTool.getClusterManagementTool().addAlert(CLUSTER_NAME, alertStr1);
+ _setupTool.getClusterManagementTool().addAlert(CLUSTER_NAME, alertStr2);
+ _setupTool.getClusterManagementTool().addAlert(CLUSTER_NAME, alertStr3);
+
+ int[] metrics1 = {10, 15, 22, 12, 16};
+ int[] metrics2 = {22, 115, 22, 163,16};
+ int[] metrics3 = {0, 0, 0, 0, 0};
+ setHealthData(metrics1, metrics2);
+
+ String controllerName = CONTROLLER_PREFIX + "_0";
+ HelixManager manager = _startCMResultMap.get(controllerName)._manager;
+
+ HealthStatsAggregationTask task = new HealthStatsAggregationTask(_startCMResultMap.get(controllerName)._manager);
+ task.run();
+ Thread.sleep(4000);
+ HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+ Builder keyBuilder = helixDataAccessor.keyBuilder();
+
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ CLUSTER_NAME));
+ Assert.assertTrue(result);
+
+ Builder kb = manager.getHelixDataAccessor().keyBuilder();
+ ExternalView externalView = manager.getHelixDataAccessor().getProperty(kb.externalView("TestDB"));
+ // Test the DISABLE_INSTANCE alerts
+ String participant1 = "localhost_" + (START_PORT + 3);
+ String participant2 = "localhost_" + (START_PORT + 2);
+ ConfigAccessor configAccessor = manager.getConfigAccessor();
+ scope = new ConfigScopeBuilder().forCluster(manager.getClusterName()).forParticipant(participant1).build();
+ String isEnabled = configAccessor.get(scope, "HELIX_ENABLED");
+ Assert.assertFalse(Boolean.parseBoolean(isEnabled));
+
+ scope = new ConfigScopeBuilder().forCluster(manager.getClusterName()).forParticipant(participant2).build();
+ isEnabled = configAccessor.get(scope, "HELIX_ENABLED");
+ Assert.assertFalse(Boolean.parseBoolean(isEnabled));
+
+ for(String partitionName : externalView.getRecord().getMapFields().keySet())
+ {
+ for(String hostName : externalView.getRecord().getMapField(partitionName).keySet())
+ {
+ if(hostName.equals(participant1) || hostName.equals(participant2))
+ {
+ Assert.assertEquals(externalView.getRecord().getMapField(partitionName).get(hostName), "OFFLINE");
+ }
+ }
+ }
+
+ // enable the disabled instances
+ setHealthData(metrics3, metrics3);
+ task.run();
+ Thread.sleep(1000);
+
+ manager.getClusterManagmentTool().enableInstance(manager.getClusterName(), participant2, true);
+ manager.getClusterManagmentTool().enableInstance(manager.getClusterName(), participant1, true);
+
+ result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ CLUSTER_NAME));
+ Assert.assertTrue(result);
+
+ // Test the DISABLE_PARTITION case
+ int[] metrics4 = {22, 115, 22, 16,163};
+ setHealthData2(metrics4);
+ task.run();
+
+ scope = new ConfigScopeBuilder().forCluster(manager.getClusterName()).forParticipant(participant1).build();
+ isEnabled = configAccessor.get(scope, "HELIX_ENABLED");
+ Assert.assertTrue(Boolean.parseBoolean(isEnabled));
+
+ scope = new ConfigScopeBuilder().forCluster(manager.getClusterName()).forParticipant(participant2).build();
+ isEnabled = configAccessor.get(scope, "HELIX_ENABLED");
+ Assert.assertTrue(Boolean.parseBoolean(isEnabled));
+
+ result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ CLUSTER_NAME));
+ Assert.assertTrue(result);
+ String participant3 = "localhost_" + (START_PORT + 4);
+ externalView = manager.getHelixDataAccessor().getProperty(kb.externalView("TestDB"));
+ Assert.assertTrue(externalView.getRecord().getMapField("TestDB_3").get(participant3).equalsIgnoreCase("OFFLINE"));
+
+ InstanceConfig nodeConfig =
+ helixDataAccessor.getProperty(keyBuilder.instanceConfig(participant3));
+ Assert.assertTrue(
+ nodeConfig.getRecord().getListField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.toString())
+ .contains("TestDB_3"));
+
+ }
+}