You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by jo...@apache.org on 2015/09/18 22:20:25 UTC

ambari git commit: AMBARI-13145 - RU - Skipping failed task caused remaining pending tasks to be ABORTED (jonathanhurley)

Repository: ambari
Updated Branches:
  refs/heads/trunk 950616d48 -> 9dd623abb


AMBARI-13145 - RU - Skipping failed task caused remaining pending tasks to be ABORTED (jonathanhurley)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/9dd623ab
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/9dd623ab
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/9dd623ab

Branch: refs/heads/trunk
Commit: 9dd623abb78e094bbf6ab5fcd4763cf2efa96c4b
Parents: 950616d
Author: Jonathan Hurley <jh...@hortonworks.com>
Authored: Fri Sep 18 10:38:42 2015 -0400
Committer: Jonathan Hurley <jh...@hortonworks.com>
Committed: Fri Sep 18 16:20:14 2015 -0400

----------------------------------------------------------------------
 .../server/actionmanager/ActionScheduler.java   |  83 ++++++-----
 .../actionmanager/TestActionScheduler.java      | 140 +++++++++++++++++++
 2 files changed, 187 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/9dd623ab/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
index 7d93638..e752b05 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
@@ -136,7 +136,7 @@ class ActionScheduler implements Runnable {
     actionTimeout = actionTimeoutMilliSec;
     this.db = db;
     this.actionQueue = actionQueue;
-    this.clusters = fsmObject;
+    clusters = fsmObject;
     this.ambariEventPublisher = ambariEventPublisher;
     this.maxAttempts = (short) maxAttempts;
     serverActionExecutor = new ServerActionExecutor(db, sleepTimeMilliSec);
@@ -287,30 +287,35 @@ class ActionScheduler implements Runnable {
         // Commands that will be scheduled in current scheduler wakeup
         List<ExecutionCommand> commandsToSchedule = new ArrayList<ExecutionCommand>();
         Map<String, RoleStats> roleStats = processInProgressStage(stage, commandsToSchedule);
+
         // Check if stage is failed
         boolean failed = false;
-        for (Map.Entry<String, RoleStats>entry : roleStats.entrySet()) {
+        for (Map.Entry<String, RoleStats> entry : roleStats.entrySet()) {
 
-          String    role  = entry.getKey();
+          String role = entry.getKey();
           RoleStats stats = entry.getValue();
 
           if (LOG.isDebugEnabled()) {
-            LOG.debug("Stats for role:" + role + ", stats=" + stats);
+            LOG.debug("Stats for role: {}, stats={}", role, stats);
           }
-          if (stats.isRoleFailed()) {
+
+          // only fail the request if the role failed and the stage is not
+          // skippable
+          if (stats.isRoleFailed() && !stage.isSkippable()) {
+            LOG.warn("{} failed, request {} will be aborted", role, request.getRequestId());
+
             failed = true;
             break;
           }
         }
 
-        if(!failed) {
+        if (!failed) {
           // Prior stage may have failed and it may need to fail the whole request
           failed = hasPreviousStageFailed(stage);
         }
 
         if (failed) {
-          LOG.warn("Operation completely failed, aborting request id:"
-              + stage.getRequestId());
+          LOG.warn("Operation completely failed, aborting request id: {}", stage.getRequestId());
           cancelHostRoleCommands(stage.getOrderedHostRoleCommands(), FAILED_TASK_ABORT_REASONING);
           abortOperationsForStage(stage);
           return;
@@ -989,34 +994,37 @@ class ActionScheduler implements Runnable {
 
   private void updateRoleStats(HostRoleStatus status, RoleStats rs) {
     switch (status) {
-    case COMPLETED:
-      rs.numSucceeded++;
-      break;
-    case FAILED:
-      rs.numFailed++;
-      break;
-    case QUEUED:
-      rs.numQueued++;
-      break;
-    case PENDING:
-      rs.numPending++;
-      break;
-    case TIMEDOUT:
-      rs.numTimedOut++;
-      break;
-    case ABORTED:
-      rs.numAborted++;
-      break;
-    case IN_PROGRESS:
-      rs.numInProgress++;
-      break;
-    case HOLDING:
-    case HOLDING_FAILED:
-    case HOLDING_TIMEDOUT:
-      rs.numHolding++;
-      break;
-    default:
-      LOG.error("Unknown status " + status.name());
+      case COMPLETED:
+        rs.numSucceeded++;
+        break;
+      case FAILED:
+        rs.numFailed++;
+        break;
+      case QUEUED:
+        rs.numQueued++;
+        break;
+      case PENDING:
+        rs.numPending++;
+        break;
+      case TIMEDOUT:
+        rs.numTimedOut++;
+        break;
+      case ABORTED:
+        rs.numAborted++;
+        break;
+      case IN_PROGRESS:
+        rs.numInProgress++;
+        break;
+      case HOLDING:
+      case HOLDING_FAILED:
+      case HOLDING_TIMEDOUT:
+        rs.numHolding++;
+        break;
+      case SKIPPED_FAILED:
+        rs.numSkipped++;
+        break;
+      default:
+        LOG.error("Unknown status " + status.name());
     }
   }
 
@@ -1038,6 +1046,8 @@ class ActionScheduler implements Runnable {
     int numPending = 0;
     int numAborted = 0;
     int numHolding = 0;
+    int numSkipped = 0;
+
     final int totalHosts;
     final float successFactor;
 
@@ -1076,6 +1086,7 @@ class ActionScheduler implements Runnable {
       builder.append(", numTimedOut=").append(numTimedOut);
       builder.append(", numPending=").append(numPending);
       builder.append(", numAborted=").append(numAborted);
+      builder.append(", numSkipped=").append(numSkipped);
       builder.append(", totalHosts=").append(totalHosts);
       builder.append(", successFactor=").append(successFactor);
       return builder.toString();

http://git-wip-us.apache.org/repos/asf/ambari/blob/9dd623ab/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
index 31356bb..f8f9ce9 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
@@ -2258,6 +2258,146 @@ public class TestActionScheduler {
 
   }
 
+  /**
+   * Tests that command failures in skippable stages do not cause the request to
+   * be aborted.
+   */
+  @Test
+  public void testSkippableCommandFailureDoesNotAbortRequest() throws Exception {
+    Properties properties = new Properties();
+    Configuration conf = new Configuration(properties);
+    ActionQueue aq = new ActionQueue();
+    Clusters fsm = mock(Clusters.class);
+    Cluster oneClusterMock = mock(Cluster.class);
+    Service serviceObj = mock(Service.class);
+    ServiceComponent scomp = mock(ServiceComponent.class);
+    ServiceComponentHost sch = mock(ServiceComponentHost.class);
+    UnitOfWork unitOfWork = mock(UnitOfWork.class);
+    when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
+    when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
+    when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
+    when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
+    when(serviceObj.getCluster()).thenReturn(oneClusterMock);
+
+    String hostname1 = "ahost.ambari.apache.org";
+
+    HashMap<String, ServiceComponentHost> hosts = new HashMap<String, ServiceComponentHost>();
+
+    hosts.put(hostname1, sch);
+
+    when(scomp.getServiceComponentHosts()).thenReturn(hosts);
+
+    // create 1 stage with 2 commands and then another stage with 1 command
+    Stage stage = null;
+    Stage stage2 = null;
+    final List<Stage> stages = new ArrayList<Stage>();
+    stages.add(stage = getStageWithSingleTask(hostname1, "cluster1", Role.NAMENODE,
+        RoleCommand.STOP, Service.Type.HDFS, 1, 1, 1));
+
+    addInstallTaskToStage(stage, hostname1, "cluster1", Role.HBASE_MASTER, RoleCommand.INSTALL,
+        Service.Type.HBASE, 1);
+
+    stages.add(stage2 = getStageWithSingleTask(hostname1, "cluster1", Role.DATANODE,
+        RoleCommand.STOP, Service.Type.HDFS, 1, 1, 1));
+
+    // !!! this is the test; make the stages skippable so that when their
+    // commands fail, the entire request is not aborted
+    for (Stage stageToMakeSkippable : stages) {
+      stageToMakeSkippable.setSkippable(true);
+    }
+
+    // fail the first task - normally this would cause an abort, exception that our stages
+    // are skippable now so it should not
+    HostRoleCommand command = stage.getOrderedHostRoleCommands().iterator().next();
+    command.setStatus(HostRoleStatus.FAILED);
+
+    ActionDBAccessor db = mock(ActionDBAccessor.class);
+
+    RequestEntity request = mock(RequestEntity.class);
+    when(request.isExclusive()).thenReturn(false);
+    when(db.getRequestEntity(anyLong())).thenReturn(request);
+
+    when(db.getCommandsInProgressCount()).thenReturn(stages.size());
+    when(db.getStagesInProgress()).thenReturn(stages);
+
+    doAnswer(new Answer<Void>() {
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        List<CommandReport> reports = (List<CommandReport>) invocation.getArguments()[0];
+        for (CommandReport report : reports) {
+          String actionId = report.getActionId();
+          long[] requestStageIds = StageUtils.getRequestStage(actionId);
+          Long requestId = requestStageIds[0];
+          Long stageId = requestStageIds[1];
+          Long id = report.getTaskId();
+          for (Stage stage : stages) {
+            if (requestId.equals(stage.getRequestId()) && stageId.equals(stage.getStageId())) {
+              for (HostRoleCommand hostRoleCommand : stage.getOrderedHostRoleCommands()) {
+                if (hostRoleCommand.getTaskId() == id) {
+                  hostRoleCommand.setStatus(HostRoleStatus.valueOf(report.getStatus()));
+                }
+              }
+            }
+          }
+
+        }
+
+        return null;
+      }
+    }).when(db).updateHostRoleStates(anyCollectionOf(CommandReport.class));
+
+    when(db.getTask(anyLong())).thenAnswer(new Answer<Object>() {
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        Long taskId = (Long) invocation.getArguments()[0];
+        for (Stage stage : stages) {
+          for (HostRoleCommand command : stage.getOrderedHostRoleCommands()) {
+            if (taskId.equals(command.getTaskId())) {
+              return command;
+            }
+          }
+        }
+        return null;
+      }
+    });
+    doAnswer(new Answer<Void>() {
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        Long requestId = (Long) invocation.getArguments()[0];
+        for (Stage stage : stages) {
+          if (requestId.equals(stage.getRequestId())) {
+            for (HostRoleCommand command : stage.getOrderedHostRoleCommands()) {
+              if (command.getStatus() == HostRoleStatus.QUEUED
+                  || command.getStatus() == HostRoleStatus.IN_PROGRESS
+                  || command.getStatus() == HostRoleStatus.PENDING) {
+                command.setStatus(HostRoleStatus.ABORTED);
+              }
+            }
+          }
+        }
+
+        return null;
+      }
+    }).when(db).abortOperation(anyLong());
+
+    ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
+        new HostsMap((String) null), unitOfWork, null, conf);
+
+    scheduler.doWork();
+
+    Assert.assertEquals(HostRoleStatus.FAILED,
+        stages.get(0).getHostRoleStatus(hostname1, "NAMENODE"));
+
+    // the remaining tasks should NOT have been aborted since the stage is
+    // skippable - these tasks would normally be ABORTED if the stage was not
+    // skippable
+    Assert.assertEquals(HostRoleStatus.QUEUED,
+        stages.get(0).getHostRoleStatus(hostname1, "HBASE_MASTER"));
+
+    Assert.assertEquals(HostRoleStatus.PENDING,
+        stages.get(1).getHostRoleStatus(hostname1, "DATANODE"));
+
+  }
 
   public static class MockModule extends AbstractModule {
     @Override