You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by mp...@apache.org on 2016/04/01 17:44:08 UTC

ambari git commit: AMBARI-15671. On Ambari Agent restart currently running commands on that agent should be immediately aborted. (mpapirkovskyy)

Repository: ambari
Updated Branches:
  refs/heads/trunk f88f9f356 -> 67cd9d9ee


AMBARI-15671. On Ambari Agent restart currently running commands on that agent should be immediately aborted. (mpapirkovskyy)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/67cd9d9e
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/67cd9d9e
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/67cd9d9e

Branch: refs/heads/trunk
Commit: 67cd9d9ee17f59adcec358c90bad515b2590d7d1
Parents: f88f9f3
Author: Myroslav Papirkovskyi <mp...@hortonworks.com>
Authored: Fri Apr 1 18:12:55 2016 +0300
Committer: Myroslav Papirkovskyi <mp...@hortonworks.com>
Committed: Fri Apr 1 18:12:55 2016 +0300

----------------------------------------------------------------------
 .../server/actionmanager/ActionScheduler.java   |  15 ++
 .../actionmanager/TestActionScheduler.java      | 173 +++++++++++++++++--
 2 files changed, 178 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/67cd9d9e/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
index 95d1763..79e3a07 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
@@ -689,6 +689,15 @@ class ActionScheduler implements Runnable {
             processActionDeath(cluster.getClusterName(), c.getHostname(), roleStr);
           }
           status = HostRoleStatus.ABORTED;
+        } else if (wasAgentRestartedDuringOperation(hostObj, s, roleStr)) {
+          String message = String.format("Detected ambari-agent restart during command execution." +
+            "The command has been aborted." +
+            "Execution command details: host: %s, role: %s, actionId: %s", host, roleStr, s.getActionId());
+          LOG.warn(message);
+          if (c.getRoleCommand().equals(RoleCommand.ACTIONEXECUTE)) {
+            processActionDeath(cluster.getClusterName(), c.getHostname(), roleStr);
+          }
+          status = HostRoleStatus.ABORTED;
         } else if (timeOutActionNeeded(status, s, hostObj, roleStr, now, commandTimeout)) {
           // Process command timeouts
           LOG.info("Host:" + host + ", role:" + roleStr + ", actionId:" + s.getActionId() + " timed out");
@@ -873,6 +882,12 @@ class ActionScheduler implements Runnable {
     return false;
   }
 
+  protected boolean wasAgentRestartedDuringOperation(Host host, Stage stage, String role) {
+    String hostName = (null == host) ? null : host.getHostName();
+    long lastStageAttemptTime = stage.getLastAttemptTime(hostName, role);
+    return lastStageAttemptTime > 0 && lastStageAttemptTime <= host.getLastRegistrationTime();
+  }
+
   private boolean hasCommandInProgress(Stage stage, String host) {
     List<ExecutionCommandWrapper> commandWrappers = stage.getExecutionCommands(host);
     for (ExecutionCommandWrapper wrapper : commandWrappers) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/67cd9d9e/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
index af6fb9b..0ee0c27 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
@@ -27,7 +27,9 @@ import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -902,6 +904,7 @@ public class TestActionScheduler {
     ServiceComponent scomp = mock(ServiceComponent.class);
     ServiceComponentHost sch = mock(ServiceComponentHost.class);
     UnitOfWork unitOfWork = mock(UnitOfWork.class);
+    //Stage stage = mock(Stage.class);
     when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
     when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
     when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
@@ -956,8 +959,10 @@ public class TestActionScheduler {
 
     Properties properties = new Properties();
     Configuration conf = new Configuration(properties);
-    ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
-            new HostsMap((String) null), unitOfWork, null, conf);
+    ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 3,
+            new HostsMap((String) null), unitOfWork, null, conf));
+
+    doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class), any(Stage.class), anyString());
 
     scheduler.doWork();
 
