You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by dm...@apache.org on 2014/08/12 18:25:46 UTC

git commit: AMBARI-6828. Postfixes for server-side implementation of Cancel requests (dlysnichenko)

Repository: ambari
Updated Branches:
  refs/heads/trunk 382e7f3df -> 9bbb43e57


AMBARI-6828. Postfixes for server-side implementation of Cancel requests (dlysnichenko)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/9bbb43e5
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/9bbb43e5
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/9bbb43e5

Branch: refs/heads/trunk
Commit: 9bbb43e57bb6071fabcb23c12ffb222fb33cccdb
Parents: 382e7f3
Author: Lisnichenko Dmitro <dl...@hortonworks.com>
Authored: Fri Aug 8 15:13:24 2014 +0300
Committer: Lisnichenko Dmitro <dl...@hortonworks.com>
Committed: Tue Aug 12 19:13:38 2014 +0300

----------------------------------------------------------------------
 .../actionmanager/ActionDBAccessorImpl.java     |   5 +
 .../server/actionmanager/ActionScheduler.java   |   2 +-
 .../actionmanager/TestActionDBAccessorImpl.java |  33 +-
 .../actionmanager/TestActionScheduler.java      | 193 +++++++++--
 .../server/agent/TestHeartbeatHandler.java      | 323 +++++++++++++++----
 5 files changed, 464 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/9bbb43e5/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
index dae9048..5e879cc 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
@@ -346,6 +346,7 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
     long now = System.currentTimeMillis();
 
     List<Long> requestsToCheck = new ArrayList<Long>();
+    List<Long> abortedCommandUpdates = new ArrayList<Long>();
 
     List<HostRoleCommandEntity> commandEntities = hostRoleCommandDAO.findByPKs(taskReports.keySet());
     for (HostRoleCommandEntity commandEntity : commandEntities) {
@@ -354,6 +355,8 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
         // We don't want to overwrite statuses for ABORTED tasks with
         // statuses that have been received from the agent after aborting task
         commandEntity.setStatus(HostRoleStatus.valueOf(report.getStatus()));
+      } else {
+        abortedCommandUpdates.add(commandEntity.getTaskId());
       }
       commandEntity.setStdOut(report.getStdOut().getBytes());
       commandEntity.setStdError(report.getStdErr().getBytes());
@@ -375,6 +378,8 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
     }
 
     hostRoleCommandDAO.mergeAll(commandEntities);
