You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by nc...@apache.org on 2016/02/23 22:21:53 UTC

[08/11] ambari git commit: AMBARI-15141. Start all services request aborts in the middle and hosts go into heartbeat-lost state. (mpapirkovskyy)

http://git-wip-us.apache.org/repos/asf/ambari/blob/083ac6da/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java b/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
index 2a4cec8..c62352a 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
@@ -54,10 +54,8 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import javax.xml.bind.JAXBException;
 
@@ -65,47 +63,28 @@ import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.Role;
 import org.apache.ambari.server.RoleCommand;
 import org.apache.ambari.server.actionmanager.ActionDBAccessor;
-import org.apache.ambari.server.actionmanager.ActionDBAccessorImpl;
 import org.apache.ambari.server.actionmanager.ActionManager;
 import org.apache.ambari.server.actionmanager.HostRoleCommand;
 import org.apache.ambari.server.actionmanager.HostRoleCommandFactory;
 import org.apache.ambari.server.actionmanager.HostRoleStatus;
 import org.apache.ambari.server.actionmanager.Request;
-import org.apache.ambari.server.actionmanager.RequestFactory;
 import org.apache.ambari.server.actionmanager.Stage;
 import org.apache.ambari.server.actionmanager.StageFactory;
 import org.apache.ambari.server.agent.HostStatus.Status;
 import org.apache.ambari.server.api.services.AmbariMetaInfo;
 import org.apache.ambari.server.configuration.Configuration;
-import org.apache.ambari.server.controller.HostsMap;
-import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.orm.GuiceJpaInitializer;
 import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
-import org.apache.ambari.server.orm.OrmTestHelper;
-import org.apache.ambari.server.orm.dao.ClusterDAO;
-import org.apache.ambari.server.orm.dao.HostDAO;
-import org.apache.ambari.server.orm.dao.RepositoryVersionDAO;
-import org.apache.ambari.server.orm.dao.ResourceTypeDAO;
-import org.apache.ambari.server.orm.dao.StackDAO;
-import org.apache.ambari.server.orm.entities.ClusterEntity;
-import org.apache.ambari.server.orm.entities.HostEntity;
-import org.apache.ambari.server.orm.entities.RepositoryVersionEntity;
-import org.apache.ambari.server.orm.entities.ResourceEntity;
-import org.apache.ambari.server.orm.entities.ResourceTypeEntity;
-import org.apache.ambari.server.orm.entities.StackEntity;
-import org.apache.ambari.server.security.authorization.ResourceType;
 import org.apache.ambari.server.serveraction.kerberos.KerberosIdentityDataFileWriter;
 import org.apache.ambari.server.serveraction.kerberos.KerberosIdentityDataFileWriterFactory;
 import org.apache.ambari.server.serveraction.kerberos.KerberosServerAction;
 import org.apache.ambari.server.state.Alert;
-import org.apache.ambari.server.state.AlertState;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.Host;
 import org.apache.ambari.server.state.HostHealthStatus;
 import org.apache.ambari.server.state.HostState;
 import org.apache.ambari.server.state.MaintenanceState;
-import org.apache.ambari.server.state.RepositoryVersionState;
 import org.apache.ambari.server.state.SecurityState;
 import org.apache.ambari.server.state.Service;
 import org.apache.ambari.server.state.ServiceComponentHost;
@@ -113,9 +92,6 @@ import org.apache.ambari.server.state.StackId;
 import org.apache.ambari.server.state.State;
 import org.apache.ambari.server.state.fsm.InvalidStateTransitionException;
 import org.apache.ambari.server.state.svccomphost.ServiceComponentHostInstallEvent;
-import org.apache.ambari.server.state.svccomphost.ServiceComponentHostStartEvent;
-import org.apache.ambari.server.state.svccomphost.ServiceComponentHostUpgradeEvent;
-import org.apache.ambari.server.utils.EventBusSynchronizer;
 import org.apache.ambari.server.utils.StageUtils;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.codec.digest.DigestUtils;
@@ -128,7 +104,6 @@ import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.gson.JsonObject;
 import com.google.inject.Guice;
 import com.google.inject.Inject;
 import com.google.inject.Injector;
