You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by dm...@apache.org on 2014/08/12 18:25:46 UTC
git commit: AMBARI-6828. Postfixes for server-side implementation of
Cancel requests (dlysnichenko)
Repository: ambari
Updated Branches:
refs/heads/trunk 382e7f3df -> 9bbb43e57
AMBARI-6828. Postfixes for server-side implementation of Cancel requests (dlysnichenko)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/9bbb43e5
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/9bbb43e5
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/9bbb43e5
Branch: refs/heads/trunk
Commit: 9bbb43e57bb6071fabcb23c12ffb222fb33cccdb
Parents: 382e7f3
Author: Lisnichenko Dmitro <dl...@hortonworks.com>
Authored: Fri Aug 8 15:13:24 2014 +0300
Committer: Lisnichenko Dmitro <dl...@hortonworks.com>
Committed: Tue Aug 12 19:13:38 2014 +0300
----------------------------------------------------------------------
.../actionmanager/ActionDBAccessorImpl.java | 5 +
.../server/actionmanager/ActionScheduler.java | 2 +-
.../actionmanager/TestActionDBAccessorImpl.java | 33 +-
.../actionmanager/TestActionScheduler.java | 193 +++++++++--
.../server/agent/TestHeartbeatHandler.java | 323 +++++++++++++++----
5 files changed, 464 insertions(+), 92 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/9bbb43e5/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
index dae9048..5e879cc 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
@@ -346,6 +346,7 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
long now = System.currentTimeMillis();
List<Long> requestsToCheck = new ArrayList<Long>();
+ List<Long> abortedCommandUpdates = new ArrayList<Long>();
List<HostRoleCommandEntity> commandEntities = hostRoleCommandDAO.findByPKs(taskReports.keySet());
for (HostRoleCommandEntity commandEntity : commandEntities) {
@@ -354,6 +355,8 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
// We don't want to overwrite statuses for ABORTED tasks with
// statuses that have been received from the agent after aborting task
commandEntity.setStatus(HostRoleStatus.valueOf(report.getStatus()));
+ } else {
+ abortedCommandUpdates.add(commandEntity.getTaskId());
}
commandEntity.setStdOut(report.getStdOut().getBytes());
commandEntity.setStdError(report.getStdErr().getBytes());
@@ -375,6 +378,8 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
}
hostRoleCommandDAO.mergeAll(commandEntities);
+ // Invalidate cache because of updates to ABORTED commands
+ hostRoleCommandCache.invalidateAll(abortedCommandUpdates);
for (Long requestId : requestsToCheck) {
endRequestIfCompleted(requestId);
http://git-wip-us.apache.org/repos/asf/ambari/blob/9bbb43e5/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
index cab891f..b9a67b7 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
@@ -794,7 +794,7 @@ class ActionScheduler implements Runnable {
* @param hostRoleCommands a list of hostRoleCommands
* @param reason why the request is being cancelled
*/
- private void cancelHostRoleCommands(Collection<HostRoleCommand> hostRoleCommands, String reason) {
+ void cancelHostRoleCommands(Collection<HostRoleCommand> hostRoleCommands, String reason) {
for (HostRoleCommand hostRoleCommand : hostRoleCommands) {
if (hostRoleCommand.getStatus() == HostRoleStatus.QUEUED) {
// Dequeue all tasks that have been already scheduled for sending to agent
http://git-wip-us.apache.org/repos/asf/ambari/blob/9bbb43e5/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
index a94f421..2850897 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
@@ -120,6 +120,37 @@ public class TestActionDBAccessorImpl {
Stage s = db.getAllStages(requestId).get(0);
assertEquals(HostRoleStatus.COMPLETED,s.getHostRoleStatus(hostname, "HBASE_MASTER"));
}
+
+ @Test
+ public void testCancelCommandReport() throws AmbariException {
+ String hostname = "host1";
+ populateActionDB(db, hostname, requestId, stageId);
+ Stage stage = db.getAllStages(requestId).get(0);
+ Assert.assertEquals(stageId, stage.getStageId());
+ stage.setHostRoleStatus(hostname, "HBASE_MASTER", HostRoleStatus.ABORTED);
+ db.hostRoleScheduled(stage, hostname, "HBASE_MASTER");
+ List<CommandReport> reports = new ArrayList<CommandReport>();
+ CommandReport cr = new CommandReport();
+ cr.setTaskId(1);
+ cr.setActionId(StageUtils.getActionId(requestId, stageId));
+ cr.setRole("HBASE_MASTER");
+ cr.setStatus("COMPLETED");
+ cr.setStdErr("");
+ cr.setStdOut("");
+ cr.setExitCode(0);
+ reports.add(cr);
+ am.processTaskResponse(hostname, reports, stage.getOrderedHostRoleCommands());
+ assertEquals(0,
+ am.getAction(requestId, stageId).getExitCode(hostname, "HBASE_MASTER"));
+ assertEquals("HostRoleStatus should remain ABORTED " +
+ "(command report status should be ignored)",
+ HostRoleStatus.ABORTED, am.getAction(requestId, stageId)
+ .getHostRoleStatus(hostname, "HBASE_MASTER"));
+ Stage s = db.getAllStages(requestId).get(0);
+ assertEquals("HostRoleStatus should remain ABORTED " +
+ "(command report status should be ignored)",
+ HostRoleStatus.ABORTED,s.getHostRoleStatus(hostname, "HBASE_MASTER"));
+ }
@Test
public void testGetStagesInProgress() throws AmbariException {
@@ -129,8 +160,6 @@ public class TestActionDBAccessorImpl {
stages.add(createStubStage(hostname, requestId, stageId + 1));
Request request = new Request(stages, clusters);
db.persistActions(request);
-
- List<Stage> stages2 = db.getStagesInProgress();
assertEquals(2, stages.size());
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/9bbb43e5/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
index 60ed592..a536bef 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
@@ -26,6 +26,7 @@ import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.*;
import java.lang.reflect.Type;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -44,13 +45,14 @@ import org.apache.ambari.server.ServiceComponentHostNotFoundException;
import org.apache.ambari.server.actionmanager.ActionScheduler.RoleStats;
import org.apache.ambari.server.agent.ActionQueue;
import org.apache.ambari.server.agent.AgentCommand;
+import org.apache.ambari.server.agent.AgentCommand.AgentCommandType;
+import org.apache.ambari.server.agent.CancelCommand;
import org.apache.ambari.server.agent.CommandReport;
import org.apache.ambari.server.agent.ExecutionCommand;
-import org.apache.ambari.server.agent.AgentCommand.AgentCommandType;
import org.apache.ambari.server.configuration.Configuration;
-import org.apache.ambari.server.controller.AmbariCustomCommandExecutionHelper;
import org.apache.ambari.server.controller.HostsMap;
import org.apache.ambari.server.serveraction.ServerAction;
+import org.apache.ambari.server.serveraction.ServerActionManager;
import org.apache.ambari.server.serveraction.ServerActionManagerImpl;
import org.apache.ambari.server.state.Cluster;
import org.apache.ambari.server.state.Clusters;
@@ -60,11 +62,12 @@ import org.apache.ambari.server.state.Service;
import org.apache.ambari.server.state.ServiceComponent;
import org.apache.ambari.server.state.ServiceComponentHost;
import org.apache.ambari.server.state.ServiceComponentHostEvent;
-import org.apache.ambari.server.state.ServiceComponentHostEventType;
import org.apache.ambari.server.state.svccomphost.ServiceComponentHostInstallEvent;
import org.apache.ambari.server.state.svccomphost.ServiceComponentHostOpFailedEvent;
import org.apache.ambari.server.state.svccomphost.ServiceComponentHostUpgradeEvent;
import org.apache.ambari.server.utils.StageUtils;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
@@ -82,6 +85,7 @@ public class TestActionScheduler {
private static final String CLUSTER_HOST_INFO_UPDATED = "{all_hosts=[c6401.ambari.apache.org,"
+ " c6402.ambari.apache.org], slave_hosts=[c6401.ambari.apache.org,"
+ " c6402.ambari.apache.org]}";
+ private final String hostname = "ahost.ambari.apache.org";
/**
@@ -111,7 +115,6 @@ public class TestActionScheduler {
when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
when(serviceObj.getCluster()).thenReturn(oneClusterMock);
- String hostname = "ahost.ambari.apache.org";
Host host = mock(Host.class);
HashMap<String, ServiceComponentHost> hosts =
new HashMap<String, ServiceComponentHost>();
@@ -193,7 +196,6 @@ public class TestActionScheduler {
when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
when(serviceObj.getCluster()).thenReturn(oneClusterMock);
- String hostname = "ahost.ambari.apache.org";
Host host = mock(Host.class);
HashMap<String, ServiceComponentHost> hosts =
new HashMap<String, ServiceComponentHost>();
@@ -258,7 +260,6 @@ public class TestActionScheduler {
when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
when(serviceObj.getCluster()).thenReturn(oneClusterMock);
- String hostname = "ahost.ambari.apache.org";
Host host = mock(Host.class);
HashMap<String, ServiceComponentHost> hosts =
new HashMap<String, ServiceComponentHost>();
@@ -286,10 +287,15 @@ public class TestActionScheduler {
}
}).when(db).timeoutHostRole(anyString(), anyLong(), anyLong(), anyString());
+ ServerActionManager sam = EasyMock.createNiceMock(ServerActionManager.class);
//Small action timeout to test rescheduling
- ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
- new HostsMap((String) null), null, unitOfWork, conf);
+ ActionScheduler scheduler = EasyMock.createMockBuilder(ActionScheduler.class).
+ withConstructor((long) 100, (long) 50, db, aq, fsm, 3,
+ new HostsMap((String) null), sam, unitOfWork, conf).
+ addMockedMethod("cancelHostRoleCommands").
+ createMock();
+ EasyMock.replay(scheduler);
scheduler.setTaskTimeoutAdjustment(false);
// Start the thread
scheduler.start();
@@ -298,10 +304,11 @@ public class TestActionScheduler {
.equals(HostRoleStatus.TIMEDOUT)) {
Thread.sleep(100L);
}
-// assertEquals(stages.get(0).getHostRoleStatus(hostname, "NAMENODE"),
-// HostRoleStatus.TIMEDOUT);
scheduler.stop();
+
+ //Check that cancelHostRoleCommands() was not called
+ EasyMock.verify(scheduler);
}
@Test
@@ -465,7 +472,6 @@ public class TestActionScheduler {
when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
when(serviceObj.getCluster()).thenReturn(oneClusterMock);
- String hostname = "ahost.ambari.apache.org";
HashMap<String, ServiceComponentHost> hosts =
new HashMap<String, ServiceComponentHost>();
hosts.put(hostname, sch);
@@ -527,7 +533,6 @@ public class TestActionScheduler {
when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
when(serviceObj.getCluster()).thenReturn(oneClusterMock);
- String hostname = "ahost.ambari.apache.org";
HashMap<String, ServiceComponentHost> hosts =
new HashMap<String, ServiceComponentHost>();
hosts.put(hostname, sch);
@@ -830,7 +835,6 @@ public class TestActionScheduler {
when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
when(serviceObj.getCluster()).thenReturn(oneClusterMock);
- String hostname = "ahost.ambari.apache.org";
HashMap<String, ServiceComponentHost> hosts =
new HashMap<String, ServiceComponentHost>();
hosts.put(hostname, sch);
@@ -915,11 +919,22 @@ public class TestActionScheduler {
Properties properties = new Properties();
Configuration conf = new Configuration(properties);
- ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
- new HostsMap((String) null), new ServerActionManagerImpl(fsm),
- unitOfWork, conf);
+ ServerActionManagerImpl serverActionManager = new ServerActionManagerImpl(fsm);
+
+ Capture<Collection<HostRoleCommand>> cancelCommandList = new Capture<Collection<HostRoleCommand>>();
+ ActionScheduler scheduler = EasyMock.createMockBuilder(ActionScheduler.class).
+ withConstructor((long)100, (long)50, db, aq, fsm, 3,
+ new HostsMap((String) null), serverActionManager,
+ unitOfWork, conf).
+ addMockedMethod("cancelHostRoleCommands").
+ createMock();
+ scheduler.cancelHostRoleCommands(EasyMock.capture(cancelCommandList),
+ EasyMock.eq(ActionScheduler.FAILED_TASK_ABORT_REASONING));
+ EasyMock.expectLastCall().once();
+ EasyMock.replay(scheduler);
+
ActionManager am = new ActionManager(
- 2, 2, aq, fsm, db, new HostsMap((String) null), new ServerActionManagerImpl(fsm), unitOfWork, requestFactory, conf);
+ 2, 2, aq, fsm, db, new HostsMap((String) null), serverActionManager, unitOfWork, requestFactory, conf);
scheduler.doWork();
@@ -930,6 +945,8 @@ public class TestActionScheduler {
scheduler.doWork();
Assert.assertEquals(HostRoleStatus.FAILED, stages.get(0).getHostRoleStatus(hostname, "NAMENODE"));
Assert.assertEquals(HostRoleStatus.ABORTED, stages.get(1).getHostRoleStatus(hostname, "DATANODE"));
+ Assert.assertEquals(cancelCommandList.getValue().size(), 1);
+ EasyMock.verify(scheduler);
}
/**
@@ -1398,7 +1415,6 @@ public class TestActionScheduler {
when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
when(serviceObj.getCluster()).thenReturn(oneClusterMock);
- String hostname = "ahost.ambari.apache.org";
Host host = mock(Host.class);
HashMap<String, ServiceComponentHost> hosts =
new HashMap<String, ServiceComponentHost>();
@@ -1556,7 +1572,6 @@ public class TestActionScheduler {
when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
when(serviceObj.getCluster()).thenReturn(oneClusterMock);
- String hostname = "ahost.ambari.apache.org";
HashMap<String, ServiceComponentHost> hosts =
new HashMap<String, ServiceComponentHost>();
hosts.put(hostname, sch);
@@ -1598,8 +1613,148 @@ public class TestActionScheduler {
scheduler.stop();
assertEquals(stages.get(0).getHostRoleStatus(hostname, "AMBARI_SERVER_ACTION"),
HostRoleStatus.COMPLETED);
+ }
+
+ @Test
+ public void testCancelRequests() throws Exception {
+ ActionQueue aq = new ActionQueue();
+ Clusters fsm = mock(Clusters.class);
+ Cluster oneClusterMock = mock(Cluster.class);
+ Service serviceObj = mock(Service.class);
+ ServiceComponent scomp = mock(ServiceComponent.class);
+ ServiceComponentHost sch = mock(ServiceComponentHost.class);
+ UnitOfWork unitOfWork = mock(UnitOfWork.class);
+ RequestFactory requestFactory = mock(RequestFactory.class);
+ when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
+ when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
+ when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
+ when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
+ when(serviceObj.getCluster()).thenReturn(oneClusterMock);
+
+ HashMap<String, ServiceComponentHost> hosts =
+ new HashMap<String, ServiceComponentHost>();
+ hosts.put(hostname, sch);
+ when(scomp.getServiceComponentHosts()).thenReturn(hosts);
+
+ long requestId = 1;
+
+ final List<Stage> stages = new ArrayList<Stage>();
+ int namenodeCmdTaskId = 1;
+ stages.add(
+ getStageWithSingleTask(
+ hostname, "cluster1", Role.NAMENODE, RoleCommand.START,
+ Service.Type.HDFS, namenodeCmdTaskId, 1, (int)requestId));
+ stages.add(
+ getStageWithSingleTask(
+ hostname, "cluster1", Role.DATANODE, RoleCommand.START,
+ Service.Type.HDFS, 2, 2, (int)requestId));
+
+ Host host = mock(Host.class);
+ when(fsm.getHost(anyString())).thenReturn(host);
+ when(host.getState()).thenReturn(HostState.HEALTHY);
+ when(host.getHostName()).thenReturn(hostname);
+
+ ActionDBAccessor db = mock(ActionDBAccessor.class);
+
+ when(db.getStagesInProgress()).thenReturn(stages);
+
+ List<HostRoleCommand> requestTasks = new ArrayList<HostRoleCommand>();
+ for (Stage stage : stages) {
+ requestTasks.addAll(stage.getOrderedHostRoleCommands());
+ }
+ when(db.getRequestTasks(anyLong())).thenReturn(requestTasks);
+ when(db.getAllStages(anyLong())).thenReturn(stages);
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ List<CommandReport> reports = (List<CommandReport>) invocation.getArguments()[0];
+ for (CommandReport report : reports) {
+ String actionId = report.getActionId();
+ long[] requestStageIds = StageUtils.getRequestStage(actionId);
+ Long requestId = requestStageIds[0];
+ Long stageId = requestStageIds[1];
+ String role = report.getRole();
+ Long id = report.getTaskId();
+ for (Stage stage : stages) {
+ if (requestId.equals(stage.getRequestId()) && stageId.equals(stage.getStageId())) {
+ for (HostRoleCommand hostRoleCommand : stage.getOrderedHostRoleCommands()) {
+ if (hostRoleCommand.getTaskId() == id) {
+ hostRoleCommand.setStatus(HostRoleStatus.valueOf(report.getStatus()));
+ }
+ }
+ }
+ }
+ }
+
+ return null;
+ }
+ }).when(db).updateHostRoleStates(anyCollectionOf(CommandReport.class));
+
+ when(db.getTask(anyLong())).thenAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ Long taskId = (Long) invocation.getArguments()[0];
+ for (Stage stage : stages) {
+ for (HostRoleCommand command : stage.getOrderedHostRoleCommands()) {
+ if (taskId.equals(command.getTaskId())) {
+ return command;
+ }
+ }
+ }
+ return null;
+ }
+ });
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ Long requestId = (Long) invocation.getArguments()[0];
+ for (Stage stage : stages) {
+ if (requestId.equals(stage.getRequestId())) {
+ for (HostRoleCommand command : stage.getOrderedHostRoleCommands()) {
+ if (command.getStatus() == HostRoleStatus.QUEUED ||
+ command.getStatus() == HostRoleStatus.IN_PROGRESS ||
+ command.getStatus() == HostRoleStatus.PENDING) {
+ command.setStatus(HostRoleStatus.ABORTED);
+ }
+ }
+ }
+ }
+
+ return null;
+ }
+ }).when(db).abortOperation(anyLong());
+
+ Properties properties = new Properties();
+ Configuration conf = new Configuration(properties);
+ ServerActionManagerImpl serverActionManager = new ServerActionManagerImpl(fsm);
+
+ ActionScheduler scheduler =new ActionScheduler(100, 50, db, aq, fsm, 3,
+ new HostsMap((String) null), serverActionManager, unitOfWork, conf);
+
+ ActionManager am = new ActionManager(
+ 2, 2, aq, fsm, db, new HostsMap((String) null),
+ serverActionManager, unitOfWork, requestFactory, conf);
+
+ scheduler.doWork();
+
+
+
+ //List<CommandReport> reports = new ArrayList<CommandReport>();
+ //reports.add(getCommandReport(HostRoleStatus.FAILED, Role.NAMENODE, Service.Type.HDFS, "1-1", 1));
+ //am.processTaskResponse(hostname, reports, stages.get(0).getOrderedHostRoleCommands());
+ String reason = "Some reason";
+
+ scheduler.scheduleCancellingRequest(requestId, reason);
+
+ scheduler.doWork();
+ Assert.assertEquals(HostRoleStatus.ABORTED, stages.get(0).getHostRoleStatus(hostname, "NAMENODE"));
+ Assert.assertEquals(HostRoleStatus.ABORTED, stages.get(1).getHostRoleStatus(hostname, "DATANODE"));
+ Assert.assertEquals(aq.size(hostname), 1); // Cancel commands should be generated only for 1 stage
+ CancelCommand cancelCommand = (CancelCommand) aq.dequeue(hostname);
+ Assert.assertEquals(cancelCommand.getTargetTaskId(), namenodeCmdTaskId);
+ Assert.assertEquals(cancelCommand.getReason(), reason);
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/9bbb43e5/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java b/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
index 349f09d..5c4a4f1 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
@@ -36,6 +36,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyList;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -60,6 +61,7 @@ import org.apache.ambari.server.RoleCommand;
import org.apache.ambari.server.actionmanager.ActionDBAccessor;
import org.apache.ambari.server.actionmanager.ActionDBAccessorImpl;
import org.apache.ambari.server.actionmanager.ActionManager;
+import org.apache.ambari.server.actionmanager.HostRoleCommand;
import org.apache.ambari.server.actionmanager.HostRoleStatus;
import org.apache.ambari.server.actionmanager.Request;
import org.apache.ambari.server.actionmanager.RequestFactory;
@@ -70,6 +72,7 @@ import org.apache.ambari.server.configuration.Configuration;
import org.apache.ambari.server.controller.HostsMap;
import org.apache.ambari.server.orm.GuiceJpaInitializer;
import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
+import org.apache.ambari.server.serveraction.ServerActionManager;
import org.apache.ambari.server.state.Alert;
import org.apache.ambari.server.state.AlertState;
import org.apache.ambari.server.state.Cluster;
@@ -88,9 +91,9 @@ import org.apache.ambari.server.state.svccomphost.ServiceComponentHostStartEvent
import org.apache.ambari.server.state.svccomphost.ServiceComponentHostUpgradeEvent;
import org.apache.ambari.server.utils.StageUtils;
import org.codehaus.jackson.JsonGenerationException;
+import static org.easymock.EasyMock.*;
import org.junit.After;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -136,8 +139,11 @@ public class TestHeartbeatHandler {
}
@Test
+ @SuppressWarnings("unchecked")
public void testHeartbeat() throws Exception {
ActionManager am = getMockActionManager();
+ expect(am.getTasks(anyObject(List.class))).andReturn(new ArrayList<HostRoleCommand>());
+ replay(am);
Clusters fsm = clusters;
fsm.addHost(DummyHostname1);
Host hostObject = clusters.getHost(DummyHostname1);
@@ -190,10 +196,8 @@ public class TestHeartbeatHandler {
}
@Test
- @Ignore //TODO (dlysnichenko) : fix
+ @SuppressWarnings("unchecked")
public void testHeartbeatWithConfigs() throws Exception {
- ActionManager am = getMockActionManager();
-
Cluster cluster = getDummyCluster();
@SuppressWarnings("serial")
@@ -211,8 +215,7 @@ public class TestHeartbeatHandler {
hdfs.getServiceComponent(SECONDARY_NAMENODE).addServiceComponentHost(DummyHostname1).persist();
ActionQueue aq = new ActionQueue();
- HeartBeatHandler handler = getHeartBeatHandler(am, aq);
-
+
ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
ServiceComponentHost serviceComponentHost2 = clusters.getCluster(DummyCluster).getService(HDFS).
@@ -220,7 +223,6 @@ public class TestHeartbeatHandler {
serviceComponentHost1.setState(State.INSTALLED);
serviceComponentHost2.setState(State.INSTALLED);
-
HeartBeat hb = new HeartBeat();
hb.setResponseId(0);
hb.setNodeStatus(new HostStatus(Status.HEALTHY, DummyHostStatus));
@@ -245,7 +247,18 @@ public class TestHeartbeatHandler {
reports.add(cr);
hb.setReports(reports);
-
+
+ final HostRoleCommand command = new HostRoleCommand(DummyHostname1,
+ Role.DATANODE, null, null);
+
+ ActionManager am = getMockActionManager();
+ expect(am.getTasks(anyObject(List.class))).andReturn(
+ new ArrayList<HostRoleCommand>() {{
+ add(command);
+ }});
+ replay(am);
+
+ HeartBeatHandler handler = getHeartBeatHandler(am, aq);
handler.handleHeartBeat(hb);
// the heartbeat test passed if actual configs is populated
@@ -254,10 +267,8 @@ public class TestHeartbeatHandler {
}
@Test
- @Ignore //TODO (dlysnichenko) : fix
+ @SuppressWarnings("unchecked")
public void testHeartbeatCustomCommandWithConfigs() throws Exception {
- ActionManager am = getMockActionManager();
-
Cluster cluster = getDummyCluster();
@SuppressWarnings("serial")
@@ -275,7 +286,6 @@ public class TestHeartbeatHandler {
hdfs.getServiceComponent(SECONDARY_NAMENODE).addServiceComponentHost(DummyHostname1).persist();
ActionQueue aq = new ActionQueue();
- HeartBeatHandler handler = getHeartBeatHandler(am, aq);
ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
@@ -284,7 +294,6 @@ public class TestHeartbeatHandler {
serviceComponentHost1.setState(State.INSTALLED);
serviceComponentHost2.setState(State.INSTALLED);
-
HeartBeat hb = new HeartBeat();
hb.setResponseId(0);
hb.setNodeStatus(new HostStatus(Status.HEALTHY, DummyHostStatus));
@@ -326,6 +335,18 @@ public class TestHeartbeatHandler {
reports.add(crn);
hb.setReports(reports);
+ final HostRoleCommand command = new HostRoleCommand(DummyHostname1,
+ Role.DATANODE, null, null);
+
+ ActionManager am = getMockActionManager();
+ expect(am.getTasks(anyObject(List.class))).andReturn(
+ new ArrayList<HostRoleCommand>() {{
+ add(command);
+ add(command);
+ }});
+ replay(am);
+
+ HeartBeatHandler handler = getHeartBeatHandler(am, aq);
handler.handleHeartBeat(hb);
// the heartbeat test passed if actual configs is populated
@@ -336,10 +357,8 @@ public class TestHeartbeatHandler {
}
@Test
- @Ignore //TODO (dlysnichenko) : fix
+ @SuppressWarnings("unchecked")
public void testHeartbeatCustomStartStop() throws Exception {
- ActionManager am = getMockActionManager();
-
Cluster cluster = getDummyCluster();
@SuppressWarnings("serial")
@@ -357,7 +376,6 @@ public class TestHeartbeatHandler {
hdfs.getServiceComponent(SECONDARY_NAMENODE).addServiceComponentHost(DummyHostname1).persist();
ActionQueue aq = new ActionQueue();
- HeartBeatHandler handler = getHeartBeatHandler(am, aq);
ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
@@ -405,6 +423,18 @@ public class TestHeartbeatHandler {
assertTrue(serviceComponentHost1.isRestartRequired());
+ final HostRoleCommand command = new HostRoleCommand(DummyHostname1,
+ Role.DATANODE, null, null);
+
+ ActionManager am = getMockActionManager();
+ expect(am.getTasks(anyObject(List.class))).andReturn(
+ new ArrayList<HostRoleCommand>() {{
+ add(command);
+ add(command);
+ }});
+ replay(am);
+
+ HeartBeatHandler handler = getHeartBeatHandler(am, aq);
handler.handleHeartBeat(hb);
// the heartbeat test passed if actual configs is populated
@@ -417,9 +447,8 @@ public class TestHeartbeatHandler {
}
@Test
+ @SuppressWarnings("unchecked")
public void testStatusHeartbeat() throws Exception {
- ActionManager am = getMockActionManager();
-
Cluster cluster = getDummyCluster();
@SuppressWarnings("serial")
@@ -437,7 +466,6 @@ public class TestHeartbeatHandler {
hdfs.getServiceComponent(SECONDARY_NAMENODE).addServiceComponentHost(DummyHostname1).persist();
ActionQueue aq = new ActionQueue();
- HeartBeatHandler handler = getHeartBeatHandler(am, aq);
ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
@@ -472,6 +500,18 @@ public class TestHeartbeatHandler {
componentStatuses.add(componentStatus2);
hb.setComponentStatus(componentStatuses);
+ final HostRoleCommand command = new HostRoleCommand(DummyHostname1,
+ Role.DATANODE, null, null);
+
+ ActionManager am = getMockActionManager();
+ expect(am.getTasks(anyObject(List.class))).andReturn(
+ new ArrayList<HostRoleCommand>() {{
+ add(command);
+ add(command);
+ }});
+ replay(am);
+
+ HeartBeatHandler handler = getHeartBeatHandler(am, aq);
handler.handleHeartBeat(hb);
State componentState1 = serviceComponentHost1.getState();
State componentState2 = serviceComponentHost2.getState();
@@ -482,9 +522,8 @@ public class TestHeartbeatHandler {
}
@Test
+ @SuppressWarnings("unchecked")
public void testStatusHeartbeatWithAnnotation() throws Exception {
- ActionManager am = getMockActionManager();
-
Cluster cluster = getDummyCluster();
@SuppressWarnings("serial")
@@ -499,7 +538,6 @@ public class TestHeartbeatHandler {
hdfs.addServiceComponent(SECONDARY_NAMENODE).persist();
ActionQueue aq = new ActionQueue();
- HeartBeatHandler handler = getHeartBeatHandler(am, aq);
HeartBeat hb = new HeartBeat();
hb.setTimestamp(System.currentTimeMillis());
@@ -510,6 +548,17 @@ public class TestHeartbeatHandler {
ArrayList<ComponentStatus> componentStatuses = new ArrayList<ComponentStatus>();
hb.setComponentStatus(componentStatuses);
+ final HostRoleCommand command = new HostRoleCommand(DummyHostname1,
+ Role.DATANODE, null, null);
+
+ ActionManager am = getMockActionManager();
+ expect(am.getTasks(anyObject(List.class))).andReturn(
+ new ArrayList<HostRoleCommand>() {{
+ add(command);
+ }}).anyTimes();
+ replay(am);
+
+ HeartBeatHandler handler = getHeartBeatHandler(am, aq);
HeartBeatResponse resp = handler.handleHeartBeat(hb);
Assert.assertFalse(resp.hasMappedComponents());
@@ -531,8 +580,8 @@ public class TestHeartbeatHandler {
}
@Test
+ @SuppressWarnings("unchecked")
public void testLiveStatusUpdateAfterStopFailed() throws Exception {
- ActionManager am = getMockActionManager();
Cluster cluster = getDummyCluster();
@SuppressWarnings("serial")
@@ -550,7 +599,6 @@ public class TestHeartbeatHandler {
addServiceComponentHost(DummyHostname1).persist();
ActionQueue aq = new ActionQueue();
- HeartBeatHandler handler = getHeartBeatHandler(am, aq);
ServiceComponentHost serviceComponentHost1 = clusters.
getCluster(DummyCluster).getService(HDFS).
@@ -589,6 +637,18 @@ public class TestHeartbeatHandler {
hb.setComponentStatus(componentStatuses);
+ final HostRoleCommand command = new HostRoleCommand(DummyHostname1,
+ Role.DATANODE, null, null);
+
+ ActionManager am = getMockActionManager();
+ expect(am.getTasks(anyObject(List.class))).andReturn(
+ new ArrayList<HostRoleCommand>() {{
+ add(command);
+ add(command);
+ }});
+ replay(am);
+
+ HeartBeatHandler handler = getHeartBeatHandler(am, aq);
handler.handleHeartBeat(hb);
State componentState1 = serviceComponentHost1.getState();
State componentState2 = serviceComponentHost2.getState();
@@ -656,6 +716,7 @@ public class TestHeartbeatHandler {
public void testRegistration() throws AmbariException,
InvalidStateTransitionException {
ActionManager am = getMockActionManager();
+ replay(am);
Clusters fsm = clusters;
HeartBeatHandler handler = new HeartBeatHandler(fsm, new ActionQueue(), am,
injector);
@@ -687,6 +748,7 @@ public class TestHeartbeatHandler {
InvalidStateTransitionException {
ActionManager am = getMockActionManager();
+ replay(am);
Clusters fsm = clusters;
HeartBeatHandler handler = new HeartBeatHandler(fsm, new ActionQueue(), am,
injector);
@@ -727,6 +789,7 @@ public class TestHeartbeatHandler {
@Test
public void testRegistrationPublicHostname() throws AmbariException, InvalidStateTransitionException {
ActionManager am = getMockActionManager();
+ replay(am);
Clusters fsm = clusters;
HeartBeatHandler handler = new HeartBeatHandler(fsm, new ActionQueue(), am,
injector);
@@ -759,6 +822,7 @@ public class TestHeartbeatHandler {
public void testInvalidOSRegistration() throws AmbariException,
InvalidStateTransitionException {
ActionManager am = getMockActionManager();
+ replay(am);
Clusters fsm = clusters;
HeartBeatHandler handler = new HeartBeatHandler(fsm, new ActionQueue(), am,
injector);
@@ -787,6 +851,7 @@ public class TestHeartbeatHandler {
InvalidStateTransitionException {
ActionManager am = getMockActionManager();
+ replay(am);
Clusters fsm = clusters;
HeartBeatHandler handler = new HeartBeatHandler(fsm, new ActionQueue(), am,
injector);
@@ -814,6 +879,7 @@ public class TestHeartbeatHandler {
public void testRegisterNewNode()
throws AmbariException, InvalidStateTransitionException {
ActionManager am = getMockActionManager();
+ replay(am);
Clusters fsm = clusters;
fsm.addHost(DummyHostname1);
Host hostObject = clusters.getHost(DummyHostname1);
@@ -906,6 +972,7 @@ public class TestHeartbeatHandler {
when(hm.generateStatusCommands(anyString())).thenReturn(dummyCmds);
ActionManager am = getMockActionManager();
+ replay(am);
Clusters fsm = clusters;
ActionQueue actionQueue = new ActionQueue();
HeartBeatHandler handler = new HeartBeatHandler(fsm, actionQueue, am,
@@ -930,9 +997,8 @@ public class TestHeartbeatHandler {
}
@Test
- @Ignore //TODO (dlysnichenko) : fix
+ @SuppressWarnings("unchecked")
public void testTaskInProgressHandling() throws AmbariException, InvalidStateTransitionException {
- ActionManager am = getMockActionManager();
Cluster cluster = getDummyCluster();
@SuppressWarnings("serial")
@@ -950,7 +1016,6 @@ public class TestHeartbeatHandler {
hdfs.getServiceComponent(SECONDARY_NAMENODE).addServiceComponentHost(DummyHostname1).persist();
ActionQueue aq = new ActionQueue();
- HeartBeatHandler handler = getHeartBeatHandler(am, aq);
ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
@@ -977,14 +1042,25 @@ public class TestHeartbeatHandler {
hb.setReports(reports);
hb.setComponentStatus(new ArrayList<ComponentStatus>());
+ final HostRoleCommand command = new HostRoleCommand(DummyHostname1,
+ Role.DATANODE, null, null);
+
+ ActionManager am = getMockActionManager();
+ expect(am.getTasks(anyObject(List.class))).andReturn(
+ new ArrayList<HostRoleCommand>() {{
+ add(command);
+ }});
+ replay(am);
+
+ HeartBeatHandler handler = getHeartBeatHandler(am, aq);
handler.handleHeartBeat(hb);
State componentState1 = serviceComponentHost1.getState();
assertEquals("Host state should still be installing", State.INSTALLING, componentState1);
}
@Test
+ @SuppressWarnings("unchecked")
public void testOPFailedEventForAbortedTask() throws AmbariException, InvalidStateTransitionException {
- ActionManager am = getMockActionManager();
Cluster cluster = getDummyCluster();
@SuppressWarnings("serial")
@@ -1002,7 +1078,6 @@ public class TestHeartbeatHandler {
hdfs.getServiceComponent(SECONDARY_NAMENODE).addServiceComponentHost(DummyHostname1).persist();
ActionQueue aq = new ActionQueue();
- HeartBeatHandler handler = getHeartBeatHandler(am, aq);
ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
@@ -1041,6 +1116,18 @@ public class TestHeartbeatHandler {
reports.add(cr);
hb.setReports(reports);
hb.setComponentStatus(new ArrayList<ComponentStatus>());
+
+ final HostRoleCommand command = new HostRoleCommand(DummyHostname1,
+ Role.DATANODE, null, null);
+
+ ActionManager am = getMockActionManager();
+ expect(am.getTasks(anyObject(List.class))).andReturn(
+ new ArrayList<HostRoleCommand>() {{
+ add(command);
+ }});
+ replay(am);
+
+ HeartBeatHandler handler = getHeartBeatHandler(am, aq);
handler.handleHeartBeat(hb);
State componentState1 = serviceComponentHost1.getState();
assertEquals("Host state should still be installing", State.INSTALLING,
@@ -1055,10 +1142,9 @@ public class TestHeartbeatHandler {
* @throws InvalidStateTransitionException
*/
@Test
- @Ignore //TODO (dlysnichenko) : fix
+ @SuppressWarnings("unchecked")
public void testCommandReportOnHeartbeatUpdatedState()
throws AmbariException, InvalidStateTransitionException {
- ActionManager am = getMockActionManager();
Cluster cluster = getDummyCluster();
@SuppressWarnings("serial")
@@ -1072,7 +1158,6 @@ public class TestHeartbeatHandler {
hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
ActionQueue aq = new ActionQueue();
- HeartBeatHandler handler = getHeartBeatHandler(am, aq);
ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
@@ -1100,6 +1185,17 @@ public class TestHeartbeatHandler {
hb.setReports(reports);
hb.setComponentStatus(new ArrayList<ComponentStatus>());
+ final HostRoleCommand command = new HostRoleCommand(DummyHostname1,
+ Role.DATANODE, null, null);
+
+ ActionManager am = getMockActionManager();
+ expect(am.getTasks(anyObject(List.class))).andReturn(
+ new ArrayList<HostRoleCommand>() {{
+ add(command);
+ }}).anyTimes();
+ replay(am);
+
+ HeartBeatHandler handler = getHeartBeatHandler(am, aq);
handler.handleHeartBeat(hb);
assertEquals("Host state should be " + State.INSTALLED,
State.INSTALLED, serviceComponentHost1.getState());
@@ -1171,9 +1267,8 @@ public class TestHeartbeatHandler {
}
@Test
- @Ignore //TODO (dlysnichenko) : fix
+ @SuppressWarnings("unchecked")
public void testUpgradeSpecificHandling() throws AmbariException, InvalidStateTransitionException {
- ActionManager am = getMockActionManager();
Cluster cluster = getDummyCluster();
@SuppressWarnings("serial")
@@ -1187,7 +1282,6 @@ public class TestHeartbeatHandler {
hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
ActionQueue aq = new ActionQueue();
- HeartBeatHandler handler = getHeartBeatHandler(am, aq);
ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
@@ -1214,6 +1308,17 @@ public class TestHeartbeatHandler {
hb.setReports(reports);
hb.setComponentStatus(new ArrayList<ComponentStatus>());
+ final HostRoleCommand command = new HostRoleCommand(DummyHostname1,
+ Role.DATANODE, null, null);
+
+ ActionManager am = getMockActionManager();
+ expect(am.getTasks(anyObject(List.class))).andReturn(
+ new ArrayList<HostRoleCommand>() {{
+ add(command);
+ }}).anyTimes();
+ replay(am);
+
+ HeartBeatHandler handler = getHeartBeatHandler(am, aq);
handler.handleHeartBeat(hb);
assertEquals("Host state should be " + State.UPGRADING,
State.UPGRADING, serviceComponentHost1.getState());
@@ -1237,8 +1342,6 @@ public class TestHeartbeatHandler {
assertEquals("Host state should be " + State.UPGRADING,
State.UPGRADING, serviceComponentHost1.getState());
- // TODO What happens when there is a TIMEDOUT
-
serviceComponentHost1.setState(State.UPGRADING);
hb.setTimestamp(System.currentTimeMillis());
hb.setResponseId(3);
@@ -1261,8 +1364,8 @@ public class TestHeartbeatHandler {
}
@Test
+ @SuppressWarnings("unchecked")
public void testStatusHeartbeatWithVersion() throws Exception {
- ActionManager am = getMockActionManager();
Cluster cluster = getDummyCluster();
@SuppressWarnings("serial")
@@ -1321,6 +1424,11 @@ public class TestHeartbeatHandler {
hb.setComponentStatus(componentStatuses);
ActionQueue aq = new ActionQueue();
+ ActionManager am = getMockActionManager();
+ expect(am.getTasks(anyObject(List.class))).andReturn(
+ new ArrayList<HostRoleCommand>() {{
+ }});
+ replay(am);
HeartBeatHandler handler = getHeartBeatHandler(am, aq);
handler.handleHeartBeat(hb);
assertEquals("Matching value " + serviceComponentHost1.getStackVersion(),
@@ -1333,9 +1441,8 @@ public class TestHeartbeatHandler {
}
@Test
- @Ignore //TODO (dlysnichenko) : fix
+ @SuppressWarnings("unchecked")
public void testComponentUpgradeCompleteReport() throws AmbariException, InvalidStateTransitionException {
- ActionManager am = getMockActionManager();
Cluster cluster = getDummyCluster();
@SuppressWarnings("serial")
@@ -1403,6 +1510,17 @@ public class TestHeartbeatHandler {
hb.setReports(reports);
ActionQueue aq = new ActionQueue();
+ final HostRoleCommand command = new HostRoleCommand(DummyHostname1,
+ Role.DATANODE, null, null);
+
+ ActionManager am = getMockActionManager();
+ expect(am.getTasks(anyObject(List.class))).andReturn(
+ new ArrayList<HostRoleCommand>() {{
+ add(command);
+ add(command);
+ }});
+ replay(am);
+
HeartBeatHandler handler = getHeartBeatHandler(am, aq);
handler.handleHeartBeat(hb);
assertEquals("Stack version for SCH should be updated to " +
@@ -1413,9 +1531,8 @@ public class TestHeartbeatHandler {
}
@Test
- @Ignore //TODO (dlysnichenko) : fix
+ @SuppressWarnings("unchecked")
public void testComponentUpgradeInProgressReport() throws AmbariException, InvalidStateTransitionException {
- ActionManager am = getMockActionManager();
Cluster cluster = getDummyCluster();
@SuppressWarnings("serial")
@@ -1481,6 +1598,17 @@ public class TestHeartbeatHandler {
hb.setReports(reports);
ActionQueue aq = new ActionQueue();
+ final HostRoleCommand command = new HostRoleCommand(DummyHostname1,
+ Role.DATANODE, null, null);
+
+ ActionManager am = getMockActionManager();
+ expect(am.getTasks(anyObject(List.class))).andReturn(
+ new ArrayList<HostRoleCommand>() {{
+ add(command);
+ add(command);
+ }});
+ replay(am);
+
HeartBeatHandler handler = getHeartBeatHandler(am, aq);
handler.handleHeartBeat(hb);
assertEquals("State of SCH not change while operation is in progress",
@@ -1493,8 +1621,8 @@ public class TestHeartbeatHandler {
@Test
+ @SuppressWarnings("unchecked")
public void testComponentUpgradeFailReport() throws AmbariException, InvalidStateTransitionException {
- ActionManager am = getMockActionManager();
Cluster cluster = getDummyCluster();
@SuppressWarnings("serial")
@@ -1575,9 +1703,6 @@ public class TestHeartbeatHandler {
cr1.setStdOut("dummy output");
cr1.setExitCode(0);
-// actionDBAccessor.updateHostRoleState(DummyHostname1, requestId, stageId,
-// Role.DATANODE.name(), cr1);
-
CommandReport cr2 = new CommandReport();
cr2.setActionId(StageUtils.getActionId(requestId, stageId));
cr2.setTaskId(2);
@@ -1593,10 +1718,18 @@ public class TestHeartbeatHandler {
reports.add(cr2);
hb.setReports(reports);
-// actionDBAccessor.updateHostRoleState(DummyHostname1, requestId, stageId,
-// Role.NAMENODE.name(), cr2);
-
ActionQueue aq = new ActionQueue();
+ final HostRoleCommand command = new HostRoleCommand(DummyHostname1,
+ Role.DATANODE, null, null);
+
+ ActionManager am = getMockActionManager();
+ expect(am.getTasks(anyObject(List.class))).andReturn(
+ new ArrayList<HostRoleCommand>() {{
+ add(command);
+ add(command);
+ }});
+ replay(am);
+
HeartBeatHandler handler = getHeartBeatHandler(am, aq);
handler.handleHeartBeat(hb);
assertEquals("State of SCH should change after fail report",
@@ -1612,9 +1745,8 @@ public class TestHeartbeatHandler {
}
@Test
- @Ignore //TODO (dlysnichenko) : fix
+ @SuppressWarnings("unchecked")
public void testProcessStatusReports() throws Exception {
- ActionManager am = getMockActionManager();
Clusters fsm = clusters;
Cluster cluster = getDummyCluster();
@@ -1631,7 +1763,17 @@ public class TestHeartbeatHandler {
ActionQueue aq = new ActionQueue();
+ final HostRoleCommand command = new HostRoleCommand(DummyHostname1,
+ Role.DATANODE, null, null);
+ ActionManager am = getMockActionManager();
+ expect(am.getTasks(anyObject(List.class))).andReturn(
+ new ArrayList<HostRoleCommand>() {{
+ add(command);
+ add(command);
+ }}).anyTimes();
+ replay(am);
HeartBeatHandler handler = new HeartBeatHandler(fsm, aq, am, injector);
+
Register reg = new Register();
HostInfo hi = new HostInfo();
hi.setHostName(DummyHostname1);
@@ -1744,6 +1886,13 @@ public class TestHeartbeatHandler {
handler.handleHeartBeat(hb1);
assertEquals(HostHealthStatus.HealthStatus.HEALTHY.name(), hostObject.getStatus());
+ reset(am);
+ expect(am.getTasks(anyObject(List.class))).andReturn(
+ new ArrayList<HostRoleCommand>() {{
+ add(command);
+ }}).anyTimes();
+ replay(am);
+
//Only one component reported status
hdfs.getServiceComponent(NAMENODE).getServiceComponentHost(DummyHostname1).setState(State.INSTALLED);
HeartBeat hb4 = new HeartBeat();
@@ -1789,10 +1938,8 @@ public class TestHeartbeatHandler {
}
@Test
- @Ignore //TODO (dlysnichenko) : fix
+ @SuppressWarnings("unchecked")
public void testIgnoreCustomActionReport() throws AmbariException, InvalidStateTransitionException {
- ActionManager am = getMockActionManager();
-
CommandReport cr1 = new CommandReport();
cr1.setActionId(StageUtils.getActionId(requestId, stageId));
cr1.setTaskId(1);
@@ -1828,6 +1975,18 @@ public class TestHeartbeatHandler {
hb.setReports(reports);
ActionQueue aq = new ActionQueue();
+
+ final HostRoleCommand command = new HostRoleCommand(DummyHostname1,
+ Role.DATANODE, null, null);
+
+ ActionManager am = getMockActionManager();
+ expect(am.getTasks(anyObject(List.class))).andReturn(
+ new ArrayList<HostRoleCommand>() {{
+ add(command);
+ add(command);
+ }});
+ replay(am);
+
HeartBeatHandler handler = getHeartBeatHandler(am, aq);
// CUSTOM_COMMAND and ACTIONEXECUTE reports are ignored
@@ -1888,9 +2047,18 @@ public class TestHeartbeatHandler {
}
private ActionManager getMockActionManager() {
- return new ActionManager(0, 0, null, null,
- actionDBAccessor, new HostsMap((String) null), null, unitOfWork,
- injector.getInstance(RequestFactory.class), null);
+ ActionQueue actionQueueMock = createNiceMock(ActionQueue.class);
+ Clusters clustersMock = createNiceMock(Clusters.class);
+ ServerActionManager serverActionManagerMock = createNiceMock(ServerActionManager.class);
+ Configuration configurationMock = createNiceMock(Configuration.class);
+
+ ActionManager actionManager = createMockBuilder(ActionManager.class).
+ addMockedMethod("getTasks").
+ withConstructor((long)0, (long)0, actionQueueMock, clustersMock,
+ actionDBAccessor, new HostsMap((String) null), serverActionManagerMock, unitOfWork,
+ injector.getInstance(RequestFactory.class), configurationMock).
+ createMock();
+ return actionManager;
}
@@ -1944,9 +2112,8 @@ public class TestHeartbeatHandler {
}
@Test
+ @SuppressWarnings("unchecked")
public void testCommandStatusProcesses() throws Exception {
- ActionManager am = getMockActionManager();
-
Cluster cluster = getDummyCluster();
Host hostObject = clusters.getHost(DummyHostname1);
clusters.mapHostToCluster(hostObject.getHostName(), cluster.getClusterName());
@@ -1957,7 +2124,6 @@ public class TestHeartbeatHandler {
hdfs.getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1).setState(State.STARTED);
ActionQueue aq = new ActionQueue();
- HeartBeatHandler handler = getHeartBeatHandler(am, aq);
HeartBeat hb = new HeartBeat();
hb.setTimestamp(System.currentTimeMillis());
@@ -1965,7 +2131,6 @@ public class TestHeartbeatHandler {
hb.setHostname(DummyHostname1);
hb.setNodeStatus(new HostStatus(Status.HEALTHY, DummyHostStatus));
hb.setReports(new ArrayList<CommandReport>());
-
List<Map<String, String>> procs = new ArrayList<Map<String, String>>();
Map<String, String> proc1info = new HashMap<String, String>();
@@ -1992,7 +2157,18 @@ public class TestHeartbeatHandler {
componentStatus1.setExtra(extra);
componentStatuses.add(componentStatus1);
hb.setComponentStatus(componentStatuses);
-
+
+ final HostRoleCommand command = new HostRoleCommand(DummyHostname1,
+ Role.DATANODE, null, null);
+
+ ActionManager am = getMockActionManager();
+ expect(am.getTasks(anyObject(List.class))).andReturn(
+ new ArrayList<HostRoleCommand>() {{
+ add(command);
+ }}).anyTimes();
+ replay(am);
+
+ HeartBeatHandler handler = getHeartBeatHandler(am, aq);
handler.handleHeartBeat(hb);
ServiceComponentHost sch = hdfs.getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
@@ -2028,9 +2204,8 @@ public class TestHeartbeatHandler {
}
@Test
+ @SuppressWarnings("unchecked")
public void testCommandStatusProcesses_empty() throws Exception {
- ActionManager am = getMockActionManager();
-
Cluster cluster = getDummyCluster();
Host hostObject = clusters.getHost(DummyHostname1);
clusters.mapHostToCluster(hostObject.getHostName(), cluster.getClusterName());
@@ -2041,8 +2216,6 @@ public class TestHeartbeatHandler {
hdfs.getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1).setState(State.STARTED);
ActionQueue aq = new ActionQueue();
- HeartBeatHandler handler = getHeartBeatHandler(am, aq);
-
HeartBeat hb = new HeartBeat();
hb.setTimestamp(System.currentTimeMillis());
hb.setResponseId(0);
@@ -2060,9 +2233,19 @@ public class TestHeartbeatHandler {
componentStatuses.add(componentStatus1);
hb.setComponentStatus(componentStatuses);
-
- handler.handleHeartBeat(hb);
- ServiceComponentHost sch = hdfs.getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
+
+ final HostRoleCommand command = new HostRoleCommand(DummyHostname1,
+ Role.DATANODE, null, null);
+
+ ActionManager am = getMockActionManager();
+ expect(am.getTasks(anyObject(List.class))).andReturn(
+ new ArrayList<HostRoleCommand>() {{
+ add(command);
+ }});
+ replay(am);
+
+ HeartBeatHandler handler = getHeartBeatHandler(am, aq);
+ ServiceComponentHost sch = hdfs.getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
Assert.assertEquals(Integer.valueOf(0), Integer.valueOf(sch.getProcesses().size()));
}