You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:58 UTC

[30/42] Refactoring the package names and removing jsql parser

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
new file mode 100644
index 0000000..a839353
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
@@ -0,0 +1,126 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.stages;
+
+import java.util.Map;
+
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.stages.AttributeName;
+import org.apache.helix.controller.stages.CurrentStateComputationStage;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.controller.stages.ReadClusterDataStage;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.testng.AssertJUnit;
+import org.testng.annotations.Test;
+
+
+public class TestCurrentStateComputationStage extends BaseStageTest
+{
+
+  @Test
+  public void testEmptyCS()
+  {
+    Map<String, Resource> resourceMap = getResourceMap();
+    event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
+    CurrentStateComputationStage stage = new CurrentStateComputationStage();
+    runStage(event, new ReadClusterDataStage());
+    runStage(event, stage);
+    CurrentStateOutput output =
+        event.getAttribute(AttributeName.CURRENT_STATE.toString());
+    AssertJUnit.assertEquals(output.getCurrentStateMap("testResourceName",
+                                                       new Partition("testResourceName_0"))
+                                   .size(),
+                             0);
+  }
+
+  @Test
+  public void testSimpleCS()
+  {
+    // setup resource
+    Map<String, Resource> resourceMap = getResourceMap();
+
+    setupLiveInstances(5);
+
+    event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
+    CurrentStateComputationStage stage = new CurrentStateComputationStage();
+    runStage(event, new ReadClusterDataStage());
+    runStage(event, stage);
+    CurrentStateOutput output1 =
+        event.getAttribute(AttributeName.CURRENT_STATE.toString());
+    AssertJUnit.assertEquals(output1.getCurrentStateMap("testResourceName",
+                                                        new Partition("testResourceName_0"))
+                                    .size(),
+                             0);
+
+    // Add a state transition messages
+    Message message = new Message(Message.MessageType.STATE_TRANSITION, "msg1");
+    message.setFromState("OFFLINE");
+    message.setToState("SLAVE");
+    message.setResourceName("testResourceName");
+    message.setPartitionName("testResourceName_1");
+    message.setTgtName("localhost_3");
+    message.setTgtSessionId("session_3");
+
+    Builder keyBuilder = accessor.keyBuilder();
+    accessor.setProperty(keyBuilder.message("localhost_" + 3, message.getId()), message);
+
+    runStage(event, new ReadClusterDataStage());
+    runStage(event, stage);
+    CurrentStateOutput output2 =
+        event.getAttribute(AttributeName.CURRENT_STATE.toString());
+    String pendingState =
+        output2.getPendingState("testResourceName",
+                                new Partition("testResourceName_1"),
+                                "localhost_3");
+    AssertJUnit.assertEquals(pendingState, "SLAVE");
+
+    ZNRecord record1 = new ZNRecord("testResourceName");
+    // Add a current state that matches sessionId and one that does not match
+    CurrentState stateWithLiveSession = new CurrentState(record1);
+    stateWithLiveSession.setSessionId("session_3");
+    stateWithLiveSession.setStateModelDefRef("MasterSlave");
+    stateWithLiveSession.setState("testResourceName_1", "OFFLINE");
+    ZNRecord record2 = new ZNRecord("testResourceName");
+    CurrentState stateWithDeadSession = new CurrentState(record2);
+    stateWithDeadSession.setSessionId("session_dead");
+    stateWithDeadSession.setStateModelDefRef("MasterSlave");
+    stateWithDeadSession.setState("testResourceName_1", "MASTER");
+
+    accessor.setProperty(keyBuilder.currentState("localhost_3",
+                                                 "session_3",
+                                                 "testResourceName"),
+                         stateWithLiveSession);
+    accessor.setProperty(keyBuilder.currentState("localhost_3",
+                                                 "session_dead",
+                                                 "testResourceName"),
+                         stateWithDeadSession);
+    runStage(event, new ReadClusterDataStage());
+    runStage(event, stage);
+    CurrentStateOutput output3 =
+        event.getAttribute(AttributeName.CURRENT_STATE.toString());
+    String currentState =
+        output3.getCurrentState("testResourceName",
+                                new Partition("testResourceName_1"),
+                                "localhost_3");
+    AssertJUnit.assertEquals(currentState, "OFFLINE");
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
new file mode 100644
index 0000000..ff1ad43
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
@@ -0,0 +1,365 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.stages;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.pipeline.Pipeline;
+import org.apache.helix.controller.stages.AttributeName;
+import org.apache.helix.controller.stages.ClusterEvent;
+import org.apache.helix.controller.stages.MessageSelectionStageOutput;
+import org.apache.helix.controller.stages.MessageThrottleStage;
+import org.apache.helix.controller.stages.MessageThrottleStageOutput;
+import org.apache.helix.controller.stages.ReadClusterDataStage;
+import org.apache.helix.controller.stages.ResourceComputationStage;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.ClusterConstraints;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.ClusterConstraints.ConstraintAttribute;
+import org.apache.helix.model.ClusterConstraints.ConstraintItem;
+import org.apache.helix.model.ClusterConstraints.ConstraintType;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestMessageThrottleStage extends ZkUnitTestBase
+{
+  private static final Logger LOG        =
+                                             Logger.getLogger(TestMessageThrottleStage.class.getName());
+  final String                _className = getShortClassName();
+
+  @Test
+  public void testMsgThrottleBasic() throws Exception
+  {
+    String clusterName = "CLUSTER_" + _className + "_basic";
+    System.out.println("START " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+    HelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
+    HelixManager manager = new DummyClusterManager(clusterName, accessor);
+
+    // ideal state: node0 is MASTER, node1 is SLAVE
+    // replica=2 means 1 master and 1 slave
+    setupIdealState(clusterName, new int[] { 0, 1 }, new String[] { "TestDB" }, 1, 2);
+    setupLiveInstances(clusterName, new int[] { 0, 1 });
+    setupStateModel(clusterName);
+
+    ClusterEvent event = new ClusterEvent("testEvent");
+    event.addAttribute("helixmanager", manager);
+
+    MessageThrottleStage throttleStage = new MessageThrottleStage();
+    try
+    {
+      runStage(event, throttleStage);
+      Assert.fail("Should throw exception since DATA_CACHE is null");
+    }
+    catch (Exception e)
+    {
+      // OK
+    }
+
+    Pipeline dataRefresh = new Pipeline();
+    dataRefresh.addStage(new ReadClusterDataStage());
+    runPipeline(event, dataRefresh);
+
+    try
+    {
+      runStage(event, throttleStage);
+      Assert.fail("Should throw exception since RESOURCE is null");
+    }
+    catch (Exception e)
+    {
+      // OK
+    }
+    runStage(event, new ResourceComputationStage());
+
+    try
+    {
+      runStage(event, throttleStage);
+      Assert.fail("Should throw exception since MESSAGE_SELECT is null");
+    }
+    catch (Exception e)
+    {
+      // OK
+    }
+    MessageSelectionStageOutput msgSelectOutput = new MessageSelectionStageOutput();
+    List<Message> selectMessages = new ArrayList<Message>();
+    Message msg =
+        createMessage(MessageType.STATE_TRANSITION,
+                      "msgId-001",
+                      "OFFLINE",
+                      "SLAVE",
+                      "TestDB",
+                      "localhost_0");
+    selectMessages.add(msg);
+
+    msgSelectOutput.addMessages("TestDB", new Partition("TestDB_0"), selectMessages);
+    event.addAttribute(AttributeName.MESSAGES_SELECTED.toString(), msgSelectOutput);
+
+    runStage(event, throttleStage);
+
+    MessageThrottleStageOutput msgThrottleOutput =
+        event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
+    Assert.assertEquals(msgThrottleOutput.getMessages("TestDB", new Partition("TestDB_0"))
+                                         .size(),
+                        1);
+
+    System.out.println("END " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+  }
+
+  @Test()
+  public void testMsgThrottleConstraints() throws Exception
+  {
+    String clusterName = "CLUSTER_" + _className + "_constraints";
+    System.out.println("START " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+    HelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
+    HelixManager manager = new DummyClusterManager(clusterName, accessor);
+
+    // ideal state: node0 is MASTER, node1 is SLAVE
+    // replica=2 means 1 master and 1 slave
+    setupIdealState(clusterName, new int[] { 0, 1 }, new String[] { "TestDB" }, 1, 2);
+    setupLiveInstances(clusterName, new int[] { 0, 1 });
+    setupStateModel(clusterName);
+
+    // setup constraints
+    ZNRecord record = new ZNRecord(ConstraintType.MESSAGE_CONSTRAINT.toString());
+
+    // constraint0:
+    // "MESSAGE_TYPE=STATE_TRANSITION,CONSTRAINT_VALUE=ANY"
+    record.setMapField("constraint0", new TreeMap<String, String>());
+    record.getMapField("constraint0").put("MESSAGE_TYPE", "STATE_TRANSITION");
+    record.getMapField("constraint0").put("CONSTRAINT_VALUE", "ANY");
+    ConstraintItem constraint0 = new ConstraintItem(record.getMapField("constraint0"));
+
+    // constraint1:
+    // "MESSAGE_TYPE=STATE_TRANSITION,TRANSITION=OFFLINE-SLAVE,CONSTRAINT_VALUE=ANY"
+    record.setMapField("constraint1", new TreeMap<String, String>());
+    record.getMapField("constraint1").put("MESSAGE_TYPE", "STATE_TRANSITION");
+    record.getMapField("constraint1").put("TRANSITION", "OFFLINE-SLAVE");
+    record.getMapField("constraint1").put("CONSTRAINT_VALUE", "50");
+    ConstraintItem constraint1 = new ConstraintItem(record.getMapField("constraint1"));
+
+    // constraint2:
+    // "MESSAGE_TYPE=STATE_TRANSITION,TRANSITION=OFFLINE-SLAVE,INSTANCE=.*,RESOURCE=TestDB,CONSTRAINT_VALUE=2";
+    record.setMapField("constraint2", new TreeMap<String, String>());
+    record.getMapField("constraint2").put("MESSAGE_TYPE", "STATE_TRANSITION");
+    record.getMapField("constraint2").put("TRANSITION", "OFFLINE-SLAVE");
+    record.getMapField("constraint2").put("INSTANCE", ".*");
+    record.getMapField("constraint2").put("RESOURCE", "TestDB");
+    record.getMapField("constraint2").put("CONSTRAINT_VALUE", "2");
+    ConstraintItem constraint2 = new ConstraintItem(record.getMapField("constraint2"));
+
+    // constraint3:
+    // "MESSAGE_TYPE=STATE_TRANSITION,TRANSITION=OFFLINE-SLAVE,INSTANCE=localhost_12918,RESOURCE=.*,CONSTRAINT_VALUE=1";
+    record.setMapField("constraint3", new TreeMap<String, String>());
+    record.getMapField("constraint3").put("MESSAGE_TYPE", "STATE_TRANSITION");
+    record.getMapField("constraint3").put("TRANSITION", "OFFLINE-SLAVE");
+    record.getMapField("constraint3").put("INSTANCE", "localhost_1");
+    record.getMapField("constraint3").put("RESOURCE", ".*");
+    record.getMapField("constraint3").put("CONSTRAINT_VALUE", "1");
+    ConstraintItem constraint3 = new ConstraintItem(record.getMapField("constraint3"));
+
+    // constraint4:
+    // "MESSAGE_TYPE=STATE_TRANSITION,TRANSITION=OFFLINE-SLAVE,INSTANCE=.*,RESOURCE=.*,CONSTRAINT_VALUE=10"
+    record.setMapField("constraint4", new TreeMap<String, String>());
+    record.getMapField("constraint4").put("MESSAGE_TYPE", "STATE_TRANSITION");
+    record.getMapField("constraint4").put("TRANSITION", "OFFLINE-SLAVE");
+    record.getMapField("constraint4").put("INSTANCE", ".*");
+    record.getMapField("constraint4").put("RESOURCE", ".*");
+    record.getMapField("constraint4").put("CONSTRAINT_VALUE", "10");
+    ConstraintItem constraint4 = new ConstraintItem(record.getMapField("constraint4"));
+
+    // constraint5:
+    // "MESSAGE_TYPE=STATE_TRANSITION,TRANSITION=OFFLINE-SLAVE,INSTANCE=localhost_12918,RESOURCE=TestDB,CONSTRAINT_VALUE=5"
+    record.setMapField("constraint5", new TreeMap<String, String>());
+    record.getMapField("constraint5").put("MESSAGE_TYPE", "STATE_TRANSITION");
+    record.getMapField("constraint5").put("TRANSITION", "OFFLINE-SLAVE");
+    record.getMapField("constraint5").put("INSTANCE", "localhost_0");
+    record.getMapField("constraint5").put("RESOURCE", "TestDB");
+    record.getMapField("constraint5").put("CONSTRAINT_VALUE", "3");
+    ConstraintItem constraint5 = new ConstraintItem(record.getMapField("constraint5"));
+
+    Builder keyBuilder = accessor.keyBuilder();
+    accessor.setProperty(keyBuilder.constraint(ConstraintType.MESSAGE_CONSTRAINT.toString()),
+                         new ClusterConstraints(record));
+
+    // ClusterConstraints constraint =
+    // accessor.getProperty(ClusterConstraints.class,
+    // PropertyType.CONFIGS,
+    // ConfigScopeProperty.CONSTRAINT.toString(),
+    // ConstraintType.MESSAGE_CONSTRAINT.toString());
+    ClusterConstraints constraint =
+        accessor.getProperty(keyBuilder.constraint(ConstraintType.MESSAGE_CONSTRAINT.toString()));
+
+    MessageThrottleStage throttleStage = new MessageThrottleStage();
+
+    // test constraintSelection
+    // message1: hit contraintSelection rule1 and rule2
+    Message msg1 =
+        createMessage(MessageType.STATE_TRANSITION,
+                      "msgId-001",
+                      "OFFLINE",
+                      "SLAVE",
+                      "TestDB",
+                      "localhost_0");
+
+    Map<ConstraintAttribute, String> msgAttr =
+        ClusterConstraints.toConstraintAttributes(msg1);
+    Set<ConstraintItem> matches = constraint.match(msgAttr);
+    System.out.println(msg1 + " matches(" + matches.size() + "): " + matches);
+    Assert.assertEquals(matches.size(), 5);
+    Assert.assertTrue(containsConstraint(matches, constraint0));
+    Assert.assertTrue(containsConstraint(matches, constraint1));
+    Assert.assertTrue(containsConstraint(matches, constraint2));
+    Assert.assertTrue(containsConstraint(matches, constraint4));
+    Assert.assertTrue(containsConstraint(matches, constraint5));
+
+    matches = throttleStage.selectConstraints(matches, msgAttr);
+    System.out.println(msg1 + " matches(" + matches.size() + "): " + matches);
+    Assert.assertEquals(matches.size(), 2);
+    Assert.assertTrue(containsConstraint(matches, constraint1));
+    Assert.assertTrue(containsConstraint(matches, constraint5));
+
+    // message2: hit contraintSelection rule1, rule2, and rule3
+    Message msg2 =
+        createMessage(MessageType.STATE_TRANSITION,
+                      "msgId-002",
+                      "OFFLINE",
+                      "SLAVE",
+                      "TestDB",
+                      "localhost_1");
+
+    msgAttr = ClusterConstraints.toConstraintAttributes(msg2);
+    matches = constraint.match(msgAttr);
+    System.out.println(msg2 + " matches(" + matches.size() + "): " + matches);
+    Assert.assertEquals(matches.size(), 5);
+    Assert.assertTrue(containsConstraint(matches, constraint0));
+    Assert.assertTrue(containsConstraint(matches, constraint1));
+    Assert.assertTrue(containsConstraint(matches, constraint2));
+    Assert.assertTrue(containsConstraint(matches, constraint3));
+    Assert.assertTrue(containsConstraint(matches, constraint4));
+
+    matches = throttleStage.selectConstraints(matches, msgAttr);
+    System.out.println(msg2 + " matches(" + matches.size() + "): " + matches);
+    Assert.assertEquals(matches.size(), 2);
+    Assert.assertTrue(containsConstraint(matches, constraint1));
+    Assert.assertTrue(containsConstraint(matches, constraint3));
+
+    // test messageThrottleStage
+    ClusterEvent event = new ClusterEvent("testEvent");
+    event.addAttribute("helixmanager", manager);
+
+    Pipeline dataRefresh = new Pipeline();
+    dataRefresh.addStage(new ReadClusterDataStage());
+    runPipeline(event, dataRefresh);
+    runStage(event, new ResourceComputationStage());
+    MessageSelectionStageOutput msgSelectOutput = new MessageSelectionStageOutput();
+
+    Message msg3 =
+        createMessage(MessageType.STATE_TRANSITION,
+                      "msgId-003",
+                      "OFFLINE",
+                      "SLAVE",
+                      "TestDB",
+                      "localhost_0");
+
+    Message msg4 =
+        createMessage(MessageType.STATE_TRANSITION,
+                      "msgId-004",
+                      "OFFLINE",
+                      "SLAVE",
+                      "TestDB",
+                      "localhost_0");
+
+    Message msg5 =
+        createMessage(MessageType.STATE_TRANSITION,
+                      "msgId-005",
+                      "OFFLINE",
+                      "SLAVE",
+                      "TestDB",
+                      "localhost_0");
+
+    Message msg6 =
+        createMessage(MessageType.STATE_TRANSITION,
+                      "msgId-006",
+                      "OFFLINE",
+                      "SLAVE",
+                      "TestDB",
+                      "localhost_1");
+
+    List<Message> selectMessages = new ArrayList<Message>();
+    selectMessages.add(msg1);
+    selectMessages.add(msg2);
+    selectMessages.add(msg3);
+    selectMessages.add(msg4);
+    selectMessages.add(msg5); // should be throttled
+    selectMessages.add(msg6); // should be throttled
+
+    msgSelectOutput.addMessages("TestDB", new Partition("TestDB_0"), selectMessages);
+    event.addAttribute(AttributeName.MESSAGES_SELECTED.toString(), msgSelectOutput);
+
+    runStage(event, throttleStage);
+
+    MessageThrottleStageOutput msgThrottleOutput =
+        event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
+    List<Message> throttleMessages =
+        msgThrottleOutput.getMessages("TestDB", new Partition("TestDB_0"));
+    Assert.assertEquals(throttleMessages.size(), 4);
+    Assert.assertTrue(throttleMessages.contains(msg1));
+    Assert.assertTrue(throttleMessages.contains(msg2));
+    Assert.assertTrue(throttleMessages.contains(msg3));
+    Assert.assertTrue(throttleMessages.contains(msg4));
+
+    System.out.println("END " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+  }
+
+  private boolean containsConstraint(Set<ConstraintItem> constraints,
+                                     ConstraintItem constraint)
+  {
+    for (ConstraintItem item : constraints)
+    {
+      if (item.toString().equals(constraint.toString()))
+      {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  // add pending message test case
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java
new file mode 100644
index 0000000..fd9a7e2
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java
@@ -0,0 +1,118 @@
+package org.apache.helix.controller.stages;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.controller.stages.MessageSelectionStage;
+import org.apache.helix.controller.stages.MessageSelectionStage.Bounds;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestMsgSelectionStage
+{
+  @Test
+  public void testMasterXfer()
+  {
+    System.out.println("START testMasterXfer at " + new Date(System.currentTimeMillis()));
+
+    Map<String, LiveInstance> liveInstances = new HashMap<String, LiveInstance>();
+    liveInstances.put("localhost_0", new LiveInstance("localhost_0"));
+    liveInstances.put("localhost_1", new LiveInstance("localhost_1"));
+
+    Map<String, String> currentStates = new HashMap<String, String>();
+    currentStates.put("localhost_0", "SLAVE");
+    currentStates.put("localhost_1", "MASTER");
+
+    Map<String, String> pendingStates = new HashMap<String, String>();
+
+    List<Message> messages = new ArrayList<Message>();
+    messages.add(TestHelper.createMessage("msgId_0",
+                                          "SLAVE",
+                                          "MASTER",
+                                          "localhost_0",
+                                          "TestDB",
+                                          "TestDB_0"));
+    messages.add(TestHelper.createMessage("msgId_1",
+                                          "MASTER",
+                                          "SLAVE",
+                                          "localhost_1",
+                                          "TestDB",
+                                          "TestDB_0"));
+
+    Map<String, Bounds> stateConstraints = new HashMap<String, Bounds>();
+    stateConstraints.put("MASTER", new Bounds(0, 1));
+    stateConstraints.put("SLAVE", new Bounds(0, 2));
+
+    Map<String, Integer> stateTransitionPriorities = new HashMap<String, Integer>();
+    stateTransitionPriorities.put("MASTER-SLAVE", 0);
+    stateTransitionPriorities.put("SLAVE-MASTER", 1);
+
+
+    List<Message> selectedMsg =
+        new MessageSelectionStage().selectMessages(liveInstances,
+                                                   currentStates,
+                                                   pendingStates,
+                                                   messages,
+                                                   stateConstraints,
+                                                   stateTransitionPriorities,
+                                                   "OFFLINE");
+
+    Assert.assertEquals(selectedMsg.size(), 1);
+    Assert.assertEquals(selectedMsg.get(0).getMsgId(), "msgId_1");
+    System.out.println("END testMasterXfer at " + new Date(System.currentTimeMillis()));
+  }
+
+  @Test
+  public void testMasterXferAfterMasterResume()
+  {
+    System.out.println("START testMasterXferAfterMasterResume at "
+        + new Date(System.currentTimeMillis()));
+
+    Map<String, LiveInstance> liveInstances = new HashMap<String, LiveInstance>();
+    liveInstances.put("localhost_0", new LiveInstance("localhost_0"));
+    liveInstances.put("localhost_1", new LiveInstance("localhost_1"));
+
+    Map<String, String> currentStates = new HashMap<String, String>();
+    currentStates.put("localhost_0", "SLAVE");
+    currentStates.put("localhost_1", "SLAVE");
+
+    Map<String, String> pendingStates = new HashMap<String, String>();
+    pendingStates.put("localhost_1", "MASTER");
+
+    List<Message> messages = new ArrayList<Message>();
+    messages.add(TestHelper.createMessage("msgId_0",
+                                          "SLAVE",
+                                          "MASTER",
+                                          "localhost_0",
+                                          "TestDB",
+                                          "TestDB_0"));
+
+    Map<String, Bounds> stateConstraints = new HashMap<String, Bounds>();
+    stateConstraints.put("MASTER", new Bounds(0, 1));
+    stateConstraints.put("SLAVE", new Bounds(0, 2));
+
+    Map<String, Integer> stateTransitionPriorities = new HashMap<String, Integer>();
+    stateTransitionPriorities.put("MASTER-SLAVE", 0);
+    stateTransitionPriorities.put("SLAVE-MASTER", 1);
+
+    List<Message> selectedMsg =
+        new MessageSelectionStage().selectMessages(liveInstances,
+                                                   currentStates,
+                                                   pendingStates,
+                                                   messages,
+                                                   stateConstraints,
+                                                   stateTransitionPriorities,
+                                                   "OFFLINE");
+
+    Assert.assertEquals(selectedMsg.size(), 0);
+    System.out.println("END testMasterXferAfterMasterResume at "
+        + new Date(System.currentTimeMillis()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/controller/stages/TestParseInfoFromAlert.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestParseInfoFromAlert.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestParseInfoFromAlert.java
new file mode 100644
index 0000000..ded4ec6
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestParseInfoFromAlert.java
@@ -0,0 +1,38 @@
+package org.apache.helix.controller.stages;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.controller.stages.StatsAggregationStage;
+import org.apache.helix.integration.ZkStandAloneCMTestBase;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestParseInfoFromAlert extends ZkStandAloneCMTestBase
+{
+  @Test
+  public void TestParse()
+  {
+    StatsAggregationStage stage = new StatsAggregationStage();
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    HelixManager manager = _startCMResultMap.get(controllerName)._manager;
+    
+    String instanceName  = stage.parseInstanceName("localhost_12918.TestStat@DB=123.latency", manager);
+    Assert.assertTrue(instanceName.equals("localhost_12918"));
+    
+    instanceName  = stage.parseInstanceName("localhost_12955.TestStat@DB=123.latency", manager);
+    Assert.assertTrue(instanceName == null);
+    
+    
+    instanceName  = stage.parseInstanceName("localhost_12922.TestStat@DB=123.latency", manager);
+    Assert.assertTrue(instanceName.equals("localhost_12922"));
+    
+    
+
+    String resourceName  = stage.parseResourceName("localhost_12918.TestStat@DB=TestDB.latency", manager);
+    Assert.assertTrue(resourceName.equals("TestDB"));
+    
+
+    String partitionName  = stage.parsePartitionName("localhost_12918.TestStat@DB=TestDB;Partition=TestDB_22.latency", manager);
+    Assert.assertTrue(partitionName.equals("TestDB_22"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
new file mode 100644
index 0000000..5b9ace5
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
@@ -0,0 +1,428 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.stages;
+
+import java.util.Date;
+import java.util.List;
+
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.controller.pipeline.Pipeline;
+import org.apache.helix.controller.stages.AttributeName;
+import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
+import org.apache.helix.controller.stages.ClusterEvent;
+import org.apache.helix.controller.stages.CurrentStateComputationStage;
+import org.apache.helix.controller.stages.MessageGenerationPhase;
+import org.apache.helix.controller.stages.MessageSelectionStage;
+import org.apache.helix.controller.stages.MessageSelectionStageOutput;
+import org.apache.helix.controller.stages.MessageThrottleStage;
+import org.apache.helix.controller.stages.ReadClusterDataStage;
+import org.apache.helix.controller.stages.ResourceComputationStage;
+import org.apache.helix.controller.stages.TaskAssignmentStage;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Message.Attributes;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestRebalancePipeline extends ZkUnitTestBase
+{
+  private static final Logger LOG        =
+                                             Logger.getLogger(TestRebalancePipeline.class.getName());
+  final String                _className = getShortClassName();
+
+  @Test
+  public void testDuplicateMsg()
+  {
+    String clusterName = "CLUSTER_" + _className + "_dup";
+    System.out.println("START " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+    HelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
+
+    HelixManager manager = new DummyClusterManager(clusterName, accessor);
+    ClusterEvent event = new ClusterEvent("testEvent");
+    event.addAttribute("helixmanager", manager);
+
+    final String resourceName = "testResource_dup";
+    String[] resourceGroups = new String[] { resourceName };
+    // ideal state: node0 is MASTER, node1 is SLAVE
+    // replica=2 means 1 master and 1 slave
+    setupIdealState(clusterName, new int[] { 0, 1 }, resourceGroups, 1, 2);
+    setupLiveInstances(clusterName, new int[] { 0, 1 });
+    setupStateModel(clusterName);
+
+    // cluster data cache refresh pipeline
+    Pipeline dataRefresh = new Pipeline();
+    dataRefresh.addStage(new ReadClusterDataStage());
+
+    // rebalance pipeline
+    Pipeline rebalancePipeline = new Pipeline();
+    rebalancePipeline.addStage(new ResourceComputationStage());
+    rebalancePipeline.addStage(new CurrentStateComputationStage());
+    rebalancePipeline.addStage(new BestPossibleStateCalcStage());
+    rebalancePipeline.addStage(new MessageGenerationPhase());
+    rebalancePipeline.addStage(new MessageSelectionStage());
+    rebalancePipeline.addStage(new MessageThrottleStage());
+    rebalancePipeline.addStage(new TaskAssignmentStage());
+
+    // round1: set node0 currentState to OFFLINE and node1 currentState to OFFLINE
+    setCurrentState(clusterName,
+                    "localhost_0",
+                    resourceName,
+                    resourceName + "_0",
+                    "session_0",
+                    "OFFLINE");
+    setCurrentState(clusterName,
+                    "localhost_1",
+                    resourceName,
+                    resourceName + "_0",
+                    "session_1",
+                    "SLAVE");
+
+    runPipeline(event, dataRefresh);
+    runPipeline(event, rebalancePipeline);
+    MessageSelectionStageOutput msgSelOutput =
+        event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
+    List<Message> messages =
+        msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
+    Assert.assertEquals(messages.size(),
+                        1,
+                        "Should output 1 message: OFFLINE-SLAVE for node0");
+    Message message = messages.get(0);
+    Assert.assertEquals(message.getFromState(), "OFFLINE");
+    Assert.assertEquals(message.getToState(), "SLAVE");
+    Assert.assertEquals(message.getTgtName(), "localhost_0");
+
+    // round2: updates node0 currentState to SLAVE but keep the
+    // message, make sure controller should not send S->M until removal is done
+    setCurrentState(clusterName,
+                    "localhost_0",
+                    resourceName,
+                    resourceName + "_0",
+                    "session_1",
+                    "SLAVE");
+
+    runPipeline(event, dataRefresh);
+    runPipeline(event, rebalancePipeline);
+    msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
+    messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
+    Assert.assertEquals(messages.size(),
+                        0,
+                        "Should NOT output 1 message: SLAVE-MASTER for node1");
+
+    System.out.println("END " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+  }
+
+  @Test
+  public void testMsgTriggeredRebalance() throws Exception
+  {
+    String clusterName = "CLUSTER_" + _className + "_msgTrigger";
+    System.out.println("START " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+    HelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
+    HelixManager manager = new DummyClusterManager(clusterName, accessor);
+    ClusterEvent event = new ClusterEvent("testEvent");
+
+    final String resourceName = "testResource_dup";
+    String[] resourceGroups = new String[] { resourceName };
+
+    TestHelper.setupEmptyCluster(_gZkClient, clusterName);
+
+    // ideal state: node0 is MASTER, node1 is SLAVE
+    // replica=2 means 1 master and 1 slave
+    setupIdealState(clusterName, new int[] { 0, 1 }, resourceGroups, 1, 2);
+    setupStateModel(clusterName);
+    setupInstances(clusterName, new int[] { 0, 1 });
+    setupLiveInstances(clusterName, new int[] { 0, 1 });
+
+    TestHelper.startController(clusterName,
+                               "controller_0",
+                               ZK_ADDR,
+                               HelixControllerMain.STANDALONE);
+
+    // round1: controller sends O->S to both node0 and node1
+    Thread.sleep(1000);
+
+    Builder keyBuilder = accessor.keyBuilder();
+    List<String> messages = accessor.getChildNames(keyBuilder.messages("localhost_0"));
+    Assert.assertEquals(messages.size(), 1);
+    messages = accessor.getChildNames(keyBuilder.messages("localhost_1"));
+    Assert.assertEquals(messages.size(), 1);
+
+    // round2: node0 and node1 update current states but not removing messages
+    // controller's rebalance pipeline should be triggered but since messages are not
+    // removed
+    // no new messages will be sent
+    setCurrentState(clusterName,
+                    "localhost_0",
+                    resourceName,
+                    resourceName + "_0",
+                    "session_0",
+                    "SLAVE");
+    setCurrentState(clusterName,
+                    "localhost_1",
+                    resourceName,
+                    resourceName + "_0",
+                    "session_1",
+                    "SLAVE");
+    Thread.sleep(1000);
+    messages = accessor.getChildNames(keyBuilder.messages("localhost_0"));
+    Assert.assertEquals(messages.size(), 1);
+
+    messages = accessor.getChildNames(keyBuilder.messages("localhost_1"));
+    Assert.assertEquals(messages.size(), 1);
+
+    // round3: node0 removes message and controller's rebalance pipeline should be
+    // triggered
+    // and sends S->M to node0
+    messages = accessor.getChildNames(keyBuilder.messages("localhost_0"));
+    accessor.removeProperty(keyBuilder.message("localhost_0", messages.get(0)));
+    Thread.sleep(1000);
+
+    messages = accessor.getChildNames(keyBuilder.messages("localhost_0"));
+    Assert.assertEquals(messages.size(), 1);
+    ZNRecord msg =
+        accessor.getProperty(keyBuilder.message("localhost_0", messages.get(0)))
+                .getRecord();
+    String toState = msg.getSimpleField(Attributes.TO_STATE.toString());
+    Assert.assertEquals(toState, "MASTER");
+
+    System.out.println("END " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+  }
+
+  @Test
+  public void testChangeIdealStateWithPendingMsg()
+  {
+    String clusterName = "CLUSTER_" + _className + "_pending";
+    System.out.println("START " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+    HelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
+    HelixManager manager = new DummyClusterManager(clusterName, accessor);
+    ClusterEvent event = new ClusterEvent("testEvent");
+    event.addAttribute("helixmanager", manager);
+
+    final String resourceName = "testResource_pending";
+    String[] resourceGroups = new String[] { resourceName };
+    // ideal state: node0 is MASTER, node1 is SLAVE
+    // replica=2 means 1 master and 1 slave
+    setupIdealState(clusterName, new int[] { 0, 1 }, resourceGroups, 1, 2);
+    setupLiveInstances(clusterName, new int[] { 0, 1 });
+    setupStateModel(clusterName);
+
+    // cluster data cache refresh pipeline
+    Pipeline dataRefresh = new Pipeline();
+    dataRefresh.addStage(new ReadClusterDataStage());
+
+    // rebalance pipeline
+    Pipeline rebalancePipeline = new Pipeline();
+    rebalancePipeline.addStage(new ResourceComputationStage());
+    rebalancePipeline.addStage(new CurrentStateComputationStage());
+    rebalancePipeline.addStage(new BestPossibleStateCalcStage());
+    rebalancePipeline.addStage(new MessageGenerationPhase());
+    rebalancePipeline.addStage(new MessageSelectionStage());
+    rebalancePipeline.addStage(new MessageThrottleStage());
+    rebalancePipeline.addStage(new TaskAssignmentStage());
+
+    // round1: set node0 currentState to OFFLINE and node1 currentState to SLAVE
+    setCurrentState(clusterName,
+                    "localhost_0",
+                    resourceName,
+                    resourceName + "_0",
+                    "session_0",
+                    "OFFLINE");
+    setCurrentState(clusterName,
+                    "localhost_1",
+                    resourceName,
+                    resourceName + "_0",
+                    "session_1",
+                    "SLAVE");
+
+    runPipeline(event, dataRefresh);
+    runPipeline(event, rebalancePipeline);
+    MessageSelectionStageOutput msgSelOutput =
+        event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
+    List<Message> messages =
+        msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
+    Assert.assertEquals(messages.size(),
+                        1,
+                        "Should output 1 message: OFFLINE-SLAVE for node0");
+    Message message = messages.get(0);
+    Assert.assertEquals(message.getFromState(), "OFFLINE");
+    Assert.assertEquals(message.getToState(), "SLAVE");
+    Assert.assertEquals(message.getTgtName(), "localhost_0");
+
+    // round2: drop resource, but keep the
+    // message, make sure controller should not send O->DROPPEDN until O->S is done
+    HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+    admin.dropResource(clusterName, resourceName);
+
+    runPipeline(event, dataRefresh);
+    runPipeline(event, rebalancePipeline);
+    msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
+    messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
+    Assert.assertEquals(messages.size(),
+                        1,
+                        "Should output only 1 message: OFFLINE->DROPPED for localhost_1");
+
+    message = messages.get(0);
+    Assert.assertEquals(message.getFromState(), "SLAVE");
+    Assert.assertEquals(message.getToState(), "OFFLINE");
+    Assert.assertEquals(message.getTgtName(), "localhost_1");
+
+    // round3: remove O->S for localhost_0, controller should now send O->DROPPED to
+    // localhost_0
+    Builder keyBuilder = accessor.keyBuilder();
+    List<String> msgIds = accessor.getChildNames(keyBuilder.messages("localhost_0"));
+    accessor.removeProperty(keyBuilder.message("localhost_0", msgIds.get(0)));
+    runPipeline(event, dataRefresh);
+    runPipeline(event, rebalancePipeline);
+    msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
+    messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
+    Assert.assertEquals(messages.size(),
+                        1,
+                        "Should output 1 message: OFFLINE->DROPPED for localhost_0");
+    message = messages.get(0);
+    Assert.assertEquals(message.getFromState(), "OFFLINE");
+    Assert.assertEquals(message.getToState(), "DROPPED");
+    Assert.assertEquals(message.getTgtName(), "localhost_0");
+
+    System.out.println("END " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+  }
+
+  @Test
+  public void testMasterXfer()
+  {
+    String clusterName = "CLUSTER_" + _className + "_xfer";
+
+    System.out.println("START " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+    HelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
+    HelixManager manager = new DummyClusterManager(clusterName, accessor);
+    ClusterEvent event = new ClusterEvent("testEvent");
+    event.addAttribute("helixmanager", manager);
+
+    final String resourceName = "testResource_xfer";
+    String[] resourceGroups = new String[] { resourceName };
+    // ideal state: node0 is MASTER, node1 is SLAVE
+    // replica=2 means 1 master and 1 slave
+    setupIdealState(clusterName, new int[] { 0, 1 }, resourceGroups, 1, 2);
+    setupLiveInstances(clusterName, new int[] { 1 });
+    setupStateModel(clusterName);
+
+    // cluster data cache refresh pipeline
+    Pipeline dataRefresh = new Pipeline();
+    dataRefresh.addStage(new ReadClusterDataStage());
+
+    // rebalance pipeline
+    Pipeline rebalancePipeline = new Pipeline();
+    rebalancePipeline.addStage(new ResourceComputationStage());
+    rebalancePipeline.addStage(new CurrentStateComputationStage());
+    rebalancePipeline.addStage(new BestPossibleStateCalcStage());
+    rebalancePipeline.addStage(new MessageGenerationPhase());
+    rebalancePipeline.addStage(new MessageSelectionStage());
+    rebalancePipeline.addStage(new MessageThrottleStage());
+    rebalancePipeline.addStage(new TaskAssignmentStage());
+
+    // round1: set node1 currentState to SLAVE
+    setCurrentState(clusterName,
+                    "localhost_1",
+                    resourceName,
+                    resourceName + "_0",
+                    "session_1",
+                    "SLAVE");
+
+    runPipeline(event, dataRefresh);
+    runPipeline(event, rebalancePipeline);
+    MessageSelectionStageOutput msgSelOutput =
+        event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
+    List<Message> messages =
+        msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
+    Assert.assertEquals(messages.size(),
+                        1,
+                        "Should output 1 message: SLAVE-MASTER for node1");
+    Message message = messages.get(0);
+    Assert.assertEquals(message.getFromState(), "SLAVE");
+    Assert.assertEquals(message.getToState(), "MASTER");
+    Assert.assertEquals(message.getTgtName(), "localhost_1");
+
+    // round2: updates node0 currentState to SLAVE but keep the
+    // message, make sure controller should not send S->M until removal is done
+    setupLiveInstances(clusterName, new int[] { 0 });
+    setCurrentState(clusterName,
+                    "localhost_0",
+                    resourceName,
+                    resourceName + "_0",
+                    "session_0",
+                    "SLAVE");
+
+    runPipeline(event, dataRefresh);
+    runPipeline(event, rebalancePipeline);
+    msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
+    messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
+    Assert.assertEquals(messages.size(),
+                        0,
+                        "Should NOT output 1 message: SLAVE-MASTER for node0");
+
+    System.out.println("END " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+  }
+
+  protected void setCurrentState(String clusterName,
+                                 String instance,
+                                 String resourceGroupName,
+                                 String resourceKey,
+                                 String sessionId,
+                                 String state)
+  {
+    ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
+    Builder keyBuilder = accessor.keyBuilder();
+
+    CurrentState curState = new CurrentState(resourceGroupName);
+    curState.setState(resourceKey, state);
+    curState.setSessionId(sessionId);
+    curState.setStateModelDefRef("MasterSlave");
+    accessor.setProperty(keyBuilder.currentState(instance, sessionId, resourceGroupName),
+                         curState);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
new file mode 100644
index 0000000..2ca2472
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
@@ -0,0 +1,245 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.stages;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.pipeline.StageContext;
+import org.apache.helix.controller.stages.AttributeName;
+import org.apache.helix.controller.stages.ClusterEvent;
+import org.apache.helix.controller.stages.ReadClusterDataStage;
+import org.apache.helix.controller.stages.ResourceComputationStage;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Resource;
+import org.apache.helix.tools.IdealStateCalculatorForStorageNode;
+import org.testng.AssertJUnit;
+import org.testng.annotations.Test;
+
+
+public class TestResourceComputationStage extends BaseStageTest
+{
+  /**
+   * Case where we have one resource in IdealState
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testSimple() throws Exception
+  {
+    int nodes = 5;
+    List<String> instances = new ArrayList<String>();
+    for (int i = 0; i < nodes; i++)
+    {
+      instances.add("localhost_" + i);
+    }
+    int partitions = 10;
+    int replicas = 1;
+    String resourceName = "testResource";
+    ZNRecord record = IdealStateCalculatorForStorageNode.calculateIdealState(
+        instances, partitions, replicas, resourceName, "MASTER", "SLAVE");
+    IdealState idealState = new IdealState(record);
+    idealState.setStateModelDefRef("MasterSlave");
+    
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();
+    Builder keyBuilder = accessor.keyBuilder();
+    accessor.setProperty(keyBuilder.idealStates(resourceName),
+                                          idealState);
+    ResourceComputationStage stage = new ResourceComputationStage();
+    runStage(event, new ReadClusterDataStage());
+    runStage(event, stage);
+
+    Map<String, Resource> resource = event
+        .getAttribute(AttributeName.RESOURCES.toString());
+    AssertJUnit.assertEquals(1, resource.size());
+
+    AssertJUnit.assertEquals(resource.keySet().iterator().next(),
+        resourceName);
+    AssertJUnit.assertEquals(resource.values().iterator().next()
+        .getResourceName(), resourceName);
+    AssertJUnit.assertEquals(resource.values().iterator().next()
+        .getStateModelDefRef(), idealState.getStateModelDefRef());
+    AssertJUnit.assertEquals(resource.values().iterator().next()
+        .getPartitions().size(), partitions);
+  }
+
+  @Test
+  public void testMultipleResources() throws Exception
+  {
+//    List<IdealState> idealStates = new ArrayList<IdealState>();
+    String[] resources = new String[]
+        { "testResource1", "testResource2" };
+    List<IdealState> idealStates = setupIdealState(5, resources, 10, 1);
+    ResourceComputationStage stage = new ResourceComputationStage();
+    runStage(event, new ReadClusterDataStage());
+    runStage(event, stage);
+
+    Map<String, Resource> resourceMap = event
+        .getAttribute(AttributeName.RESOURCES.toString());
+    AssertJUnit.assertEquals(resources.length, resourceMap.size());
+
+    for (int i = 0; i < resources.length; i++)
+    {
+      String resourceName = resources[i];
+      IdealState idealState = idealStates.get(i);
+      AssertJUnit.assertTrue(resourceMap.containsKey(resourceName));
+      AssertJUnit.assertEquals(resourceMap.get(resourceName)
+          .getResourceName(), resourceName);
+      AssertJUnit.assertEquals(resourceMap.get(resourceName)
+          .getStateModelDefRef(), idealState.getStateModelDefRef());
+      AssertJUnit.assertEquals(resourceMap.get(resourceName)
+          .getPartitions().size(), idealState.getNumPartitions());
+    }
+  }
+
+  @Test
+  public void testMultipleResourcesWithSomeDropped() throws Exception
+  {
+    int nodes = 5;
+    List<String> instances = new ArrayList<String>();
+    for (int i = 0; i < nodes; i++)
+    {
+      instances.add("localhost_" + i);
+    }
+    String[] resources = new String[]
+    { "testResource1", "testResource2" };
+    List<IdealState> idealStates = new ArrayList<IdealState>();
+    for (int i = 0; i < resources.length; i++)
+    {
+      int partitions = 10;
+      int replicas = 1;
+      String resourceName = resources[i];
+      ZNRecord record = IdealStateCalculatorForStorageNode
+          .calculateIdealState(instances, partitions, replicas,
+              resourceName, "MASTER", "SLAVE");
+      IdealState idealState = new IdealState(record);
+      idealState.setStateModelDefRef("MasterSlave");
+      
+      HelixDataAccessor accessor = manager.getHelixDataAccessor();
+      Builder keyBuilder = accessor.keyBuilder();
+      accessor.setProperty(keyBuilder.idealStates(resourceName),
+                                            idealState);
+
+
+      idealStates.add(idealState);
+    }
+    // ADD A LIVE INSTANCE WITH A CURRENT STATE THAT CONTAINS RESOURCE WHICH NO
+    // LONGER EXISTS IN IDEALSTATE
+    String instanceName = "localhost_" + 3;
+    LiveInstance liveInstance = new LiveInstance(instanceName);
+    String sessionId = UUID.randomUUID().toString();
+    liveInstance.setSessionId(sessionId);
+    
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();
+    Builder keyBuilder = accessor.keyBuilder();
+    accessor.setProperty(keyBuilder.liveInstance(instanceName),
+                                          liveInstance);
+
+    String oldResource = "testResourceOld";
+    CurrentState currentState = new CurrentState(oldResource);
+    currentState.setState("testResourceOld_0", "OFFLINE");
+    currentState.setState("testResourceOld_1", "SLAVE");
+    currentState.setState("testResourceOld_2", "MASTER");
+    currentState.setStateModelDefRef("MasterSlave");
+    accessor.setProperty(keyBuilder.currentState(instanceName, sessionId, oldResource),
+                                          currentState);
+
+    ResourceComputationStage stage = new ResourceComputationStage();
+    runStage(event, new ReadClusterDataStage());
+    runStage(event, stage);
+
+    Map<String, Resource> resourceMap = event
+        .getAttribute(AttributeName.RESOURCES.toString());
+    // +1 because it will have one for current state
+    AssertJUnit.assertEquals(resources.length + 1, resourceMap.size());
+
+    for (int i = 0; i < resources.length; i++)
+    {
+      String resourceName = resources[i];
+      IdealState idealState = idealStates.get(i);
+      AssertJUnit.assertTrue(resourceMap.containsKey(resourceName));
+      AssertJUnit.assertEquals(resourceMap.get(resourceName)
+          .getResourceName(), resourceName);
+      AssertJUnit.assertEquals(resourceMap.get(resourceName)
+          .getStateModelDefRef(), idealState.getStateModelDefRef());
+      AssertJUnit.assertEquals(resourceMap.get(resourceName)
+          .getPartitions().size(), idealState.getNumPartitions());
+    }
+    // Test the data derived from CurrentState
+    AssertJUnit.assertTrue(resourceMap.containsKey(oldResource));
+    AssertJUnit.assertEquals(resourceMap.get(oldResource)
+        .getResourceName(), oldResource);
+    AssertJUnit.assertEquals(resourceMap.get(oldResource)
+        .getStateModelDefRef(), currentState.getStateModelDefRef());
+    AssertJUnit
+        .assertEquals(resourceMap.get(oldResource).getPartitions()
+            .size(), currentState.getPartitionStateMap().size());
+    AssertJUnit.assertNotNull(resourceMap.get(oldResource).getPartition("testResourceOld_0"));
+    AssertJUnit.assertNotNull(resourceMap.get(oldResource).getPartition("testResourceOld_1"));
+    AssertJUnit.assertNotNull(resourceMap.get(oldResource).getPartition("testResourceOld_2"));
+
+  }
+
+  @Test
+  public void testNull()
+  {
+    ClusterEvent event = new ClusterEvent("sampleEvent");
+    ResourceComputationStage stage = new ResourceComputationStage();
+    StageContext context = new StageContext();
+    stage.init(context);
+    stage.preProcess();
+    boolean exceptionCaught = false;
+    try
+    {
+      stage.process(event);
+    } catch (Exception e)
+    {
+      exceptionCaught = true;
+    }
+    AssertJUnit.assertTrue(exceptionCaught);
+    stage.postProcess();
+  }
+
+
+//  public void testEmptyCluster()
+//  {
+//    ClusterEvent event = new ClusterEvent("sampleEvent");
+//    ClusterManager manager = new Mocks.MockManager();
+//    event.addAttribute("clustermanager", manager);
+//    ResourceComputationStage stage = new ResourceComputationStage();
+//    StageContext context = new StageContext();
+//    stage.init(context);
+//    stage.preProcess();
+//    boolean exceptionCaught = false;
+//    try
+//    {
+//      stage.process(event);
+//    } catch (Exception e)
+//    {
+//      exceptionCaught = true;
+//    }
+//    Assert.assertTrue(exceptionCaught);
+//    stage.postProcess();
+//  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/healthcheck/TestAddDropAlert.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestAddDropAlert.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestAddDropAlert.java
new file mode 100644
index 0000000..d21d02d
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/healthcheck/TestAddDropAlert.java
@@ -0,0 +1,194 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.healthcheck;
+
+import java.util.Date;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.TestHelper.StartCMResult;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.healthcheck.HealthStatsAggregationTask;
+import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
+import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.mock.storage.MockEspressoHealthReportProvider;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.mock.storage.MockTransition;
+import org.apache.helix.model.Message;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestAddDropAlert extends ZkIntegrationTestBase
+{
+  ZkClient _zkClient;
+  protected ClusterSetup _setupTool = null;
+  protected final String _alertStr =
+      "EXP(accumulate()(localhost_12918.RestQueryStats@DBName=TestDB0.latency))CMP(GREATER)CON(10)";
+  protected final String _alertStatusStr = _alertStr; // +" : (*)";
+  protected final String _dbName = "TestDB0";
+
+  @BeforeClass()
+  public void beforeClass() throws Exception
+  {
+    _zkClient = new ZkClient(ZK_ADDR);
+    _zkClient.setZkSerializer(new ZNRecordSerializer());
+
+    _setupTool = new ClusterSetup(ZK_ADDR);
+  }
+
+  @AfterClass
+  public void afterClass()
+  {
+    _zkClient.close();
+  }
+
+  public class AddDropAlertTransition extends MockTransition
+  {
+    @Override
+    public void doTransition(Message message, NotificationContext context)
+    {
+      HelixManager manager = context.getManager();
+      HelixDataAccessor accessor = manager.getHelixDataAccessor();
+      String fromState = message.getFromState();
+      String toState = message.getToState();
+      String instance = message.getTgtName();
+      String partition = message.getPartitionName();
+
+      if (fromState.equalsIgnoreCase("SLAVE") && toState.equalsIgnoreCase("MASTER"))
+      {
+
+        // add a stat and report to ZK
+        // perhaps should keep reporter per instance...
+        ParticipantHealthReportCollectorImpl reporter =
+            new ParticipantHealthReportCollectorImpl(manager, instance);
+        MockEspressoHealthReportProvider provider =
+            new MockEspressoHealthReportProvider();
+        reporter.addHealthReportProvider(provider);
+        String statName = "latency";
+        provider.setStat(_dbName, statName, "15");
+        reporter.transmitHealthReports();
+
+        // sleep long enough for first set of alerts to report and alert to get deleted
+        // then change reported data
+        try
+        {
+          Thread.sleep(10000);
+        }
+        catch (InterruptedException e)
+        {
+          System.err.println("Error sleeping");
+        }
+        provider.setStat(_dbName, statName, "1");
+        reporter.transmitHealthReports();
+
+        /*
+         * for (int i = 0; i < 5; i++) { accessor.setProperty(PropertyType.HEALTHREPORT,
+         * new ZNRecord("mockAlerts" + i), instance, "mockAlerts"); try {
+         * Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated
+         * catch block e.printStackTrace(); } }
+         */
+      }
+    }
+  }
+
+  @Test()
+  public void testAddDropAlert() throws Exception
+  {
+    String clusterName = getShortClassName();
+    MockParticipant[] participants = new MockParticipant[5];
+
+    System.out.println("START TestAddDropAlert at "
+        + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant start port
+                            "localhost", // participant name prefix
+                            "TestDB", // resource name prefix
+                            1, // resources
+                            10, // partitions per resource group
+                            5, // number of nodes //change back to 5!!!
+                            1, // replicas //change back to 3!!!
+                            "MasterSlave",
+                            true); // do rebalance
+    // enableHealthCheck(clusterName);
+
+    _setupTool.getClusterManagementTool().addAlert(clusterName, _alertStr);
+
+    StartCMResult cmResult =
+        TestHelper.startController(clusterName,
+                                   "controller_0",
+                                   ZK_ADDR,
+                                   HelixControllerMain.STANDALONE);
+    // start participants
+    for (int i = 0; i < 5; i++) // !!!change back to 5
+    {
+      String instanceName = "localhost_" + (12918 + i);
+
+      participants[i] =
+          new MockParticipant(clusterName,
+                              instanceName,
+                              ZK_ADDR,
+                              new AddDropAlertTransition());
+      participants[i].syncStart();
+//      new Thread(participants[i]).start();
+    }
+
+    boolean result =
+        ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                          clusterName));
+    Assert.assertTrue(result);
+
+    // drop alert soon after adding, but leave enough time for alert to fire once
+    // Thread.sleep(3000);
+    ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient));
+    Builder keyBuilder = accessor.keyBuilder();
+
+    new HealthStatsAggregationTask(cmResult._manager).run();
+    String instance = "localhost_12918";
+    ZNRecord record = accessor.getProperty(keyBuilder.alertStatus()).getRecord();
+    Map<String, Map<String, String>> recMap = record.getMapFields();
+    Set<String> keySet = recMap.keySet();
+    Assert.assertTrue(keySet.size() > 0);
+
+    _setupTool.getClusterManagementTool().dropAlert(clusterName, _alertStr);
+    new HealthStatsAggregationTask(cmResult._manager).run();
+    // other verifications go here
+    // for (int i = 0; i < 1; i++) //change 1 back to 5
+    // {
+    // String instance = "localhost_" + (12918 + i);
+    record = accessor.getProperty(keyBuilder.alertStatus()).getRecord();
+    recMap = record.getMapFields();
+    keySet = recMap.keySet();
+    Assert.assertEquals(keySet.size(), 0);
+    // }
+
+    System.out.println("END TestAddDropAlert at " + new Date(System.currentTimeMillis()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertActionTriggering.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertActionTriggering.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertActionTriggering.java
new file mode 100644
index 0000000..5699119
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertActionTriggering.java
@@ -0,0 +1,189 @@
+package org.apache.helix.healthcheck;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.ConfigScope;
+import org.apache.helix.ConfigScopeBuilder;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.healthcheck.HealthStatsAggregationTask;
+import org.apache.helix.integration.ZkStandAloneCMTestBaseWithPropertyServerCheck;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.HealthStat;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.InstanceConfig.InstanceConfigProperty;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestAlertActionTriggering extends
+    ZkStandAloneCMTestBaseWithPropertyServerCheck
+{
+  String _statName = "TestStat@DB=db1";
+  String _stat = "TestStat";
+  String metricName1 = "TestMetric1";
+  String metricName2 = "TestMetric2";
+  void setHealthData(int[] val1, int[] val2)
+  {
+    for (int i = 0; i < NODE_NR; i++)
+    {
+      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      HelixManager manager = _startCMResultMap.get(instanceName)._manager;
+      ZNRecord record = new ZNRecord(_stat);
+      Map<String, String> valMap = new HashMap<String, String>();
+      valMap.put(metricName1, val1[i] + "");
+      valMap.put(metricName2, val2[i] + "");
+      record.setSimpleField("TimeStamp", new Date().getTime() + "");
+      record.setMapField(_statName, valMap);
+      HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+      Builder keyBuilder = helixDataAccessor.keyBuilder();
+      helixDataAccessor
+        .setProperty(keyBuilder.healthReport( manager.getInstanceName(), record.getId()), new HealthStat(record));
+    }
+    try
+    {
+      Thread.sleep(1000);
+    }
+    catch (InterruptedException e)
+    {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+  }
+
+  void setHealthData2(int[] val1)
+  {
+    for (int i = 0; i < NODE_NR; i++)
+    {
+      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      HelixManager manager = _startCMResultMap.get(instanceName)._manager;
+      ZNRecord record = new ZNRecord(_stat);
+      Map<String, String> valMap = new HashMap<String, String>();
+      valMap.put(metricName2, val1[i] + "");
+      record.setSimpleField("TimeStamp", new Date().getTime() + "");
+      record.setMapField("TestStat@DB=TestDB;Partition=TestDB_3", valMap);
+      HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+      Builder keyBuilder = helixDataAccessor.keyBuilder();
+      helixDataAccessor
+        .setProperty(keyBuilder.healthReport( manager.getInstanceName(), record.getId()), new HealthStat(record));
+    }
+    try
+    {
+      Thread.sleep(1000);
+    }
+    catch (InterruptedException e)
+    {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+  }
+
+  @Test
+  public void testAlertActionDisableNode() throws InterruptedException
+  {
+    ConfigScope scope = new ConfigScopeBuilder().forCluster(CLUSTER_NAME).build();
+    Map<String, String> properties = new HashMap<String, String>();
+    properties.put("healthChange.enabled", "true");
+    _setupTool.getClusterManagementTool().setConfig(scope, properties);
+
+    String alertStr1 = "EXP(decay(1.0)(localhost_*.TestStat@DB=db1.TestMetric1))CMP(GREATER)CON(20)ACTION(DISABLE_INSTANCE)";
+    String alertStr2 = "EXP(decay(1.0)(localhost_*.TestStat@DB=db1.TestMetric2))CMP(GREATER)CON(120)ACTION(DISABLE_INSTANCE)";
+    String alertStr3 = "EXP(decay(1.0)(localhost_*.TestStat@DB=TestDB;Partition=*.TestMetric2))CMP(GREATER)CON(160)ACTION(DISABLE_PARTITION)";
+
+    _setupTool.getClusterManagementTool().addAlert(CLUSTER_NAME, alertStr1);
+    _setupTool.getClusterManagementTool().addAlert(CLUSTER_NAME, alertStr2);
+    _setupTool.getClusterManagementTool().addAlert(CLUSTER_NAME, alertStr3);
+
+    int[] metrics1 = {10, 15, 22, 12, 16};
+    int[] metrics2 = {22, 115, 22, 163,16};
+    int[] metrics3 = {0, 0, 0, 0, 0};
+    setHealthData(metrics1, metrics2);
+
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    HelixManager manager = _startCMResultMap.get(controllerName)._manager;
+
+    HealthStatsAggregationTask task = new HealthStatsAggregationTask(_startCMResultMap.get(controllerName)._manager);
+    task.run();
+    Thread.sleep(4000);
+    HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+    Builder keyBuilder = helixDataAccessor.keyBuilder();
+
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                 CLUSTER_NAME));
+    Assert.assertTrue(result);
+
+    Builder kb = manager.getHelixDataAccessor().keyBuilder();
+    ExternalView externalView = manager.getHelixDataAccessor().getProperty(kb.externalView("TestDB"));
+    // Test the DISABLE_INSTANCE alerts
+    String participant1 = "localhost_" + (START_PORT + 3);
+    String participant2 = "localhost_" + (START_PORT + 2);
+    ConfigAccessor configAccessor = manager.getConfigAccessor();
+    scope = new ConfigScopeBuilder().forCluster(manager.getClusterName()).forParticipant(participant1).build();
+    String isEnabled = configAccessor.get(scope, "HELIX_ENABLED");
+    Assert.assertFalse(Boolean.parseBoolean(isEnabled));
+
+    scope = new ConfigScopeBuilder().forCluster(manager.getClusterName()).forParticipant(participant2).build();
+    isEnabled = configAccessor.get(scope, "HELIX_ENABLED");
+    Assert.assertFalse(Boolean.parseBoolean(isEnabled));
+
+    for(String partitionName : externalView.getRecord().getMapFields().keySet())
+    {
+      for(String hostName :  externalView.getRecord().getMapField(partitionName).keySet())
+      {
+        if(hostName.equals(participant1) || hostName.equals(participant2))
+        {
+          Assert.assertEquals(externalView.getRecord().getMapField(partitionName).get(hostName), "OFFLINE");
+        }
+      }
+    }
+
+    // enable the disabled instances
+    setHealthData(metrics3, metrics3);
+    task.run();
+    Thread.sleep(1000);
+
+    manager.getClusterManagmentTool().enableInstance(manager.getClusterName(), participant2, true);
+    manager.getClusterManagmentTool().enableInstance(manager.getClusterName(), participant1, true);
+
+    result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                 CLUSTER_NAME));
+    Assert.assertTrue(result);
+
+    // Test the DISABLE_PARTITION case
+    int[] metrics4 = {22, 115, 22, 16,163};
+    setHealthData2(metrics4);
+    task.run();
+
+    scope = new ConfigScopeBuilder().forCluster(manager.getClusterName()).forParticipant(participant1).build();
+    isEnabled = configAccessor.get(scope, "HELIX_ENABLED");
+    Assert.assertTrue(Boolean.parseBoolean(isEnabled));
+
+    scope = new ConfigScopeBuilder().forCluster(manager.getClusterName()).forParticipant(participant2).build();
+    isEnabled = configAccessor.get(scope, "HELIX_ENABLED");
+    Assert.assertTrue(Boolean.parseBoolean(isEnabled));
+
+    result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                 CLUSTER_NAME));
+    Assert.assertTrue(result);
+    String participant3 = "localhost_" + (START_PORT + 4);
+    externalView = manager.getHelixDataAccessor().getProperty(kb.externalView("TestDB"));
+    Assert.assertTrue(externalView.getRecord().getMapField("TestDB_3").get(participant3).equalsIgnoreCase("OFFLINE"));
+
+    InstanceConfig nodeConfig =
+        helixDataAccessor.getProperty(keyBuilder.instanceConfig(participant3));
+    Assert.assertTrue(
+        nodeConfig.getRecord().getListField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.toString())
+          .contains("TestDB_3"));
+
+  }
+}