You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by nc...@apache.org on 2016/02/23 22:21:54 UTC

[09/11] ambari git commit: AMBARI-15141. Start all services request aborts in the middle and hosts go into heartbeat-lost state. (mpapirkovskyy)

http://git-wip-us.apache.org/repos/asf/ambari/blob/083ac6da/ambari-server/src/test/java/org/apache/ambari/server/agent/HeartbeatProcessorTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/agent/HeartbeatProcessorTest.java b/ambari-server/src/test/java/org/apache/ambari/server/agent/HeartbeatProcessorTest.java
new file mode 100644
index 0000000..eb99142
--- /dev/null
+++ b/ambari-server/src/test/java/org/apache/ambari/server/agent/HeartbeatProcessorTest.java
@@ -0,0 +1,1290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.agent;
+
+import com.google.gson.JsonObject;
+import com.google.inject.Guice;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import com.google.inject.persist.PersistService;
+import com.google.inject.persist.UnitOfWork;
+import junit.framework.Assert;
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.Role;
+import org.apache.ambari.server.RoleCommand;
+import org.apache.ambari.server.actionmanager.ActionDBAccessor;
+import org.apache.ambari.server.actionmanager.ActionDBAccessorImpl;
+import org.apache.ambari.server.actionmanager.ActionManager;
+import org.apache.ambari.server.actionmanager.HostRoleCommand;
+import org.apache.ambari.server.actionmanager.HostRoleCommandFactory;
+import org.apache.ambari.server.actionmanager.HostRoleStatus;
+import org.apache.ambari.server.actionmanager.Request;
+import org.apache.ambari.server.actionmanager.RequestFactory;
+import org.apache.ambari.server.actionmanager.Stage;
+import org.apache.ambari.server.actionmanager.StageFactory;
+import org.apache.ambari.server.api.services.AmbariMetaInfo;
+import org.apache.ambari.server.configuration.Configuration;
+import org.apache.ambari.server.controller.HostsMap;
+import org.apache.ambari.server.orm.GuiceJpaInitializer;
+import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
+import org.apache.ambari.server.orm.dao.HostDAO;
+import org.apache.ambari.server.orm.dao.RepositoryVersionDAO;
+import org.apache.ambari.server.orm.entities.HostEntity;
+import org.apache.ambari.server.orm.entities.RepositoryVersionEntity;
+import org.apache.ambari.server.state.Alert;
+import org.apache.ambari.server.state.AlertState;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.Host;
+import org.apache.ambari.server.state.HostState;
+import org.apache.ambari.server.state.SecurityState;
+import org.apache.ambari.server.state.Service;
+import org.apache.ambari.server.state.ServiceComponentHost;
+import org.apache.ambari.server.state.StackId;
+import org.apache.ambari.server.state.State;
+import org.apache.ambari.server.state.fsm.InvalidStateTransitionException;
+import org.apache.ambari.server.state.svccomphost.ServiceComponentHostInstallEvent;
+import org.apache.ambari.server.state.svccomphost.ServiceComponentHostUpgradeEvent;
+import org.apache.ambari.server.utils.EventBusSynchronizer;
+import org.apache.ambari.server.utils.StageUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.ambari.server.agent.DummyHeartbeatConstants.DATANODE;
+import static org.apache.ambari.server.agent.DummyHeartbeatConstants.DummyCluster;
+import static org.apache.ambari.server.agent.DummyHeartbeatConstants.DummyHostStatus;
+import static org.apache.ambari.server.agent.DummyHeartbeatConstants.DummyHostname1;
+import static org.apache.ambari.server.agent.DummyHeartbeatConstants.DummyOSRelease;
+import static org.apache.ambari.server.agent.DummyHeartbeatConstants.DummyOs;
+import static org.apache.ambari.server.agent.DummyHeartbeatConstants.DummyOsType;
+import static org.apache.ambari.server.agent.DummyHeartbeatConstants.DummyStackId;
+import static org.apache.ambari.server.agent.DummyHeartbeatConstants.HBASE_MASTER;
+import static org.apache.ambari.server.agent.DummyHeartbeatConstants.HDFS;
+import static org.apache.ambari.server.agent.DummyHeartbeatConstants.HDFS_CLIENT;
+import static org.apache.ambari.server.agent.DummyHeartbeatConstants.NAMENODE;
+import static org.apache.ambari.server.agent.DummyHeartbeatConstants.SECONDARY_NAMENODE;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class HeartbeatProcessorTest {
+
+  private static final Logger log = LoggerFactory.getLogger(TestHeartbeatHandler.class);
+  private Injector injector;
+  private Clusters clusters;
+  long requestId = 23;
+  long stageId = 31;
+  private UnitOfWork unitOfWork;
+
+  @Inject
+  Configuration config;
+
+  @Inject
+  ActionDBAccessor actionDBAccessor;
+
+  @Inject
+  HeartbeatTestHelper heartbeatTestHelper;
+
+  @Inject
+  private HostRoleCommandFactory hostRoleCommandFactory;
+
+  @Inject
+  private HostDAO hostDAO;
+
+  @Inject
+  private StageFactory stageFactory;
+
+  @Inject
+  private AmbariMetaInfo metaInfo;
+
+  private final static StackId HDP_22_STACK = new StackId("HDP", "2.2.0");
+
+  @Before
+  public void setup() throws Exception {
+    InMemoryDefaultTestModule module = HeartbeatTestHelper.getTestModule();
+    injector = Guice.createInjector(module);
+    injector.getInstance(GuiceJpaInitializer.class);
+    clusters = injector.getInstance(Clusters.class);
+    injector.injectMembers(this);
+    unitOfWork = injector.getInstance(UnitOfWork.class);
+  }
+
+  @After
+  public void teardown() throws AmbariException {
+    injector.getInstance(PersistService.class).stop();
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testHeartbeatWithConfigs() throws Exception {
+    Cluster cluster = heartbeatTestHelper.getDummyCluster();
+    Service hdfs = cluster.addService(HDFS);
+    hdfs.persist();
+    hdfs.addServiceComponent(DATANODE).persist();
+    hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
+    hdfs.addServiceComponent(NAMENODE).persist();
+    hdfs.getServiceComponent(NAMENODE).addServiceComponentHost(DummyHostname1).persist();
+    hdfs.addServiceComponent(SECONDARY_NAMENODE).persist();
+    hdfs.getServiceComponent(SECONDARY_NAMENODE).addServiceComponentHost(DummyHostname1).persist();
+
+    ActionQueue aq = new ActionQueue();
+
+    ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
+        getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
+    ServiceComponentHost serviceComponentHost2 = clusters.getCluster(DummyCluster).getService(HDFS).
+        getServiceComponent(NAMENODE).getServiceComponentHost(DummyHostname1);
+    serviceComponentHost1.setState(State.INSTALLED);
+    serviceComponentHost2.setState(State.INSTALLED);
+
+    HeartBeat hb = new HeartBeat();
+    hb.setResponseId(0);
+    hb.setNodeStatus(new HostStatus(HostStatus.Status.HEALTHY, DummyHostStatus));
+    hb.setHostname(DummyHostname1);
+
+    List<CommandReport> reports = new ArrayList<CommandReport>();
+    CommandReport cr = new CommandReport();
+    cr.setActionId(StageUtils.getActionId(requestId, stageId));
+    cr.setServiceName(HDFS);
+    cr.setTaskId(1);
+    cr.setRole(DATANODE);
+    cr.setStatus("COMPLETED");
+    cr.setStdErr("");
+    cr.setStdOut("");
+    cr.setExitCode(215);
+    cr.setRoleCommand("START");
+    cr.setClusterName(DummyCluster);
+
+    cr.setConfigurationTags(new HashMap<String, Map<String, String>>() {{
+      put("global", new HashMap<String, String>() {{
+        put("tag", "version1");
+      }});
+    }});
+
+    reports.add(cr);
+    hb.setReports(reports);
+
+    HostEntity host1 = hostDAO.findByName(DummyHostname1);
+    Assert.assertNotNull(host1);
+    final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
+        Role.DATANODE, null, null);
+
+    ActionManager am = heartbeatTestHelper.getMockActionManager();
+    expect(am.getTasks(anyObject(List.class))).andReturn(
+        new ArrayList<HostRoleCommand>() {{
+          add(command);
+        }});
+    replay(am);
+
+    HeartBeatHandler handler = heartbeatTestHelper.getHeartBeatHandler(am, aq);
+    HeartbeatProcessor heartbeatProcessor = handler.getHeartbeatProcessor();
+    heartbeatProcessor.processHeartbeat(hb);
+
+    // the heartbeat test passed if actual configs is populated
+    Assert.assertNotNull(serviceComponentHost1.getActualConfigs());
+    Assert.assertEquals(serviceComponentHost1.getActualConfigs().size(), 1);
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testRestartRequiredAfterInstallClient() throws Exception {
+    Cluster cluster = heartbeatTestHelper.getDummyCluster();
+    Service hdfs = cluster.addService(HDFS);
+    hdfs.persist();
+    hdfs.addServiceComponent(HDFS_CLIENT).persist();
+    hdfs.getServiceComponent(HDFS_CLIENT).addServiceComponentHost(DummyHostname1).persist();
+
+    ActionQueue aq = new ActionQueue();
+
+    ServiceComponentHost serviceComponentHost = clusters.getCluster(DummyCluster).getService(HDFS).
+        getServiceComponent(HDFS_CLIENT).getServiceComponentHost(DummyHostname1);
+
+    serviceComponentHost.setState(State.INSTALLED);
+    serviceComponentHost.setRestartRequired(true);
+
+    HeartBeat hb = new HeartBeat();
+    hb.setResponseId(0);
+    hb.setNodeStatus(new HostStatus(HostStatus.Status.HEALTHY, DummyHostStatus));
+    hb.setHostname(DummyHostname1);
+
+
+    List<CommandReport> reports = new ArrayList<CommandReport>();
+    CommandReport cr = new CommandReport();
+    cr.setActionId(StageUtils.getActionId(requestId, stageId));
+    cr.setServiceName(HDFS);
+    cr.setRoleCommand("INSTALL");
+    cr.setCustomCommand("EXECUTION_COMMAND");
+    cr.setTaskId(1);
+    cr.setRole(HDFS_CLIENT);
+    cr.setStatus("COMPLETED");
+    cr.setStdErr("");
+    cr.setStdOut("");
+    cr.setExitCode(215);
+    cr.setClusterName(DummyCluster);
+    cr.setConfigurationTags(new HashMap<String, Map<String, String>>() {{
+      put("global", new HashMap<String, String>() {{
+        put("tag", "version1");
+      }});
+    }});
+    reports.add(cr);
+    hb.setReports(reports);
+
+    final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
+        Role.DATANODE, null, null);
+
+    ActionManager am = heartbeatTestHelper.getMockActionManager();
+    expect(am.getTasks(anyObject(List.class))).andReturn(
+        new ArrayList<HostRoleCommand>() {{
+          add(command);
+          add(command);
+        }});
+    replay(am);
+
+    HeartBeatHandler handler = heartbeatTestHelper.getHeartBeatHandler(am, aq);
+    HeartbeatProcessor heartbeatProcessor = handler.getHeartbeatProcessor();
+    heartbeatProcessor.processHeartbeat(hb);
+
+    Assert.assertNotNull(serviceComponentHost.getActualConfigs());
+    Assert.assertFalse(serviceComponentHost.isRestartRequired());
+    Assert.assertEquals(serviceComponentHost.getActualConfigs().size(), 1);
+
+  }
+
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testHeartbeatCustomCommandWithConfigs() throws Exception {
+    Cluster cluster = heartbeatTestHelper.getDummyCluster();
+    Service hdfs = cluster.addService(HDFS);
+    hdfs.persist();
+    hdfs.addServiceComponent(DATANODE).persist();
+    hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
+    hdfs.addServiceComponent(NAMENODE).persist();
+    hdfs.getServiceComponent(NAMENODE).addServiceComponentHost(DummyHostname1).persist();
+    hdfs.addServiceComponent(SECONDARY_NAMENODE).persist();
+    hdfs.getServiceComponent(SECONDARY_NAMENODE).addServiceComponentHost(DummyHostname1).persist();
+
+    ActionQueue aq = new ActionQueue();
+
+    ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
+        getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
+    ServiceComponentHost serviceComponentHost2 = clusters.getCluster(DummyCluster).getService(HDFS).
+        getServiceComponent(NAMENODE).getServiceComponentHost(DummyHostname1);
+    serviceComponentHost1.setState(State.INSTALLED);
+    serviceComponentHost2.setState(State.INSTALLED);
+
+    HeartBeat hb = new HeartBeat();
+    hb.setResponseId(0);
+    hb.setNodeStatus(new HostStatus(HostStatus.Status.HEALTHY, DummyHostStatus));
+    hb.setHostname(DummyHostname1);
+
+    List<CommandReport> reports = new ArrayList<CommandReport>();
+    CommandReport cr = new CommandReport();
+    cr.setActionId(StageUtils.getActionId(requestId, stageId));
+    cr.setServiceName(HDFS);
+    cr.setRoleCommand("CUSTOM_COMMAND");
+    cr.setCustomCommand("RESTART");
+    cr.setTaskId(1);
+    cr.setRole(DATANODE);
+    cr.setStatus("COMPLETED");
+    cr.setStdErr("");
+    cr.setStdOut("");
+    cr.setExitCode(215);
+    cr.setClusterName(DummyCluster);
+    cr.setConfigurationTags(new HashMap<String, Map<String,String>>() {{
+      put("global", new HashMap<String,String>() {{ put("tag", "version1"); }});
+    }});
+    CommandReport crn = new CommandReport();
+    crn.setActionId(StageUtils.getActionId(requestId, stageId));
+    crn.setServiceName(HDFS);
+    crn.setRoleCommand("CUSTOM_COMMAND");
+    crn.setCustomCommand("START");
+    crn.setTaskId(1);
+    crn.setRole(NAMENODE);
+    crn.setStatus("COMPLETED");
+    crn.setStdErr("");
+    crn.setStdOut("");
+    crn.setExitCode(215);
+    crn.setClusterName(DummyCluster);
+    crn.setConfigurationTags(new HashMap<String, Map<String,String>>() {{
+      put("global", new HashMap<String,String>() {{ put("tag", "version1"); }});
+    }});
+
+    reports.add(cr);
+    reports.add(crn);
+    hb.setReports(reports);
+
+    final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
+        Role.DATANODE, null, null);
+
+    ActionManager am = heartbeatTestHelper.getMockActionManager();
+    expect(am.getTasks(anyObject(List.class))).andReturn(
+        new ArrayList<HostRoleCommand>() {{
+          add(command);
+          add(command);
+        }});
+    replay(am);
+
+    HeartBeatHandler handler = heartbeatTestHelper.getHeartBeatHandler(am, aq);
+    HeartbeatProcessor heartbeatProcessor = handler.getHeartbeatProcessor();
+    heartbeatProcessor.processHeartbeat(hb);
+
+    // the heartbeat test passed if actual configs is populated
+    Assert.assertNotNull(serviceComponentHost1.getActualConfigs());
+    Assert.assertEquals(serviceComponentHost1.getActualConfigs().size(), 1);
+    Assert.assertNotNull(serviceComponentHost2.getActualConfigs());
+    Assert.assertEquals(serviceComponentHost2.getActualConfigs().size(), 1);
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testHeartbeatCustomStartStop() throws Exception {
+    Cluster cluster = heartbeatTestHelper.getDummyCluster();
+    Service hdfs = cluster.addService(HDFS);
+    hdfs.persist();
+    hdfs.addServiceComponent(DATANODE).persist();
+    hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
+    hdfs.addServiceComponent(NAMENODE).persist();
+    hdfs.getServiceComponent(NAMENODE).addServiceComponentHost(DummyHostname1).persist();
+    hdfs.addServiceComponent(SECONDARY_NAMENODE).persist();
+    hdfs.getServiceComponent(SECONDARY_NAMENODE).addServiceComponentHost(DummyHostname1).persist();
+
+    ActionQueue aq = new ActionQueue();
+
+    ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
+        getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
+    ServiceComponentHost serviceComponentHost2 = clusters.getCluster(DummyCluster).getService(HDFS).
+        getServiceComponent(NAMENODE).getServiceComponentHost(DummyHostname1);
+    serviceComponentHost1.setState(State.INSTALLED);
+    serviceComponentHost2.setState(State.STARTED);
+    serviceComponentHost1.setRestartRequired(true);
+    serviceComponentHost2.setRestartRequired(true);
+
+    HeartBeat hb = new HeartBeat();
+    hb.setResponseId(0);
+    hb.setNodeStatus(new HostStatus(HostStatus.Status.HEALTHY, DummyHostStatus));
+    hb.setHostname(DummyHostname1);
+
+    List<CommandReport> reports = new ArrayList<CommandReport>();
+    CommandReport cr = new CommandReport();
+    cr.setActionId(StageUtils.getActionId(requestId, stageId));
+    cr.setServiceName(HDFS);
+    cr.setRoleCommand("CUSTOM_COMMAND");
+    cr.setCustomCommand("START");
+    cr.setTaskId(1);
+    cr.setRole(DATANODE);
+    cr.setStatus("COMPLETED");
+    cr.setStdErr("");
+    cr.setStdOut("");
+    cr.setExitCode(215);
+    cr.setClusterName(DummyCluster);
+    CommandReport crn = new CommandReport();
+    crn.setActionId(StageUtils.getActionId(requestId, stageId));
+    crn.setServiceName(HDFS);
+    crn.setRoleCommand("CUSTOM_COMMAND");
+    crn.setCustomCommand("STOP");
+    crn.setTaskId(1);
+    crn.setRole(NAMENODE);
+    crn.setStatus("COMPLETED");
+    crn.setStdErr("");
+    crn.setStdOut("");
+    crn.setExitCode(215);
+    crn.setClusterName(DummyCluster);
+
+    reports.add(cr);
+    reports.add(crn);
+    hb.setReports(reports);
+
+    assertTrue(serviceComponentHost1.isRestartRequired());
+
+    final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
+        Role.DATANODE, null, null);
+
+    ActionManager am = heartbeatTestHelper.getMockActionManager();
+    expect(am.getTasks(anyObject(List.class))).andReturn(
+        new ArrayList<HostRoleCommand>() {{
+          add(command);
+          add(command);
+        }});
+    replay(am);
+
+    HeartBeatHandler handler = heartbeatTestHelper.getHeartBeatHandler(am, aq);
+    HeartbeatProcessor heartbeatProcessor = handler.getHeartbeatProcessor();
+    heartbeatProcessor.processHeartbeat(hb);
+
+    // the heartbeat test passed if actual configs is populated
+    State componentState1 = serviceComponentHost1.getState();
+    assertEquals(State.STARTED, componentState1);
+    assertFalse(serviceComponentHost1.isRestartRequired());
+    State componentState2 = serviceComponentHost2.getState();
+    assertEquals(State.INSTALLED, componentState2);
+    assertTrue(serviceComponentHost2.isRestartRequired());
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testStatusHeartbeat() throws Exception {
+    Cluster cluster = heartbeatTestHelper.getDummyCluster();
+    Service hdfs = cluster.addService(HDFS);
+    hdfs.persist();
+    hdfs.addServiceComponent(DATANODE).persist();
+    hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
+    hdfs.addServiceComponent(NAMENODE).persist();
+    hdfs.getServiceComponent(NAMENODE).addServiceComponentHost(DummyHostname1).persist();
+    hdfs.addServiceComponent(SECONDARY_NAMENODE).persist();
+    hdfs.getServiceComponent(SECONDARY_NAMENODE).addServiceComponentHost(DummyHostname1).persist();
+
+    ActionQueue aq = new ActionQueue();
+
+    ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
+        getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
+    ServiceComponentHost serviceComponentHost2 = clusters.getCluster(DummyCluster).getService(HDFS).
+        getServiceComponent(NAMENODE).getServiceComponentHost(DummyHostname1);
+    ServiceComponentHost serviceComponentHost3 = clusters.getCluster(DummyCluster).getService(HDFS).
+        getServiceComponent(SECONDARY_NAMENODE).getServiceComponentHost(DummyHostname1);
+    serviceComponentHost1.setState(State.INSTALLED);
+    serviceComponentHost1.setSecurityState(SecurityState.UNSECURED);
+    serviceComponentHost2.setState(State.INSTALLED);
+    serviceComponentHost2.setSecurityState(SecurityState.SECURING);
+    serviceComponentHost3.setState(State.STARTING);
+
+    HeartBeat hb = new HeartBeat();
+    hb.setTimestamp(System.currentTimeMillis());
+    hb.setResponseId(0);
+    hb.setHostname(DummyHostname1);
+    hb.setNodeStatus(new HostStatus(HostStatus.Status.HEALTHY, DummyHostStatus));
+    hb.setReports(new ArrayList<CommandReport>());
+    ArrayList<ComponentStatus> componentStatuses = new ArrayList<ComponentStatus>();
+    ComponentStatus componentStatus1 = new ComponentStatus();
+    componentStatus1.setClusterName(DummyCluster);
+    componentStatus1.setServiceName(HDFS);
+    componentStatus1.setMessage(DummyHostStatus);
+    componentStatus1.setStatus(State.STARTED.name());
+    componentStatus1.setSecurityState(SecurityState.SECURED_KERBEROS.name());
+    componentStatus1.setComponentName(DATANODE);
+    componentStatuses.add(componentStatus1);
+    ComponentStatus componentStatus2 = new ComponentStatus();
+    componentStatus2.setClusterName(DummyCluster);
+    componentStatus2.setServiceName(HDFS);
+    componentStatus2.setMessage(DummyHostStatus);
+    componentStatus2.setStatus(State.STARTED.name());
+    componentStatus2.setSecurityState(SecurityState.UNSECURED.name());
+    componentStatus2.setComponentName(SECONDARY_NAMENODE);
+    componentStatuses.add(componentStatus2);
+    hb.setComponentStatus(componentStatuses);
+
+    final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
+        Role.DATANODE, null, null);
+
+    ActionManager am = heartbeatTestHelper.getMockActionManager();
+    expect(am.getTasks(anyObject(List.class))).andReturn(
+        new ArrayList<HostRoleCommand>() {{
+          add(command);
+          add(command);
+        }});
+    replay(am);
+
+    HeartBeatHandler handler = heartbeatTestHelper.getHeartBeatHandler(am, aq);
+    HeartbeatProcessor heartbeatProcessor = handler.getHeartbeatProcessor();
+    heartbeatProcessor.processHeartbeat(hb);
+    State componentState1 = serviceComponentHost1.getState();
+    State componentState2 = serviceComponentHost2.getState();
+    State componentState3 = serviceComponentHost3.getState();
+    assertEquals(State.STARTED, componentState1);
+    assertEquals(SecurityState.SECURED_KERBEROS, serviceComponentHost1.getSecurityState());
+    assertEquals(State.INSTALLED, componentState2);
+    assertEquals(SecurityState.SECURING, serviceComponentHost2.getSecurityState());
+    assertEquals(State.STARTED, componentState3);
+    assertEquals(SecurityState.UNSECURED, serviceComponentHost3.getSecurityState());
+  }
+
+
+  @Test
+  public void testCommandReport() throws AmbariException {
+    injector.injectMembers(this);
+    clusters.addHost(DummyHostname1);
+    clusters.getHost(DummyHostname1).persist();
+
+    StackId dummyStackId = new StackId(DummyStackId);
+    clusters.addCluster(DummyCluster, dummyStackId);
+
+    ActionDBAccessor db = injector.getInstance(ActionDBAccessorImpl.class);
+    ActionManager am = new ActionManager(5000, 1200000, new ActionQueue(), clusters, db,
+        new HostsMap((String) null), unitOfWork, injector.getInstance(RequestFactory.class), null, null);
+    heartbeatTestHelper.populateActionDB(db, DummyHostname1, requestId, stageId);
+    Stage stage = db.getAllStages(requestId).get(0);
+    Assert.assertEquals(stageId, stage.getStageId());
+    stage.setHostRoleStatus(DummyHostname1, HBASE_MASTER, HostRoleStatus.QUEUED);
+    db.hostRoleScheduled(stage, DummyHostname1, HBASE_MASTER);
+    List<CommandReport> reports = new ArrayList<CommandReport>();
+    CommandReport cr = new CommandReport();
+    cr.setActionId(StageUtils.getActionId(requestId, stageId));
+    cr.setTaskId(1);
+    cr.setRole(HBASE_MASTER);
+    cr.setStatus("COMPLETED");
+    cr.setStdErr("");
+    cr.setStdOut("");
+    cr.setExitCode(215);
+
+    cr.setConfigurationTags(new HashMap<String, Map<String,String>>() {{
+      put("global", new HashMap<String,String>() {{ put("tag", "version1"); }});
+    }});
+
+
+    reports.add(cr);
+    am.processTaskResponse(DummyHostname1, reports, stage.getOrderedHostRoleCommands());
+    assertEquals(215,
+        am.getAction(requestId, stageId).getExitCode(DummyHostname1, HBASE_MASTER));
+    assertEquals(HostRoleStatus.COMPLETED, am.getAction(requestId, stageId)
+        .getHostRoleStatus(DummyHostname1, HBASE_MASTER));
+    Stage s = db.getAllStages(requestId).get(0);
+    assertEquals(HostRoleStatus.COMPLETED,
+        s.getHostRoleStatus(DummyHostname1, HBASE_MASTER));
+    assertEquals(215,
+        s.getExitCode(DummyHostname1, HBASE_MASTER));
+  }
+
+  /**
+   * Tests the fact that when START and STOP commands are in progress, and heartbeat
+   * forces the host component state to STARTED or INSTALLED, there are no undesired
+   * side effects.
+   * @throws AmbariException
+   * @throws InvalidStateTransitionException
+   */
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testCommandReportOnHeartbeatUpdatedState()
+      throws AmbariException, InvalidStateTransitionException {
+    Cluster cluster = heartbeatTestHelper.getDummyCluster();
+    Service hdfs = cluster.addService(HDFS);
+    hdfs.persist();
+    hdfs.addServiceComponent(DATANODE).persist();
+    hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
+
+    ActionQueue aq = new ActionQueue();
+
+    ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
+        getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
+    serviceComponentHost1.setState(State.INSTALLED);
+
+    HeartBeat hb = new HeartBeat();
+    hb.setTimestamp(System.currentTimeMillis());
+    hb.setResponseId(0);
+    hb.setHostname(DummyHostname1);
+    hb.setNodeStatus(new HostStatus(HostStatus.Status.HEALTHY, DummyHostStatus));
+
+    List<CommandReport> reports = new ArrayList<CommandReport>();
+    CommandReport cr = new CommandReport();
+    cr.setActionId(StageUtils.getActionId(requestId, stageId));
+    cr.setTaskId(1);
+    cr.setClusterName(DummyCluster);
+    cr.setServiceName(HDFS);
+    cr.setRole(DATANODE);
+    cr.setStatus(HostRoleStatus.IN_PROGRESS.toString());
+    cr.setStdErr("none");
+    cr.setStdOut("dummy output");
+    cr.setExitCode(777);
+    cr.setRoleCommand("START");
+    reports.add(cr);
+    hb.setReports(reports);
+    hb.setComponentStatus(new ArrayList<ComponentStatus>());
+
+    final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
+        Role.DATANODE, null, null);
+
+    ActionManager am = heartbeatTestHelper.getMockActionManager();
+    expect(am.getTasks(anyObject(List.class))).andReturn(
+        new ArrayList<HostRoleCommand>() {{
+          add(command);
+        }}).anyTimes();
+    replay(am);
+
+    HeartBeatHandler handler = heartbeatTestHelper.getHeartBeatHandler(am, aq);
+    HeartbeatProcessor heartbeatProcessor = handler.getHeartbeatProcessor();
+    heartbeatProcessor.processHeartbeat(hb);
+
+    assertEquals("Host state should  be " + State.INSTALLED,
+        State.INSTALLED, serviceComponentHost1.getState());
+
+    hb.setTimestamp(System.currentTimeMillis());
+    hb.setResponseId(1);
+    cr.setStatus(HostRoleStatus.COMPLETED.toString());
+    cr.setExitCode(0);
+
+    heartbeatProcessor.processHeartbeat(hb);
+    assertEquals("Host state should be " + State.STARTED,
+        State.STARTED, serviceComponentHost1.getState());
+
+    hb.setTimestamp(System.currentTimeMillis());
+    hb.setResponseId(2);
+    cr.setStatus(HostRoleStatus.IN_PROGRESS.toString());
+    cr.setRoleCommand("STOP");
+    cr.setExitCode(777);
+
+    heartbeatProcessor.processHeartbeat(hb);
+    assertEquals("Host state should be " + State.STARTED,
+        State.STARTED, serviceComponentHost1.getState());
+
+    hb.setTimestamp(System.currentTimeMillis());
+    hb.setResponseId(3);
+    cr.setStatus(HostRoleStatus.COMPLETED.toString());
+    cr.setExitCode(0);
+
+    heartbeatProcessor.processHeartbeat(hb);
+    assertEquals("Host state should be " + State.INSTALLED,
+        State.INSTALLED, serviceComponentHost1.getState());
+
+    // validate the transitions when there is no heartbeat
+    serviceComponentHost1.setState(State.STARTING);
+    cr.setStatus(HostRoleStatus.IN_PROGRESS.toString());
+    cr.setExitCode(777);
+    cr.setRoleCommand("START");
+    hb.setResponseId(4);
+
+    heartbeatProcessor.processHeartbeat(hb);
+    assertEquals("Host state should be " + State.STARTING,
+        State.STARTING, serviceComponentHost1.getState());
+
+    cr.setStatus(HostRoleStatus.COMPLETED.toString());
+    cr.setExitCode(0);
+    hb.setResponseId(5);
+
+    heartbeatProcessor.processHeartbeat(hb);
+    assertEquals("Host state should be " + State.STARTED,
+        State.STARTED, serviceComponentHost1.getState());
+
+    serviceComponentHost1.setState(State.STOPPING);
+    cr.setStatus(HostRoleStatus.IN_PROGRESS.toString());
+    cr.setExitCode(777);
+    cr.setRoleCommand("STOP");
+    hb.setResponseId(6);
+
+    heartbeatProcessor.processHeartbeat(hb);
+    assertEquals("Host state should be " + State.STOPPING,
+        State.STOPPING, serviceComponentHost1.getState());
+
+    cr.setStatus(HostRoleStatus.COMPLETED.toString());
+    cr.setExitCode(0);
+    hb.setResponseId(7);
+
+    heartbeatProcessor.processHeartbeat(hb);
+    assertEquals("Host state should be " + State.INSTALLED,
+        State.INSTALLED, serviceComponentHost1.getState());
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testUpgradeSpecificHandling() throws AmbariException, InvalidStateTransitionException {
+    Cluster cluster = heartbeatTestHelper.getDummyCluster();
+    Service hdfs = cluster.addService(HDFS);
+    hdfs.persist();
+    hdfs.addServiceComponent(DATANODE).persist();
+    hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
+
+    ActionQueue aq = new ActionQueue();
+
+    ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
+        getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
+    serviceComponentHost1.setState(State.UPGRADING);
+
+    HeartBeat hb = new HeartBeat();
+    hb.setTimestamp(System.currentTimeMillis());
+    hb.setResponseId(0);
+    hb.setHostname(DummyHostname1);
+    hb.setNodeStatus(new HostStatus(HostStatus.Status.HEALTHY, DummyHostStatus));
+
+    List<CommandReport> reports = new ArrayList<CommandReport>();
+    CommandReport cr = new CommandReport();
+    cr.setActionId(StageUtils.getActionId(requestId, stageId));
+    cr.setTaskId(1);
+    cr.setClusterName(DummyCluster);
+    cr.setServiceName(HDFS);
+    cr.setRole(DATANODE);
+    cr.setRoleCommand("INSTALL");
+    cr.setStatus(HostRoleStatus.IN_PROGRESS.toString());
+    cr.setStdErr("none");
+    cr.setStdOut("dummy output");
+    cr.setExitCode(777);
+    reports.add(cr);
+    hb.setReports(reports);
+    hb.setComponentStatus(new ArrayList<ComponentStatus>());
+
+    final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
+        Role.DATANODE, null, null);
+
+    ActionManager am = heartbeatTestHelper.getMockActionManager();
+    expect(am.getTasks(anyObject(List.class))).andReturn(
+        new ArrayList<HostRoleCommand>() {{
+          add(command);
+        }}).anyTimes();
+    replay(am);
+
+    HeartBeatHandler handler = heartbeatTestHelper.getHeartBeatHandler(am, aq);
+    HeartbeatProcessor heartbeatProcessor = handler.getHeartbeatProcessor();
+    heartbeatProcessor.processHeartbeat(hb);
+
+    assertEquals("Host state should  be " + State.UPGRADING,
+        State.UPGRADING, serviceComponentHost1.getState());
+
+    hb.setTimestamp(System.currentTimeMillis());
+    hb.setResponseId(1);
+    cr.setStatus(HostRoleStatus.COMPLETED.toString());
+    cr.setExitCode(0);
+
+    heartbeatProcessor.processHeartbeat(hb);
+    assertEquals("Host state should be " + State.INSTALLED,
+        State.INSTALLED, serviceComponentHost1.getState());
+
+    serviceComponentHost1.setState(State.UPGRADING);
+    hb.setTimestamp(System.currentTimeMillis());
+    hb.setResponseId(2);
+    cr.setStatus(HostRoleStatus.FAILED.toString());
+    cr.setExitCode(3);
+
+    heartbeatProcessor.processHeartbeat(hb);
+    assertEquals("Host state should be " + State.UPGRADING,
+        State.UPGRADING, serviceComponentHost1.getState());
+
+    serviceComponentHost1.setState(State.UPGRADING);
+    hb.setTimestamp(System.currentTimeMillis());
+    hb.setResponseId(3);
+    cr.setStatus(HostRoleStatus.PENDING.toString());
+    cr.setExitCode(55);
+
+    heartbeatProcessor.processHeartbeat(hb);
+    assertEquals("Host state should be " + State.UPGRADING,
+        State.UPGRADING, serviceComponentHost1.getState());
+
+    serviceComponentHost1.setState(State.UPGRADING);
+    hb.setTimestamp(System.currentTimeMillis());
+    hb.setResponseId(4);
+    cr.setStatus(HostRoleStatus.QUEUED.toString());
+    cr.setExitCode(55);
+
+    heartbeatProcessor.processHeartbeat(hb);
+    assertEquals("Host state should be " + State.UPGRADING,
+        State.UPGRADING, serviceComponentHost1.getState());
+  }
+
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testCommandStatusProcesses() throws Exception {
+    Cluster cluster = heartbeatTestHelper.getDummyCluster();
+    Service hdfs = cluster.addService(HDFS);
+    hdfs.persist();
+    hdfs.addServiceComponent(DATANODE).persist();
+    hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
+    hdfs.getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1).setState(State.STARTED);
+
+    ActionQueue aq = new ActionQueue();
+
+    HeartBeat hb = new HeartBeat();
+    hb.setTimestamp(System.currentTimeMillis());
+    hb.setResponseId(0);
+    hb.setHostname(DummyHostname1);
+    hb.setNodeStatus(new HostStatus(HostStatus.Status.HEALTHY, DummyHostStatus));
+    hb.setReports(new ArrayList<CommandReport>());
+
+    List<Map<String, String>> procs = new ArrayList<Map<String, String>>();
+    Map<String, String> proc1info = new HashMap<String, String>();
+    proc1info.put("name", "a");
+    proc1info.put("status", "RUNNING");
+    procs.add(proc1info);
+
+    Map<String, String> proc2info = new HashMap<String, String>();
+    proc2info.put("name", "b");
+    proc2info.put("status", "NOT_RUNNING");
+    procs.add(proc2info);
+
+    Map<String, Object> extra = new HashMap<String, Object>();
+    extra.put("processes", procs);
+
+    ArrayList<ComponentStatus> componentStatuses = new ArrayList<ComponentStatus>();
+    ComponentStatus componentStatus1 = new ComponentStatus();
+    componentStatus1.setClusterName(DummyCluster);
+    componentStatus1.setServiceName(HDFS);
+    componentStatus1.setMessage(DummyHostStatus);
+    componentStatus1.setStatus(State.STARTED.name());
+    componentStatus1.setSecurityState(SecurityState.UNSECURED.name());
+    componentStatus1.setComponentName(DATANODE);
+
+    componentStatus1.setExtra(extra);
+    componentStatuses.add(componentStatus1);
+    hb.setComponentStatus(componentStatuses);
+
+    final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
+        Role.DATANODE, null, null);
+
+    ActionManager am = heartbeatTestHelper.getMockActionManager();
+    expect(am.getTasks(anyObject(List.class))).andReturn(
+        new ArrayList<HostRoleCommand>() {{
+          add(command);
+        }}).anyTimes();
+    replay(am);
+
+    HeartBeatHandler handler = heartbeatTestHelper.getHeartBeatHandler(am, aq);
+    HeartbeatProcessor heartbeatProcessor = handler.getHeartbeatProcessor();
+    heartbeatProcessor.processHeartbeat(hb);
+    ServiceComponentHost sch = hdfs.getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
+
+    Assert.assertEquals(Integer.valueOf(2), Integer.valueOf(sch.getProcesses().size()));
+
+    hb = new HeartBeat();
+    hb.setTimestamp(System.currentTimeMillis());
+    hb.setResponseId(1);
+    hb.setHostname(DummyHostname1);
+    hb.setNodeStatus(new HostStatus(HostStatus.Status.HEALTHY, DummyHostStatus));
+    hb.setReports(new ArrayList<CommandReport>());
+
+    componentStatus1 = new ComponentStatus();
+    componentStatus1.setClusterName(DummyCluster);
+    componentStatus1.setServiceName(HDFS);
+    componentStatus1.setMessage(DummyHostStatus);
+    componentStatus1.setStatus(State.STARTED.name());
+    componentStatus1.setSecurityState(SecurityState.UNSECURED.name());
+    componentStatus1.setComponentName(DATANODE);
+    hb.setComponentStatus(Collections.singletonList(componentStatus1));
+
+    heartbeatProcessor.processHeartbeat(hb);
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testComponentUpgradeCompleteReport() throws AmbariException, InvalidStateTransitionException {
+    Cluster cluster = heartbeatTestHelper.getDummyCluster();
+    Service hdfs = cluster.addService(HDFS);
+    hdfs.persist();
+    hdfs.addServiceComponent(DATANODE).persist();
+    hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
+    hdfs.addServiceComponent(NAMENODE).persist();
+    hdfs.getServiceComponent(NAMENODE).addServiceComponentHost(DummyHostname1).persist();
+    hdfs.addServiceComponent(HDFS_CLIENT).persist();
+    hdfs.getServiceComponent(HDFS_CLIENT).addServiceComponentHost(DummyHostname1).persist();
+
+    ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
+        getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
+    ServiceComponentHost serviceComponentHost2 = clusters.getCluster(DummyCluster).getService(HDFS).
+        getServiceComponent(NAMENODE).getServiceComponentHost(DummyHostname1);
+
+    StackId stack130 = new StackId("HDP-1.3.0");
+    StackId stack120 = new StackId("HDP-1.2.0");
+
+    serviceComponentHost1.setState(State.UPGRADING);
+    serviceComponentHost2.setState(State.INSTALLING);
+
+    serviceComponentHost1.setStackVersion(stack120);
+    serviceComponentHost1.setDesiredStackVersion(stack130);
+    serviceComponentHost2.setStackVersion(stack120);
+
+    HeartBeat hb = new HeartBeat();
+    hb.setTimestamp(System.currentTimeMillis());
+    hb.setResponseId(0);
+    hb.setHostname(DummyHostname1);
+    hb.setNodeStatus(new HostStatus(HostStatus.Status.HEALTHY, DummyHostStatus));
+    CommandReport cr1 = new CommandReport();
+    cr1.setActionId(StageUtils.getActionId(requestId, stageId));
+    cr1.setTaskId(1);
+    cr1.setClusterName(DummyCluster);
+    cr1.setServiceName(HDFS);
+    cr1.setRole(DATANODE);
+    cr1.setStatus(HostRoleStatus.COMPLETED.toString());
+    cr1.setStdErr("none");
+    cr1.setStdOut("dummy output");
+    cr1.setExitCode(0);
+    cr1.setRoleCommand(RoleCommand.UPGRADE.toString());
+
+    CommandReport cr2 = new CommandReport();
+    cr2.setActionId(StageUtils.getActionId(requestId, stageId));
+    cr2.setTaskId(2);
+    cr2.setClusterName(DummyCluster);
+    cr2.setServiceName(HDFS);
+    cr2.setRole(NAMENODE);
+    cr2.setStatus(HostRoleStatus.COMPLETED.toString());
+    cr2.setStdErr("none");
+    cr2.setStdOut("dummy output");
+    cr2.setExitCode(0);
+    cr2.setRoleCommand(RoleCommand.UPGRADE.toString());
+    ArrayList<CommandReport> reports = new ArrayList<CommandReport>();
+    reports.add(cr1);
+    reports.add(cr2);
+    hb.setReports(reports);
+
+    ActionQueue aq = new ActionQueue();
+    final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
+        Role.DATANODE, null, null);
+
+    ActionManager am = heartbeatTestHelper.getMockActionManager();
+    expect(am.getTasks(anyObject(List.class))).andReturn(
+        new ArrayList<HostRoleCommand>() {{
+          add(command);
+          add(command);
+        }});
+    replay(am);
+
+    HeartBeatHandler handler = heartbeatTestHelper.getHeartBeatHandler(am, aq);
+    HeartbeatProcessor heartbeatProcessor = handler.getHeartbeatProcessor();
+    heartbeatProcessor.processHeartbeat(hb);
+
+    assertEquals("Stack version for SCH should be updated to " +
+            serviceComponentHost1.getDesiredStackVersion(),
+        stack130, serviceComponentHost1.getStackVersion());
+    assertEquals("Stack version for SCH should not change ",
+        stack120, serviceComponentHost2.getStackVersion());
+  }
+
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testComponentUpgradeFailReport() throws AmbariException, InvalidStateTransitionException {
+    Cluster cluster = heartbeatTestHelper.getDummyCluster();
+    Service hdfs = cluster.addService(HDFS);
+    hdfs.persist();
+    hdfs.addServiceComponent(DATANODE).persist();
+    hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
+    hdfs.addServiceComponent(NAMENODE).persist();
+    hdfs.getServiceComponent(NAMENODE).addServiceComponentHost(DummyHostname1).persist();
+    hdfs.addServiceComponent(HDFS_CLIENT).persist();
+    hdfs.getServiceComponent(HDFS_CLIENT).addServiceComponentHost(DummyHostname1).persist();
+
+    ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
+        getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
+    ServiceComponentHost serviceComponentHost2 = clusters.getCluster(DummyCluster).getService(HDFS).
+        getServiceComponent(NAMENODE).getServiceComponentHost(DummyHostname1);
+
+    StackId stack130 = new StackId("HDP-1.3.0");
+    StackId stack120 = new StackId("HDP-1.2.0");
+
+    serviceComponentHost1.setState(State.UPGRADING);
+    serviceComponentHost2.setState(State.INSTALLING);
+
+    serviceComponentHost1.setStackVersion(stack120);
+    serviceComponentHost1.setDesiredStackVersion(stack130);
+    serviceComponentHost2.setStackVersion(stack120);
+
+    Stage s = stageFactory.createNew(requestId, "/a/b", "cluster1", 1L, "action manager test",
+        "clusterHostInfo", "commandParamsStage", "hostParamsStage");
+    s.setStageId(stageId);
+    s.addHostRoleExecutionCommand(DummyHostname1, Role.DATANODE, RoleCommand.UPGRADE,
+        new ServiceComponentHostUpgradeEvent(Role.DATANODE.toString(),
+            DummyHostname1, System.currentTimeMillis(), "HDP-1.3.0"),
+        DummyCluster, "HDFS", false, false);
+    s.addHostRoleExecutionCommand(DummyHostname1, Role.NAMENODE, RoleCommand.INSTALL,
+        new ServiceComponentHostInstallEvent(Role.NAMENODE.toString(),
+            DummyHostname1, System.currentTimeMillis(), "HDP-1.3.0"),
+        DummyCluster, "HDFS", false, false);
+    List<Stage> stages = new ArrayList<Stage>();
+    stages.add(s);
+    Request request = new Request(stages, clusters);
+    actionDBAccessor.persistActions(request);
+    CommandReport cr = new CommandReport();
+    cr.setActionId(StageUtils.getActionId(requestId, stageId));
+    cr.setTaskId(1);
+    cr.setClusterName(DummyCluster);
+    cr.setServiceName(HDFS);
+    cr.setRole(DATANODE);
+    cr.setStatus(HostRoleStatus.IN_PROGRESS.toString());
+    cr.setStdErr("none");
+    cr.setStdOut("dummy output");
+    actionDBAccessor.updateHostRoleState(DummyHostname1, requestId, stageId,
+        Role.DATANODE.name(), cr);
+    cr.setRole(NAMENODE);
+    cr.setTaskId(2);
+    actionDBAccessor.updateHostRoleState(DummyHostname1, requestId, stageId,
+        Role.NAMENODE.name(), cr);
+
+    HeartBeat hb = new HeartBeat();
+    hb.setTimestamp(System.currentTimeMillis());
+    hb.setResponseId(0);
+    hb.setHostname(DummyHostname1);
+    hb.setNodeStatus(new HostStatus(HostStatus.Status.HEALTHY, DummyHostStatus));
+    CommandReport cr1 = new CommandReport();
+    cr1.setActionId(StageUtils.getActionId(requestId, stageId));
+    cr1.setTaskId(1);
+    cr1.setClusterName(DummyCluster);
+    cr1.setServiceName(HDFS);
+    cr1.setRole(DATANODE);
+    cr1.setRoleCommand("INSTALL");
+    cr1.setStatus(HostRoleStatus.FAILED.toString());
+    cr1.setStdErr("none");
+    cr1.setStdOut("dummy output");
+    cr1.setExitCode(0);
+
+    CommandReport cr2 = new CommandReport();
+    cr2.setActionId(StageUtils.getActionId(requestId, stageId));
+    cr2.setTaskId(2);
+    cr2.setClusterName(DummyCluster);
+    cr2.setServiceName(HDFS);
+    cr2.setRole(NAMENODE);
+    cr2.setRoleCommand("INSTALL");
+    cr2.setStatus(HostRoleStatus.FAILED.toString());
+    cr2.setStdErr("none");
+    cr2.setStdOut("dummy output");
+    cr2.setExitCode(0);
+    ArrayList<CommandReport> reports = new ArrayList<CommandReport>();
+    reports.add(cr1);
+    reports.add(cr2);
+    hb.setReports(reports);
+
+    ActionQueue aq = new ActionQueue();
+    final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
+        Role.DATANODE, null, null);
+
+    ActionManager am = heartbeatTestHelper.getMockActionManager();
+    expect(am.getTasks(anyObject(List.class))).andReturn(
+        new ArrayList<HostRoleCommand>() {{
+          add(command);
+          add(command);
+        }});
+    replay(am);
+
+    HeartBeatHandler handler = heartbeatTestHelper.getHeartBeatHandler(am, aq);
+    HeartbeatProcessor heartbeatProcessor = handler.getHeartbeatProcessor();
+    heartbeatProcessor.processHeartbeat(hb);
+
+    assertEquals("State of SCH should change after fail report",
+        State.UPGRADING, serviceComponentHost1.getState());
+    assertEquals("State of SCH should change after fail report",
+        State.INSTALL_FAILED, serviceComponentHost2.getState());
+    assertEquals("Stack version of SCH should not change after fail report",
+        stack120, serviceComponentHost1.getStackVersion());
+    assertEquals("Stack version of SCH should not change after fail report",
+        stack130, serviceComponentHost1.getDesiredStackVersion());
+    assertEquals("Stack version of SCH should not change after fail report",
+        State.INSTALL_FAILED, serviceComponentHost2.getState());
+  }
+
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testComponentUpgradeInProgressReport() throws AmbariException, InvalidStateTransitionException {
+    Cluster cluster = heartbeatTestHelper.getDummyCluster();
+    Service hdfs = cluster.addService(HDFS);
+    hdfs.persist();
+    hdfs.addServiceComponent(DATANODE).persist();
+    hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
+    hdfs.addServiceComponent(NAMENODE).persist();
+    hdfs.getServiceComponent(NAMENODE).addServiceComponentHost(DummyHostname1).persist();
+    hdfs.addServiceComponent(HDFS_CLIENT).persist();
+    hdfs.getServiceComponent(HDFS_CLIENT).addServiceComponentHost(DummyHostname1).persist();
+
+    ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
+        getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
+    ServiceComponentHost serviceComponentHost2 = clusters.getCluster(DummyCluster).getService(HDFS).
+        getServiceComponent(NAMENODE).getServiceComponentHost(DummyHostname1);
+
+    StackId stack130 = new StackId("HDP-1.3.0");
+    StackId stack120 = new StackId("HDP-1.2.0");
+
+    serviceComponentHost1.setState(State.UPGRADING);
+    serviceComponentHost2.setState(State.INSTALLING);
+
+    serviceComponentHost1.setStackVersion(stack120);
+    serviceComponentHost1.setDesiredStackVersion(stack130);
+    serviceComponentHost2.setStackVersion(stack120);
+
+    HeartBeat hb = new HeartBeat();
+    hb.setTimestamp(System.currentTimeMillis());
+    hb.setResponseId(0);
+    hb.setHostname(DummyHostname1);
+    hb.setNodeStatus(new HostStatus(HostStatus.Status.HEALTHY, DummyHostStatus));
+    CommandReport cr1 = new CommandReport();
+    cr1.setActionId(StageUtils.getActionId(requestId, stageId));
+    cr1.setTaskId(1);
+    cr1.setClusterName(DummyCluster);
+    cr1.setServiceName(HDFS);
+    cr1.setRole(DATANODE);
+    cr1.setRoleCommand("INSTALL");
+    cr1.setStatus(HostRoleStatus.IN_PROGRESS.toString());
+    cr1.setStdErr("none");
+    cr1.setStdOut("dummy output");
+    cr1.setExitCode(777);
+
+    CommandReport cr2 = new CommandReport();
+    cr2.setActionId(StageUtils.getActionId(requestId, stageId));
+    cr2.setTaskId(2);
+    cr2.setClusterName(DummyCluster);
+    cr2.setServiceName(HDFS);
+    cr2.setRole(NAMENODE);
+    cr2.setRoleCommand("INSTALL");
+    cr2.setStatus(HostRoleStatus.IN_PROGRESS.toString());
+    cr2.setStdErr("none");
+    cr2.setStdOut("dummy output");
+    cr2.setExitCode(777);
+    ArrayList<CommandReport> reports = new ArrayList<CommandReport>();
+    reports.add(cr1);
+    reports.add(cr2);
+    hb.setReports(reports);
+
+    ActionQueue aq = new ActionQueue();
+    final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
+        Role.DATANODE, null, null);
+
+    ActionManager am = heartbeatTestHelper.getMockActionManager();
+    expect(am.getTasks(anyObject(List.class))).andReturn(
+        new ArrayList<HostRoleCommand>() {{
+          add(command);
+          add(command);
+        }});
+    replay(am);
+
+    HeartBeatHandler handler = heartbeatTestHelper.getHeartBeatHandler(am, aq);
+    handler.handleHeartBeat(hb);
+    assertEquals("State of SCH not change while operation is in progress",
+        State.UPGRADING, serviceComponentHost1.getState());
+    assertEquals("Stack version of SCH should not change after in progress report",
+        stack130, serviceComponentHost1.getDesiredStackVersion());
+    assertEquals("State of SCH not change while operation is  in progress",
+        State.INSTALLING, serviceComponentHost2.getState());
+  }
+
+
+  /**
+   * Tests that if there is an invalid cluster in heartbeat data, the heartbeat
+   * doesn't fail.
+   *
+   * @throws Exception
+   */
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testHeartBeatWithAlertAndInvalidCluster() throws Exception {
+    ActionManager am = heartbeatTestHelper.getMockActionManager();
+
+    expect(am.getTasks(anyObject(List.class))).andReturn(
+        new ArrayList<HostRoleCommand>());
+
+    replay(am);
+
+    Cluster cluster = heartbeatTestHelper.getDummyCluster();
+    Clusters fsm = clusters;
+    Host hostObject = clusters.getHost(DummyHostname1);
+    hostObject.setIPv4("ipv4");
+    hostObject.setIPv6("ipv6");
+    hostObject.setOsType(DummyOsType);
+
+    ActionQueue aq = new ActionQueue();
+
+    HeartBeatHandler handler = new HeartBeatHandler(fsm, aq, am, injector);
+    Register reg = new Register();
+    HostInfo hi = new HostInfo();
+    hi.setHostName(DummyHostname1);
+    hi.setOS(DummyOs);
+    hi.setOSRelease(DummyOSRelease);
+    reg.setHostname(DummyHostname1);
+    reg.setHardwareProfile(hi);
+    reg.setAgentVersion(metaInfo.getServerVersion());
+    handler.handleRegistration(reg);
+
+    hostObject.setState(HostState.UNHEALTHY);
+
+    ExecutionCommand execCmd = new ExecutionCommand();
+    execCmd.setRequestAndStage(2, 34);
+    execCmd.setHostname(DummyHostname1);
+    aq.enqueue(DummyHostname1, new ExecutionCommand());
+
+    HeartBeat hb = new HeartBeat();
+    HostStatus hs = new HostStatus(HostStatus.Status.HEALTHY, DummyHostStatus);
+
+    hb.setResponseId(0);
+    hb.setNodeStatus(hs);
+    hb.setHostname(DummyHostname1);
+
+    Alert alert = new Alert("foo", "bar", "baz", "foobar", "foobarbaz",
+        AlertState.OK);
+
+    alert.setCluster("BADCLUSTER");
+
+    List<Alert> alerts = Collections.singletonList(alert);
+    hb.setAlerts(alerts);
+
+    // should NOT throw AmbariException from alerts.
+    handler.getHeartbeatProcessor().processHeartbeat(hb);
+  }
+
+
+  @Test
+  public void testInstallPackagesWithVersion() throws Exception {
+    // required since this test method checks the DAO result of handling a
+    // heartbeat which performs some async tasks
+    EventBusSynchronizer.synchronizeAmbariEventPublisher(injector);
+
+    final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
+        Role.DATANODE, null, null);
+
+    ActionManager am = heartbeatTestHelper.getMockActionManager();
+    expect(am.getTasks(anyObject(List.class))).andReturn(
+        Collections.singletonList(command)).anyTimes();
+    replay(am);
+
+    Cluster cluster = heartbeatTestHelper.getDummyCluster();
+    HeartBeatHandler handler = heartbeatTestHelper.getHeartBeatHandler(am, new ActionQueue());
+    HeartbeatProcessor heartbeatProcessor = handler.getHeartbeatProcessor();
+    HeartBeat hb = new HeartBeat();
+
+    JsonObject json = new JsonObject();
+    json.addProperty("actual_version", "2.2.1.0-2222");
+    json.addProperty("package_installation_result", "SUCCESS");
+    json.addProperty("installed_repository_version", "0.1");
+    json.addProperty("stack_id", cluster.getDesiredStackVersion().getStackId());
+
+
+    CommandReport cmdReport = new CommandReport();
+    cmdReport.setActionId(StageUtils.getActionId(requestId, stageId));
+    cmdReport.setTaskId(1);
+    cmdReport.setCustomCommand("install_packages");
+    cmdReport.setStructuredOut(json.toString());
+    cmdReport.setRoleCommand(RoleCommand.ACTIONEXECUTE.name());
+    cmdReport.setStatus(HostRoleStatus.COMPLETED.name());
+    cmdReport.setRole("install_packages");
+    cmdReport.setClusterName(DummyCluster);
+
+    hb.setReports(Collections.singletonList(cmdReport));
+    hb.setTimestamp(0L);
+    hb.setResponseId(0);
+    hb.setNodeStatus(new HostStatus(HostStatus.Status.HEALTHY, DummyHostStatus));
+    hb.setHostname(DummyHostname1);
+    hb.setComponentStatus(new ArrayList<ComponentStatus>());
+
+    StackId stackId = new StackId("HDP", "0.1");
+
+    RepositoryVersionDAO dao = injector.getInstance(RepositoryVersionDAO.class);
+    RepositoryVersionEntity entity = dao.findByStackAndVersion(stackId, "0.1");
+    Assert.assertNotNull(entity);
+
+    heartbeatProcessor.processHeartbeat(hb);
+
+    entity = dao.findByStackAndVersion(stackId, "0.1");
+    Assert.assertNull(entity);
+
+    entity = dao.findByStackAndVersion(stackId, "2.2.1.0-2222");
+    Assert.assertNotNull(entity);
+  }
+
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/083ac6da/ambari-server/src/test/java/org/apache/ambari/server/agent/HeartbeatTestHelper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/agent/HeartbeatTestHelper.java b/ambari-server/src/test/java/org/apache/ambari/server/agent/HeartbeatTestHelper.java
new file mode 100644
index 0000000..02974ca
--- /dev/null
+++ b/ambari-server/src/test/java/org/apache/ambari/server/agent/HeartbeatTestHelper.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.agent;
+
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import com.google.inject.Singleton;
+import com.google.inject.persist.UnitOfWork;
+import junit.framework.Assert;
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.Role;
+import org.apache.ambari.server.RoleCommand;
+import org.apache.ambari.server.actionmanager.ActionDBAccessor;
+import org.apache.ambari.server.actionmanager.ActionManager;
+import org.apache.ambari.server.actionmanager.Request;
+import org.apache.ambari.server.actionmanager.RequestFactory;
+import org.apache.ambari.server.actionmanager.Stage;
+import org.apache.ambari.server.actionmanager.StageFactory;
+import org.apache.ambari.server.api.services.AmbariMetaInfo;
+import org.apache.ambari.server.configuration.Configuration;
+import org.apache.ambari.server.controller.HostsMap;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
+import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
+import org.apache.ambari.server.orm.OrmTestHelper;
+import org.apache.ambari.server.orm.dao.ClusterDAO;
+import org.apache.ambari.server.orm.dao.HostDAO;
+import org.apache.ambari.server.orm.dao.ResourceTypeDAO;
+import org.apache.ambari.server.orm.dao.StackDAO;
+import org.apache.ambari.server.orm.entities.ClusterEntity;
+import org.apache.ambari.server.orm.entities.HostEntity;
+import org.apache.ambari.server.orm.entities.ResourceEntity;
+import org.apache.ambari.server.orm.entities.ResourceTypeEntity;
+import org.apache.ambari.server.orm.entities.StackEntity;
+import org.apache.ambari.server.security.authorization.ResourceType;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.Host;
+import org.apache.ambari.server.state.RepositoryVersionState;
+import org.apache.ambari.server.state.StackId;
+import org.apache.ambari.server.state.fsm.InvalidStateTransitionException;
+import org.apache.ambari.server.state.svccomphost.ServiceComponentHostStartEvent;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.ambari.server.agent.DummyHeartbeatConstants.DummyCluster;
+import static org.apache.ambari.server.agent.DummyHeartbeatConstants.DummyHostname1;
+import static org.apache.ambari.server.agent.DummyHeartbeatConstants.DummyOSRelease;
+import static org.apache.ambari.server.agent.DummyHeartbeatConstants.DummyOs;
+import static org.apache.ambari.server.agent.DummyHeartbeatConstants.DummyStackId;
+import static org.apache.ambari.server.agent.DummyHeartbeatConstants.HBASE;
+import static org.easymock.EasyMock.createMockBuilder;
+import static org.easymock.EasyMock.createNiceMock;
+
+@Singleton
+public class HeartbeatTestHelper {
+
+  @Inject
+  Clusters clusters;
+
+  @Inject
+  Injector injector;
+
+  @Inject
+  AmbariMetaInfo metaInfo;
+
+  @Inject
+  ActionDBAccessor actionDBAccessor;
+
+  @Inject
+  ClusterDAO clusterDAO;
+
+  @Inject
+  StackDAO stackDAO;
+
+  @Inject
+  UnitOfWork unitOfWork;
+
+  @Inject
+  ResourceTypeDAO resourceTypeDAO;
+
+  @Inject
+  OrmTestHelper helper;
+
+  @Inject
+  private HostDAO hostDAO;
+
+  @Inject
+  private StageFactory stageFactory;
+
+  public final static StackId HDP_22_STACK = new StackId("HDP", "2.2.0");
+
+  public static InMemoryDefaultTestModule getTestModule() {
+    return new InMemoryDefaultTestModule(){
+
+      @Override
+      protected void configure() {
+        getProperties().put("recovery.type", "FULL");
+        getProperties().put("recovery.lifetime_max_count", "10");
+        getProperties().put("recovery.max_count", "4");
+        getProperties().put("recovery.window_in_minutes", "23");
+        getProperties().put("recovery.retry_interval", "2");
+        super.configure();
+      }
+    };
+  }
+
+  public HeartBeatHandler getHeartBeatHandler(ActionManager am, ActionQueue aq)
+      throws InvalidStateTransitionException, AmbariException {
+    HeartBeatHandler handler = new HeartBeatHandler(clusters, aq, am, injector);
+    Register reg = new Register();
+    HostInfo hi = new HostInfo();
+    hi.setHostName(DummyHostname1);
+    hi.setOS(DummyOs);
+    hi.setOSRelease(DummyOSRelease);
+    reg.setHostname(DummyHostname1);
+    reg.setResponseId(0);
+    reg.setHardwareProfile(hi);
+    reg.setAgentVersion(metaInfo.getServerVersion());
+    handler.handleRegistration(reg);
+    return handler;
+  }
+
+  public ActionManager getMockActionManager() {
+    ActionQueue actionQueueMock = createNiceMock(ActionQueue.class);
+    Clusters clustersMock = createNiceMock(Clusters.class);
+    Configuration configurationMock = createNiceMock(Configuration.class);
+
+    ActionManager actionManager = createMockBuilder(ActionManager.class).
+        addMockedMethod("getTasks").
+        withConstructor((long)0, (long)0, actionQueueMock, clustersMock,
+            actionDBAccessor, new HostsMap((String) null), unitOfWork,
+            injector.getInstance(RequestFactory.class), configurationMock, createNiceMock(AmbariEventPublisher.class)).
+        createMock();
+    return actionManager;
+  }
+
+  public Cluster getDummyCluster()
+      throws AmbariException {
+    StackEntity stackEntity = stackDAO.find(HDP_22_STACK.getStackName(), HDP_22_STACK.getStackVersion());
+    org.junit.Assert.assertNotNull(stackEntity);
+
+    // Create the cluster
+    ResourceTypeEntity resourceTypeEntity = new ResourceTypeEntity();
+    resourceTypeEntity.setId(ResourceType.CLUSTER.getId());
+    resourceTypeEntity.setName(ResourceType.CLUSTER.name());
+    resourceTypeEntity = resourceTypeDAO.merge(resourceTypeEntity);
+
+    ResourceEntity resourceEntity = new ResourceEntity();
+    resourceEntity.setResourceType(resourceTypeEntity);
+
+    ClusterEntity clusterEntity = new ClusterEntity();
+    clusterEntity.setClusterName(DummyCluster);
+    clusterEntity.setClusterInfo("test_cluster_info1");
+    clusterEntity.setResource(resourceEntity);
+    clusterEntity.setDesiredStack(stackEntity);
+
+    clusterDAO.create(clusterEntity);
+
+    StackId stackId = new StackId(DummyStackId);
+
+    Cluster cluster = clusters.getCluster(DummyCluster);
+
+    cluster.setDesiredStackVersion(stackId);
+    cluster.setCurrentStackVersion(stackId);
+    helper.getOrCreateRepositoryVersion(stackId, stackId.getStackVersion());
+    cluster.createClusterVersion(stackId, stackId.getStackVersion(), "admin",
+        RepositoryVersionState.UPGRADING);
+
+    Set<String> hostNames = new HashSet<String>(){{
+      add(DummyHostname1);
+    }};
+
+    Map<String, String> hostAttributes = new HashMap<String, String>();
+    hostAttributes.put("os_family", "redhat");
+    hostAttributes.put("os_release_version", "6.3");
+
+    List<HostEntity> hostEntities = new ArrayList<HostEntity>();
+    for(String hostName : hostNames) {
+      clusters.addHost(hostName);
+      Host host = clusters.getHost(hostName);
+      host.setHostAttributes(hostAttributes);
+      host.persist();
+
+      HostEntity hostEntity = hostDAO.findByName(hostName);
+      Assert.assertNotNull(hostEntity);
+      hostEntities.add(hostEntity);
+    }
+    clusterEntity.setHostEntities(hostEntities);
+    clusters.mapHostsToCluster(hostNames, DummyCluster);
+
+    return cluster;
+  }
+
+  public void populateActionDB(ActionDBAccessor db, String DummyHostname1, long requestId, long stageId) throws AmbariException {
+    Stage s = stageFactory.createNew(requestId, "/a/b", DummyCluster, 1L, "heartbeat handler test",
+        "clusterHostInfo", "commandParamsStage", "hostParamsStage");
+    s.setStageId(stageId);
+    String filename = null;
+    s.addHostRoleExecutionCommand(DummyHostname1, Role.HBASE_MASTER,
+        RoleCommand.START,
+        new ServiceComponentHostStartEvent(Role.HBASE_MASTER.toString(),
+            DummyHostname1, System.currentTimeMillis()), DummyCluster, HBASE, false, false);
+    List<Stage> stages = new ArrayList<Stage>();
+    stages.add(s);
+    Request request = new Request(stages, clusters);
+    db.persistActions(request);
+  }
+
+}