@@ -145,8 +120,6 @@ public class TestHeartbeatHandler {
   long requestId = 23;
   long stageId = 31;
 
-  private final static StackId HDP_22_STACK = new StackId("HDP", "2.2.0");
-
   @Inject
   AmbariMetaInfo metaInfo;
 
@@ -157,26 +130,14 @@ public class TestHeartbeatHandler {
   ActionDBAccessor actionDBAccessor;
 
   @Inject
-  OrmTestHelper helper;
-
-  @Inject
-  ResourceTypeDAO resourceTypeDAO;
-
-  @Inject
-  StackDAO stackDAO;
-
-  @Inject
-  ClusterDAO clusterDAO;
-
-  @Inject
-  HostDAO hostDAO;
-
-  @Inject
   StageFactory stageFactory;
 
   @Inject
   HostRoleCommandFactory hostRoleCommandFactory;
 
+  @Inject
+  HeartbeatTestHelper heartbeatTestHelper;
+
   private UnitOfWork unitOfWork;
 
   @Rule
@@ -187,18 +148,7 @@ public class TestHeartbeatHandler {
 
   @Before
   public void setup() throws Exception {
-    module = new InMemoryDefaultTestModule(){
-
-      @Override
-      protected void configure() {
-        getProperties().put("recovery.type", "FULL");
-        getProperties().put("recovery.lifetime_max_count", "10");
-        getProperties().put("recovery.max_count", "4");
-        getProperties().put("recovery.window_in_minutes", "23");
-        getProperties().put("recovery.retry_interval", "2");
-        super.configure();
-      }
-    };
+    module = HeartbeatTestHelper.getTestModule();
     injector = Guice.createInjector(module);
     injector.getInstance(GuiceJpaInitializer.class);
     clusters = injector.getInstance(Clusters.class);
@@ -215,7 +165,7 @@ public class TestHeartbeatHandler {
   @Test
   @SuppressWarnings("unchecked")
   public void testHeartbeat() throws Exception {
-    ActionManager am = getMockActionManager();
+    ActionManager am = heartbeatTestHelper.getMockActionManager();
     expect(am.getTasks(anyObject(List.class))).andReturn(new ArrayList<HostRoleCommand>());
     replay(am);
     Clusters fsm = clusters;
@@ -257,387 +207,16 @@ public class TestHeartbeatHandler {
     assertEquals(0, aq.dequeueAll(DummyHostname1).size());
   }
 
-  @Test
-  @SuppressWarnings("unchecked")
-  public void testHeartbeatWithConfigs() throws Exception {
-    Cluster cluster = getDummyCluster();
-    Service hdfs = cluster.addService(HDFS);
-    hdfs.persist();
-    hdfs.addServiceComponent(DATANODE).persist();
-    hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
-    hdfs.addServiceComponent(NAMENODE).persist();
-    hdfs.getServiceComponent(NAMENODE).addServiceComponentHost(DummyHostname1).persist();
-    hdfs.addServiceComponent(SECONDARY_NAMENODE).persist();
-    hdfs.getServiceComponent(SECONDARY_NAMENODE).addServiceComponentHost(DummyHostname1).persist();
-
-    ActionQueue aq = new ActionQueue();
-
-    ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
-            getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
-    ServiceComponentHost serviceComponentHost2 = clusters.getCluster(DummyCluster).getService(HDFS).
-            getServiceComponent(NAMENODE).getServiceComponentHost(DummyHostname1);
-    serviceComponentHost1.setState(State.INSTALLED);
-    serviceComponentHost2.setState(State.INSTALLED);
-
-    HeartBeat hb = new HeartBeat();
-    hb.setResponseId(0);
-    hb.setNodeStatus(new HostStatus(Status.HEALTHY, DummyHostStatus));
-    hb.setHostname(DummyHostname1);
-
-    List<CommandReport> reports = new ArrayList<CommandReport>();
-    CommandReport cr = new CommandReport();
-    cr.setActionId(StageUtils.getActionId(requestId, stageId));
-    cr.setServiceName(HDFS);
-    cr.setTaskId(1);
-    cr.setRole(DATANODE);
-    cr.setStatus("COMPLETED");
-    cr.setStdErr("");
-    cr.setStdOut("");
-    cr.setExitCode(215);
-    cr.setRoleCommand("START");
-    cr.setClusterName(DummyCluster);
-
-    cr.setConfigurationTags(new HashMap<String, Map<String, String>>() {{
-      put("global", new HashMap<String, String>() {{
-        put("tag", "version1");
-      }});
-    }});
-
-    reports.add(cr);
-    hb.setReports(reports);
-
-    HostEntity host1 = hostDAO.findByName(DummyHostname1);
-    Assert.assertNotNull(host1);
-    final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
-            Role.DATANODE, null, null);
-
-    ActionManager am = getMockActionManager();
-    expect(am.getTasks(anyObject(List.class))).andReturn(
-            new ArrayList<HostRoleCommand>() {{
-              add(command);
-            }});
-    replay(am);
-
-    HeartBeatHandler handler = getHeartBeatHandler(am, aq);
-    handler.handleHeartBeat(hb);
-
-    // the heartbeat test passed if actual configs is populated
-    Assert.assertNotNull(serviceComponentHost1.getActualConfigs());
-    Assert.assertEquals(serviceComponentHost1.getActualConfigs().size(), 1);
-  }
-
-  @Test
-  @SuppressWarnings("unchecked")
-  public void testRestartRequiredAfterInstallClient() throws Exception {
-    Cluster cluster = getDummyCluster();
-    Service hdfs = cluster.addService(HDFS);
-    hdfs.persist();
-    hdfs.addServiceComponent(HDFS_CLIENT).persist();
-    hdfs.getServiceComponent(HDFS_CLIENT).addServiceComponentHost(DummyHostname1).persist();
-
-    ActionQueue aq = new ActionQueue();
-
-    ServiceComponentHost serviceComponentHost = clusters.getCluster(DummyCluster).getService(HDFS).
-        getServiceComponent(HDFS_CLIENT).getServiceComponentHost(DummyHostname1);
-
-    serviceComponentHost.setState(State.INSTALLED);
-    serviceComponentHost.setRestartRequired(true);
-
-    HeartBeat hb = new HeartBeat();
-    hb.setResponseId(0);
-    hb.setNodeStatus(new HostStatus(Status.HEALTHY, DummyHostStatus));
-    hb.setHostname(DummyHostname1);
-
-
-    List<CommandReport> reports = new ArrayList<CommandReport>();
-    CommandReport cr = new CommandReport();
-    cr.setActionId(StageUtils.getActionId(requestId, stageId));
-    cr.setServiceName(HDFS);
-    cr.setRoleCommand("INSTALL");
-    cr.setCustomCommand("EXECUTION_COMMAND");
-    cr.setTaskId(1);
-    cr.setRole(HDFS_CLIENT);
-    cr.setStatus("COMPLETED");
-    cr.setStdErr("");
-    cr.setStdOut("");
-    cr.setExitCode(215);
-    cr.setClusterName(DummyCluster);
-    cr.setConfigurationTags(new HashMap<String, Map<String, String>>() {{
-      put("global", new HashMap<String, String>() {{
-        put("tag", "version1");
-      }});
-    }});
-    reports.add(cr);
-    hb.setReports(reports);
-
-    final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
-        Role.DATANODE, null, null);
-
-    ActionManager am = getMockActionManager();
-    expect(am.getTasks(anyObject(List.class))).andReturn(
-      new ArrayList<HostRoleCommand>() {{
-        add(command);
-        add(command);
-      }});
-    replay(am);
-
-    HeartBeatHandler handler = getHeartBeatHandler(am, aq);
-    handler.handleHeartBeat(hb);
-
-    Assert.assertNotNull(serviceComponentHost.getActualConfigs());
-    Assert.assertFalse(serviceComponentHost.isRestartRequired());
-    Assert.assertEquals(serviceComponentHost.getActualConfigs().size(), 1);
-
-  }
-
-  @Test
-  @SuppressWarnings("unchecked")
-  public void testHeartbeatCustomCommandWithConfigs() throws Exception {
-    Cluster cluster = getDummyCluster();
-    Service hdfs = cluster.addService(HDFS);
-    hdfs.persist();
-    hdfs.addServiceComponent(DATANODE).persist();
-    hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
-    hdfs.addServiceComponent(NAMENODE).persist();
-    hdfs.getServiceComponent(NAMENODE).addServiceComponentHost(DummyHostname1).persist();
-    hdfs.addServiceComponent(SECONDARY_NAMENODE).persist();
-    hdfs.getServiceComponent(SECONDARY_NAMENODE).addServiceComponentHost(DummyHostname1).persist();
-
-    ActionQueue aq = new ActionQueue();
-
-    ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
-        getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
-    ServiceComponentHost serviceComponentHost2 = clusters.getCluster(DummyCluster).getService(HDFS).
-        getServiceComponent(NAMENODE).getServiceComponentHost(DummyHostname1);
-    serviceComponentHost1.setState(State.INSTALLED);
-    serviceComponentHost2.setState(State.INSTALLED);
-
-    HeartBeat hb = new HeartBeat();
-    hb.setResponseId(0);
-    hb.setNodeStatus(new HostStatus(Status.HEALTHY, DummyHostStatus));
-    hb.setHostname(DummyHostname1);
-
-    List<CommandReport> reports = new ArrayList<CommandReport>();
-    CommandReport cr = new CommandReport();
-    cr.setActionId(StageUtils.getActionId(requestId, stageId));
-    cr.setServiceName(HDFS);
-    cr.setRoleCommand("CUSTOM_COMMAND");
-    cr.setCustomCommand("RESTART");
-    cr.setTaskId(1);
-    cr.setRole(DATANODE);
-    cr.setStatus("COMPLETED");
-    cr.setStdErr("");
-    cr.setStdOut("");
-    cr.setExitCode(215);
-    cr.setClusterName(DummyCluster);
-    cr.setConfigurationTags(new HashMap<String, Map<String,String>>() {{
-      put("global", new HashMap<String,String>() {{ put("tag", "version1"); }});
-    }});
-    CommandReport crn = new CommandReport();
-    crn.setActionId(StageUtils.getActionId(requestId, stageId));
-    crn.setServiceName(HDFS);
-    crn.setRoleCommand("CUSTOM_COMMAND");
-    crn.setCustomCommand("START");
-    crn.setTaskId(1);
-    crn.setRole(NAMENODE);
-    crn.setStatus("COMPLETED");
-    crn.setStdErr("");
-    crn.setStdOut("");
-    crn.setExitCode(215);
-    crn.setClusterName(DummyCluster);
-    crn.setConfigurationTags(new HashMap<String, Map<String,String>>() {{
-      put("global", new HashMap<String,String>() {{ put("tag", "version1"); }});
-    }});
-
-    reports.add(cr);
-    reports.add(crn);
-    hb.setReports(reports);
-
-    final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
-      Role.DATANODE, null, null);
-
-    ActionManager am = getMockActionManager();
-    expect(am.getTasks(anyObject(List.class))).andReturn(
-            new ArrayList<HostRoleCommand>() {{
-              add(command);
-              add(command);
-            }});
-    replay(am);
-
-    HeartBeatHandler handler = getHeartBeatHandler(am, aq);
-    handler.handleHeartBeat(hb);
-
-    // the heartbeat test passed if actual configs is populated
-    Assert.assertNotNull(serviceComponentHost1.getActualConfigs());
-    Assert.assertEquals(serviceComponentHost1.getActualConfigs().size(), 1);
-    Assert.assertNotNull(serviceComponentHost2.getActualConfigs());
-    Assert.assertEquals(serviceComponentHost2.getActualConfigs().size(), 1);
-  }
-
-  @Test
-  @SuppressWarnings("unchecked")
-  public void testHeartbeatCustomStartStop() throws Exception {
-    Cluster cluster = getDummyCluster();
-    Service hdfs = cluster.addService(HDFS);
-    hdfs.persist();
-    hdfs.addServiceComponent(DATANODE).persist();
-    hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
-    hdfs.addServiceComponent(NAMENODE).persist();
-    hdfs.getServiceComponent(NAMENODE).addServiceComponentHost(DummyHostname1).persist();
-    hdfs.addServiceComponent(SECONDARY_NAMENODE).persist();
-    hdfs.getServiceComponent(SECONDARY_NAMENODE).addServiceComponentHost(DummyHostname1).persist();
-
-    ActionQueue aq = new ActionQueue();
-
-    ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
-        getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
-    ServiceComponentHost serviceComponentHost2 = clusters.getCluster(DummyCluster).getService(HDFS).
-        getServiceComponent(NAMENODE).getServiceComponentHost(DummyHostname1);
-    serviceComponentHost1.setState(State.INSTALLED);
-    serviceComponentHost2.setState(State.STARTED);
-    serviceComponentHost1.setRestartRequired(true);
-    serviceComponentHost2.setRestartRequired(true);
-
-    HeartBeat hb = new HeartBeat();
-    hb.setResponseId(0);
-    hb.setNodeStatus(new HostStatus(Status.HEALTHY, DummyHostStatus));
-    hb.setHostname(DummyHostname1);
-
-    List<CommandReport> reports = new ArrayList<CommandReport>();
-    CommandReport cr = new CommandReport();
-    cr.setActionId(StageUtils.getActionId(requestId, stageId));
-    cr.setServiceName(HDFS);
-    cr.setRoleCommand("CUSTOM_COMMAND");
-    cr.setCustomCommand("START");
-    cr.setTaskId(1);
-    cr.setRole(DATANODE);
-    cr.setStatus("COMPLETED");
-    cr.setStdErr("");
-    cr.setStdOut("");
-    cr.setExitCode(215);
-    cr.setClusterName(DummyCluster);
-    CommandReport crn = new CommandReport();
-    crn.setActionId(StageUtils.getActionId(requestId, stageId));
-    crn.setServiceName(HDFS);
-    crn.setRoleCommand("CUSTOM_COMMAND");
-    crn.setCustomCommand("STOP");
-    crn.setTaskId(1);
-    crn.setRole(NAMENODE);
-    crn.setStatus("COMPLETED");
-    crn.setStdErr("");
-    crn.setStdOut("");
-    crn.setExitCode(215);
-    crn.setClusterName(DummyCluster);
-
-    reports.add(cr);
-    reports.add(crn);
-    hb.setReports(reports);
-
-    assertTrue(serviceComponentHost1.isRestartRequired());
-
-    final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
-      Role.DATANODE, null, null);
-
-    ActionManager am = getMockActionManager();
-    expect(am.getTasks(anyObject(List.class))).andReturn(
-      new ArrayList<HostRoleCommand>() {{
-        add(command);
-        add(command);
-      }});
-    replay(am);
-
-    HeartBeatHandler handler = getHeartBeatHandler(am, aq);
-    handler.handleHeartBeat(hb);
-
-    // the heartbeat test passed if actual configs is populated
-    State componentState1 = serviceComponentHost1.getState();
-    assertEquals(State.STARTED, componentState1);
-    assertFalse(serviceComponentHost1.isRestartRequired());
-    State componentState2 = serviceComponentHost2.getState();
-    assertEquals(State.INSTALLED, componentState2);
-    assertTrue(serviceComponentHost2.isRestartRequired());
-  }
-
-  @Test
-  @SuppressWarnings("unchecked")
-  public void testStatusHeartbeat() throws Exception {
-    Cluster cluster = getDummyCluster();
-    Service hdfs = cluster.addService(HDFS);
-    hdfs.persist();
-    hdfs.addServiceComponent(DATANODE).persist();
-    hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
-    hdfs.addServiceComponent(NAMENODE).persist();
-    hdfs.getServiceComponent(NAMENODE).addServiceComponentHost(DummyHostname1).persist();
-    hdfs.addServiceComponent(SECONDARY_NAMENODE).persist();
-    hdfs.getServiceComponent(SECONDARY_NAMENODE).addServiceComponentHost(DummyHostname1).persist();
-
-    ActionQueue aq = new ActionQueue();
 
-    ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
-            getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
-    ServiceComponentHost serviceComponentHost2 = clusters.getCluster(DummyCluster).getService(HDFS).
-            getServiceComponent(NAMENODE).getServiceComponentHost(DummyHostname1);
-    ServiceComponentHost serviceComponentHost3 = clusters.getCluster(DummyCluster).getService(HDFS).
-        getServiceComponent(SECONDARY_NAMENODE).getServiceComponentHost(DummyHostname1);
-    serviceComponentHost1.setState(State.INSTALLED);
-    serviceComponentHost1.setSecurityState(SecurityState.UNSECURED);
-    serviceComponentHost2.setState(State.INSTALLED);
-    serviceComponentHost2.setSecurityState(SecurityState.SECURING);
-    serviceComponentHost3.setState(State.STARTING);
 
-    HeartBeat hb = new HeartBeat();
-    hb.setTimestamp(System.currentTimeMillis());
-    hb.setResponseId(0);
-    hb.setHostname(DummyHostname1);
-    hb.setNodeStatus(new HostStatus(Status.HEALTHY, DummyHostStatus));
-    hb.setReports(new ArrayList<CommandReport>());
-    ArrayList<ComponentStatus> componentStatuses = new ArrayList<ComponentStatus>();
-    ComponentStatus componentStatus1 = new ComponentStatus();
-    componentStatus1.setClusterName(DummyCluster);
-    componentStatus1.setServiceName(HDFS);
-    componentStatus1.setMessage(DummyHostStatus);
-    componentStatus1.setStatus(State.STARTED.name());
-    componentStatus1.setSecurityState(SecurityState.SECURED_KERBEROS.name());
-    componentStatus1.setComponentName(DATANODE);
-    componentStatuses.add(componentStatus1);
-    ComponentStatus componentStatus2 = new ComponentStatus();
-    componentStatus2.setClusterName(DummyCluster);
-    componentStatus2.setServiceName(HDFS);
-    componentStatus2.setMessage(DummyHostStatus);
-    componentStatus2.setStatus(State.STARTED.name());
-    componentStatus2.setSecurityState(SecurityState.UNSECURED.name());
-    componentStatus2.setComponentName(SECONDARY_NAMENODE);
-    componentStatuses.add(componentStatus2);
-    hb.setComponentStatus(componentStatuses);
 
-    final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
-            Role.DATANODE, null, null);
 
-    ActionManager am = getMockActionManager();
-    expect(am.getTasks(anyObject(List.class))).andReturn(
-      new ArrayList<HostRoleCommand>() {{
-        add(command);
-        add(command);
-      }});
-    replay(am);
 
-    HeartBeatHandler handler = getHeartBeatHandler(am, aq);
-    handler.handleHeartBeat(hb);
-    State componentState1 = serviceComponentHost1.getState();
-    State componentState2 = serviceComponentHost2.getState();
-    State componentState3 = serviceComponentHost3.getState();
-    assertEquals(State.STARTED, componentState1);
-    assertEquals(SecurityState.SECURED_KERBEROS, serviceComponentHost1.getSecurityState());
-    assertEquals(State.INSTALLED, componentState2);
-    assertEquals(SecurityState.SECURING, serviceComponentHost2.getSecurityState());
-    assertEquals(State.STARTED, componentState3);
-    assertEquals(SecurityState.UNSECURED, serviceComponentHost3.getSecurityState());
-  }
 
   @Test
   @SuppressWarnings("unchecked")
   public void testStatusHeartbeatWithAnnotation() throws Exception {
-    Cluster cluster = getDummyCluster();
+    Cluster cluster = heartbeatTestHelper.getDummyCluster();
     Service hdfs = cluster.addService(HDFS);
     hdfs.persist();
     hdfs.addServiceComponent(DATANODE).persist();
@@ -658,14 +237,14 @@ public class TestHeartbeatHandler {
     final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
             Role.DATANODE, null, null);
 
-    ActionManager am = getMockActionManager();
+    ActionManager am = heartbeatTestHelper.getMockActionManager();
     expect(am.getTasks(anyObject(List.class))).andReturn(
             new ArrayList<HostRoleCommand>() {{
               add(command);
             }}).anyTimes();
     replay(am);
 
-    HeartBeatHandler handler = getHeartBeatHandler(am, aq);
+    HeartBeatHandler handler = heartbeatTestHelper.getHeartBeatHandler(am, aq);
     HeartBeatResponse resp = handler.handleHeartBeat(hb);
     Assert.assertFalse(resp.hasMappedComponents());
 
@@ -689,7 +268,7 @@ public class TestHeartbeatHandler {
   @Test
   @SuppressWarnings("unchecked")
   public void testLiveStatusUpdateAfterStopFailed() throws Exception {
-    Cluster cluster = getDummyCluster();
+    Cluster cluster = heartbeatTestHelper.getDummyCluster();
     Service hdfs = cluster.addService(HDFS);
     hdfs.persist();
     hdfs.addServiceComponent(DATANODE).persist();
@@ -743,7 +322,7 @@ public class TestHeartbeatHandler {
     final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
             Role.DATANODE, null, null);
 
-    ActionManager am = getMockActionManager();
+    ActionManager am = heartbeatTestHelper.getMockActionManager();
     expect(am.getTasks(anyObject(List.class))).andReturn(
             new ArrayList<HostRoleCommand>() {{
               add(command);
@@ -751,82 +330,25 @@ public class TestHeartbeatHandler {
             }});
     replay(am);
 
-    HeartBeatHandler handler = getHeartBeatHandler(am, aq);
-    handler.handleHeartBeat(hb);
+    HeartBeatHandler handler = heartbeatTestHelper.getHeartBeatHandler(am, aq);
+    HeartbeatProcessor heartbeatProcessor = handler.getHeartbeatProcessor();
+    heartbeatProcessor.processHeartbeat(hb);
+
     State componentState1 = serviceComponentHost1.getState();
     State componentState2 = serviceComponentHost2.getState();
     assertEquals(State.STARTED, componentState1);
     assertEquals(State.INSTALLED, componentState2);
   }
 
+
   @Test
-  public void testCommandReport() throws AmbariException {
-    injector.injectMembers(this);
-    clusters.addHost(DummyHostname1);
-    clusters.getHost(DummyHostname1).persist();
-
-    StackId dummyStackId = new StackId(DummyStackId);
-    clusters.addCluster(DummyCluster, dummyStackId);
-
-    ActionDBAccessor db = injector.getInstance(ActionDBAccessorImpl.class);
-    ActionManager am = new ActionManager(5000, 1200000, new ActionQueue(), clusters, db,
-        new HostsMap((String) null), unitOfWork, injector.getInstance(RequestFactory.class), null, null);
-    populateActionDB(db, DummyHostname1);
-    Stage stage = db.getAllStages(requestId).get(0);
-    Assert.assertEquals(stageId, stage.getStageId());
-    stage.setHostRoleStatus(DummyHostname1, HBASE_MASTER, HostRoleStatus.QUEUED);
-    db.hostRoleScheduled(stage, DummyHostname1, HBASE_MASTER);
-    List<CommandReport> reports = new ArrayList<CommandReport>();
-    CommandReport cr = new CommandReport();
-    cr.setActionId(StageUtils.getActionId(requestId, stageId));
-    cr.setTaskId(1);
-    cr.setRole(HBASE_MASTER);
-    cr.setStatus("COMPLETED");
-    cr.setStdErr("");
-    cr.setStdOut("");
-    cr.setExitCode(215);
-
-    cr.setConfigurationTags(new HashMap<String, Map<String,String>>() {{
-        put("global", new HashMap<String,String>() {{ put("tag", "version1"); }});
-      }});
-
-
-    reports.add(cr);
-    am.processTaskResponse(DummyHostname1, reports, stage.getOrderedHostRoleCommands());
-    assertEquals(215,
-            am.getAction(requestId, stageId).getExitCode(DummyHostname1, HBASE_MASTER));
-    assertEquals(HostRoleStatus.COMPLETED, am.getAction(requestId, stageId)
-            .getHostRoleStatus(DummyHostname1, HBASE_MASTER));
-    Stage s = db.getAllStages(requestId).get(0);
-    assertEquals(HostRoleStatus.COMPLETED,
-            s.getHostRoleStatus(DummyHostname1, HBASE_MASTER));
-    assertEquals(215,
-            s.getExitCode(DummyHostname1, HBASE_MASTER));
-  }
-
-  private void populateActionDB(ActionDBAccessor db, String DummyHostname1) throws AmbariException {
-    Stage s = stageFactory.createNew(requestId, "/a/b", DummyCluster, 1L, "heartbeat handler test",
-      "clusterHostInfo", "commandParamsStage", "hostParamsStage");
-    s.setStageId(stageId);
-    String filename = null;
-    s.addHostRoleExecutionCommand(DummyHostname1, Role.HBASE_MASTER,
-        RoleCommand.START,
-        new ServiceComponentHostStartEvent(Role.HBASE_MASTER.toString(),
-            DummyHostname1, System.currentTimeMillis()), DummyCluster, HBASE, false, false);
-    List<Stage> stages = new ArrayList<Stage>();
-    stages.add(s);
-    Request request = new Request(stages, clusters);
-    db.persistActions(request);
-  }
-
-  @Test
-  public void testRegistration() throws AmbariException,
-      InvalidStateTransitionException {
-    ActionManager am = getMockActionManager();
-    replay(am);
-    Clusters fsm = clusters;
-    HeartBeatHandler handler = new HeartBeatHandler(fsm, new ActionQueue(), am,
-        injector);
+  public void testRegistration() throws AmbariException,
+      InvalidStateTransitionException {
+    ActionManager am = heartbeatTestHelper.getMockActionManager();
+    replay(am);
+    Clusters fsm = clusters;
+    HeartBeatHandler handler = new HeartBeatHandler(fsm, new ActionQueue(), am,
+        injector);
     clusters.addHost(DummyHostname1);
     Host hostObject = clusters.getHost(DummyHostname1);
     hostObject.setIPv4("ipv4");
@@ -853,12 +375,12 @@ public class TestHeartbeatHandler {
   @Test
   public void testRegistrationRecoveryConfig() throws AmbariException,
       InvalidStateTransitionException {
-    ActionManager am = getMockActionManager();
+    ActionManager am = heartbeatTestHelper.getMockActionManager();
     replay(am);
     Clusters fsm = clusters;
     HeartBeatHandler handler = new HeartBeatHandler(fsm, new ActionQueue(), am,
                                                     injector);
-    Cluster cluster = getDummyCluster();
+    Cluster cluster = heartbeatTestHelper.getDummyCluster();
     Service hdfs = cluster.addService(HDFS);
     hdfs.persist();
     hdfs.addServiceComponent(DATANODE).persist();
@@ -912,12 +434,12 @@ public class TestHeartbeatHandler {
   @Test
   public void testRegistrationRecoveryConfigMaintenanceMode()
           throws AmbariException, InvalidStateTransitionException {
-    ActionManager am = getMockActionManager();
+    ActionManager am = heartbeatTestHelper.getMockActionManager();
     replay(am);
     Clusters fsm = clusters;
     HeartBeatHandler handler = new HeartBeatHandler(fsm, new ActionQueue(), am,
             injector);
-    Cluster cluster = getDummyCluster();
+    Cluster cluster = heartbeatTestHelper.getDummyCluster();
     Service hdfs = cluster.addService(HDFS);
     hdfs.persist();
     hdfs.addServiceComponent(DATANODE).persist();
@@ -963,7 +485,7 @@ public class TestHeartbeatHandler {
   @Test
   public void testRegistrationAgentConfig() throws AmbariException,
       InvalidStateTransitionException {
-    ActionManager am = getMockActionManager();
+    ActionManager am = heartbeatTestHelper.getMockActionManager();
     replay(am);
     Clusters fsm = clusters;
     HeartBeatHandler handler = new HeartBeatHandler(fsm, new ActionQueue(), am,
@@ -996,7 +518,7 @@ public class TestHeartbeatHandler {
   public void testRegistrationWithBadVersion() throws AmbariException,
       InvalidStateTransitionException {
 
-    ActionManager am = getMockActionManager();
+    ActionManager am = heartbeatTestHelper.getMockActionManager();
     replay(am);
     Clusters fsm = clusters;
     HeartBeatHandler handler = new HeartBeatHandler(fsm, new ActionQueue(), am,
@@ -1037,7 +559,7 @@ public class TestHeartbeatHandler {
 
   @Test
   public void testRegistrationPublicHostname() throws AmbariException, InvalidStateTransitionException {
-    ActionManager am = getMockActionManager();
+    ActionManager am = heartbeatTestHelper.getMockActionManager();
     replay(am);
     Clusters fsm = clusters;
     HeartBeatHandler handler = new HeartBeatHandler(fsm, new ActionQueue(), am,
@@ -1070,7 +592,7 @@ public class TestHeartbeatHandler {
   @Test
   public void testInvalidOSRegistration() throws AmbariException,
       InvalidStateTransitionException {
-    ActionManager am = getMockActionManager();
+    ActionManager am = heartbeatTestHelper.getMockActionManager();
     replay(am);
     Clusters fsm = clusters;
     HeartBeatHandler handler = new HeartBeatHandler(fsm, new ActionQueue(), am,
@@ -1099,7 +621,7 @@ public class TestHeartbeatHandler {
   public void testIncompatibleAgentRegistration() throws AmbariException,
           InvalidStateTransitionException {
 
-    ActionManager am = getMockActionManager();
+    ActionManager am = heartbeatTestHelper.getMockActionManager();
     replay(am);
     Clusters fsm = clusters;
     HeartBeatHandler handler = new HeartBeatHandler(fsm, new ActionQueue(), am,
@@ -1127,7 +649,7 @@ public class TestHeartbeatHandler {
   @Test
   public void testRegisterNewNode()
       throws AmbariException, InvalidStateTransitionException {
-    ActionManager am = getMockActionManager();
+    ActionManager am = heartbeatTestHelper.getMockActionManager();
     replay(am);
     Clusters fsm = clusters;
     fsm.addHost(DummyHostname1);
@@ -1220,7 +742,7 @@ public class TestHeartbeatHandler {
     HeartbeatMonitor hm = mock(HeartbeatMonitor.class);
     when(hm.generateStatusCommands(anyString())).thenReturn(dummyCmds);
 
-    ActionManager am = getMockActionManager();
+    ActionManager am = heartbeatTestHelper.getMockActionManager();
     replay(am);
     Clusters fsm = clusters;
     ActionQueue actionQueue = new ActionQueue();
@@ -1248,7 +770,7 @@ public class TestHeartbeatHandler {
   @Test
   @SuppressWarnings("unchecked")
   public void testTaskInProgressHandling() throws AmbariException, InvalidStateTransitionException {
-    Cluster cluster = getDummyCluster();
+    Cluster cluster = heartbeatTestHelper.getDummyCluster();
     Service hdfs = cluster.addService(HDFS);
     hdfs.persist();
     hdfs.addServiceComponent(DATANODE).persist();
@@ -1289,15 +811,16 @@ public class TestHeartbeatHandler {
     final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
             Role.DATANODE, null, RoleCommand.INSTALL);
 
-    ActionManager am = getMockActionManager();
+    ActionManager am = heartbeatTestHelper.getMockActionManager();
     expect(am.getTasks(anyObject(List.class))).andReturn(
             new ArrayList<HostRoleCommand>() {{
               add(command);
             }});
     replay(am);
 
-    HeartBeatHandler handler = getHeartBeatHandler(am, aq);
+    HeartBeatHandler handler = heartbeatTestHelper.getHeartBeatHandler(am, aq);
     handler.handleHeartBeat(hb);
+    handler.getHeartbeatProcessor().processHeartbeat(hb);
     State componentState1 = serviceComponentHost1.getState();
     assertEquals("Host state should still be installing", State.INSTALLING, componentState1);
   }
@@ -1305,7 +828,7 @@ public class TestHeartbeatHandler {
   @Test
   @SuppressWarnings("unchecked")
   public void testOPFailedEventForAbortedTask() throws AmbariException, InvalidStateTransitionException {
-    Cluster cluster = getDummyCluster();
+    Cluster cluster = heartbeatTestHelper.getDummyCluster();
     Service hdfs = cluster.addService(HDFS);
     hdfs.persist();
     hdfs.addServiceComponent(DATANODE).persist();
@@ -1342,494 +865,45 @@ public class TestHeartbeatHandler {
 
     List<CommandReport> reports = new ArrayList<CommandReport>();
     CommandReport cr = new CommandReport();
-    cr.setActionId(StageUtils.getActionId(1, 1));
-    cr.setTaskId(1);
-    cr.setClusterName(DummyCluster);
-    cr.setServiceName(HDFS);
-    cr.setRole(DATANODE);
-    cr.setRoleCommand("INSTALL");
-    cr.setStatus("FAILED");
-    cr.setStdErr("none");
-    cr.setStdOut("dummy output");
-    cr.setExitCode(777);
-    reports.add(cr);
-    hb.setReports(reports);
-    hb.setComponentStatus(new ArrayList<ComponentStatus>());
-
-    final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
-            Role.DATANODE, null, null);
-
-    ActionManager am = getMockActionManager();
-    expect(am.getTasks(anyObject(List.class))).andReturn(
-            new ArrayList<HostRoleCommand>() {{
-              add(command);
-            }});
-    replay(am);
-
-    HeartBeatHandler handler = getHeartBeatHandler(am, aq);
-    handler.handleHeartBeat(hb);
-    State componentState1 = serviceComponentHost1.getState();
-    assertEquals("Host state should still be installing", State.INSTALLING,
-      componentState1);
-  }
-
-  /**
-   * Tests the fact that when START and STOP commands are in progress, and heartbeat
-   * forces the host component state to STARTED or INSTALLED, there are no undesired
-   * side effects.
-   * @throws AmbariException
-   * @throws InvalidStateTransitionException
-   */
-  @Test
-  @SuppressWarnings("unchecked")
-  public void testCommandReportOnHeartbeatUpdatedState()
-      throws AmbariException, InvalidStateTransitionException {
-    Cluster cluster = getDummyCluster();
-    Service hdfs = cluster.addService(HDFS);
-    hdfs.persist();
-    hdfs.addServiceComponent(DATANODE).persist();
-    hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
-
-    ActionQueue aq = new ActionQueue();
-
-    ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
-        getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
-    serviceComponentHost1.setState(State.INSTALLED);
-
-    HeartBeat hb = new HeartBeat();
-    hb.setTimestamp(System.currentTimeMillis());
-    hb.setResponseId(0);
-    hb.setHostname(DummyHostname1);
-    hb.setNodeStatus(new HostStatus(Status.HEALTHY, DummyHostStatus));
-
-    List<CommandReport> reports = new ArrayList<CommandReport>();
-    CommandReport cr = new CommandReport();
-    cr.setActionId(StageUtils.getActionId(requestId, stageId));
-    cr.setTaskId(1);
-    cr.setClusterName(DummyCluster);
-    cr.setServiceName(HDFS);
-    cr.setRole(DATANODE);
-    cr.setStatus(HostRoleStatus.IN_PROGRESS.toString());
-    cr.setStdErr("none");
-    cr.setStdOut("dummy output");
-    cr.setExitCode(777);
-    cr.setRoleCommand("START");
-    reports.add(cr);
-    hb.setReports(reports);
-    hb.setComponentStatus(new ArrayList<ComponentStatus>());
-
-    final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
-            Role.DATANODE, null, null);
-
-    ActionManager am = getMockActionManager();
-    expect(am.getTasks(anyObject(List.class))).andReturn(
-            new ArrayList<HostRoleCommand>() {{
-              add(command);
-            }}).anyTimes();
-    replay(am);
-
-    HeartBeatHandler handler = getHeartBeatHandler(am, aq);
-    handler.handleHeartBeat(hb);
-    assertEquals("Host state should  be " + State.INSTALLED,
-        State.INSTALLED, serviceComponentHost1.getState());
-
-    hb.setTimestamp(System.currentTimeMillis());
-    hb.setResponseId(1);
-    cr.setStatus(HostRoleStatus.COMPLETED.toString());
-    cr.setExitCode(0);
-
-    handler.handleHeartBeat(hb);
-    assertEquals("Host state should be " + State.STARTED,
-        State.STARTED, serviceComponentHost1.getState());
-
-    hb.setTimestamp(System.currentTimeMillis());
-    hb.setResponseId(2);
-    cr.setStatus(HostRoleStatus.IN_PROGRESS.toString());
-    cr.setRoleCommand("STOP");
-    cr.setExitCode(777);
-
-    handler.handleHeartBeat(hb);
-    assertEquals("Host state should be " + State.STARTED,
-        State.STARTED, serviceComponentHost1.getState());
-
-    hb.setTimestamp(System.currentTimeMillis());
-    hb.setResponseId(3);
-    cr.setStatus(HostRoleStatus.COMPLETED.toString());
-    cr.setExitCode(0);
-
-    handler.handleHeartBeat(hb);
-    assertEquals("Host state should be " + State.INSTALLED,
-        State.INSTALLED, serviceComponentHost1.getState());
-
-    // validate the transitions when there is no heartbeat
-    serviceComponentHost1.setState(State.STARTING);
-    cr.setStatus(HostRoleStatus.IN_PROGRESS.toString());
-    cr.setExitCode(777);
-    cr.setRoleCommand("START");
-    hb.setResponseId(4);
-
-    handler.handleHeartBeat(hb);
-    assertEquals("Host state should be " + State.STARTING,
-        State.STARTING, serviceComponentHost1.getState());
-
-    cr.setStatus(HostRoleStatus.COMPLETED.toString());
-    cr.setExitCode(0);
-    hb.setResponseId(5);
-
-    handler.handleHeartBeat(hb);
-    assertEquals("Host state should be " + State.STARTED,
-        State.STARTED, serviceComponentHost1.getState());
-
-    serviceComponentHost1.setState(State.STOPPING);
-    cr.setStatus(HostRoleStatus.IN_PROGRESS.toString());
-    cr.setExitCode(777);
-    cr.setRoleCommand("STOP");
-    hb.setResponseId(6);
-
-    handler.handleHeartBeat(hb);
-    assertEquals("Host state should be " + State.STOPPING,
-        State.STOPPING, serviceComponentHost1.getState());
-
-    cr.setStatus(HostRoleStatus.COMPLETED.toString());
-    cr.setExitCode(0);
-    hb.setResponseId(7);
-
-    handler.handleHeartBeat(hb);
-    assertEquals("Host state should be " + State.INSTALLED,
-        State.INSTALLED, serviceComponentHost1.getState());
-  }
-
-  @Test
-  @SuppressWarnings("unchecked")
-  public void testUpgradeSpecificHandling() throws AmbariException, InvalidStateTransitionException {
-    Cluster cluster = getDummyCluster();
-    Service hdfs = cluster.addService(HDFS);
-    hdfs.persist();
-    hdfs.addServiceComponent(DATANODE).persist();
-    hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
-
-    ActionQueue aq = new ActionQueue();
-
-    ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
-        getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
-    serviceComponentHost1.setState(State.UPGRADING);
-
-    HeartBeat hb = new HeartBeat();
-    hb.setTimestamp(System.currentTimeMillis());
-    hb.setResponseId(0);
-    hb.setHostname(DummyHostname1);
-    hb.setNodeStatus(new HostStatus(Status.HEALTHY, DummyHostStatus));
-
-    List<CommandReport> reports = new ArrayList<CommandReport>();
-    CommandReport cr = new CommandReport();
-    cr.setActionId(StageUtils.getActionId(requestId, stageId));
-    cr.setTaskId(1);
-    cr.setClusterName(DummyCluster);
-    cr.setServiceName(HDFS);
-    cr.setRole(DATANODE);
-    cr.setRoleCommand("INSTALL");
-    cr.setStatus(HostRoleStatus.IN_PROGRESS.toString());
-    cr.setStdErr("none");
-    cr.setStdOut("dummy output");
-    cr.setExitCode(777);
-    reports.add(cr);
-    hb.setReports(reports);
-    hb.setComponentStatus(new ArrayList<ComponentStatus>());
-
-    final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
-            Role.DATANODE, null, null);
-
-    ActionManager am = getMockActionManager();
-    expect(am.getTasks(anyObject(List.class))).andReturn(
-            new ArrayList<HostRoleCommand>() {{
-              add(command);
-            }}).anyTimes();
-    replay(am);
-
-    HeartBeatHandler handler = getHeartBeatHandler(am, aq);
-    handler.handleHeartBeat(hb);
-    assertEquals("Host state should  be " + State.UPGRADING,
-        State.UPGRADING, serviceComponentHost1.getState());
-
-    hb.setTimestamp(System.currentTimeMillis());
-    hb.setResponseId(1);
-    cr.setStatus(HostRoleStatus.COMPLETED.toString());
-    cr.setExitCode(0);
-
-    handler.handleHeartBeat(hb);
-    assertEquals("Host state should be " + State.INSTALLED,
-        State.INSTALLED, serviceComponentHost1.getState());
-
-    serviceComponentHost1.setState(State.UPGRADING);
-    hb.setTimestamp(System.currentTimeMillis());
-    hb.setResponseId(2);
-    cr.setStatus(HostRoleStatus.FAILED.toString());
-    cr.setExitCode(3);
-
-    handler.handleHeartBeat(hb);
-    assertEquals("Host state should be " + State.UPGRADING,
-        State.UPGRADING, serviceComponentHost1.getState());
-
-    serviceComponentHost1.setState(State.UPGRADING);
-    hb.setTimestamp(System.currentTimeMillis());
-    hb.setResponseId(3);
-    cr.setStatus(HostRoleStatus.PENDING.toString());
-    cr.setExitCode(55);
-
-    handler.handleHeartBeat(hb);
-    assertEquals("Host state should be " + State.UPGRADING,
-        State.UPGRADING, serviceComponentHost1.getState());
-
-    serviceComponentHost1.setState(State.UPGRADING);
-    hb.setTimestamp(System.currentTimeMillis());
-    hb.setResponseId(4);
-    cr.setStatus(HostRoleStatus.QUEUED.toString());
-    cr.setExitCode(55);
-
-    handler.handleHeartBeat(hb);
-    assertEquals("Host state should be " + State.UPGRADING,
-        State.UPGRADING, serviceComponentHost1.getState());
-  }
-
-  @Test
-  @SuppressWarnings("unchecked")
-  public void testStatusHeartbeatWithVersion() throws Exception {
-    Cluster cluster = getDummyCluster();
-    Service hdfs = cluster.addService(HDFS);
-    hdfs.persist();
-    hdfs.addServiceComponent(DATANODE).persist();
-    hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
-    hdfs.addServiceComponent(NAMENODE).persist();
-    hdfs.getServiceComponent(NAMENODE).addServiceComponentHost(DummyHostname1).persist();
-    hdfs.addServiceComponent(HDFS_CLIENT).persist();
-    hdfs.getServiceComponent(HDFS_CLIENT).addServiceComponentHost(DummyHostname1).persist();
-
-    ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
-        getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
-    ServiceComponentHost serviceComponentHost2 = clusters.getCluster(DummyCluster).getService(HDFS).
-        getServiceComponent(NAMENODE).getServiceComponentHost(DummyHostname1);
-    ServiceComponentHost serviceComponentHost3 = clusters.getCluster(DummyCluster).getService(HDFS).
-        getServiceComponent(HDFS_CLIENT).getServiceComponentHost(DummyHostname1);
-
-    StackId stack130 = new StackId("HDP-1.3.0");
-    StackId stack120 = new StackId("HDP-1.2.0");
-
-    serviceComponentHost1.setState(State.INSTALLED);
-    serviceComponentHost2.setState(State.STARTED);
-    serviceComponentHost3.setState(State.STARTED);
-    serviceComponentHost1.setStackVersion(stack130);
-    serviceComponentHost2.setStackVersion(stack120);
-    serviceComponentHost3.setStackVersion(stack120);
-
-    HeartBeat hb = new HeartBeat();
-    hb.setTimestamp(System.currentTimeMillis());
-    hb.setResponseId(0);
-    hb.setHostname(DummyHostname1);
-    hb.setNodeStatus(new HostStatus(Status.HEALTHY, DummyHostStatus));
-    hb.setReports(new ArrayList<CommandReport>());
-    hb.setAgentEnv(new AgentEnv());
-    hb.setMounts(new ArrayList<DiskInfo>());
-
-    ArrayList<ComponentStatus> componentStatuses = new ArrayList<ComponentStatus>();
-    ComponentStatus componentStatus1 = createComponentStatus(DummyCluster, HDFS, DummyHostStatus, State.STARTED,
-        SecurityState.UNSECURED, DATANODE, "{\"stackName\":\"HDP\",\"stackVersion\":\"1.3.0\"}");
-    ComponentStatus componentStatus2 =
-        createComponentStatus(DummyCluster, HDFS, DummyHostStatus, State.STARTED, SecurityState.UNSECURED, NAMENODE, "");
-    ComponentStatus componentStatus3 = createComponentStatus(DummyCluster, HDFS, DummyHostStatus, State.INSTALLED,
-        SecurityState.UNSECURED, HDFS_CLIENT, "{\"stackName\":\"HDP\",\"stackVersion\":\"1.3.0\"}");
-
-    componentStatuses.add(componentStatus1);
-    componentStatuses.add(componentStatus2);
-    componentStatuses.add(componentStatus3);
-    hb.setComponentStatus(componentStatuses);
-
-    ActionQueue aq = new ActionQueue();
-    ActionManager am = getMockActionManager();
-    expect(am.getTasks(anyObject(List.class))).andReturn(
-            new ArrayList<HostRoleCommand>() {{
-            }});
-    replay(am);
-    HeartBeatHandler handler = getHeartBeatHandler(am, aq);
-    handler.handleHeartBeat(hb);
-    assertEquals("Matching value " + serviceComponentHost1.getStackVersion(),
-        stack130, serviceComponentHost1.getStackVersion());
-    assertEquals("Matching value " + serviceComponentHost2.getStackVersion(),
-        stack120, serviceComponentHost2.getStackVersion());
-    assertEquals("Matching value " + serviceComponentHost3.getStackVersion(),
-        stack130, serviceComponentHost3.getStackVersion());
-    assertTrue(hb.getAgentEnv().getHostHealth().getServerTimeStampAtReporting() >= hb.getTimestamp());
-  }
-
-  @Test
-  @SuppressWarnings("unchecked")
-  public void testComponentUpgradeCompleteReport() throws AmbariException, InvalidStateTransitionException {
-    Cluster cluster = getDummyCluster();
-    Service hdfs = cluster.addService(HDFS);
-    hdfs.persist();
-    hdfs.addServiceComponent(DATANODE).persist();
-    hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
-    hdfs.addServiceComponent(NAMENODE).persist();
-    hdfs.getServiceComponent(NAMENODE).addServiceComponentHost(DummyHostname1).persist();
-    hdfs.addServiceComponent(HDFS_CLIENT).persist();
-    hdfs.getServiceComponent(HDFS_CLIENT).addServiceComponentHost(DummyHostname1).persist();
-
-    ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
-            getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
-    ServiceComponentHost serviceComponentHost2 = clusters.getCluster(DummyCluster).getService(HDFS).
-            getServiceComponent(NAMENODE).getServiceComponentHost(DummyHostname1);
-
-    StackId stack130 = new StackId("HDP-1.3.0");
-    StackId stack120 = new StackId("HDP-1.2.0");
-
-    serviceComponentHost1.setState(State.UPGRADING);
-    serviceComponentHost2.setState(State.INSTALLING);
-
-    serviceComponentHost1.setStackVersion(stack120);
-    serviceComponentHost1.setDesiredStackVersion(stack130);
-    serviceComponentHost2.setStackVersion(stack120);
-
-    HeartBeat hb = new HeartBeat();
-    hb.setTimestamp(System.currentTimeMillis());
-    hb.setResponseId(0);
-    hb.setHostname(DummyHostname1);
-    hb.setNodeStatus(new HostStatus(Status.HEALTHY, DummyHostStatus));
-    CommandReport cr1 = new CommandReport();
-    cr1.setActionId(StageUtils.getActionId(requestId, stageId));
-    cr1.setTaskId(1);
-    cr1.setClusterName(DummyCluster);
-    cr1.setServiceName(HDFS);
-    cr1.setRole(DATANODE);
-    cr1.setStatus(HostRoleStatus.COMPLETED.toString());
-    cr1.setStdErr("none");
-    cr1.setStdOut("dummy output");
-    cr1.setExitCode(0);
-    cr1.setRoleCommand(RoleCommand.UPGRADE.toString());
-
-    CommandReport cr2 = new CommandReport();
-    cr2.setActionId(StageUtils.getActionId(requestId, stageId));
-    cr2.setTaskId(2);
-    cr2.setClusterName(DummyCluster);
-    cr2.setServiceName(HDFS);
-    cr2.setRole(NAMENODE);
-    cr2.setStatus(HostRoleStatus.COMPLETED.toString());
-    cr2.setStdErr("none");
-    cr2.setStdOut("dummy output");
-    cr2.setExitCode(0);
-    cr2.setRoleCommand(RoleCommand.UPGRADE.toString());
-    ArrayList<CommandReport> reports = new ArrayList<CommandReport>();
-    reports.add(cr1);
-    reports.add(cr2);
-    hb.setReports(reports);
-
-    ActionQueue aq = new ActionQueue();
-    final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
-            Role.DATANODE, null, null);
-
-    ActionManager am = getMockActionManager();
-    expect(am.getTasks(anyObject(List.class))).andReturn(
-            new ArrayList<HostRoleCommand>() {{
-              add(command);
-              add(command);
-            }});
-    replay(am);
-
-    HeartBeatHandler handler = getHeartBeatHandler(am, aq);
-    handler.handleHeartBeat(hb);
-    assertEquals("Stack version for SCH should be updated to " +
-            serviceComponentHost1.getDesiredStackVersion(),
-            stack130, serviceComponentHost1.getStackVersion());
-    assertEquals("Stack version for SCH should not change ",
-            stack120, serviceComponentHost2.getStackVersion());
-  }
-
-  @Test
-  @SuppressWarnings("unchecked")
-  public void testComponentUpgradeInProgressReport() throws AmbariException, InvalidStateTransitionException {
-    Cluster cluster = getDummyCluster();
-    Service hdfs = cluster.addService(HDFS);
-    hdfs.persist();
-    hdfs.addServiceComponent(DATANODE).persist();
-    hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
-    hdfs.addServiceComponent(NAMENODE).persist();
-    hdfs.getServiceComponent(NAMENODE).addServiceComponentHost(DummyHostname1).persist();
-    hdfs.addServiceComponent(HDFS_CLIENT).persist();
-    hdfs.getServiceComponent(HDFS_CLIENT).addServiceComponentHost(DummyHostname1).persist();
-
-    ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
-            getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
-    ServiceComponentHost serviceComponentHost2 = clusters.getCluster(DummyCluster).getService(HDFS).
-            getServiceComponent(NAMENODE).getServiceComponentHost(DummyHostname1);
-
-    StackId stack130 = new StackId("HDP-1.3.0");
-    StackId stack120 = new StackId("HDP-1.2.0");
-
-    serviceComponentHost1.setState(State.UPGRADING);
-    serviceComponentHost2.setState(State.INSTALLING);
-
-    serviceComponentHost1.setStackVersion(stack120);
-    serviceComponentHost1.setDesiredStackVersion(stack130);
-    serviceComponentHost2.setStackVersion(stack120);
-
-    HeartBeat hb = new HeartBeat();
-    hb.setTimestamp(System.currentTimeMillis());
-    hb.setResponseId(0);
-    hb.setHostname(DummyHostname1);
-    hb.setNodeStatus(new HostStatus(Status.HEALTHY, DummyHostStatus));
-    CommandReport cr1 = new CommandReport();
-    cr1.setActionId(StageUtils.getActionId(requestId, stageId));
-    cr1.setTaskId(1);
-    cr1.setClusterName(DummyCluster);
-    cr1.setServiceName(HDFS);
-    cr1.setRole(DATANODE);
-    cr1.setRoleCommand("INSTALL");
-    cr1.setStatus(HostRoleStatus.IN_PROGRESS.toString());
-    cr1.setStdErr("none");
-    cr1.setStdOut("dummy output");
-    cr1.setExitCode(777);
-
-    CommandReport cr2 = new CommandReport();
-    cr2.setActionId(StageUtils.getActionId(requestId, stageId));
-    cr2.setTaskId(2);
-    cr2.setClusterName(DummyCluster);
-    cr2.setServiceName(HDFS);
-    cr2.setRole(NAMENODE);
-    cr2.setRoleCommand("INSTALL");
-    cr2.setStatus(HostRoleStatus.IN_PROGRESS.toString());
-    cr2.setStdErr("none");
-    cr2.setStdOut("dummy output");
-    cr2.setExitCode(777);
-    ArrayList<CommandReport> reports = new ArrayList<CommandReport>();
-    reports.add(cr1);
-    reports.add(cr2);
+    cr.setActionId(StageUtils.getActionId(1, 1));
+    cr.setTaskId(1);
+    cr.setClusterName(DummyCluster);
+    cr.setServiceName(HDFS);
+    cr.setRole(DATANODE);
+    cr.setRoleCommand("INSTALL");
+    cr.setStatus("FAILED");
+    cr.setStdErr("none");
+    cr.setStdOut("dummy output");
+    cr.setExitCode(777);
+    reports.add(cr);
     hb.setReports(reports);
+    hb.setComponentStatus(new ArrayList<ComponentStatus>());
 
-    ActionQueue aq = new ActionQueue();
     final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
             Role.DATANODE, null, null);
 
-    ActionManager am = getMockActionManager();
+    ActionManager am = heartbeatTestHelper.getMockActionManager();
     expect(am.getTasks(anyObject(List.class))).andReturn(
             new ArrayList<HostRoleCommand>() {{
               add(command);
-              add(command);
             }});
     replay(am);
 
-    HeartBeatHandler handler = getHeartBeatHandler(am, aq);
+    HeartBeatHandler handler = heartbeatTestHelper.getHeartBeatHandler(am, aq);
     handler.handleHeartBeat(hb);
-    assertEquals("State of SCH not change while operation is in progress",
-            State.UPGRADING, serviceComponentHost1.getState());
-    assertEquals("Stack version of SCH should not change after in progress report",
-            stack130, serviceComponentHost1.getDesiredStackVersion());
-    assertEquals("State of SCH not change while operation is  in progress",
-            State.INSTALLING, serviceComponentHost2.getState());
+    handler.getHeartbeatProcessor().processHeartbeat(hb);
+    State componentState1 = serviceComponentHost1.getState();
+    assertEquals("Host state should still be installing", State.INSTALLING,
+      componentState1);
   }
 
 
+
+
   @Test
   @SuppressWarnings("unchecked")
-  public void testComponentUpgradeFailReport() throws AmbariException, InvalidStateTransitionException {
-    Cluster cluster = getDummyCluster();
+  public void testStatusHeartbeatWithVersion() throws Exception {
+    Cluster cluster = heartbeatTestHelper.getDummyCluster();
     Service hdfs = cluster.addService(HDFS);
     hdfs.persist();
     hdfs.addServiceComponent(DATANODE).persist();
@@ -1840,117 +914,73 @@ public class TestHeartbeatHandler {
     hdfs.getServiceComponent(HDFS_CLIENT).addServiceComponentHost(DummyHostname1).persist();
 
     ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
-            getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
+        getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
     ServiceComponentHost serviceComponentHost2 = clusters.getCluster(DummyCluster).getService(HDFS).
-            getServiceComponent(NAMENODE).getServiceComponentHost(DummyHostname1);
+        getServiceComponent(NAMENODE).getServiceComponentHost(DummyHostname1);
+    ServiceComponentHost serviceComponentHost3 = clusters.getCluster(DummyCluster).getService(HDFS).
+        getServiceComponent(HDFS_CLIENT).getServiceComponentHost(DummyHostname1);
 
     StackId stack130 = new StackId("HDP-1.3.0");
     StackId stack120 = new StackId("HDP-1.2.0");
 
-    serviceComponentHost1.setState(State.UPGRADING);
-    serviceComponentHost2.setState(State.INSTALLING);
-
-    serviceComponentHost1.setStackVersion(stack120);
-    serviceComponentHost1.setDesiredStackVersion(stack130);
+    serviceComponentHost1.setState(State.INSTALLED);
+    serviceComponentHost2.setState(State.STARTED);
+    serviceComponentHost3.setState(State.STARTED);
+    serviceComponentHost1.setStackVersion(stack130);
     serviceComponentHost2.setStackVersion(stack120);
-
-    Stage s = stageFactory.createNew(requestId, "/a/b", "cluster1", 1L, "action manager test",
-        "clusterHostInfo", "commandParamsStage", "hostParamsStage");
-    s.setStageId(stageId);
-    s.addHostRoleExecutionCommand(DummyHostname1, Role.DATANODE, RoleCommand.UPGRADE,
-      new ServiceComponentHostUpgradeEvent(Role.DATANODE.toString(),
-        DummyHostname1, System.currentTimeMillis(), "HDP-1.3.0"),
-      DummyCluster, "HDFS", false, false);
-    s.addHostRoleExecutionCommand(DummyHostname1, Role.NAMENODE, RoleCommand.INSTALL,
-      new ServiceComponentHostInstallEvent(Role.NAMENODE.toString(),
-        DummyHostname1, System.currentTimeMillis(), "HDP-1.3.0"),
-          DummyCluster, "HDFS", false, false);
-    List<Stage> stages = new ArrayList<Stage>();
-    stages.add(s);
-    Request request = new Request(stages, clusters);
-    actionDBAccessor.persistActions(request);
-    CommandReport cr = new CommandReport();
-    cr.setActionId(StageUtils.getActionId(requestId, stageId));
-    cr.setTaskId(1);
-    cr.setClusterName(DummyCluster);
-    cr.setServiceName(HDFS);
-    cr.setRole(DATANODE);
-    cr.setStatus(HostRoleStatus.IN_PROGRESS.toString());
-    cr.setStdErr("none");
-    cr.setStdOut("dummy output");
-    actionDBAccessor.updateHostRoleState(DummyHostname1, requestId, stageId,
-      Role.DATANODE.name(), cr);
-    cr.setRole(NAMENODE);
-    cr.setTaskId(2);
-    actionDBAccessor.updateHostRoleState(DummyHostname1, requestId, stageId,
-      Role.NAMENODE.name(), cr);
+    serviceComponentHost3.setStackVersion(stack120);
 
     HeartBeat hb = new HeartBeat();
     hb.setTimestamp(System.currentTimeMillis());
     hb.setResponseId(0);
     hb.setHostname(DummyHostname1);
     hb.setNodeStatus(new HostStatus(Status.HEALTHY, DummyHostStatus));
-    CommandReport cr1 = new CommandReport();
-    cr1.setActionId(StageUtils.getActionId(requestId, stageId));
-    cr1.setTaskId(1);
-    cr1.setClusterName(DummyCluster);
-    cr1.setServiceName(HDFS);
-    cr1.setRole(DATANODE);
-    cr1.setRoleCommand("INSTALL");
-    cr1.setStatus(HostRoleStatus.FAILED.toString());
-    cr1.setStdErr("none");
-    cr1.setStdOut("dummy output");
-    cr1.setExitCode(0);
+    hb.setReports(new ArrayList<CommandReport>());
+    hb.setAgentEnv(new AgentEnv());
+    hb.setMounts(new ArrayList<DiskInfo>());
 
-    CommandReport cr2 = new CommandReport();
-    cr2.setActionId(StageUtils.getActionId(requestId, stageId));
-    cr2.setTaskId(2);
-    cr2.setClusterName(DummyCluster);
-    cr2.setServiceName(HDFS);
-    cr2.setRole(NAMENODE);
-    cr2.setRoleCommand("INSTALL");
-    cr2.setStatus(HostRoleStatus.FAILED.toString());
-    cr2.setStdErr("none");
-    cr2.setStdOut("dummy output");
-    cr2.setExitCode(0);
-    ArrayList<CommandReport> reports = new ArrayList<CommandReport>();
-    reports.add(cr1);
-    reports.add(cr2);
-    hb.setReports(reports);
+    ArrayList<ComponentStatus> componentStatuses = new ArrayList<ComponentStatus>();
+    ComponentStatus componentStatus1 = createComponentStatus(DummyCluster, HDFS, DummyHostStatus, State.STARTED,
+        SecurityState.UNSECURED, DATANODE, "{\"stackName\":\"HDP\",\"stackVersion\":\"1.3.0\"}");
+    ComponentStatus componentStatus2 =
+        createComponentStatus(DummyCluster, HDFS, DummyHostStatus, State.STARTED, SecurityState.UNSECURED, NAMENODE, "");
+    ComponentStatus componentStatus3 = createComponentStatus(DummyCluster, HDFS, DummyHostStatus, State.INSTALLED,
+        SecurityState.UNSECURED, HDFS_CLIENT, "{\"stackName\":\"HDP\",\"stackVersion\":\"1.3.0\"}");
 
-    ActionQueue aq = new ActionQueue();
-    final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
-            Role.DATANODE, null, null);
+    componentStatuses.add(componentStatus1);
+    componentStatuses.add(componentStatus2);
+    componentStatuses.add(componentStatus3);
+    hb.setComponentStatus(componentStatuses);
 
-    ActionManager am = getMockActionManager();
+    ActionQueue aq = new ActionQueue();
+    ActionManager am = heartbeatTestHelper.getMockActionManager();
     expect(am.getTasks(anyObject(List.class))).andReturn(
             new ArrayList<HostRoleCommand>() {{
-              add(command);
-              add(command);
             }});
     replay(am);
 
-    HeartBeatHandler handler = getHeartBeatHandler(am, aq);
+    HeartBeatHandler handler = heartbeatTestHelper.getHeartBeatHandler(am, aq);
+    HeartbeatProcessor heartbeatProcessor = handler.getHeartbeatProcessor();
     handler.handleHeartBeat(hb);
-    assertEquals("State of SCH should change after fail report",
-        State.UPGRADING, serviceComponentHost1.getState());
-    assertEquals("State of SCH should change after fail report",
-            State.INSTALL_FAILED, serviceComponentHost2.getState());
-    assertEquals("Stack version of SCH should not change after fail report",
-            stack120, serviceComponentHost1.getStackVersion());
-    assertEquals("Stack version of SCH should not change after fail report",
-            stack130, serviceComponentHost1.getDesiredStackVersion());
-    assertEquals("Stack version of SCH should not change after fail report",
-            State.INSTALL_FAILED, serviceComponentHost2.getState());
+    heartbeatProcessor.processHeartbeat(hb);
+
+    assertEquals("Matching value " + serviceComponentHost1.getStackVersion(),
+        stack130, serviceComponentHost1.getStackVersion());
+    assertEquals("Matching value " + serviceComponentHost2.getStackVersion(),
+        stack120, serviceComponentHost2.getStackVersion());
+    assertEquals("Matching value " + serviceComponentHost3.getStackVersion(),
+        stack130, serviceComponentHost3.getStackVersion());
+    assertTrue(hb.getAgentEnv().getHostHealth().getServerTimeStampAtReporting() >= hb.getTimestamp());
   }
 
 
+
   @Test
   @SuppressWarnings("unchecked")
   public void testRecoveryStatusReports() throws Exception {
     Clusters fsm = clusters;
 
-    Cluster cluster = getDummyCluster();
+    Cluster cluster = heartbeatTestHelper.getDummyCluster();
     Host hostObject = clusters.getHost(DummyHostname1);
     Service hdfs = cluster.addService(HDFS);
     hdfs.persist();
@@ -1964,7 +994,7 @@ public class TestHeartbeatHandler {
     ActionQueue aq = new ActionQueue();
 
     final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1, Role.DATANODE, null, null);
-    ActionManager am = getMockActionManager();
+    ActionManager am = heartbeatTestHelper.getMockActionManager();
     expect(am.getTasks(anyObject(List.class))).andReturn(
         new ArrayList<HostRoleCommand>() {{
           add(command);
@@ -2032,7 +1062,7 @@ public class TestHeartbeatHandler {
   public void testProcessStatusReports() throws Exception {
     Clusters fsm = clusters;
 
-    Cluster cluster = getDummyCluster();
+    Cluster cluster = heartbeatTestHelper.getDummyCluster();
     Host hostObject = clusters.getHost(DummyHostname1);
     Service hdfs = cluster.addService(HDFS);
     hdfs.persist();
@@ -2047,7 +1077,7 @@ public class TestHeartbeatHandler {
 
     final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
             Role.DATANODE, null, null);
-    ActionManager am = getMockActionManager();
+    ActionManager am = heartbeatTestHelper.getMockActionManager();
     expect(am.getTasks(anyObject(List.class))).andReturn(
             new ArrayList<HostRoleCommand>() {{
               add(command);
@@ -2055,6 +1085,7 @@ public class TestHeartbeatHandler {
             }}).anyTimes();
     replay(am);
     HeartBeatHandler handler = new HeartBeatHandler(fsm, aq, am, injector);
+    HeartbeatProcessor heartbeatProcessor = handler.getHeartbeatProcessor();
 
     Register reg = new Register();
     HostInfo hi = new HostInfo();
@@ -2092,6 +1123,7 @@ public class TestHeartbeatHandler {
     componentStatus.add(nameNodeStatus);
     hb1.setComponentStatus(componentStatus);
     handler.handleHeartBeat(hb1);
+    heartbeatProcessor.processHeartbeat(hb1);
     assertEquals(HostHealthStatus.HealthStatus.HEALTHY.name(), hostObject.getStatus());
 
     //Some slaves are down, masters are up
@@ -2116,6 +1148,7 @@ public class TestHeartbeatHandler {
     componentStatus.add(nameNodeStatus);
     hb2.setComponentStatus(componentStatus);
     handler.handleHeartBeat(hb2);
+    heartbeatProcessor.processHeartbeat(hb2);
     assertEquals(HostHealthStatus.HealthStatus.ALERT.name(), hostObject.getStatus());
 
     // mark the installed DN as maintenance
@@ -2142,6 +1175,7 @@ public class TestHeartbeatHandler {
     componentStatus.add(nameNodeStatus);
     hb2a.setComponentStatus(componentStatus);
     handler.handleHeartBeat(hb2a);
+    heartbeatProcessor.processHeartbeat(hb2a);
     assertEquals(HostHealthStatus.HealthStatus.HEALTHY.name(), hostObject.getStatus());
 
     hdfs.getServiceComponent(DATANODE).getServiceComponentHost(
@@ -2169,11 +1203,13 @@ public class TestHeartbeatHandler {
     componentStatus.add(nameNodeStatus);
     hb3.setComponentStatus(componentStatus);
     handler.handleHeartBeat(hb3);
+    heartbeatProcessor.processHeartbeat(hb3);
     assertEquals(HostHealthStatus.HealthStatus.UNHEALTHY.name(), hostObject.getStatus());
 
     //All are up
     hb1.setResponseId(4);
     handler.handleHeartBeat(hb1);
+    heartbeatProcessor.processHeartbeat(hb1);
     assertEquals(HostHealthStatus.HealthStatus.HEALTHY.name(), hostObject.getStatus());
 
     reset(am);
@@ -2199,10 +1235,12 @@ public class TestHeartbeatHandler {
     componentStatus.add(dataNodeStatus);
     hb4.setComponentStatus(componentStatus);
     handler.handleHeartBeat(hb4);
+    heartbeatProcessor.processHeartbeat(hb4);
     assertEquals(HostHealthStatus.HealthStatus.UNHEALTHY.name(), hostObject.getStatus());
 
     hb1.setResponseId(6);
     handler.handleHeartBeat(hb1);
+    heartbeatProcessor.processHeartbeat(hb1);
     assertEquals(HostHealthStatus.HealthStatus.HEALTHY.name(), hostObject.getStatus());
 
     //Some command reports
@@ -2225,6 +1263,7 @@ public class TestHeartbeatHandler {
     reports.add(cr1);
     hb5.setReports(reports);
     handler.handleHeartBeat(hb5);
+    heartbeatProcessor.processHeartbeat(hb5);
     assertEquals(HostHealthStatus.HealthStatus.ALERT.name(), hostObject.getStatus());
   }
 
@@ -2270,7 +1309,7 @@ public class TestHeartbeatHandler {
     final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
             Role.DATANODE, null, null);
 
-    ActionManager am = getMockActionManager();
+    ActionManager am = heartbeatTestHelper.getMockActionManager();
     expect(am.getTasks(anyObject(List.class))).andReturn(
             new ArrayList<HostRoleCommand>() {{
               add(command);
@@ -2278,12 +1317,13 @@ public class TestHeartbeatHandler {
             }});
     replay(am);
 
-    HeartBeatHandler handler = getHeartBeatHandler(am, aq);
+    HeartBeatHandler handler = heartbeatTestHelper.getHeartBeatHandler(am, aq);
 
     // CUSTOM_COMMAND and ACTIONEXECUTE reports are ignored
     // they should not change the host component state
     try {
       handler.handleHeartBeat(hb);
+      handler.getHeartbeatProcessor().processHeartbeat(hb);
     } catch (Exception e) {
       fail();
     }
@@ -2321,8 +1361,8 @@ public class TestHeartbeatHandler {
     expected.setStackVersion(dummyStackId.getStackVersion());
     expected.setComponents(dummyComponents);
 
-    getDummyCluster();
-    HeartBeatHandler handler = getHeartBeatHandler(getMockActionManager(),
+    heartbeatTestHelper.getDummyCluster();
+    HeartBeatHandler handler = heartbeatTestHelper.getHeartBeatHandler(heartbeatTestHelper.getMockActionManager(),
         new ActionQueue());
 
     ComponentsResponse actual = handler.handleComponents(DummyCluster);
@@ -2337,19 +1377,7 @@ public class TestHeartbeatHandler {
     assertEquals(expected.getComponents(), actual.getComponents());
   }
 
-  private ActionManager getMockActionManager() {
-    ActionQueue actionQueueMock = createNiceMock(ActionQueue.class);
-    Clusters clustersMock = createNiceMock(Clusters.class);
-    Configuration configurationMock = createNiceMock(Configuration.class);
-
-    ActionManager actionManager = createMockBuilder(ActionManager.class).
-            addMockedMethod("getTasks").
-            withConstructor((long)0, (long)0, actionQueueMock, clustersMock,
-                    actionDBAccessor, new HostsMap((String) null), unitOfWork,
-                    injector.getInstance(RequestFactory.class), configurationMock, createNiceMock(AmbariEventPublisher.class)).
-            createMock();
-    return actionManager;
-  }
+
 
 
   private ComponentStatus createComponentStatus(String clusterName, String serviceName, String message,
@@ -2366,164 +1394,11 @@ public class TestHeartbeatHandler {
     return componentStatus1;
   }
 
-  private HeartBeatHandler getHeartBeatHandler(ActionManager am, ActionQueue aq)
-      throws InvalidStateTransitionException, AmbariException {
-    HeartBeatHandler handler = new HeartBeatHandler(clusters, aq, am, injector);
-    Register reg = new Register();
-    HostInfo hi = new HostInfo();
-    hi.setHostName(DummyHostname1);
-    hi.setOS(DummyOs);
-    hi.setOSRelease(DummyOSRelease);
-    reg.setHostname(DummyHostname1);
-    reg.setResponseId(0);
-    reg.setHardwareProfile(hi);
-    reg.setAgentVersion(metaInfo.getServerVersion());
-    handler.handleRegistration(reg);
-    return handler;
-  }
-
-  private Cluster getDummyCluster()
-      throws AmbariException {
-    StackEntity stackEntity = stackDAO.find(HDP_22_STACK.getStackName(), HDP_22_STACK.getStackVersion());
-    org.junit.Assert.assertNotNull(stackEntity);
-
-    // Create the cluster
-    ResourceTypeEntity resourceTypeEntity = new ResourceTypeEntity();
-    resourceTypeEntity.setId(ResourceType.CLUSTER.getId());
-    resourceTypeEntity.setName(ResourceType.CLUSTER.name());
-    resourceTypeEntity = resourceTypeDAO.merge(resourceTypeEntity);
-
-    ResourceEntity resourceEntity = new ResourceEntity();
-    resourceEntity.setResourceType(resourceTypeEntity);
-
-    ClusterEntity clusterEntity = new ClusterEntity();
-    clusterEntity.setClusterName(DummyCluster);
-    clusterEntity.setClusterInfo("test_cluster_info1");
-    clusterEntity.setResource(resourceEntity);
-    clusterEntity.setDesiredStack(stackEntity);
-
-    clusterDAO.create(clusterEntity);
-
-    StackId stackId = new StackId(DummyStackId);
-
-    Cluster cluster = clusters.getCluster(DummyCluster);
-
-    cluster.setDesiredStackVersion(stackId);
-    cluster.setCurrentStackVersion(stackId);
-    helper.getOrCreateRepositoryVersion(stackId, stackId.getStackVersion());
-    cluster.createClusterVersion(stackId, stackId.getStackVersion(), "admin",
-        RepositoryVersionState.UPGRADING);
-
-    Set<String> hostNames = new HashSet<String>(){{
-      add(DummyHostname1);
-    }};
-
-    Map<String, String> hostAttributes = new HashMap<String, String>();
-    hostAttributes.put("os_family", "redhat");
-    hostAttributes.put("os_release_version", "6.3");
-
-    List<HostEntity> hostEntities = new ArrayList<HostEntity>();
-    for(String hostName : hostNames) {
-      clusters.addHost(hostName);
-      Host host = clusters.getHost(hostName);
-      host.setHostAttributes(hostAttributes);
-      host.persist();
-
-      HostEntity hostEntity = hostDAO.findByName(hostName);
-      Assert.assertNotNull(hostEntity);
-      hostEntities.add(hostEntity);
-    }
-    clusterEntity.setHostEntities(hostEntities);
-    clusters.mapHostsToCluster(hostNames, DummyCluster);
-
-    return cluster;
-  }
-
-  @Test
-  @SuppressWarnings("unchecked")
-  public void testCommandStatusProcesses() throws Exception {
-    Cluster cluster = getDummyCluster();
-    Service hdfs = cluster.addService(HDFS);
-    hdfs.persist();
-    hdfs.addServiceComponent(DATANODE).persist();
-    hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
-    hdfs.getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1).setState(State.STARTED);
-
-    ActionQueue aq = new ActionQueue();
-
-    HeartBeat hb = new HeartBeat();
-    hb.setTimestamp(System.currentTimeMillis());
-    hb.setResponseId(0);
-    hb.setHostname(DummyHostname1);
-    hb.setNodeStatus(new HostStatus(Status.HEALTHY, DummyHostStatus));
-    hb.setReports(new ArrayList<CommandReport>());
-
-    List<Map<String, String>> procs = new ArrayList<Map<String, String>>();
-    Map<String, String> proc1info = new HashMap<String, String>();
-    proc1info.put("name", "a");
-    proc1info.put("status", "RUNNING");
-    procs.add(proc1info);
-
-    Map<String, String> proc2info = new HashMap<String, String>();
-    proc2info.put("name", "b");
-    proc2info.put("status", "NOT_RUNNING");
-    procs.add(proc2info);
-
-    Map<String, Object> extra = new HashMap<String, Object>();
-    extra.put("processes", procs);
-
-    ArrayList<ComponentStatus> componentStatuses = new ArrayList<ComponentStatus>();
-    ComponentStatus componentStatus1 = new ComponentStatus();
-    componentStatus1.setClusterName(DummyCluster);
-    componentStatus1.setServiceName(HDFS);
-    componentStatus1.setMessage(DummyHostStatus);
-    componentStatus1.setStatus(State.STARTED.name());
-    componentStatus1.setSecurityState(SecurityState.UNSECURED.name());
-    componentStatus1.setComponentName(DATANODE);
-
-    componentStatus1.setExtra(extra);
-    componentStatuses.add(componentStatus1);
-    hb.setComponentStatus(componentStatuses);
-
-    final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
-            Role.DATANODE, null, null);
-
-    ActionManager am = getMockActionManager();
-    expect(am.getTasks(anyObject(List.class))).andReturn(
-            new ArrayList<HostRoleCommand>() {{
-              add(command);
-            }}).anyTimes();
-    replay(am);
-
-    HeartBeatHandler handler = getHeartBeatHandler(am, aq);
-    handler.handleHeartBeat(hb);
-    ServiceComponentHost sch = hdfs.getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
-
-    Assert.assertEquals(Integer.valueOf(2), Integer.valueOf(sch.getProcesses().size()));
-
-    hb = new HeartBeat();
-    hb.setTimestamp(System.currentTimeMillis());
-    hb.setResponseId(1);
-    hb.setHostname(DummyHostname1);
-    hb.setNodeStatus(new HostStatus(Status.HEALTHY, DummyHostStatus));
-    hb.setReports(new ArrayList<CommandReport>());
-
-    componentStatus1 = new ComponentStatus();
-    componentStatus1.setClusterName(DummyCluster);
-    componentStatus1.setServiceName(HDFS);
-    componentStatus1.setMessage(DummyHostStatus);
-    componentStatus1.setStatus(State.STARTED.name());
-    componentStatus1.setSecurityState(SecurityState.UNSECURED.name());
-    componentStatus1.setComponentName(DATANODE);
-    hb.setComponentStatus(Collections.singletonList(componentStatus1));
-
-    handler.handleHeartBeat(hb);
-  }
 
   @Test
   @SuppressWarnings("unchecked")
   public void testCommandStatusProcesses_empty() throws Exception {
-    Cluster cluster = getDummyCluster();
+    Cluster cluster = heartbeatTestHelper.getDummyCluster();
     Service hdfs = cluster.addService(HDFS);
     hdfs.persist();
     hdfs.addServiceComponent(DATANODE).persist();
@@ -2553,137 +1428,19 @@ public class TestHeartbeatHandler {
     final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
             Role.DATANODE, null, null);
 
-    ActionManager am = getMockActionManager();
+    ActionManager am = heartbeatTestHelper.getMockActionManager();
     expect(am.getTasks(anyObject(List.class))).andReturn(
             new ArrayList<HostRoleCommand>() {{
               add(command);
             }});
     replay(am);
 
-    HeartBeatHandler handler = getHeartBeatHandler(am, aq);
+    HeartBeatHandler handler = heartbeatTestHelper.getHeartBeatHandler(am, aq);
     ServiceComponentHost sch = hdfs.getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
 
     Assert.assertEquals(Integer.valueOf(0), Integer.valueOf(sch.getProcesses().size()));
   }
 
-  /**
-   * Tests that if there is an invalid cluster in heartbeat data, the heartbeat
-   * doesn't fail.
-   *
-   * @throws Exception
-   */
-  @Test
-  @SuppressWarnings("unchecked")
-  public void testHeartBeatWithAlertAndInvalidCluster() throws Exception {
-    ActionManager am = getMockActionManager();
-
-    expect(am.getTasks(anyObject(List.class))).andReturn(
-        new ArrayList<HostRoleCommand>());
-
-    replay(am);
-
-    Cluster cluster = getDummyCluster();
-    Clusters fsm = clusters;
-    Host hostObject = clusters.getHost(DummyHostname1);
-    hostObject.setIPv4("ipv4");
-    hostObject.setIPv6("ipv6");
-    hostObject.setOsType(DummyOsType);
-
-    ActionQueue aq = new ActionQueue();
-
-    HeartBeatHandler handler = new HeartBeatHandler(fsm, aq, am, injector);
-    Register reg = new Register();
-    HostInfo hi = new HostInfo();
-    hi.setHostName(DummyHostname1);
-    hi.setOS(DummyOs);
-    hi.setOSRelease(DummyOSRelease);
-    reg.setHostname(DummyHostname1);
-    reg.setHardwareProfile(hi);
-    reg.setAgentVersion(metaInfo.getServerVersion());
-    handler.handleRegistration(reg);
-
-    hostObject.setState(HostState.UNHEALTHY);
-
-    ExecutionCommand execCmd = new ExecutionCommand();
-    execCmd.setRequestAndStage(2, 34);
-    execCmd.setHostname(DummyHostname1);
-    aq.enqueue(DummyHostname1, new ExecutionCommand());
-
-    HeartBeat hb = new HeartBeat();
-    HostStatus hs = new HostStatus(Status.HEALTHY, DummyHostStatus);
-
-    hb.setResponseId(0);
-    hb.setNodeStatus(hs);
-    hb.setHostname(DummyHostname1);
-
-    Alert alert = new Alert("foo", "bar", "baz", "foobar", "foobarbaz",
-        AlertState.OK);
-
-    alert.setCluster("BADCLUSTER");
-
-    List<Alert> alerts = Collections.singletonList(alert);
-    hb.setAlerts(alerts);
-
-    // should NOT throw AmbariException from alerts.
-    handler.handleHeartBeat(hb);
-  }
-
-  @Test
-  public void testInstallPackagesWithVersion() throws Exception {
-    // required since this test method checks the DAO result of handling a
-    // heartbeat which performs some async tasks
-    EventBusSynchronizer.synchronizeAmbariEventPublisher(injector);
-
-    final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
-        Role.DATANODE, null, null);
-
-    ActionManager am = getMockActionManager();
-    expect(am.getTasks(anyObject(List.class))).andReturn(
-        Collections.singletonList(command)).anyTimes();
-    replay(am);
-
-    Cluster cluster = getDummyCluster();
-    HeartBeatHandler handler = getHeartBeatHandler(am, new ActionQueue());
-    HeartBeat hb = new HeartBeat();
-
-    JsonObject json = new JsonObject();
-    json.addProperty("actual_version", "2.2.1.0-2222");
-    json.addProperty("package_installation_result", "SUCCESS");
-    json.addProperty("installed_repository_version", "0.1");
-    json.addProperty("stack_id", cluster.getDesiredStackVersion().getStackId());
-
-
-    CommandReport cmdReport = new CommandReport();
-    cmdReport.setActionId(StageUtils.getActionId(requestId, stageId));
-    cmdReport.setTaskId(1);
-    cmdReport.setCustomCommand("install_packages");
-    cmdReport.setStructuredOut(json.toString());
-    cmdReport.setRoleCommand(RoleCommand.ACTIONEXECUTE.name());
-    cmdReport.setStatus(HostRoleStatus.COMPLETED.name());
-    cmdReport.setRole("install_packages");
-    cmdReport.setClusterName(DummyCluster);
-
-    hb.setReports(Collections.singletonList(cmdReport));
-    hb.setTimestamp(0L);
-    hb.setResponseId(0);
-    hb.setNodeStatus(new HostStatus(Status.HEALTHY, DummyHostStatus));
-    hb.setHostname(DummyHostname1);
-    hb.setComponentStatus(new ArrayList<ComponentStatus>());
-
-    StackId stackId = new StackId("HDP", "0.1");
-
-    RepositoryVersionDAO dao = injector.getInstance(RepositoryVersionDAO.class);
-    RepositoryVersionEntity entity = dao.findByStackAndVersion(stackId, "0.1");
-    Assert.assertNotNull(entity);
-
-    handler.handleHeartBeat(hb);
-
-    entity = dao.findByStackAndVersion(stackId, "0.1");
-    Assert.assertNull(entity);
-
-    entity = dao.findByStackAndVersion(stackId, "2.2.1.0-2222");
-    Assert.assertNotNull(entity);
-  }
 
   @Test
   public void testInjectKeytabApplicableHost() throws Exception {
@@ -2758,14 +1515,14 @@ public class TestHeartbeatHandler {
     final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
         Role.DATANODE, null, null);
 
-    ActionManager am = getMockActionManager();
+    ActionManager am = heartbeatTestHelper.getMockActionManager();
     expect(am.getTasks(anyObject(List.class))).andReturn(
         new ArrayList<HostRoleCommand>() {{
           add(command);
         }});
     replay(am);
 
-    getHeartBeatHandler(am, aq).injectKeytab(executionCommand, "SET_KEYTAB", targetHost);
+    heartbeatTestHelper.getHeartBeatHandler(am, aq).injectKeytab(executionCommand, "SET_KEYTAB", targetHost);
 
     return executionCommand.getKerberosCommandParams();
   }
@@ -2789,14 +1546,14 @@ public class TestHeartbeatHandler {
     final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
         Role.DATANODE, null, null);
 
-    ActionManager am = getMockActionManager();
+    ActionManager am = heartbeatTestHelper.getMockActionManager();
     expect(am.getTasks(anyObject(List.class))).andReturn(
         new ArrayList<HostRoleCommand>() {{
           add(command);
         }});
     replay(am);
 
-    getHeartBeatHandler(am, aq).injectKeytab(executionCommand, "REMOVE_KEYTAB", targetHost);
+    heartbeatTestHelper.getHeartBeatHandler(am, aq).injectKeytab(executionCommand, "REMOVE_KEYTAB", targetHost);
 
     return executionCommand.getKerberosCommandParams();
   }