You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by jo...@apache.org on 2016/04/12 05:34:17 UTC

[3/3] ambari git commit: Revert "AMBARI-15671. On Ambari Agent restart currently running commands on that agent should be immediately aborted. (mpapirkovskyy)"

Revert "AMBARI-15671. On Ambari Agent restart currently running commands on that agent should be immediately aborted. (mpapirkovskyy)"

This reverts commit 638b0a2672223636af19fcb3722eb4490c922f5e.


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/dd832a85
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/dd832a85
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/dd832a85

Branch: refs/heads/branch-2.2
Commit: dd832a85e95b9fdaf41d1604a7b25c4ad654d7d9
Parents: a8b8ef9
Author: Jonathan Hurley <jh...@hortonworks.com>
Authored: Mon Apr 11 23:33:05 2016 -0400
Committer: Jonathan Hurley <jh...@hortonworks.com>
Committed: Mon Apr 11 23:33:05 2016 -0400

----------------------------------------------------------------------
 .../server/actionmanager/ActionScheduler.java   |  15 --
 .../actionmanager/TestActionScheduler.java      | 173 ++-----------------
 2 files changed, 10 insertions(+), 178 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/dd832a85/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
index c05e50b..9d6b7d6 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
@@ -688,15 +688,6 @@ class ActionScheduler implements Runnable {
             processActionDeath(cluster.getClusterName(), c.getHostname(), roleStr);
           }
           status = HostRoleStatus.ABORTED;
