You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ab...@apache.org on 2020/08/04 06:25:54 UTC
[tez] branch branch-0.9 updated: TEZ-4172: Let tasks be killed after too many overall attempts (László Bodor reviewed by Jonathan Turner Eagles)
This is an automated email from the ASF dual-hosted git repository.
abstractdog pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/branch-0.9 by this push:
new b15f658 TEZ-4172: Let tasks be killed after too many overall attempts (László Bodor reviewed by Jonathan Turner Eagles)
b15f658 is described below
commit b15f658761c38d9c8da449d2e40425e6ff4cfac4
Author: László Bodor <bo...@gmail.com>
AuthorDate: Tue Aug 4 08:17:54 2020 +0200
TEZ-4172: Let tasks be killed after too many overall attempts (László Bodor reviewed by Jonathan Turner Eagles)
Signed-off-by: Laszlo Bodor <bo...@gmail.com>
---
.../org/apache/tez/dag/api/TezConfiguration.java | 10 +++++
.../java/org/apache/tez/dag/app/dag/Vertex.java | 1 +
.../org/apache/tez/dag/app/dag/impl/TaskImpl.java | 49 +++++++++++++++------
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 8 ++++
.../apache/tez/dag/app/dag/impl/TestTaskImpl.java | 51 ++++++++++++++++++----
5 files changed, 97 insertions(+), 22 deletions(-)
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 90aa76e..8410f60 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -695,6 +695,16 @@ public class TezConfiguration extends Configuration {
public static final int TEZ_AM_TASK_MAX_FAILED_ATTEMPTS_DEFAULT = 4;
/**
+ * Int value. The maximum number of attempts that can run for a particular task before the task is
+ * failed. This count every attempts, including failed, killed attempts. Task failure results in
+ * DAG failure. Default is 0, which disables this feature.
+ */
+ @ConfigurationScope(Scope.VERTEX)
+ @ConfigurationProperty(type = "integer")
+ public static final String TEZ_AM_TASK_MAX_ATTEMPTS = TEZ_AM_PREFIX + "task.max.attempts";
+ public static final int TEZ_AM_TASK_MAX_ATTEMPTS_DEFAULT = 0;
+
+ /**
* Boolean value. Specifies whether a re-scheduled attempt of a task, caused by previous
* failures gets higher priority
*/
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index f3ef72b..ba3079d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -214,6 +214,7 @@ public interface Vertex extends Comparable<Vertex> {
interface VertexConfig {
int getMaxFailedTaskAttempts();
+ int getMaxTaskAttempts();
boolean getTaskRescheduleHigherPriority();
boolean getTaskRescheduleRelaxedLocality();
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 3470216..0b4b116 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -126,6 +126,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
private final TezTaskID taskId;
private Map<TezTaskAttemptID, TaskAttempt> attempts;
protected final int maxFailedAttempts;
+ protected final int maxAttempts; //overall max number of attempts (consider preempted task attempts)
protected final Clock clock;
private final Vertex vertex;
private final Lock readLock;
@@ -159,7 +160,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
private final Set<NodeId> nodesWithRunningAttempts = Collections
.newSetFromMap(new ConcurrentHashMap<NodeId, Boolean>());
- private static final SingleArcTransition<TaskImpl , TaskEvent>
+ private static final MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal>
ATTEMPT_KILLED_TRANSITION = new AttemptKilledTransition();
private static final SingleArcTransition<TaskImpl, TaskEvent>
KILL_TRANSITION = new KillTransition();
@@ -193,7 +194,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
.addTransition(TaskStateInternal.SCHEDULED, TaskStateInternal.KILL_WAIT,
TaskEventType.T_TERMINATE,
KILL_TRANSITION)
- .addTransition(TaskStateInternal.SCHEDULED, TaskStateInternal.SCHEDULED,
+ .addTransition(TaskStateInternal.SCHEDULED, EnumSet.of(TaskStateInternal.SCHEDULED, TaskStateInternal.FAILED),
TaskEventType.T_ATTEMPT_KILLED, ATTEMPT_KILLED_TRANSITION)
.addTransition(TaskStateInternal.SCHEDULED,
EnumSet.of(TaskStateInternal.SCHEDULED, TaskStateInternal.FAILED),
@@ -216,7 +217,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
EnumSet.of(TaskStateInternal.SUCCEEDED),
TaskEventType.T_ATTEMPT_SUCCEEDED,
new AttemptSucceededTransition())
- .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING,
+ .addTransition(TaskStateInternal.RUNNING, EnumSet.of(TaskStateInternal.RUNNING, TaskStateInternal.FAILED),
TaskEventType.T_ATTEMPT_KILLED,
ATTEMPT_KILLED_TRANSITION)
.addTransition(TaskStateInternal.RUNNING,
@@ -380,6 +381,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
writeLock = readWriteLock.writeLock();
this.attempts = Collections.emptyMap();
maxFailedAttempts = vertex.getVertexConfig().getMaxFailedTaskAttempts();
+ maxAttempts = vertex.getVertexConfig().getMaxTaskAttempts();
taskId = TezTaskID.getInstance(vertexId, taskIndex);
this.taskCommunicatorManagerInterface = taskCommunicatorManagerInterface;
this.taskHeartbeatHandler = thh;
@@ -776,7 +778,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
}
// This is always called in the Write Lock
- private void addAndScheduleAttempt(TezTaskAttemptID schedulingCausalTA) {
+ private boolean addAndScheduleAttempt(TezTaskAttemptID schedulingCausalTA) {
TaskAttempt attempt = createAttempt(attempts.size(), schedulingCausalTA);
if (LOG.isDebugEnabled()) {
LOG.debug("Created attempt " + attempt.getID());
@@ -794,13 +796,23 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
Preconditions.checkArgument(attempts.put(attempt.getID(), attempt) == null,
attempt.getID() + " already existed");
break;
-
default:
Preconditions.checkArgument(attempts.put(attempt.getID(), attempt) == null,
attempt.getID() + " already existed");
break;
}
+ if (maxAttempts > 0 && attempts.size() == maxAttempts) {
+ TaskImpl task = (TaskImpl) attempt.getTask();
+ LOG.error("Cannot schedule new attempt for task as max number of attempts ({}) reached: {}",
+ maxAttempts, task);
+
+ task.logJobHistoryTaskFailedEvent(TaskState.FAILED);
+ task.eventHandler.handle(new VertexEventTaskCompleted(task.taskId, TaskState.FAILED));
+
+ return false;
+ }
+
// TODO: Recovery
/*
// Update nextATtemptNumber
@@ -819,7 +831,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
// picture in mind
eventHandler.handle(new DAGEventSchedulerUpdate(
DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, attempt));
-
+ return true;
}
@Override
@@ -1007,7 +1019,9 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
task.locationHint = scheduleEvent.getTaskLocationHint();
task.baseTaskSpec = scheduleEvent.getBaseTaskSpec();
// For now, initial scheduling dependency is due to vertex manager scheduling
- task.addAndScheduleAttempt(null);
+ if (!task.addAndScheduleAttempt(null)) {
+ return TaskStateInternal.FAILED;
+ }
return TaskStateInternal.SCHEDULED;
}
}
@@ -1108,7 +1122,9 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
LOG.info("Can not recover the successful task attempt, schedule new task attempt,"
+ "taskId=" + task.getTaskId());
task.successfulAttempt = null;
- task.addAndScheduleAttempt(successTaId);
+ if (!task.addAndScheduleAttempt(successTaId)) {
+ task.finished(TaskStateInternal.FAILED);
+ }
task.eventHandler.handle(new TaskAttemptEventAttemptKilled(successTaId,
errorMsg, TaskAttemptTerminationCause.TERMINATED_AT_RECOVERY, true));
return TaskStateInternal.RUNNING;
@@ -1167,9 +1183,11 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
}
private static class AttemptKilledTransition implements
- SingleArcTransition<TaskImpl, TaskEvent> {
+ MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> {
@Override
- public void transition(TaskImpl task, TaskEvent event) {
+ public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
+ TaskStateInternal originalState = task.getInternalState();
+
TaskEventTAUpdate castEvent = (TaskEventTAUpdate) event;
task.addDiagnosticInfo("TaskAttempt " + castEvent.getTaskAttemptID().getId() + " killed");
if (task.commitAttempt !=null &&
@@ -1197,8 +1215,11 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
task.getVertex().incrementKilledTaskAttemptCount();
}
if (task.shouldScheduleNewAttempt()) {
- task.addAndScheduleAttempt(castEvent.getTaskAttemptID());
+ if (!task.addAndScheduleAttempt(castEvent.getTaskAttemptID())) {
+ return task.finished(TaskStateInternal.FAILED);
+ }
}
+ return originalState;
}
}
@@ -1258,8 +1279,10 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
if (task.shouldScheduleNewAttempt()) {
LOG.info("Scheduling new attempt for task: " + task.getTaskId()
+ ", currentFailedAttempts: " + task.failedAttempts + ", maxFailedAttempts: "
- + task.maxFailedAttempts);
- task.addAndScheduleAttempt(getSchedulingCausalTA());
+ + task.maxFailedAttempts + ", maxAttempts: " + task.maxAttempts);
+ if (!task.addAndScheduleAttempt(getSchedulingCausalTA())){
+ return task.finished(TaskStateInternal.FAILED);
+ }
}
} else {
if (castEvent.getTaskFailureType() == TaskFailureType.NON_FATAL) {
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 536d51d..1e8b1c1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -4841,6 +4841,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
static class VertexConfigImpl implements VertexConfig {
private final int maxFailedTaskAttempts;
+ private final int maxTaskAttempts;
private final boolean taskRescheduleHigherPriority;
private final boolean taskRescheduleRelaxedLocality;
@@ -4860,6 +4861,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
public VertexConfigImpl(Configuration conf) {
this.maxFailedTaskAttempts = conf.getInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS,
TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS_DEFAULT);
+ this.maxTaskAttempts = conf.getInt(TezConfiguration.TEZ_AM_TASK_MAX_ATTEMPTS,
+ TezConfiguration.TEZ_AM_TASK_MAX_ATTEMPTS_DEFAULT);
this.taskRescheduleHigherPriority =
conf.getBoolean(TezConfiguration.TEZ_AM_TASK_RESCHEDULE_HIGHER_PRIORITY,
TezConfiguration.TEZ_AM_TASK_RESCHEDULE_HIGHER_PRIORITY_DEFAULT);
@@ -4886,6 +4889,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
}
@Override
+ public int getMaxTaskAttempts() {
+ return maxTaskAttempts;
+ }
+
+ @Override
public boolean getTaskRescheduleHigherPriority() {
return taskRescheduleHigherPriority;
}
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index 1af6092..a28e786 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -286,10 +286,15 @@ public class TestTaskImpl {
attempt.setState(s);
}
- private void killRunningTaskAttempt(TezTaskAttemptID attemptId) {
+ private void killRunningTaskAttempt(TezTaskAttemptID attemptId, TaskState stateToVerify) {
+ killRunningTaskAttempt(attemptId, stateToVerify, 1);
+ }
+
+ private void killRunningTaskAttempt(TezTaskAttemptID attemptId, TaskState stateToVerify,
+ int killedCountToVerify) {
mockTask.handle(createTaskTAKilledEvent(attemptId));
- assertTaskRunningState();
- verify(mockTask.getVertex(), times(1)).incrementKilledTaskAttemptCount();
+ assertTaskState(stateToVerify);
+ verify(mockTask.getVertex(), times(killedCountToVerify)).incrementKilledTaskAttemptCount();
}
private void failRunningTaskAttempt(TezTaskAttemptID attemptId) {
@@ -310,21 +315,25 @@ public class TestTaskImpl {
* {@link TaskState#NEW}
*/
private void assertTaskNewState() {
- assertEquals(TaskState.NEW, mockTask.getState());
+ assertTaskState(TaskState.NEW);
}
/**
* {@link TaskState#SCHEDULED}
*/
private void assertTaskScheduledState() {
- assertEquals(TaskState.SCHEDULED, mockTask.getState());
+ assertTaskState(TaskState.SCHEDULED);
}
/**
* {@link TaskState#RUNNING}
*/
private void assertTaskRunningState() {
- assertEquals(TaskState.RUNNING, mockTask.getState());
+ assertTaskState(TaskState.RUNNING);
+ }
+
+ private void assertTaskState(TaskState state) {
+ assertEquals(state, mockTask.getState());
}
/**
@@ -385,7 +394,7 @@ public class TestTaskImpl {
}
@Test(timeout = 5000)
- public void testTooManyFailedAtetmpts() {
+ public void testTooManyFailedAttempts() {
LOG.info("--- START: testTooManyFailedAttempts ---");
TezTaskID taskId = getNewTaskID();
scheduleTaskAttempt(taskId, TaskState.SCHEDULED);
@@ -409,6 +418,30 @@ public class TestTaskImpl {
}
@Test(timeout = 5000)
+ public void testTooManyAttempts() {
+ LOG.info("--- START: testTooManyAttempts ---");
+
+ conf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_ATTEMPTS, 3);
+ Vertex vertex = mock(Vertex.class);
+ doReturn(new VertexImpl.VertexConfigImpl(conf)).when(vertex).getVertexConfig();
+ mockTask = new MockTaskImpl(vertexId, partition,
+ eventHandler, conf, taskCommunicatorManagerInterface, clock,
+ taskHeartbeatHandler, appContext, leafVertex,
+ taskResource, containerContext, vertex);
+
+ TezTaskID taskId = getNewTaskID();
+ scheduleTaskAttempt(taskId, TaskState.SCHEDULED); // attempt_0
+ launchTaskAttempt(mockTask.getLastAttempt().getID());
+ killRunningTaskAttempt(mockTask.getLastAttempt().getID(), TaskState.RUNNING, 1); // attempt_1
+
+ launchTaskAttempt(mockTask.getLastAttempt().getID());
+ killRunningTaskAttempt(mockTask.getLastAttempt().getID(), TaskState.FAILED, 2); // attempt_2 -> reached 3
+
+ assertEquals(TaskStateInternal.FAILED, mockTask.getInternalState());
+ verifyOutgoingEvents(eventHandler.events, VertexEventType.V_TASK_COMPLETED);
+ }
+
+ @Test(timeout = 5000)
public void testFailedAttemptWithFatalError() {
LOG.info("--- START: testFailedAttemptWithFatalError ---");
TezTaskID taskId = getNewTaskID();
@@ -428,7 +461,7 @@ public class TestTaskImpl {
TezTaskID taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
launchTaskAttempt(mockTask.getLastAttempt().getID());
- killRunningTaskAttempt(mockTask.getLastAttempt().getID());
+ killRunningTaskAttempt(mockTask.getLastAttempt().getID(), TaskState.RUNNING);
assertEquals(TaskStateInternal.RUNNING, mockTask.getInternalState());
killTask(taskId);
mockTask.handle(createTaskTAKilledEvent(mockTask.getLastAttempt().getID()));
@@ -503,7 +536,7 @@ public class TestTaskImpl {
scheduleTaskAttempt(taskId);
TezTaskAttemptID lastTAId = mockTask.getLastAttempt().getID();
launchTaskAttempt(mockTask.getLastAttempt().getID());
- killRunningTaskAttempt(mockTask.getLastAttempt().getID());
+ killRunningTaskAttempt(mockTask.getLastAttempt().getID(), TaskState.RUNNING);
// last killed attempt should be causal TA of next attempt
Assert.assertEquals(lastTAId, mockTask.getLastAttempt().getSchedulingCausalTA());
}