@@ -1044,9 +1049,12 @@ public class TestActionScheduler {
     Properties properties = new Properties();
     properties.put(Configuration.PARALLEL_STAGE_EXECUTION_KEY, "false");
     Configuration conf = new Configuration(properties);
-    ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
+    ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 3,
             new HostsMap((String) null),
-            unitOfWork, null, conf);
+            unitOfWork, null, conf));
+
+
+    doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class), any(Stage.class), anyString());
 
     scheduler.doWork();
 
@@ -1115,9 +1123,11 @@ public class TestActionScheduler {
     Properties properties = new Properties();
     properties.put(Configuration.PARALLEL_STAGE_EXECUTION_KEY, "true");
     Configuration conf = new Configuration(properties);
-    ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
+    ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 3,
         new HostsMap((String) null),
-        unitOfWork, null, conf);
+        unitOfWork, null, conf));
+
+    doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class), any(Stage.class), anyString());
 
     scheduler.doWork();
 
@@ -2125,6 +2135,145 @@ public class TestActionScheduler {
     Assert.assertEquals(cancelCommand.getReason(), reason);
   }
 
+  @Test
+  public void testCancelRequestsDueAgentRestart() throws Exception {
+    final long HOST_REGISTRATION_TIME = 100L;
+    final long STAGE_LAST_ATTEMPT_TIME = HOST_REGISTRATION_TIME - 1;
+    ActionQueue aq = new ActionQueue();
+    Clusters fsm = mock(Clusters.class);
+    Cluster oneClusterMock = mock(Cluster.class);
+    Service serviceObj = mock(Service.class);
+    ServiceComponent scomp = mock(ServiceComponent.class);
+    ServiceComponentHost sch = mock(ServiceComponentHost.class);
+    UnitOfWork unitOfWork = mock(UnitOfWork.class);
+    when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
+    when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
+    when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
+    when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
+    when(serviceObj.getCluster()).thenReturn(oneClusterMock);
+
+    HostEntity hostEntity = new HostEntity();
+    hostEntity.setHostName(hostname);
+    hostDAO.create(hostEntity);
+
+    HashMap<String, ServiceComponentHost> hosts =
+            new HashMap<String, ServiceComponentHost>();
+    hosts.put(hostname, sch);
+    when(scomp.getServiceComponentHosts()).thenReturn(hosts);
+
+    long requestId = 1;
+
+    final List<Stage> stages = new ArrayList<Stage>();
+    int namenodeCmdTaskId = 1;
+    stages.add(
+            getStageWithSingleTask(
+                    hostname, "cluster1", Role.NAMENODE, RoleCommand.START,
+                    Service.Type.HDFS, namenodeCmdTaskId, 1, (int)requestId));
+    stages.add(
+            getStageWithSingleTask(
+                    hostname, "cluster1", Role.DATANODE, RoleCommand.START,
+                    Service.Type.HDFS, 2, 2, (int)requestId));
+
+    Host host = mock(Host.class);
+    when(fsm.getHost(anyString())).thenReturn(host);
+    when(host.getState()).thenReturn(HostState.HEALTHY);
+    when(host.getHostName()).thenReturn(hostname);
+    when(host.getLastRegistrationTime()).thenReturn(HOST_REGISTRATION_TIME);
+
+    stages.get(0).setLastAttemptTime(host.getHostName(), Role.NAMENODE.toString(), STAGE_LAST_ATTEMPT_TIME);
+    stages.get(1).setLastAttemptTime(host.getHostName(), Role.DATANODE.toString(), STAGE_LAST_ATTEMPT_TIME);
+
+    ActionDBAccessor db = mock(ActionDBAccessor.class);
+
+    RequestEntity request = mock(RequestEntity.class);
+    when(request.isExclusive()).thenReturn(false);
+    when(db.getRequestEntity(anyLong())).thenReturn(request);
+
+    when(db.getCommandsInProgressCount()).thenReturn(stages.size());
+    when(db.getStagesInProgress()).thenReturn(stages);
+
+    List<HostRoleCommand> requestTasks = new ArrayList<HostRoleCommand>();
+    for (Stage stage : stages) {
+      requestTasks.addAll(stage.getOrderedHostRoleCommands());
+    }
+    when(db.getRequestTasks(anyLong())).thenReturn(requestTasks);
+    when(db.getAllStages(anyLong())).thenReturn(stages);
+    doAnswer(new Answer<Void>() {
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        List<CommandReport> reports = (List<CommandReport>) invocation.getArguments()[0];
+        for (CommandReport report : reports) {
+          String actionId = report.getActionId();
+          long[] requestStageIds = StageUtils.getRequestStage(actionId);
+          Long requestId = requestStageIds[0];
+          Long stageId = requestStageIds[1];
+          Long id = report.getTaskId();
+          for (Stage stage : stages) {
+            if (requestId.equals(stage.getRequestId()) && stageId.equals(stage.getStageId())) {
+              for (HostRoleCommand hostRoleCommand : stage.getOrderedHostRoleCommands()) {
+                if (hostRoleCommand.getTaskId() == id) {
+                  hostRoleCommand.setStatus(HostRoleStatus.valueOf(report.getStatus()));
+                }
+              }
+            }
+          }
+
+        }
+
+        return null;
+      }
+    }).when(db).updateHostRoleStates(anyCollectionOf(CommandReport.class));
+
+    when(db.getTask(anyLong())).thenAnswer(new Answer<Object>() {
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        Long taskId = (Long) invocation.getArguments()[0];
+        for (Stage stage : stages) {
+          for (HostRoleCommand command : stage.getOrderedHostRoleCommands()) {
+            if (taskId.equals(command.getTaskId())) {
+              return command;
+            }
+          }
+        }
+        return null;
+      }
+    });
+    doAnswer(new Answer<Void>() {
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        Long requestId = (Long) invocation.getArguments()[0];
+        for (Stage stage : stages) {
+          if (requestId.equals(stage.getRequestId())) {
+            for (HostRoleCommand command : stage.getOrderedHostRoleCommands()) {
+              if (command.getStatus() == HostRoleStatus.QUEUED ||
+                      command.getStatus() == HostRoleStatus.IN_PROGRESS ||
+                      command.getStatus() == HostRoleStatus.PENDING) {
+                command.setStatus(HostRoleStatus.ABORTED);
+              }
+            }
+          }
+        }
+
+        return null;
+      }
+    }).when(db).abortOperation(anyLong());
+
+    Properties properties = new Properties();
+    Configuration conf = new Configuration(properties);
+
+    ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
+        new HostsMap((String) null), unitOfWork, null, conf);
+
+    scheduler.doWork();
+
+    String reason = "Some reason";
+
+    scheduler.scheduleCancellingRequest(requestId, reason);
+
+    scheduler.doWork();
+    Assert.assertEquals(HostRoleStatus.ABORTED, stages.get(0).getHostRoleStatus(hostname, "NAMENODE"));
+    Assert.assertEquals(HostRoleStatus.ABORTED, stages.get(1).getHostRoleStatus(hostname, "DATANODE"));
+  }
 
   @Test
   public void testExclusiveRequests() throws Exception {
@@ -2266,8 +2415,10 @@ public class TestActionScheduler {
     Properties properties = new Properties();
     Configuration conf = new Configuration(properties);
 
-    ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
-        new HostsMap((String) null), unitOfWork, null, conf);
+    ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 3,
+        new HostsMap((String) null), unitOfWork, null, conf));
+
+    doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class), any(Stage.class), anyString());
 
     // Execution of request 1
 
@@ -2464,8 +2615,10 @@ public class TestActionScheduler {
       }
     }).when(db).abortOperation(anyLong());
 
-    ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
-        new HostsMap((String) null), unitOfWork, null, conf);
+    ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 3,
+        new HostsMap((String) null), unitOfWork, null, conf));
+
+    doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class), any(Stage.class), anyString());
 
     scheduler.doWork();