+    // Invalidate cache because of updates to ABORTED commands
+    hostRoleCommandCache.invalidateAll(abortedCommandUpdates);
 
     for (Long requestId : requestsToCheck) {
       endRequestIfCompleted(requestId);

http://git-wip-us.apache.org/repos/asf/ambari/blob/9bbb43e5/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
index cab891f..b9a67b7 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
@@ -794,7 +794,7 @@ class ActionScheduler implements Runnable {
    * @param hostRoleCommands a list of hostRoleCommands
    * @param reason why the request is being cancelled
    */
-  private void cancelHostRoleCommands(Collection<HostRoleCommand> hostRoleCommands, String reason) {
+  void cancelHostRoleCommands(Collection<HostRoleCommand> hostRoleCommands, String reason) {
     for (HostRoleCommand hostRoleCommand : hostRoleCommands) {
       if (hostRoleCommand.getStatus() == HostRoleStatus.QUEUED) {
         // Dequeue all tasks that have been already scheduled for sending to agent

http://git-wip-us.apache.org/repos/asf/ambari/blob/9bbb43e5/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
index a94f421..2850897 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
@@ -120,6 +120,37 @@ public class TestActionDBAccessorImpl {
     Stage s = db.getAllStages(requestId).get(0);
     assertEquals(HostRoleStatus.COMPLETED,s.getHostRoleStatus(hostname, "HBASE_MASTER"));
   }
+
+  @Test
+  public void testCancelCommandReport() throws AmbariException {
+    String hostname = "host1";
+    populateActionDB(db, hostname, requestId, stageId);
+    Stage stage = db.getAllStages(requestId).get(0);
+    Assert.assertEquals(stageId, stage.getStageId());
+    stage.setHostRoleStatus(hostname, "HBASE_MASTER", HostRoleStatus.ABORTED);
+    db.hostRoleScheduled(stage, hostname, "HBASE_MASTER");
+    List<CommandReport> reports = new ArrayList<CommandReport>();
+    CommandReport cr = new CommandReport();
+    cr.setTaskId(1);
+    cr.setActionId(StageUtils.getActionId(requestId, stageId));
+    cr.setRole("HBASE_MASTER");
+    cr.setStatus("COMPLETED");
+    cr.setStdErr("");
+    cr.setStdOut("");
+    cr.setExitCode(0);
+    reports.add(cr);
+    am.processTaskResponse(hostname, reports, stage.getOrderedHostRoleCommands());
+    assertEquals(0,
+            am.getAction(requestId, stageId).getExitCode(hostname, "HBASE_MASTER"));
+    assertEquals("HostRoleStatus should remain ABORTED " +
+            "(command report status should be ignored)",
+            HostRoleStatus.ABORTED, am.getAction(requestId, stageId)
+            .getHostRoleStatus(hostname, "HBASE_MASTER"));
+    Stage s = db.getAllStages(requestId).get(0);
+    assertEquals("HostRoleStatus should remain ABORTED " +
+            "(command report status should be ignored)",
+            HostRoleStatus.ABORTED,s.getHostRoleStatus(hostname, "HBASE_MASTER"));
+  }
   
   @Test
   public void testGetStagesInProgress() throws AmbariException {
@@ -129,8 +160,6 @@ public class TestActionDBAccessorImpl {
     stages.add(createStubStage(hostname, requestId, stageId + 1));
     Request request = new Request(stages, clusters);
     db.persistActions(request);
-
-    List<Stage> stages2 = db.getStagesInProgress();
     assertEquals(2, stages.size());
   }
   

http://git-wip-us.apache.org/repos/asf/ambari/blob/9bbb43e5/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
index 60ed592..a536bef 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
@@ -26,6 +26,7 @@ import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.*;
 import java.lang.reflect.Type;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -44,13 +45,14 @@ import org.apache.ambari.server.ServiceComponentHostNotFoundException;
 import org.apache.ambari.server.actionmanager.ActionScheduler.RoleStats;
 import org.apache.ambari.server.agent.ActionQueue;
 import org.apache.ambari.server.agent.AgentCommand;
+import org.apache.ambari.server.agent.AgentCommand.AgentCommandType;
+import org.apache.ambari.server.agent.CancelCommand;
 import org.apache.ambari.server.agent.CommandReport;
 import org.apache.ambari.server.agent.ExecutionCommand;
-import org.apache.ambari.server.agent.AgentCommand.AgentCommandType;
 import org.apache.ambari.server.configuration.Configuration;
-import org.apache.ambari.server.controller.AmbariCustomCommandExecutionHelper;
 import org.apache.ambari.server.controller.HostsMap;
 import org.apache.ambari.server.serveraction.ServerAction;
+import org.apache.ambari.server.serveraction.ServerActionManager;
 import org.apache.ambari.server.serveraction.ServerActionManagerImpl;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
@@ -60,11 +62,12 @@ import org.apache.ambari.server.state.Service;
 import org.apache.ambari.server.state.ServiceComponent;
 import org.apache.ambari.server.state.ServiceComponentHost;
 import org.apache.ambari.server.state.ServiceComponentHostEvent;
-import org.apache.ambari.server.state.ServiceComponentHostEventType;
 import org.apache.ambari.server.state.svccomphost.ServiceComponentHostInstallEvent;
 import org.apache.ambari.server.state.svccomphost.ServiceComponentHostOpFailedEvent;
 import org.apache.ambari.server.state.svccomphost.ServiceComponentHostUpgradeEvent;
 import org.apache.ambari.server.utils.StageUtils;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
@@ -82,6 +85,7 @@ public class TestActionScheduler {
   private static final String CLUSTER_HOST_INFO_UPDATED = "{all_hosts=[c6401.ambari.apache.org,"
       + " c6402.ambari.apache.org], slave_hosts=[c6401.ambari.apache.org,"
       + " c6402.ambari.apache.org]}";
+  private final String hostname = "ahost.ambari.apache.org";
 
 
   /**
@@ -111,7 +115,6 @@ public class TestActionScheduler {
     when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
     when(serviceObj.getCluster()).thenReturn(oneClusterMock);
     
-    String hostname = "ahost.ambari.apache.org";
     Host host = mock(Host.class);
     HashMap<String, ServiceComponentHost> hosts =
             new HashMap<String, ServiceComponentHost>();
@@ -193,7 +196,6 @@ public class TestActionScheduler {
     when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
     when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
     when(serviceObj.getCluster()).thenReturn(oneClusterMock);
-    String hostname = "ahost.ambari.apache.org";
     Host host = mock(Host.class);
     HashMap<String, ServiceComponentHost> hosts =
             new HashMap<String, ServiceComponentHost>();
@@ -258,7 +260,6 @@ public class TestActionScheduler {
     when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
     when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
     when(serviceObj.getCluster()).thenReturn(oneClusterMock);
-    String hostname = "ahost.ambari.apache.org";
     Host host = mock(Host.class);
     HashMap<String, ServiceComponentHost> hosts =
             new HashMap<String, ServiceComponentHost>();
@@ -286,10 +287,15 @@ public class TestActionScheduler {
       }
     }).when(db).timeoutHostRole(anyString(), anyLong(), anyLong(), anyString());
 
+    ServerActionManager sam = EasyMock.createNiceMock(ServerActionManager.class);
 
     //Small action timeout to test rescheduling
-    ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
-      new HostsMap((String) null), null, unitOfWork, conf);
+    ActionScheduler scheduler = EasyMock.createMockBuilder(ActionScheduler.class).
+        withConstructor((long) 100, (long) 50, db, aq, fsm, 3,
+                new HostsMap((String) null), sam, unitOfWork, conf).
+        addMockedMethod("cancelHostRoleCommands").
+        createMock();
+    EasyMock.replay(scheduler);
     scheduler.setTaskTimeoutAdjustment(false);
     // Start the thread
     scheduler.start();
@@ -298,10 +304,11 @@ public class TestActionScheduler {
       .equals(HostRoleStatus.TIMEDOUT)) {
       Thread.sleep(100L);
     }
-//    assertEquals(stages.get(0).getHostRoleStatus(hostname, "NAMENODE"),
-//      HostRoleStatus.TIMEDOUT);
 
     scheduler.stop();
+
+    //Check that cancelHostRoleCommands() was not called
+    EasyMock.verify(scheduler);
   }
 
   @Test
@@ -465,7 +472,6 @@ public class TestActionScheduler {
     when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
     when(serviceObj.getCluster()).thenReturn(oneClusterMock);
 
-    String hostname = "ahost.ambari.apache.org";
     HashMap<String, ServiceComponentHost> hosts =
             new HashMap<String, ServiceComponentHost>();
     hosts.put(hostname, sch);
@@ -527,7 +533,6 @@ public class TestActionScheduler {
     when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
     when(serviceObj.getCluster()).thenReturn(oneClusterMock);
 
-    String hostname = "ahost.ambari.apache.org";
     HashMap<String, ServiceComponentHost> hosts =
             new HashMap<String, ServiceComponentHost>();
     hosts.put(hostname, sch);
@@ -830,7 +835,6 @@ public class TestActionScheduler {
     when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
     when(serviceObj.getCluster()).thenReturn(oneClusterMock);
 
-    String hostname = "ahost.ambari.apache.org";
     HashMap<String, ServiceComponentHost> hosts =
             new HashMap<String, ServiceComponentHost>();
     hosts.put(hostname, sch);
@@ -915,11 +919,22 @@ public class TestActionScheduler {
 
     Properties properties = new Properties();
     Configuration conf = new Configuration(properties);
-    ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
-        new HostsMap((String) null), new ServerActionManagerImpl(fsm),
-        unitOfWork, conf);
+    ServerActionManagerImpl serverActionManager = new ServerActionManagerImpl(fsm);
+
+    Capture<Collection<HostRoleCommand>> cancelCommandList = new Capture<Collection<HostRoleCommand>>();
+    ActionScheduler scheduler = EasyMock.createMockBuilder(ActionScheduler.class).
+        withConstructor((long)100, (long)50, db, aq, fsm, 3,
+          new HostsMap((String) null), serverActionManager,
+          unitOfWork, conf).
+          addMockedMethod("cancelHostRoleCommands").
+          createMock();
+    scheduler.cancelHostRoleCommands(EasyMock.capture(cancelCommandList),
+            EasyMock.eq(ActionScheduler.FAILED_TASK_ABORT_REASONING));
+    EasyMock.expectLastCall().once();
+    EasyMock.replay(scheduler);
+
     ActionManager am = new ActionManager(
-        2, 2, aq, fsm, db, new HostsMap((String) null), new ServerActionManagerImpl(fsm), unitOfWork, requestFactory, conf);
+        2, 2, aq, fsm, db, new HostsMap((String) null), serverActionManager, unitOfWork, requestFactory, conf);
 
     scheduler.doWork();
 
@@ -930,6 +945,8 @@ public class TestActionScheduler {
     scheduler.doWork();
     Assert.assertEquals(HostRoleStatus.FAILED, stages.get(0).getHostRoleStatus(hostname, "NAMENODE"));
     Assert.assertEquals(HostRoleStatus.ABORTED, stages.get(1).getHostRoleStatus(hostname, "DATANODE"));
+    Assert.assertEquals(cancelCommandList.getValue().size(), 1);
+    EasyMock.verify(scheduler);
   }
 
   /**
@@ -1398,7 +1415,6 @@ public class TestActionScheduler {
     when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
     when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
     when(serviceObj.getCluster()).thenReturn(oneClusterMock);
-    String hostname = "ahost.ambari.apache.org";
     Host host = mock(Host.class);
     HashMap<String, ServiceComponentHost> hosts =
             new HashMap<String, ServiceComponentHost>();
@@ -1556,7 +1572,6 @@ public class TestActionScheduler {
     when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
     when(serviceObj.getCluster()).thenReturn(oneClusterMock);
 
-    String hostname = "ahost.ambari.apache.org";
     HashMap<String, ServiceComponentHost> hosts =
             new HashMap<String, ServiceComponentHost>();
     hosts.put(hostname, sch);
@@ -1598,8 +1613,148 @@ public class TestActionScheduler {
     scheduler.stop();
     assertEquals(stages.get(0).getHostRoleStatus(hostname, "AMBARI_SERVER_ACTION"),
             HostRoleStatus.COMPLETED);
+  }
+
+  @Test
+  public void testCancelRequests() throws Exception {
+    ActionQueue aq = new ActionQueue();
+    Clusters fsm = mock(Clusters.class);
+    Cluster oneClusterMock = mock(Cluster.class);
+    Service serviceObj = mock(Service.class);
+    ServiceComponent scomp = mock(ServiceComponent.class);
+    ServiceComponentHost sch = mock(ServiceComponentHost.class);
+    UnitOfWork unitOfWork = mock(UnitOfWork.class);
+    RequestFactory requestFactory = mock(RequestFactory.class);
+    when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
+    when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
+    when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
+    when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
+    when(serviceObj.getCluster()).thenReturn(oneClusterMock);
+
+    HashMap<String, ServiceComponentHost> hosts =
+            new HashMap<String, ServiceComponentHost>();
+    hosts.put(hostname, sch);
+    when(scomp.getServiceComponentHosts()).thenReturn(hosts);
+
+    long requestId = 1;
+
+    final List<Stage> stages = new ArrayList<Stage>();
+    int namenodeCmdTaskId = 1;
+    stages.add(
+            getStageWithSingleTask(
+                    hostname, "cluster1", Role.NAMENODE, RoleCommand.START,
+                    Service.Type.HDFS, namenodeCmdTaskId, 1, (int)requestId));
+    stages.add(
+            getStageWithSingleTask(
+                    hostname, "cluster1", Role.DATANODE, RoleCommand.START,
+                    Service.Type.HDFS, 2, 2, (int)requestId));
+
+    Host host = mock(Host.class);
+    when(fsm.getHost(anyString())).thenReturn(host);
+    when(host.getState()).thenReturn(HostState.HEALTHY);
+    when(host.getHostName()).thenReturn(hostname);
+
+    ActionDBAccessor db = mock(ActionDBAccessor.class);
+
+    when(db.getStagesInProgress()).thenReturn(stages);
+
+    List<HostRoleCommand> requestTasks = new ArrayList<HostRoleCommand>();
+    for (Stage stage : stages) {
+      requestTasks.addAll(stage.getOrderedHostRoleCommands());
+    }
+    when(db.getRequestTasks(anyLong())).thenReturn(requestTasks);
+    when(db.getAllStages(anyLong())).thenReturn(stages);
+    doAnswer(new Answer() {
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        List<CommandReport> reports = (List<CommandReport>) invocation.getArguments()[0];
+        for (CommandReport report : reports) {
+          String actionId = report.getActionId();
+          long[] requestStageIds = StageUtils.getRequestStage(actionId);
+          Long requestId = requestStageIds[0];
+          Long stageId = requestStageIds[1];
+          String role = report.getRole();
+          Long id = report.getTaskId();
+          for (Stage stage : stages) {
+            if (requestId.equals(stage.getRequestId()) && stageId.equals(stage.getStageId())) {
+              for (HostRoleCommand hostRoleCommand : stage.getOrderedHostRoleCommands()) {
+                if (hostRoleCommand.getTaskId() == id) {
+                  hostRoleCommand.setStatus(HostRoleStatus.valueOf(report.getStatus()));
+                }
+              }
+            }
+          }
 
+        }
+
+        return null;
+      }
+    }).when(db).updateHostRoleStates(anyCollectionOf(CommandReport.class));
+
+    when(db.getTask(anyLong())).thenAnswer(new Answer<Object>() {
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        Long taskId = (Long) invocation.getArguments()[0];
+        for (Stage stage : stages) {
+          for (HostRoleCommand command : stage.getOrderedHostRoleCommands()) {
+            if (taskId.equals(command.getTaskId())) {
+              return command;
+            }
+          }
+        }
+        return null;
+      }
+    });
+    doAnswer(new Answer() {
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        Long requestId = (Long) invocation.getArguments()[0];
+        for (Stage stage : stages) {
+          if (requestId.equals(stage.getRequestId())) {
+            for (HostRoleCommand command : stage.getOrderedHostRoleCommands()) {
+              if (command.getStatus() == HostRoleStatus.QUEUED ||
+                      command.getStatus() == HostRoleStatus.IN_PROGRESS ||
+                      command.getStatus() == HostRoleStatus.PENDING) {
+                command.setStatus(HostRoleStatus.ABORTED);
+              }
+            }
+          }
+        }
+
+        return null;
+      }
+    }).when(db).abortOperation(anyLong());
+
+    Properties properties = new Properties();
+    Configuration conf = new Configuration(properties);
+    ServerActionManagerImpl serverActionManager = new ServerActionManagerImpl(fsm);
+
+    ActionScheduler scheduler =new ActionScheduler(100, 50, db, aq, fsm, 3,
+                    new HostsMap((String) null), serverActionManager, unitOfWork, conf);
+
+    ActionManager am = new ActionManager(
+            2, 2, aq, fsm, db, new HostsMap((String) null),
+            serverActionManager, unitOfWork, requestFactory, conf);
+
+    scheduler.doWork();
+
+
+
+    //List<CommandReport> reports = new ArrayList<CommandReport>();
+    //reports.add(getCommandReport(HostRoleStatus.FAILED, Role.NAMENODE, Service.Type.HDFS, "1-1", 1));
+    //am.processTaskResponse(hostname, reports, stages.get(0).getOrderedHostRoleCommands());
+    String reason = "Some reason";
+
+    scheduler.scheduleCancellingRequest(requestId, reason);
+
+    scheduler.doWork();
+    Assert.assertEquals(HostRoleStatus.ABORTED, stages.get(0).getHostRoleStatus(hostname, "NAMENODE"));
+    Assert.assertEquals(HostRoleStatus.ABORTED, stages.get(1).getHostRoleStatus(hostname, "DATANODE"));
+    Assert.assertEquals(aq.size(hostname), 1); // Cancel commands should be generated only for 1 stage
 
+    CancelCommand cancelCommand = (CancelCommand) aq.dequeue(hostname);
+    Assert.assertEquals(cancelCommand.getTargetTaskId(), namenodeCmdTaskId);
+    Assert.assertEquals(cancelCommand.getReason(), reason);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/9bbb43e5/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java b/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
index 349f09d..5c4a4f1 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
@@ -36,6 +36,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyList;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -60,6 +61,7 @@ import org.apache.ambari.server.RoleCommand;
 import org.apache.ambari.server.actionmanager.ActionDBAccessor;
 import org.apache.ambari.server.actionmanager.ActionDBAccessorImpl;
 import org.apache.ambari.server.actionmanager.ActionManager;
+import org.apache.ambari.server.actionmanager.HostRoleCommand;
 import org.apache.ambari.server.actionmanager.HostRoleStatus;
 import org.apache.ambari.server.actionmanager.Request;
 import org.apache.ambari.server.actionmanager.RequestFactory;
@@ -70,6 +72,7 @@ import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.controller.HostsMap;
 import org.apache.ambari.server.orm.GuiceJpaInitializer;
 import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
+import org.apache.ambari.server.serveraction.ServerActionManager;
 import org.apache.ambari.server.state.Alert;
 import org.apache.ambari.server.state.AlertState;
 import org.apache.ambari.server.state.Cluster;
@@ -88,9 +91,9 @@ import org.apache.ambari.server.state.svccomphost.ServiceComponentHostStartEvent
 import org.apache.ambari.server.state.svccomphost.ServiceComponentHostUpgradeEvent;
 import org.apache.ambari.server.utils.StageUtils;
 import org.codehaus.jackson.JsonGenerationException;
+import static org.easymock.EasyMock.*;
 import org.junit.After;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -136,8 +139,11 @@ public class TestHeartbeatHandler {
   }
 
   @Test
+  @SuppressWarnings("unchecked")
   public void testHeartbeat() throws Exception {
     ActionManager am = getMockActionManager();
+    expect(am.getTasks(anyObject(List.class))).andReturn(new ArrayList<HostRoleCommand>());
+    replay(am);
     Clusters fsm = clusters;
     fsm.addHost(DummyHostname1);
     Host hostObject = clusters.getHost(DummyHostname1);
@@ -190,10 +196,8 @@ public class TestHeartbeatHandler {
   }
   
   @Test
-  @Ignore //TODO (dlysnichenko) : fix
+  @SuppressWarnings("unchecked")
   public void testHeartbeatWithConfigs() throws Exception {
-    ActionManager am = getMockActionManager();
-
     Cluster cluster = getDummyCluster();
     
     @SuppressWarnings("serial")
@@ -211,8 +215,7 @@ public class TestHeartbeatHandler {
     hdfs.getServiceComponent(SECONDARY_NAMENODE).addServiceComponentHost(DummyHostname1).persist();
     
     ActionQueue aq = new ActionQueue();
-    HeartBeatHandler handler = getHeartBeatHandler(am, aq);
-    
+
     ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
             getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
     ServiceComponentHost serviceComponentHost2 = clusters.getCluster(DummyCluster).getService(HDFS).
@@ -220,7 +223,6 @@ public class TestHeartbeatHandler {
     serviceComponentHost1.setState(State.INSTALLED);
     serviceComponentHost2.setState(State.INSTALLED);
 
-
     HeartBeat hb = new HeartBeat();
     hb.setResponseId(0);
     hb.setNodeStatus(new HostStatus(Status.HEALTHY, DummyHostStatus));
@@ -245,7 +247,18 @@ public class TestHeartbeatHandler {
     
     reports.add(cr);
     hb.setReports(reports);
-    
+
+    final HostRoleCommand command = new HostRoleCommand(DummyHostname1,
+            Role.DATANODE, null, null);
+
+    ActionManager am = getMockActionManager();
+    expect(am.getTasks(anyObject(List.class))).andReturn(
+            new ArrayList<HostRoleCommand>() {{
+              add(command);
+            }});
+    replay(am);
+
+    HeartBeatHandler handler = getHeartBeatHandler(am, aq);
     handler.handleHeartBeat(hb);
 
     // the heartbeat test passed if actual configs is populated
@@ -254,10 +267,8 @@ public class TestHeartbeatHandler {
   }
 
   @Test
-  @Ignore //TODO (dlysnichenko) : fix
+  @SuppressWarnings("unchecked")
   public void testHeartbeatCustomCommandWithConfigs() throws Exception {
-    ActionManager am = getMockActionManager();
-
     Cluster cluster = getDummyCluster();
 
     @SuppressWarnings("serial")
@@ -275,7 +286,6 @@ public class TestHeartbeatHandler {
     hdfs.getServiceComponent(SECONDARY_NAMENODE).addServiceComponentHost(DummyHostname1).persist();
 
     ActionQueue aq = new ActionQueue();
-    HeartBeatHandler handler = getHeartBeatHandler(am, aq);
 
     ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
         getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
@@ -284,7 +294,6 @@ public class TestHeartbeatHandler {
     serviceComponentHost1.setState(State.INSTALLED);
     serviceComponentHost2.setState(State.INSTALLED);
 
-
     HeartBeat hb = new HeartBeat();
     hb.setResponseId(0);
     hb.setNodeStatus(new HostStatus(Status.HEALTHY, DummyHostStatus));
@@ -326,6 +335,18 @@ public class TestHeartbeatHandler {
     reports.add(crn);
     hb.setReports(reports);
 
+    final HostRoleCommand command = new HostRoleCommand(DummyHostname1,
+      Role.DATANODE, null, null);
+
+    ActionManager am = getMockActionManager();
+    expect(am.getTasks(anyObject(List.class))).andReturn(
+            new ArrayList<HostRoleCommand>() {{
+              add(command);
+              add(command);
+            }});
+    replay(am);
+
+    HeartBeatHandler handler = getHeartBeatHandler(am, aq);
     handler.handleHeartBeat(hb);
 
     // the heartbeat test passed if actual configs is populated
@@ -336,10 +357,8 @@ public class TestHeartbeatHandler {
   }
 
   @Test
-  @Ignore //TODO (dlysnichenko) : fix
+  @SuppressWarnings("unchecked")
   public void testHeartbeatCustomStartStop() throws Exception {
-    ActionManager am = getMockActionManager();
-
     Cluster cluster = getDummyCluster();
 
     @SuppressWarnings("serial")
@@ -357,7 +376,6 @@ public class TestHeartbeatHandler {
     hdfs.getServiceComponent(SECONDARY_NAMENODE).addServiceComponentHost(DummyHostname1).persist();
 
     ActionQueue aq = new ActionQueue();
-    HeartBeatHandler handler = getHeartBeatHandler(am, aq);
 
     ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
         getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
@@ -405,6 +423,18 @@ public class TestHeartbeatHandler {
 
     assertTrue(serviceComponentHost1.isRestartRequired());
 
+    final HostRoleCommand command = new HostRoleCommand(DummyHostname1,
+            Role.DATANODE, null, null);
+
+    ActionManager am = getMockActionManager();
+    expect(am.getTasks(anyObject(List.class))).andReturn(
+            new ArrayList<HostRoleCommand>() {{
+              add(command);
+              add(command);
+            }});
+    replay(am);
+
+    HeartBeatHandler handler = getHeartBeatHandler(am, aq);
     handler.handleHeartBeat(hb);
 
     // the heartbeat test passed if actual configs is populated
@@ -417,9 +447,8 @@ public class TestHeartbeatHandler {
   }
 
   @Test
+  @SuppressWarnings("unchecked")
   public void testStatusHeartbeat() throws Exception {
-    ActionManager am = getMockActionManager();
-
     Cluster cluster = getDummyCluster();
 
     @SuppressWarnings("serial")
@@ -437,7 +466,6 @@ public class TestHeartbeatHandler {
     hdfs.getServiceComponent(SECONDARY_NAMENODE).addServiceComponentHost(DummyHostname1).persist();
 
     ActionQueue aq = new ActionQueue();
-    HeartBeatHandler handler = getHeartBeatHandler(am, aq);
 
     ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
             getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
@@ -472,6 +500,18 @@ public class TestHeartbeatHandler {
     componentStatuses.add(componentStatus2);
     hb.setComponentStatus(componentStatuses);
 
+    final HostRoleCommand command = new HostRoleCommand(DummyHostname1,
+            Role.DATANODE, null, null);
+
+    ActionManager am = getMockActionManager();
+    expect(am.getTasks(anyObject(List.class))).andReturn(
+            new ArrayList<HostRoleCommand>() {{
+              add(command);
+              add(command);
+            }});
+    replay(am);
+
+    HeartBeatHandler handler = getHeartBeatHandler(am, aq);
     handler.handleHeartBeat(hb);
     State componentState1 = serviceComponentHost1.getState();
     State componentState2 = serviceComponentHost2.getState();
@@ -482,9 +522,8 @@ public class TestHeartbeatHandler {
   }
 
   @Test
+  @SuppressWarnings("unchecked")
   public void testStatusHeartbeatWithAnnotation() throws Exception {
-    ActionManager am = getMockActionManager();
-
     Cluster cluster = getDummyCluster();
 
     @SuppressWarnings("serial")
@@ -499,7 +538,6 @@ public class TestHeartbeatHandler {
     hdfs.addServiceComponent(SECONDARY_NAMENODE).persist();
 
     ActionQueue aq = new ActionQueue();
-    HeartBeatHandler handler = getHeartBeatHandler(am, aq);
 
     HeartBeat hb = new HeartBeat();
     hb.setTimestamp(System.currentTimeMillis());
@@ -510,6 +548,17 @@ public class TestHeartbeatHandler {
     ArrayList<ComponentStatus> componentStatuses = new ArrayList<ComponentStatus>();
     hb.setComponentStatus(componentStatuses);
 
+    final HostRoleCommand command = new HostRoleCommand(DummyHostname1,
+            Role.DATANODE, null, null);
+
+    ActionManager am = getMockActionManager();
+    expect(am.getTasks(anyObject(List.class))).andReturn(
+            new ArrayList<HostRoleCommand>() {{
+              add(command);
+            }}).anyTimes();
+    replay(am);
+
+    HeartBeatHandler handler = getHeartBeatHandler(am, aq);
     HeartBeatResponse resp = handler.handleHeartBeat(hb);
     Assert.assertFalse(resp.hasMappedComponents());
 
@@ -531,8 +580,8 @@ public class TestHeartbeatHandler {
   }
 
   @Test
+  @SuppressWarnings("unchecked")
   public void testLiveStatusUpdateAfterStopFailed() throws Exception {
-    ActionManager am = getMockActionManager();
     Cluster cluster = getDummyCluster();
 
     @SuppressWarnings("serial")
@@ -550,7 +599,6 @@ public class TestHeartbeatHandler {
             addServiceComponentHost(DummyHostname1).persist();
 
     ActionQueue aq = new ActionQueue();
-    HeartBeatHandler handler = getHeartBeatHandler(am, aq);
 
     ServiceComponentHost serviceComponentHost1 = clusters.
             getCluster(DummyCluster).getService(HDFS).
@@ -589,6 +637,18 @@ public class TestHeartbeatHandler {
 
     hb.setComponentStatus(componentStatuses);
 
+    final HostRoleCommand command = new HostRoleCommand(DummyHostname1,
+            Role.DATANODE, null, null);
+
+    ActionManager am = getMockActionManager();
+    expect(am.getTasks(anyObject(List.class))).andReturn(
+            new ArrayList<HostRoleCommand>() {{
+              add(command);
+              add(command);
+            }});
+    replay(am);
+
+    HeartBeatHandler handler = getHeartBeatHandler(am, aq);
     handler.handleHeartBeat(hb);
     State componentState1 = serviceComponentHost1.getState();
     State componentState2 = serviceComponentHost2.getState();
@@ -656,6 +716,7 @@ public class TestHeartbeatHandler {
   public void testRegistration() throws AmbariException,
       InvalidStateTransitionException {
     ActionManager am = getMockActionManager();
+    replay(am);
     Clusters fsm = clusters;
     HeartBeatHandler handler = new HeartBeatHandler(fsm, new ActionQueue(), am,
         injector);
@@ -687,6 +748,7 @@ public class TestHeartbeatHandler {
       InvalidStateTransitionException {
 
     ActionManager am = getMockActionManager();
+    replay(am);
     Clusters fsm = clusters;
     HeartBeatHandler handler = new HeartBeatHandler(fsm, new ActionQueue(), am,
         injector);
@@ -727,6 +789,7 @@ public class TestHeartbeatHandler {
   @Test
   public void testRegistrationPublicHostname() throws AmbariException, InvalidStateTransitionException {
     ActionManager am = getMockActionManager();
+    replay(am);
     Clusters fsm = clusters;
     HeartBeatHandler handler = new HeartBeatHandler(fsm, new ActionQueue(), am,
         injector);
@@ -759,6 +822,7 @@ public class TestHeartbeatHandler {
   public void testInvalidOSRegistration() throws AmbariException,
       InvalidStateTransitionException {
     ActionManager am = getMockActionManager();
+    replay(am);
     Clusters fsm = clusters;
     HeartBeatHandler handler = new HeartBeatHandler(fsm, new ActionQueue(), am,
         injector);
@@ -787,6 +851,7 @@ public class TestHeartbeatHandler {
           InvalidStateTransitionException {
 
     ActionManager am = getMockActionManager();
+    replay(am);
     Clusters fsm = clusters;
     HeartBeatHandler handler = new HeartBeatHandler(fsm, new ActionQueue(), am,
             injector);
@@ -814,6 +879,7 @@ public class TestHeartbeatHandler {
   public void testRegisterNewNode()
       throws AmbariException, InvalidStateTransitionException {
     ActionManager am = getMockActionManager();
+    replay(am);
     Clusters fsm = clusters;
     fsm.addHost(DummyHostname1);
     Host hostObject = clusters.getHost(DummyHostname1);
@@ -906,6 +972,7 @@ public class TestHeartbeatHandler {
     when(hm.generateStatusCommands(anyString())).thenReturn(dummyCmds);
 
     ActionManager am = getMockActionManager();
+    replay(am);
     Clusters fsm = clusters;
     ActionQueue actionQueue = new ActionQueue();
     HeartBeatHandler handler = new HeartBeatHandler(fsm, actionQueue, am,
@@ -930,9 +997,8 @@ public class TestHeartbeatHandler {
   }
 
   @Test
-  @Ignore //TODO (dlysnichenko) : fix
+  @SuppressWarnings("unchecked")
   public void testTaskInProgressHandling() throws AmbariException, InvalidStateTransitionException {
-    ActionManager am = getMockActionManager();
     Cluster cluster = getDummyCluster();
 
     @SuppressWarnings("serial")
@@ -950,7 +1016,6 @@ public class TestHeartbeatHandler {
     hdfs.getServiceComponent(SECONDARY_NAMENODE).addServiceComponentHost(DummyHostname1).persist();
 
     ActionQueue aq = new ActionQueue();
-    HeartBeatHandler handler = getHeartBeatHandler(am, aq);
 
     ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
             getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
@@ -977,14 +1042,25 @@ public class TestHeartbeatHandler {
     hb.setReports(reports);
     hb.setComponentStatus(new ArrayList<ComponentStatus>());
 
+    final HostRoleCommand command = new HostRoleCommand(DummyHostname1,
+            Role.DATANODE, null, null);
+
+    ActionManager am = getMockActionManager();
+    expect(am.getTasks(anyObject(List.class))).andReturn(
+            new ArrayList<HostRoleCommand>() {{
+              add(command);
+            }});
+    replay(am);
+
+    HeartBeatHandler handler = getHeartBeatHandler(am, aq);
     handler.handleHeartBeat(hb);
     State componentState1 = serviceComponentHost1.getState();
     assertEquals("Host state should still be installing", State.INSTALLING, componentState1);
   }
 
   @Test
+  @SuppressWarnings("unchecked")
   public void testOPFailedEventForAbortedTask() throws AmbariException, InvalidStateTransitionException {
-    ActionManager am = getMockActionManager();
     Cluster cluster = getDummyCluster();
 
     @SuppressWarnings("serial")
@@ -1002,7 +1078,6 @@ public class TestHeartbeatHandler {
     hdfs.getServiceComponent(SECONDARY_NAMENODE).addServiceComponentHost(DummyHostname1).persist();
 
     ActionQueue aq = new ActionQueue();
-    HeartBeatHandler handler = getHeartBeatHandler(am, aq);
 
     ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
       getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
@@ -1041,6 +1116,18 @@ public class TestHeartbeatHandler {
     reports.add(cr);
     hb.setReports(reports);
     hb.setComponentStatus(new ArrayList<ComponentStatus>());
+
+    final HostRoleCommand command = new HostRoleCommand(DummyHostname1,
+            Role.DATANODE, null, null);
+
+    ActionManager am = getMockActionManager();
+    expect(am.getTasks(anyObject(List.class))).andReturn(
+            new ArrayList<HostRoleCommand>() {{
+              add(command);
+            }});
+    replay(am);
+
+    HeartBeatHandler handler = getHeartBeatHandler(am, aq);
     handler.handleHeartBeat(hb);
     State componentState1 = serviceComponentHost1.getState();
     assertEquals("Host state should still be installing", State.INSTALLING,
@@ -1055,10 +1142,9 @@ public class TestHeartbeatHandler {
    * @throws InvalidStateTransitionException
    */
   @Test
-  @Ignore //TODO (dlysnichenko) : fix
+  @SuppressWarnings("unchecked")
   public void testCommandReportOnHeartbeatUpdatedState()
       throws AmbariException, InvalidStateTransitionException {
-    ActionManager am = getMockActionManager();
     Cluster cluster = getDummyCluster();
 
     @SuppressWarnings("serial")
@@ -1072,7 +1158,6 @@ public class TestHeartbeatHandler {
     hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
 
     ActionQueue aq = new ActionQueue();
-    HeartBeatHandler handler = getHeartBeatHandler(am, aq);
 
     ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
         getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
@@ -1100,6 +1185,17 @@ public class TestHeartbeatHandler {
     hb.setReports(reports);
     hb.setComponentStatus(new ArrayList<ComponentStatus>());
 
+    final HostRoleCommand command = new HostRoleCommand(DummyHostname1,
+            Role.DATANODE, null, null);
+
+    ActionManager am = getMockActionManager();
+    expect(am.getTasks(anyObject(List.class))).andReturn(
+            new ArrayList<HostRoleCommand>() {{
+              add(command);
+            }}).anyTimes();
+    replay(am);
+
+    HeartBeatHandler handler = getHeartBeatHandler(am, aq);
     handler.handleHeartBeat(hb);
     assertEquals("Host state should  be " + State.INSTALLED,
         State.INSTALLED, serviceComponentHost1.getState());
@@ -1171,9 +1267,8 @@ public class TestHeartbeatHandler {
   }
 
   @Test
-  @Ignore //TODO (dlysnichenko) : fix
+  @SuppressWarnings("unchecked")
   public void testUpgradeSpecificHandling() throws AmbariException, InvalidStateTransitionException {
-    ActionManager am = getMockActionManager();
     Cluster cluster = getDummyCluster();
 
     @SuppressWarnings("serial")
@@ -1187,7 +1282,6 @@ public class TestHeartbeatHandler {
     hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
 
     ActionQueue aq = new ActionQueue();
-    HeartBeatHandler handler = getHeartBeatHandler(am, aq);
 
     ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
         getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
@@ -1214,6 +1308,17 @@ public class TestHeartbeatHandler {
     hb.setReports(reports);
     hb.setComponentStatus(new ArrayList<ComponentStatus>());
 
+    final HostRoleCommand command = new HostRoleCommand(DummyHostname1,
+            Role.DATANODE, null, null);
+
+    ActionManager am = getMockActionManager();
+    expect(am.getTasks(anyObject(List.class))).andReturn(
+            new ArrayList<HostRoleCommand>() {{
+              add(command);
+            }}).anyTimes();
+    replay(am);
+
+    HeartBeatHandler handler = getHeartBeatHandler(am, aq);
     handler.handleHeartBeat(hb);
     assertEquals("Host state should  be " + State.UPGRADING,
         State.UPGRADING, serviceComponentHost1.getState());
@@ -1237,8 +1342,6 @@ public class TestHeartbeatHandler {
     assertEquals("Host state should be " + State.UPGRADING,
         State.UPGRADING, serviceComponentHost1.getState());
 
-    // TODO What happens when there is a TIMEDOUT
-
     serviceComponentHost1.setState(State.UPGRADING);
     hb.setTimestamp(System.currentTimeMillis());
     hb.setResponseId(3);
@@ -1261,8 +1364,8 @@ public class TestHeartbeatHandler {
   }
 
   @Test
+  @SuppressWarnings("unchecked")
   public void testStatusHeartbeatWithVersion() throws Exception {
-    ActionManager am = getMockActionManager();
     Cluster cluster = getDummyCluster();
 
     @SuppressWarnings("serial")
@@ -1321,6 +1424,11 @@ public class TestHeartbeatHandler {
     hb.setComponentStatus(componentStatuses);
 
     ActionQueue aq = new ActionQueue();
+    ActionManager am = getMockActionManager();
+    expect(am.getTasks(anyObject(List.class))).andReturn(
+            new ArrayList<HostRoleCommand>() {{
+            }});
+    replay(am);
     HeartBeatHandler handler = getHeartBeatHandler(am, aq);
     handler.handleHeartBeat(hb);
     assertEquals("Matching value " + serviceComponentHost1.getStackVersion(),
@@ -1333,9 +1441,8 @@ public class TestHeartbeatHandler {
   }
 
   @Test
-  @Ignore //TODO (dlysnichenko) : fix
+  @SuppressWarnings("unchecked")
   public void testComponentUpgradeCompleteReport() throws AmbariException, InvalidStateTransitionException {
-    ActionManager am = getMockActionManager();
     Cluster cluster = getDummyCluster();
 
     @SuppressWarnings("serial")
@@ -1403,6 +1510,17 @@ public class TestHeartbeatHandler {
     hb.setReports(reports);
 
     ActionQueue aq = new ActionQueue();
+    final HostRoleCommand command = new HostRoleCommand(DummyHostname1,
+            Role.DATANODE, null, null);
+
+    ActionManager am = getMockActionManager();
+    expect(am.getTasks(anyObject(List.class))).andReturn(
+            new ArrayList<HostRoleCommand>() {{
+              add(command);
+              add(command);
+            }});
+    replay(am);
+
     HeartBeatHandler handler = getHeartBeatHandler(am, aq);
     handler.handleHeartBeat(hb);
     assertEquals("Stack version for SCH should be updated to " +
@@ -1413,9 +1531,8 @@ public class TestHeartbeatHandler {
   }
 
   @Test
-  @Ignore //TODO (dlysnichenko) : fix
+  @SuppressWarnings("unchecked")
   public void testComponentUpgradeInProgressReport() throws AmbariException, InvalidStateTransitionException {
-    ActionManager am = getMockActionManager();
     Cluster cluster = getDummyCluster();
 
     @SuppressWarnings("serial")
@@ -1481,6 +1598,17 @@ public class TestHeartbeatHandler {
     hb.setReports(reports);
 
     ActionQueue aq = new ActionQueue();
+    final HostRoleCommand command = new HostRoleCommand(DummyHostname1,
+            Role.DATANODE, null, null);
+
+    ActionManager am = getMockActionManager();
+    expect(am.getTasks(anyObject(List.class))).andReturn(
+            new ArrayList<HostRoleCommand>() {{
+              add(command);
+              add(command);
+            }});
+    replay(am);
+
     HeartBeatHandler handler = getHeartBeatHandler(am, aq);
     handler.handleHeartBeat(hb);
     assertEquals("State of SCH not change while operation is in progress",
@@ -1493,8 +1621,8 @@ public class TestHeartbeatHandler {
 
 
   @Test
+  @SuppressWarnings("unchecked")
   public void testComponentUpgradeFailReport() throws AmbariException, InvalidStateTransitionException {
-    ActionManager am = getMockActionManager();
     Cluster cluster = getDummyCluster();
 
     @SuppressWarnings("serial")
@@ -1575,9 +1703,6 @@ public class TestHeartbeatHandler {
     cr1.setStdOut("dummy output");
     cr1.setExitCode(0);
 
-//    actionDBAccessor.updateHostRoleState(DummyHostname1, requestId, stageId,
-//      Role.DATANODE.name(), cr1);
-
     CommandReport cr2 = new CommandReport();
     cr2.setActionId(StageUtils.getActionId(requestId, stageId));
     cr2.setTaskId(2);
@@ -1593,10 +1718,18 @@ public class TestHeartbeatHandler {
     reports.add(cr2);
     hb.setReports(reports);
 
-//    actionDBAccessor.updateHostRoleState(DummyHostname1, requestId, stageId,
-//      Role.NAMENODE.name(), cr2);
-
     ActionQueue aq = new ActionQueue();
+    final HostRoleCommand command = new HostRoleCommand(DummyHostname1,
+            Role.DATANODE, null, null);
+
+    ActionManager am = getMockActionManager();
+    expect(am.getTasks(anyObject(List.class))).andReturn(
+            new ArrayList<HostRoleCommand>() {{
+              add(command);
+              add(command);
+            }});
+    replay(am);
+
     HeartBeatHandler handler = getHeartBeatHandler(am, aq);
     handler.handleHeartBeat(hb);
     assertEquals("State of SCH should change after fail report",
@@ -1612,9 +1745,8 @@ public class TestHeartbeatHandler {
   }
   
   @Test
-  @Ignore //TODO (dlysnichenko) : fix
+  @SuppressWarnings("unchecked")
   public void testProcessStatusReports() throws Exception {
-    ActionManager am = getMockActionManager();
     Clusters fsm = clusters;
 
     Cluster cluster = getDummyCluster();
@@ -1631,7 +1763,17 @@ public class TestHeartbeatHandler {
 
     ActionQueue aq = new ActionQueue();
 
+    final HostRoleCommand command = new HostRoleCommand(DummyHostname1,
+            Role.DATANODE, null, null);
+    ActionManager am = getMockActionManager();
+    expect(am.getTasks(anyObject(List.class))).andReturn(
+            new ArrayList<HostRoleCommand>() {{
+              add(command);
+              add(command);
+            }}).anyTimes();
+    replay(am);
     HeartBeatHandler handler = new HeartBeatHandler(fsm, aq, am, injector);
+
     Register reg = new Register();
     HostInfo hi = new HostInfo();
     hi.setHostName(DummyHostname1);
@@ -1744,6 +1886,13 @@ public class TestHeartbeatHandler {
     handler.handleHeartBeat(hb1);
     assertEquals(HostHealthStatus.HealthStatus.HEALTHY.name(), hostObject.getStatus());
 
+    reset(am);
+    expect(am.getTasks(anyObject(List.class))).andReturn(
+            new ArrayList<HostRoleCommand>() {{
+              add(command);
+            }}).anyTimes();
+    replay(am);
+
     //Only one component reported status
     hdfs.getServiceComponent(NAMENODE).getServiceComponentHost(DummyHostname1).setState(State.INSTALLED);
     HeartBeat hb4 = new HeartBeat();
@@ -1789,10 +1938,8 @@ public class TestHeartbeatHandler {
   }
 
   @Test
-  @Ignore //TODO (dlysnichenko) : fix
+  @SuppressWarnings("unchecked")
   public void testIgnoreCustomActionReport() throws AmbariException, InvalidStateTransitionException {
-    ActionManager am = getMockActionManager();
-
     CommandReport cr1 = new CommandReport();
     cr1.setActionId(StageUtils.getActionId(requestId, stageId));
     cr1.setTaskId(1);
@@ -1828,6 +1975,18 @@ public class TestHeartbeatHandler {
     hb.setReports(reports);
 
     ActionQueue aq = new ActionQueue();
+
+    final HostRoleCommand command = new HostRoleCommand(DummyHostname1,
+            Role.DATANODE, null, null);
+
+    ActionManager am = getMockActionManager();
+    expect(am.getTasks(anyObject(List.class))).andReturn(
+            new ArrayList<HostRoleCommand>() {{
+              add(command);
+              add(command);
+            }});
+    replay(am);
+
     HeartBeatHandler handler = getHeartBeatHandler(am, aq);
 
     // CUSTOM_COMMAND and ACTIONEXECUTE reports are ignored
@@ -1888,9 +2047,18 @@ public class TestHeartbeatHandler {
   }
 
   private ActionManager getMockActionManager() {
-    return new ActionManager(0, 0, null, null,
-        actionDBAccessor, new HostsMap((String) null), null, unitOfWork,
-        injector.getInstance(RequestFactory.class), null);
+    ActionQueue actionQueueMock = createNiceMock(ActionQueue.class);
+    Clusters clustersMock = createNiceMock(Clusters.class);
+    ServerActionManager serverActionManagerMock = createNiceMock(ServerActionManager.class);
+    Configuration configurationMock = createNiceMock(Configuration.class);
+
+    ActionManager actionManager = createMockBuilder(ActionManager.class).
+            addMockedMethod("getTasks").
+            withConstructor((long)0, (long)0, actionQueueMock, clustersMock,
+                    actionDBAccessor, new HostsMap((String) null), serverActionManagerMock, unitOfWork,
+                    injector.getInstance(RequestFactory.class), configurationMock).
+            createMock();
+    return actionManager;
   }
 
 
@@ -1944,9 +2112,8 @@ public class TestHeartbeatHandler {
   }
   
   @Test
+  @SuppressWarnings("unchecked")
   public void testCommandStatusProcesses() throws Exception {
-    ActionManager am = getMockActionManager();
-
     Cluster cluster = getDummyCluster();
     Host hostObject = clusters.getHost(DummyHostname1);
     clusters.mapHostToCluster(hostObject.getHostName(), cluster.getClusterName());
@@ -1957,7 +2124,6 @@ public class TestHeartbeatHandler {
     hdfs.getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1).setState(State.STARTED);
     
     ActionQueue aq = new ActionQueue();
-    HeartBeatHandler handler = getHeartBeatHandler(am, aq);
 
     HeartBeat hb = new HeartBeat();
     hb.setTimestamp(System.currentTimeMillis());
@@ -1965,7 +2131,6 @@ public class TestHeartbeatHandler {
     hb.setHostname(DummyHostname1);
     hb.setNodeStatus(new HostStatus(Status.HEALTHY, DummyHostStatus));
     hb.setReports(new ArrayList<CommandReport>());
-    
 
     List<Map<String, String>> procs = new ArrayList<Map<String, String>>();
     Map<String, String> proc1info = new HashMap<String, String>();
@@ -1992,7 +2157,18 @@ public class TestHeartbeatHandler {
     componentStatus1.setExtra(extra);
     componentStatuses.add(componentStatus1);
     hb.setComponentStatus(componentStatuses);
-    
+
+    final HostRoleCommand command = new HostRoleCommand(DummyHostname1,
+            Role.DATANODE, null, null);
+
+    ActionManager am = getMockActionManager();
+    expect(am.getTasks(anyObject(List.class))).andReturn(
+            new ArrayList<HostRoleCommand>() {{
+              add(command);
+            }}).anyTimes();
+    replay(am);
+
+    HeartBeatHandler handler = getHeartBeatHandler(am, aq);
     handler.handleHeartBeat(hb);
     ServiceComponentHost sch = hdfs.getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1); 
 
@@ -2028,9 +2204,8 @@ public class TestHeartbeatHandler {
   }
   
   @Test
+  @SuppressWarnings("unchecked")
   public void testCommandStatusProcesses_empty() throws Exception {
-    ActionManager am = getMockActionManager();
-
     Cluster cluster = getDummyCluster();
     Host hostObject = clusters.getHost(DummyHostname1);
     clusters.mapHostToCluster(hostObject.getHostName(), cluster.getClusterName());
@@ -2041,8 +2216,6 @@ public class TestHeartbeatHandler {
     hdfs.getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1).setState(State.STARTED);
     
     ActionQueue aq = new ActionQueue();
-    HeartBeatHandler handler = getHeartBeatHandler(am, aq);
-
     HeartBeat hb = new HeartBeat();
     hb.setTimestamp(System.currentTimeMillis());
     hb.setResponseId(0);
@@ -2060,9 +2233,19 @@ public class TestHeartbeatHandler {
     
     componentStatuses.add(componentStatus1);
     hb.setComponentStatus(componentStatuses);
-    
-    handler.handleHeartBeat(hb);
-    ServiceComponentHost sch = hdfs.getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1); 
+
+    final HostRoleCommand command = new HostRoleCommand(DummyHostname1,
+            Role.DATANODE, null, null);
+
+    ActionManager am = getMockActionManager();
+    expect(am.getTasks(anyObject(List.class))).andReturn(
+            new ArrayList<HostRoleCommand>() {{
+              add(command);
+            }});
+    replay(am);
+
+    HeartBeatHandler handler = getHeartBeatHandler(am, aq);
+    ServiceComponentHost sch = hdfs.getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
 
     Assert.assertEquals(Integer.valueOf(0), Integer.valueOf(sch.getProcesses().size()));
   }