You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by jo...@apache.org on 2015/09/18 22:20:25 UTC
ambari git commit: AMBARI-13145 - RU - Skipping failed task caused
remaining pending tasks to be ABORTED (jonathanhurley)
Repository: ambari
Updated Branches:
refs/heads/trunk 950616d48 -> 9dd623abb
AMBARI-13145 - RU - Skipping failed task caused remaining pending tasks to be ABORTED (jonathanhurley)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/9dd623ab
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/9dd623ab
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/9dd623ab
Branch: refs/heads/trunk
Commit: 9dd623abb78e094bbf6ab5fcd4763cf2efa96c4b
Parents: 950616d
Author: Jonathan Hurley <jh...@hortonworks.com>
Authored: Fri Sep 18 10:38:42 2015 -0400
Committer: Jonathan Hurley <jh...@hortonworks.com>
Committed: Fri Sep 18 16:20:14 2015 -0400
----------------------------------------------------------------------
.../server/actionmanager/ActionScheduler.java | 83 ++++++-----
.../actionmanager/TestActionScheduler.java | 140 +++++++++++++++++++
2 files changed, 187 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/9dd623ab/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
index 7d93638..e752b05 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
@@ -136,7 +136,7 @@ class ActionScheduler implements Runnable {
actionTimeout = actionTimeoutMilliSec;
this.db = db;
this.actionQueue = actionQueue;
- this.clusters = fsmObject;
+ clusters = fsmObject;
this.ambariEventPublisher = ambariEventPublisher;
this.maxAttempts = (short) maxAttempts;
serverActionExecutor = new ServerActionExecutor(db, sleepTimeMilliSec);
@@ -287,30 +287,35 @@ class ActionScheduler implements Runnable {
// Commands that will be scheduled in current scheduler wakeup
List<ExecutionCommand> commandsToSchedule = new ArrayList<ExecutionCommand>();
Map<String, RoleStats> roleStats = processInProgressStage(stage, commandsToSchedule);
+
// Check if stage is failed
boolean failed = false;
- for (Map.Entry<String, RoleStats>entry : roleStats.entrySet()) {
+ for (Map.Entry<String, RoleStats> entry : roleStats.entrySet()) {
- String role = entry.getKey();
+ String role = entry.getKey();
RoleStats stats = entry.getValue();
if (LOG.isDebugEnabled()) {
- LOG.debug("Stats for role:" + role + ", stats=" + stats);
+ LOG.debug("Stats for role: {}, stats={}", role, stats);
}
- if (stats.isRoleFailed()) {
+
+ // only fail the request if the role failed and the stage is not
+ // skippable
+ if (stats.isRoleFailed() && !stage.isSkippable()) {
+ LOG.warn("{} failed, request {} will be aborted", role, request.getRequestId());
+
failed = true;
break;
}
}
- if(!failed) {
+ if (!failed) {
// Prior stage may have failed and it may need to fail the whole request
failed = hasPreviousStageFailed(stage);
}
if (failed) {
- LOG.warn("Operation completely failed, aborting request id:"
- + stage.getRequestId());
+ LOG.warn("Operation completely failed, aborting request id: {}", stage.getRequestId());
cancelHostRoleCommands(stage.getOrderedHostRoleCommands(), FAILED_TASK_ABORT_REASONING);
abortOperationsForStage(stage);
return;
@@ -989,34 +994,37 @@ class ActionScheduler implements Runnable {
private void updateRoleStats(HostRoleStatus status, RoleStats rs) {
switch (status) {
- case COMPLETED:
- rs.numSucceeded++;
- break;
- case FAILED:
- rs.numFailed++;
- break;
- case QUEUED:
- rs.numQueued++;
- break;
- case PENDING:
- rs.numPending++;
- break;
- case TIMEDOUT:
- rs.numTimedOut++;
- break;
- case ABORTED:
- rs.numAborted++;
- break;
- case IN_PROGRESS:
- rs.numInProgress++;
- break;
- case HOLDING:
- case HOLDING_FAILED:
- case HOLDING_TIMEDOUT:
- rs.numHolding++;
- break;
- default:
- LOG.error("Unknown status " + status.name());
+ case COMPLETED:
+ rs.numSucceeded++;
+ break;
+ case FAILED:
+ rs.numFailed++;
+ break;
+ case QUEUED:
+ rs.numQueued++;
+ break;
+ case PENDING:
+ rs.numPending++;
+ break;
+ case TIMEDOUT:
+ rs.numTimedOut++;
+ break;
+ case ABORTED:
+ rs.numAborted++;
+ break;
+ case IN_PROGRESS:
+ rs.numInProgress++;
+ break;
+ case HOLDING:
+ case HOLDING_FAILED:
+ case HOLDING_TIMEDOUT:
+ rs.numHolding++;
+ break;
+ case SKIPPED_FAILED:
+ rs.numSkipped++;
+ break;
+ default:
+ LOG.error("Unknown status " + status.name());
}
}
@@ -1038,6 +1046,8 @@ class ActionScheduler implements Runnable {
int numPending = 0;
int numAborted = 0;
int numHolding = 0;
+ int numSkipped = 0;
+
final int totalHosts;
final float successFactor;
@@ -1076,6 +1086,7 @@ class ActionScheduler implements Runnable {
builder.append(", numTimedOut=").append(numTimedOut);
builder.append(", numPending=").append(numPending);
builder.append(", numAborted=").append(numAborted);
+ builder.append(", numSkipped=").append(numSkipped);
builder.append(", totalHosts=").append(totalHosts);
builder.append(", successFactor=").append(successFactor);
return builder.toString();
http://git-wip-us.apache.org/repos/asf/ambari/blob/9dd623ab/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
index 31356bb..f8f9ce9 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
@@ -2258,6 +2258,146 @@ public class TestActionScheduler {
}
+ /**
+ * Tests that command failures in skippable stages do not cause the request to
+ * be aborted.
+ */
+ @Test
+ public void testSkippableCommandFailureDoesNotAbortRequest() throws Exception {
+ Properties properties = new Properties();
+ Configuration conf = new Configuration(properties);
+ ActionQueue aq = new ActionQueue();
+ Clusters fsm = mock(Clusters.class);
+ Cluster oneClusterMock = mock(Cluster.class);
+ Service serviceObj = mock(Service.class);
+ ServiceComponent scomp = mock(ServiceComponent.class);
+ ServiceComponentHost sch = mock(ServiceComponentHost.class);
+ UnitOfWork unitOfWork = mock(UnitOfWork.class);
+ when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
+ when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
+ when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
+ when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
+ when(serviceObj.getCluster()).thenReturn(oneClusterMock);
+
+ String hostname1 = "ahost.ambari.apache.org";
+
+ HashMap<String, ServiceComponentHost> hosts = new HashMap<String, ServiceComponentHost>();
+
+ hosts.put(hostname1, sch);
+
+ when(scomp.getServiceComponentHosts()).thenReturn(hosts);
+
+ // create 1 stage with 2 commands and then another stage with 1 command
+ Stage stage = null;
+ Stage stage2 = null;
+ final List<Stage> stages = new ArrayList<Stage>();
+ stages.add(stage = getStageWithSingleTask(hostname1, "cluster1", Role.NAMENODE,
+ RoleCommand.STOP, Service.Type.HDFS, 1, 1, 1));
+
+ addInstallTaskToStage(stage, hostname1, "cluster1", Role.HBASE_MASTER, RoleCommand.INSTALL,
+ Service.Type.HBASE, 1);
+
+ stages.add(stage2 = getStageWithSingleTask(hostname1, "cluster1", Role.DATANODE,
+ RoleCommand.STOP, Service.Type.HDFS, 1, 1, 1));
+
+ // !!! this is the test; make the stages skippable so that when their
+ // commands fail, the entire request is not aborted
+ for (Stage stageToMakeSkippable : stages) {
+ stageToMakeSkippable.setSkippable(true);
+ }
+
+ // fail the first task - normally this would cause an abort, exception that our stages
+ // are skippable now so it should not
+ HostRoleCommand command = stage.getOrderedHostRoleCommands().iterator().next();
+ command.setStatus(HostRoleStatus.FAILED);
+
+ ActionDBAccessor db = mock(ActionDBAccessor.class);
+
+ RequestEntity request = mock(RequestEntity.class);
+ when(request.isExclusive()).thenReturn(false);
+ when(db.getRequestEntity(anyLong())).thenReturn(request);
+
+ when(db.getCommandsInProgressCount()).thenReturn(stages.size());
+ when(db.getStagesInProgress()).thenReturn(stages);
+
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ List<CommandReport> reports = (List<CommandReport>) invocation.getArguments()[0];
+ for (CommandReport report : reports) {
+ String actionId = report.getActionId();
+ long[] requestStageIds = StageUtils.getRequestStage(actionId);
+ Long requestId = requestStageIds[0];
+ Long stageId = requestStageIds[1];
+ Long id = report.getTaskId();
+ for (Stage stage : stages) {
+ if (requestId.equals(stage.getRequestId()) && stageId.equals(stage.getStageId())) {
+ for (HostRoleCommand hostRoleCommand : stage.getOrderedHostRoleCommands()) {
+ if (hostRoleCommand.getTaskId() == id) {
+ hostRoleCommand.setStatus(HostRoleStatus.valueOf(report.getStatus()));
+ }
+ }
+ }
+ }
+
+ }
+
+ return null;
+ }
+ }).when(db).updateHostRoleStates(anyCollectionOf(CommandReport.class));
+
+ when(db.getTask(anyLong())).thenAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ Long taskId = (Long) invocation.getArguments()[0];
+ for (Stage stage : stages) {
+ for (HostRoleCommand command : stage.getOrderedHostRoleCommands()) {
+ if (taskId.equals(command.getTaskId())) {
+ return command;
+ }
+ }
+ }
+ return null;
+ }
+ });
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ Long requestId = (Long) invocation.getArguments()[0];
+ for (Stage stage : stages) {
+ if (requestId.equals(stage.getRequestId())) {
+ for (HostRoleCommand command : stage.getOrderedHostRoleCommands()) {
+ if (command.getStatus() == HostRoleStatus.QUEUED
+ || command.getStatus() == HostRoleStatus.IN_PROGRESS
+ || command.getStatus() == HostRoleStatus.PENDING) {
+ command.setStatus(HostRoleStatus.ABORTED);
+ }
+ }
+ }
+ }
+
+ return null;
+ }
+ }).when(db).abortOperation(anyLong());
+
+ ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
+ new HostsMap((String) null), unitOfWork, null, conf);
+
+ scheduler.doWork();
+
+ Assert.assertEquals(HostRoleStatus.FAILED,
+ stages.get(0).getHostRoleStatus(hostname1, "NAMENODE"));
+
+ // the remaining tasks should NOT have been aborted since the stage is
+ // skippable - these tasks would normally be ABORTED if the stage was not
+ // skippable
+ Assert.assertEquals(HostRoleStatus.QUEUED,
+ stages.get(0).getHostRoleStatus(hostname1, "HBASE_MASTER"));
+
+ Assert.assertEquals(HostRoleStatus.PENDING,
+ stages.get(1).getHostRoleStatus(hostname1, "DATANODE"));
+
+ }
public static class MockModule extends AbstractModule {
@Override