You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by mp...@apache.org on 2016/04/01 17:44:08 UTC
ambari git commit: AMBARI-15671. On Ambari Agent restart currently
running commands on that agent should be immediately aborted. (mpapirkovskyy)
Repository: ambari
Updated Branches:
refs/heads/trunk f88f9f356 -> 67cd9d9ee
AMBARI-15671. On Ambari Agent restart currently running commands on that agent should be immediately aborted. (mpapirkovskyy)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/67cd9d9e
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/67cd9d9e
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/67cd9d9e
Branch: refs/heads/trunk
Commit: 67cd9d9ee17f59adcec358c90bad515b2590d7d1
Parents: f88f9f3
Author: Myroslav Papirkovskyi <mp...@hortonworks.com>
Authored: Fri Apr 1 18:12:55 2016 +0300
Committer: Myroslav Papirkovskyi <mp...@hortonworks.com>
Committed: Fri Apr 1 18:12:55 2016 +0300
----------------------------------------------------------------------
.../server/actionmanager/ActionScheduler.java | 15 ++
.../actionmanager/TestActionScheduler.java | 173 +++++++++++++++++--
2 files changed, 178 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/67cd9d9e/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
index 95d1763..79e3a07 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
@@ -689,6 +689,15 @@ class ActionScheduler implements Runnable {
processActionDeath(cluster.getClusterName(), c.getHostname(), roleStr);
}
status = HostRoleStatus.ABORTED;
+ } else if (wasAgentRestartedDuringOperation(hostObj, s, roleStr)) {
+ String message = String.format("Detected ambari-agent restart during command execution." +
+ "The command has been aborted." +
+ "Execution command details: host: %s, role: %s, actionId: %s", host, roleStr, s.getActionId());
+ LOG.warn(message);
+ if (c.getRoleCommand().equals(RoleCommand.ACTIONEXECUTE)) {
+ processActionDeath(cluster.getClusterName(), c.getHostname(), roleStr);
+ }
+ status = HostRoleStatus.ABORTED;
} else if (timeOutActionNeeded(status, s, hostObj, roleStr, now, commandTimeout)) {
// Process command timeouts
LOG.info("Host:" + host + ", role:" + roleStr + ", actionId:" + s.getActionId() + " timed out");
@@ -873,6 +882,12 @@ class ActionScheduler implements Runnable {
return false;
}
+ protected boolean wasAgentRestartedDuringOperation(Host host, Stage stage, String role) {
+ String hostName = (null == host) ? null : host.getHostName();
+ long lastStageAttemptTime = stage.getLastAttemptTime(hostName, role);
+ return lastStageAttemptTime > 0 && lastStageAttemptTime <= host.getLastRegistrationTime();
+ }
+
private boolean hasCommandInProgress(Stage stage, String host) {
List<ExecutionCommandWrapper> commandWrappers = stage.getExecutionCommands(host);
for (ExecutionCommandWrapper wrapper : commandWrappers) {
http://git-wip-us.apache.org/repos/asf/ambari/blob/67cd9d9e/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
index af6fb9b..0ee0c27 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
@@ -27,7 +27,9 @@ import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -902,6 +904,7 @@ public class TestActionScheduler {
ServiceComponent scomp = mock(ServiceComponent.class);
ServiceComponentHost sch = mock(ServiceComponentHost.class);
UnitOfWork unitOfWork = mock(UnitOfWork.class);
+ //Stage stage = mock(Stage.class);
when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
@@ -956,8 +959,10 @@ public class TestActionScheduler {
Properties properties = new Properties();
Configuration conf = new Configuration(properties);
- ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
- new HostsMap((String) null), unitOfWork, null, conf);
+ ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 3,
+ new HostsMap((String) null), unitOfWork, null, conf));
+
+ doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class), any(Stage.class), anyString());
scheduler.doWork();
@@ -1044,9 +1049,12 @@ public class TestActionScheduler {
Properties properties = new Properties();
properties.put(Configuration.PARALLEL_STAGE_EXECUTION_KEY, "false");
Configuration conf = new Configuration(properties);
- ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
+ ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 3,
new HostsMap((String) null),
- unitOfWork, null, conf);
+ unitOfWork, null, conf));
+
+
+ doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class), any(Stage.class), anyString());
scheduler.doWork();
@@ -1115,9 +1123,11 @@ public class TestActionScheduler {
Properties properties = new Properties();
properties.put(Configuration.PARALLEL_STAGE_EXECUTION_KEY, "true");
Configuration conf = new Configuration(properties);
- ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
+ ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 3,
new HostsMap((String) null),
- unitOfWork, null, conf);
+ unitOfWork, null, conf));
+
+ doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class), any(Stage.class), anyString());
scheduler.doWork();
@@ -2125,6 +2135,145 @@ public class TestActionScheduler {
Assert.assertEquals(cancelCommand.getReason(), reason);
}
+ @Test
+ public void testCancelRequestsDueAgentRestart() throws Exception {
+ final long HOST_REGISTRATION_TIME = 100L;
+ final long STAGE_LAST_ATTEMPT_TIME = HOST_REGISTRATION_TIME - 1;
+ ActionQueue aq = new ActionQueue();
+ Clusters fsm = mock(Clusters.class);
+ Cluster oneClusterMock = mock(Cluster.class);
+ Service serviceObj = mock(Service.class);
+ ServiceComponent scomp = mock(ServiceComponent.class);
+ ServiceComponentHost sch = mock(ServiceComponentHost.class);
+ UnitOfWork unitOfWork = mock(UnitOfWork.class);
+ when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
+ when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
+ when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
+ when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
+ when(serviceObj.getCluster()).thenReturn(oneClusterMock);
+
+ HostEntity hostEntity = new HostEntity();
+ hostEntity.setHostName(hostname);
+ hostDAO.create(hostEntity);
+
+ HashMap<String, ServiceComponentHost> hosts =
+ new HashMap<String, ServiceComponentHost>();
+ hosts.put(hostname, sch);
+ when(scomp.getServiceComponentHosts()).thenReturn(hosts);
+
+ long requestId = 1;
+
+ final List<Stage> stages = new ArrayList<Stage>();
+ int namenodeCmdTaskId = 1;
+ stages.add(
+ getStageWithSingleTask(
+ hostname, "cluster1", Role.NAMENODE, RoleCommand.START,
+ Service.Type.HDFS, namenodeCmdTaskId, 1, (int)requestId));
+ stages.add(
+ getStageWithSingleTask(
+ hostname, "cluster1", Role.DATANODE, RoleCommand.START,
+ Service.Type.HDFS, 2, 2, (int)requestId));
+
+ Host host = mock(Host.class);
+ when(fsm.getHost(anyString())).thenReturn(host);
+ when(host.getState()).thenReturn(HostState.HEALTHY);
+ when(host.getHostName()).thenReturn(hostname);
+ when(host.getLastRegistrationTime()).thenReturn(HOST_REGISTRATION_TIME);
+
+ stages.get(0).setLastAttemptTime(host.getHostName(), Role.NAMENODE.toString(), STAGE_LAST_ATTEMPT_TIME);
+ stages.get(1).setLastAttemptTime(host.getHostName(), Role.DATANODE.toString(), STAGE_LAST_ATTEMPT_TIME);
+
+ ActionDBAccessor db = mock(ActionDBAccessor.class);
+
+ RequestEntity request = mock(RequestEntity.class);
+ when(request.isExclusive()).thenReturn(false);
+ when(db.getRequestEntity(anyLong())).thenReturn(request);
+
+ when(db.getCommandsInProgressCount()).thenReturn(stages.size());
+ when(db.getStagesInProgress()).thenReturn(stages);
+
+ List<HostRoleCommand> requestTasks = new ArrayList<HostRoleCommand>();
+ for (Stage stage : stages) {
+ requestTasks.addAll(stage.getOrderedHostRoleCommands());
+ }
+ when(db.getRequestTasks(anyLong())).thenReturn(requestTasks);
+ when(db.getAllStages(anyLong())).thenReturn(stages);
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ List<CommandReport> reports = (List<CommandReport>) invocation.getArguments()[0];
+ for (CommandReport report : reports) {
+ String actionId = report.getActionId();
+ long[] requestStageIds = StageUtils.getRequestStage(actionId);
+ Long requestId = requestStageIds[0];
+ Long stageId = requestStageIds[1];
+ Long id = report.getTaskId();
+ for (Stage stage : stages) {
+ if (requestId.equals(stage.getRequestId()) && stageId.equals(stage.getStageId())) {
+ for (HostRoleCommand hostRoleCommand : stage.getOrderedHostRoleCommands()) {
+ if (hostRoleCommand.getTaskId() == id) {
+ hostRoleCommand.setStatus(HostRoleStatus.valueOf(report.getStatus()));
+ }
+ }
+ }
+ }
+
+ }
+
+ return null;
+ }
+ }).when(db).updateHostRoleStates(anyCollectionOf(CommandReport.class));
+
+ when(db.getTask(anyLong())).thenAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ Long taskId = (Long) invocation.getArguments()[0];
+ for (Stage stage : stages) {
+ for (HostRoleCommand command : stage.getOrderedHostRoleCommands()) {
+ if (taskId.equals(command.getTaskId())) {
+ return command;
+ }
+ }
+ }
+ return null;
+ }
+ });
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ Long requestId = (Long) invocation.getArguments()[0];
+ for (Stage stage : stages) {
+ if (requestId.equals(stage.getRequestId())) {
+ for (HostRoleCommand command : stage.getOrderedHostRoleCommands()) {
+ if (command.getStatus() == HostRoleStatus.QUEUED ||
+ command.getStatus() == HostRoleStatus.IN_PROGRESS ||
+ command.getStatus() == HostRoleStatus.PENDING) {
+ command.setStatus(HostRoleStatus.ABORTED);
+ }
+ }
+ }
+ }
+
+ return null;
+ }
+ }).when(db).abortOperation(anyLong());
+
+ Properties properties = new Properties();
+ Configuration conf = new Configuration(properties);
+
+ ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
+ new HostsMap((String) null), unitOfWork, null, conf);
+
+ scheduler.doWork();
+
+ String reason = "Some reason";
+
+ scheduler.scheduleCancellingRequest(requestId, reason);
+
+ scheduler.doWork();
+ Assert.assertEquals(HostRoleStatus.ABORTED, stages.get(0).getHostRoleStatus(hostname, "NAMENODE"));
+ Assert.assertEquals(HostRoleStatus.ABORTED, stages.get(1).getHostRoleStatus(hostname, "DATANODE"));
+ }
@Test
public void testExclusiveRequests() throws Exception {
@@ -2266,8 +2415,10 @@ public class TestActionScheduler {
Properties properties = new Properties();
Configuration conf = new Configuration(properties);
- ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
- new HostsMap((String) null), unitOfWork, null, conf);
+ ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 3,
+ new HostsMap((String) null), unitOfWork, null, conf));
+
+ doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class), any(Stage.class), anyString());
// Execution of request 1
@@ -2464,8 +2615,10 @@ public class TestActionScheduler {
}
}).when(db).abortOperation(anyLong());
- ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
- new HostsMap((String) null), unitOfWork, null, conf);
+ ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 3,
+ new HostsMap((String) null), unitOfWork, null, conf));
+
+ doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class), any(Stage.class), anyString());
scheduler.doWork();