You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by jo...@apache.org on 2016/04/12 05:34:17 UTC
[3/3] ambari git commit: Revert "AMBARI-15671. On Ambari Agent
restart currently running commands on that agent should be immediately
aborted. (mpapirkovskyy)"
Revert "AMBARI-15671. On Ambari Agent restart currently running commands on that agent should be immediately aborted. (mpapirkovskyy)"
This reverts commit 638b0a2672223636af19fcb3722eb4490c922f5e.
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/dd832a85
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/dd832a85
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/dd832a85
Branch: refs/heads/branch-2.2
Commit: dd832a85e95b9fdaf41d1604a7b25c4ad654d7d9
Parents: a8b8ef9
Author: Jonathan Hurley <jh...@hortonworks.com>
Authored: Mon Apr 11 23:33:05 2016 -0400
Committer: Jonathan Hurley <jh...@hortonworks.com>
Committed: Mon Apr 11 23:33:05 2016 -0400
----------------------------------------------------------------------
.../server/actionmanager/ActionScheduler.java | 15 --
.../actionmanager/TestActionScheduler.java | 173 ++-----------------
2 files changed, 10 insertions(+), 178 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/dd832a85/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
index c05e50b..9d6b7d6 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
@@ -688,15 +688,6 @@ class ActionScheduler implements Runnable {
processActionDeath(cluster.getClusterName(), c.getHostname(), roleStr);
}
status = HostRoleStatus.ABORTED;
- } else if (wasAgentRestartedDuringOperation(hostObj, s, roleStr)) {
- String message = String.format("Detected ambari-agent restart during command execution." +
- "The command has been aborted." +
- "Execution command details: host: %s, role: %s, actionId: %s", host, roleStr, s.getActionId());
- LOG.warn(message);
- if (c.getRoleCommand().equals(RoleCommand.ACTIONEXECUTE)) {
- processActionDeath(cluster.getClusterName(), c.getHostname(), roleStr);
- }
- status = HostRoleStatus.ABORTED;
} else if (timeOutActionNeeded(status, s, hostObj, roleStr, now, commandTimeout)) {
// Process command timeouts
LOG.info("Host:" + host + ", role:" + roleStr + ", actionId:" + s.getActionId() + " timed out");
@@ -881,12 +872,6 @@ class ActionScheduler implements Runnable {
return false;
}
- protected boolean wasAgentRestartedDuringOperation(Host host, Stage stage, String role) {
- String hostName = (null == host) ? null : host.getHostName();
- long lastStageAttemptTime = stage.getLastAttemptTime(hostName, role);
- return lastStageAttemptTime > 0 && lastStageAttemptTime <= host.getLastRegistrationTime();
- }
-
/**
* Determines if at least one task for a given hostname in the specified stage is in progress.
* <p/>
http://git-wip-us.apache.org/repos/asf/ambari/blob/dd832a85/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
index 0ee0c27..af6fb9b 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
@@ -27,9 +27,7 @@ import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -904,7 +902,6 @@ public class TestActionScheduler {
ServiceComponent scomp = mock(ServiceComponent.class);
ServiceComponentHost sch = mock(ServiceComponentHost.class);
UnitOfWork unitOfWork = mock(UnitOfWork.class);
- //Stage stage = mock(Stage.class);
when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
@@ -959,10 +956,8 @@ public class TestActionScheduler {
Properties properties = new Properties();
Configuration conf = new Configuration(properties);
- ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 3,
- new HostsMap((String) null), unitOfWork, null, conf));
-
- doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class), any(Stage.class), anyString());
+ ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
+ new HostsMap((String) null), unitOfWork, null, conf);
scheduler.doWork();
@@ -1049,12 +1044,9 @@ public class TestActionScheduler {
Properties properties = new Properties();
properties.put(Configuration.PARALLEL_STAGE_EXECUTION_KEY, "false");
Configuration conf = new Configuration(properties);
- ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 3,
+ ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
new HostsMap((String) null),
- unitOfWork, null, conf));
-
-
- doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class), any(Stage.class), anyString());
+ unitOfWork, null, conf);
scheduler.doWork();
@@ -1123,11 +1115,9 @@ public class TestActionScheduler {
Properties properties = new Properties();
properties.put(Configuration.PARALLEL_STAGE_EXECUTION_KEY, "true");
Configuration conf = new Configuration(properties);
- ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 3,
+ ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
new HostsMap((String) null),
- unitOfWork, null, conf));
-
- doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class), any(Stage.class), anyString());
+ unitOfWork, null, conf);
scheduler.doWork();
@@ -2135,145 +2125,6 @@ public class TestActionScheduler {
Assert.assertEquals(cancelCommand.getReason(), reason);
}
- @Test
- public void testCancelRequestsDueAgentRestart() throws Exception {
- final long HOST_REGISTRATION_TIME = 100L;
- final long STAGE_LAST_ATTEMPT_TIME = HOST_REGISTRATION_TIME - 1;
- ActionQueue aq = new ActionQueue();
- Clusters fsm = mock(Clusters.class);
- Cluster oneClusterMock = mock(Cluster.class);
- Service serviceObj = mock(Service.class);
- ServiceComponent scomp = mock(ServiceComponent.class);
- ServiceComponentHost sch = mock(ServiceComponentHost.class);
- UnitOfWork unitOfWork = mock(UnitOfWork.class);
- when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
- when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
- when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
- when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
- when(serviceObj.getCluster()).thenReturn(oneClusterMock);
-
- HostEntity hostEntity = new HostEntity();
- hostEntity.setHostName(hostname);
- hostDAO.create(hostEntity);
-
- HashMap<String, ServiceComponentHost> hosts =
- new HashMap<String, ServiceComponentHost>();
- hosts.put(hostname, sch);
- when(scomp.getServiceComponentHosts()).thenReturn(hosts);
-
- long requestId = 1;
-
- final List<Stage> stages = new ArrayList<Stage>();
- int namenodeCmdTaskId = 1;
- stages.add(
- getStageWithSingleTask(
- hostname, "cluster1", Role.NAMENODE, RoleCommand.START,
- Service.Type.HDFS, namenodeCmdTaskId, 1, (int)requestId));
- stages.add(
- getStageWithSingleTask(
- hostname, "cluster1", Role.DATANODE, RoleCommand.START,
- Service.Type.HDFS, 2, 2, (int)requestId));
-
- Host host = mock(Host.class);
- when(fsm.getHost(anyString())).thenReturn(host);
- when(host.getState()).thenReturn(HostState.HEALTHY);
- when(host.getHostName()).thenReturn(hostname);
- when(host.getLastRegistrationTime()).thenReturn(HOST_REGISTRATION_TIME);
-
- stages.get(0).setLastAttemptTime(host.getHostName(), Role.NAMENODE.toString(), STAGE_LAST_ATTEMPT_TIME);
- stages.get(1).setLastAttemptTime(host.getHostName(), Role.DATANODE.toString(), STAGE_LAST_ATTEMPT_TIME);
-
- ActionDBAccessor db = mock(ActionDBAccessor.class);
-
- RequestEntity request = mock(RequestEntity.class);
- when(request.isExclusive()).thenReturn(false);
- when(db.getRequestEntity(anyLong())).thenReturn(request);
-
- when(db.getCommandsInProgressCount()).thenReturn(stages.size());
- when(db.getStagesInProgress()).thenReturn(stages);
-
- List<HostRoleCommand> requestTasks = new ArrayList<HostRoleCommand>();
- for (Stage stage : stages) {
- requestTasks.addAll(stage.getOrderedHostRoleCommands());
- }
- when(db.getRequestTasks(anyLong())).thenReturn(requestTasks);
- when(db.getAllStages(anyLong())).thenReturn(stages);
- doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocation) throws Throwable {
- List<CommandReport> reports = (List<CommandReport>) invocation.getArguments()[0];
- for (CommandReport report : reports) {
- String actionId = report.getActionId();
- long[] requestStageIds = StageUtils.getRequestStage(actionId);
- Long requestId = requestStageIds[0];
- Long stageId = requestStageIds[1];
- Long id = report.getTaskId();
- for (Stage stage : stages) {
- if (requestId.equals(stage.getRequestId()) && stageId.equals(stage.getStageId())) {
- for (HostRoleCommand hostRoleCommand : stage.getOrderedHostRoleCommands()) {
- if (hostRoleCommand.getTaskId() == id) {
- hostRoleCommand.setStatus(HostRoleStatus.valueOf(report.getStatus()));
- }
- }
- }
- }
-
- }
-
- return null;
- }
- }).when(db).updateHostRoleStates(anyCollectionOf(CommandReport.class));
-
- when(db.getTask(anyLong())).thenAnswer(new Answer<Object>() {
- @Override
- public Object answer(InvocationOnMock invocation) throws Throwable {
- Long taskId = (Long) invocation.getArguments()[0];
- for (Stage stage : stages) {
- for (HostRoleCommand command : stage.getOrderedHostRoleCommands()) {
- if (taskId.equals(command.getTaskId())) {
- return command;
- }
- }
- }
- return null;
- }
- });
- doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocation) throws Throwable {
- Long requestId = (Long) invocation.getArguments()[0];
- for (Stage stage : stages) {
- if (requestId.equals(stage.getRequestId())) {
- for (HostRoleCommand command : stage.getOrderedHostRoleCommands()) {
- if (command.getStatus() == HostRoleStatus.QUEUED ||
- command.getStatus() == HostRoleStatus.IN_PROGRESS ||
- command.getStatus() == HostRoleStatus.PENDING) {
- command.setStatus(HostRoleStatus.ABORTED);
- }
- }
- }
- }
-
- return null;
- }
- }).when(db).abortOperation(anyLong());
-
- Properties properties = new Properties();
- Configuration conf = new Configuration(properties);
-
- ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
- new HostsMap((String) null), unitOfWork, null, conf);
-
- scheduler.doWork();
-
- String reason = "Some reason";
-
- scheduler.scheduleCancellingRequest(requestId, reason);
-
- scheduler.doWork();
- Assert.assertEquals(HostRoleStatus.ABORTED, stages.get(0).getHostRoleStatus(hostname, "NAMENODE"));
- Assert.assertEquals(HostRoleStatus.ABORTED, stages.get(1).getHostRoleStatus(hostname, "DATANODE"));
- }
@Test
public void testExclusiveRequests() throws Exception {
@@ -2415,10 +2266,8 @@ public class TestActionScheduler {
Properties properties = new Properties();
Configuration conf = new Configuration(properties);
- ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 3,
- new HostsMap((String) null), unitOfWork, null, conf));
-
- doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class), any(Stage.class), anyString());
+ ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
+ new HostsMap((String) null), unitOfWork, null, conf);
// Execution of request 1
@@ -2615,10 +2464,8 @@ public class TestActionScheduler {
}
}).when(db).abortOperation(anyLong());
- ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 3,
- new HostsMap((String) null), unitOfWork, null, conf));
-
- doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class), any(Stage.class), anyString());
+ ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
+ new HostsMap((String) null), unitOfWork, null, conf);
scheduler.doWork();