-        } else if (wasAgentRestartedDuringOperation(hostObj, s, roleStr)) {
-          String message = String.format("Detected ambari-agent restart during command execution." +
-            "The command has been aborted." +
-            "Execution command details: host: %s, role: %s, actionId: %s", host, roleStr, s.getActionId());
-          LOG.warn(message);
-          if (c.getRoleCommand().equals(RoleCommand.ACTIONEXECUTE)) {
-            processActionDeath(cluster.getClusterName(), c.getHostname(), roleStr);
-          }
-          status = HostRoleStatus.ABORTED;
         } else if (timeOutActionNeeded(status, s, hostObj, roleStr, now, commandTimeout)) {
           // Process command timeouts
           LOG.info("Host:" + host + ", role:" + roleStr + ", actionId:" + s.getActionId() + " timed out");
@@ -881,12 +872,6 @@ class ActionScheduler implements Runnable {
     return false;
   }
 
-  protected boolean wasAgentRestartedDuringOperation(Host host, Stage stage, String role) {
-    String hostName = (null == host) ? null : host.getHostName();
-    long lastStageAttemptTime = stage.getLastAttemptTime(hostName, role);
-    return lastStageAttemptTime > 0 && lastStageAttemptTime <= host.getLastRegistrationTime();
-  }
-
   /**
    * Determines if at least one task for a given hostname in the specified stage is in progress.
    * <p/>

http://git-wip-us.apache.org/repos/asf/ambari/blob/dd832a85/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
index 0ee0c27..af6fb9b 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
@@ -27,9 +27,7 @@ import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -904,7 +902,6 @@ public class TestActionScheduler {
     ServiceComponent scomp = mock(ServiceComponent.class);
     ServiceComponentHost sch = mock(ServiceComponentHost.class);
     UnitOfWork unitOfWork = mock(UnitOfWork.class);
-    //Stage stage = mock(Stage.class);
     when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
     when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
     when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
@@ -959,10 +956,8 @@ public class TestActionScheduler {
 
     Properties properties = new Properties();
     Configuration conf = new Configuration(properties);
-    ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 3,
-            new HostsMap((String) null), unitOfWork, null, conf));
-
-    doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class), any(Stage.class), anyString());
+    ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
+            new HostsMap((String) null), unitOfWork, null, conf);
 
     scheduler.doWork();
 
@@ -1049,12 +1044,9 @@ public class TestActionScheduler {
     Properties properties = new Properties();
     properties.put(Configuration.PARALLEL_STAGE_EXECUTION_KEY, "false");
     Configuration conf = new Configuration(properties);
-    ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 3,
+    ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
             new HostsMap((String) null),
-            unitOfWork, null, conf));
-
-
-    doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class), any(Stage.class), anyString());
+            unitOfWork, null, conf);
 
     scheduler.doWork();
 
@@ -1123,11 +1115,9 @@ public class TestActionScheduler {
     Properties properties = new Properties();
     properties.put(Configuration.PARALLEL_STAGE_EXECUTION_KEY, "true");
     Configuration conf = new Configuration(properties);
-    ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 3,
+    ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
         new HostsMap((String) null),
-        unitOfWork, null, conf));
-
-    doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class), any(Stage.class), anyString());
+        unitOfWork, null, conf);
 
     scheduler.doWork();
 
@@ -2135,145 +2125,6 @@ public class TestActionScheduler {
     Assert.assertEquals(cancelCommand.getReason(), reason);
   }
 
-  @Test
-  public void testCancelRequestsDueAgentRestart() throws Exception {
-    final long HOST_REGISTRATION_TIME = 100L;
-    final long STAGE_LAST_ATTEMPT_TIME = HOST_REGISTRATION_TIME - 1;
-    ActionQueue aq = new ActionQueue();
-    Clusters fsm = mock(Clusters.class);
-    Cluster oneClusterMock = mock(Cluster.class);
-    Service serviceObj = mock(Service.class);
-    ServiceComponent scomp = mock(ServiceComponent.class);
-    ServiceComponentHost sch = mock(ServiceComponentHost.class);
-    UnitOfWork unitOfWork = mock(UnitOfWork.class);
-    when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
-    when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
-    when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
-    when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
-    when(serviceObj.getCluster()).thenReturn(oneClusterMock);
-
-    HostEntity hostEntity = new HostEntity();
-    hostEntity.setHostName(hostname);
-    hostDAO.create(hostEntity);
-
-    HashMap<String, ServiceComponentHost> hosts =
-            new HashMap<String, ServiceComponentHost>();
-    hosts.put(hostname, sch);
-    when(scomp.getServiceComponentHosts()).thenReturn(hosts);
-
-    long requestId = 1;
-
-    final List<Stage> stages = new ArrayList<Stage>();
-    int namenodeCmdTaskId = 1;
-    stages.add(
-            getStageWithSingleTask(
-                    hostname, "cluster1", Role.NAMENODE, RoleCommand.START,
-                    Service.Type.HDFS, namenodeCmdTaskId, 1, (int)requestId));
-    stages.add(
-            getStageWithSingleTask(
-                    hostname, "cluster1", Role.DATANODE, RoleCommand.START,
-                    Service.Type.HDFS, 2, 2, (int)requestId));
-
-    Host host = mock(Host.class);
-    when(fsm.getHost(anyString())).thenReturn(host);
-    when(host.getState()).thenReturn(HostState.HEALTHY);
-    when(host.getHostName()).thenReturn(hostname);
-    when(host.getLastRegistrationTime()).thenReturn(HOST_REGISTRATION_TIME);
-
-    stages.get(0).setLastAttemptTime(host.getHostName(), Role.NAMENODE.toString(), STAGE_LAST_ATTEMPT_TIME);
-    stages.get(1).setLastAttemptTime(host.getHostName(), Role.DATANODE.toString(), STAGE_LAST_ATTEMPT_TIME);
-
-    ActionDBAccessor db = mock(ActionDBAccessor.class);
-
-    RequestEntity request = mock(RequestEntity.class);
-    when(request.isExclusive()).thenReturn(false);
-    when(db.getRequestEntity(anyLong())).thenReturn(request);
-
-    when(db.getCommandsInProgressCount()).thenReturn(stages.size());
-    when(db.getStagesInProgress()).thenReturn(stages);
-
-    List<HostRoleCommand> requestTasks = new ArrayList<HostRoleCommand>();
-    for (Stage stage : stages) {
-      requestTasks.addAll(stage.getOrderedHostRoleCommands());
-    }
-    when(db.getRequestTasks(anyLong())).thenReturn(requestTasks);
-    when(db.getAllStages(anyLong())).thenReturn(stages);
-    doAnswer(new Answer<Void>() {
-      @Override
-      public Void answer(InvocationOnMock invocation) throws Throwable {
-        List<CommandReport> reports = (List<CommandReport>) invocation.getArguments()[0];
-        for (CommandReport report : reports) {
-          String actionId = report.getActionId();
-          long[] requestStageIds = StageUtils.getRequestStage(actionId);
-          Long requestId = requestStageIds[0];
-          Long stageId = requestStageIds[1];
-          Long id = report.getTaskId();
-          for (Stage stage : stages) {
-            if (requestId.equals(stage.getRequestId()) && stageId.equals(stage.getStageId())) {
-              for (HostRoleCommand hostRoleCommand : stage.getOrderedHostRoleCommands()) {
-                if (hostRoleCommand.getTaskId() == id) {
-                  hostRoleCommand.setStatus(HostRoleStatus.valueOf(report.getStatus()));
-                }
-              }
-            }
-          }
-
-        }
-
-        return null;
-      }
-    }).when(db).updateHostRoleStates(anyCollectionOf(CommandReport.class));
-
-    when(db.getTask(anyLong())).thenAnswer(new Answer<Object>() {
-      @Override
-      public Object answer(InvocationOnMock invocation) throws Throwable {
-        Long taskId = (Long) invocation.getArguments()[0];
-        for (Stage stage : stages) {
-          for (HostRoleCommand command : stage.getOrderedHostRoleCommands()) {
-            if (taskId.equals(command.getTaskId())) {
-              return command;
-            }
-          }
-        }
-        return null;
-      }
-    });
-    doAnswer(new Answer<Void>() {
-      @Override
-      public Void answer(InvocationOnMock invocation) throws Throwable {
-        Long requestId = (Long) invocation.getArguments()[0];
-        for (Stage stage : stages) {
-          if (requestId.equals(stage.getRequestId())) {
-            for (HostRoleCommand command : stage.getOrderedHostRoleCommands()) {
-              if (command.getStatus() == HostRoleStatus.QUEUED ||
-                      command.getStatus() == HostRoleStatus.IN_PROGRESS ||
-                      command.getStatus() == HostRoleStatus.PENDING) {
-                command.setStatus(HostRoleStatus.ABORTED);
-              }
-            }
-          }
-        }
-
-        return null;
-      }
-    }).when(db).abortOperation(anyLong());
-
-    Properties properties = new Properties();
-    Configuration conf = new Configuration(properties);
-
-    ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
-        new HostsMap((String) null), unitOfWork, null, conf);
-
-    scheduler.doWork();
-
-    String reason = "Some reason";
-
-    scheduler.scheduleCancellingRequest(requestId, reason);
-
-    scheduler.doWork();
-    Assert.assertEquals(HostRoleStatus.ABORTED, stages.get(0).getHostRoleStatus(hostname, "NAMENODE"));
-    Assert.assertEquals(HostRoleStatus.ABORTED, stages.get(1).getHostRoleStatus(hostname, "DATANODE"));
-  }
 
   @Test
   public void testExclusiveRequests() throws Exception {
@@ -2415,10 +2266,8 @@ public class TestActionScheduler {
     Properties properties = new Properties();
     Configuration conf = new Configuration(properties);
 
-    ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 3,
-        new HostsMap((String) null), unitOfWork, null, conf));
-
-    doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class), any(Stage.class), anyString());
+    ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
+        new HostsMap((String) null), unitOfWork, null, conf);
 
     // Execution of request 1
 
@@ -2615,10 +2464,8 @@ public class TestActionScheduler {
       }
     }).when(db).abortOperation(anyLong());
 
-    ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 3,
-        new HostsMap((String) null), unitOfWork, null, conf));
-
-    doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class), any(Stage.class), anyString());
+    ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
+        new HostsMap((String) null), unitOfWork, null, conf);
 
     scheduler.doWork();