You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by jl...@apache.org on 2016/03/17 20:07:25 UTC
tez git commit: TEZ-2958. Recovered TA,
whose commit cannot be recovered, should move to killed state (jlowe)
Repository: tez
Updated Branches:
refs/heads/master 8e969abf9 -> 191447e09
TEZ-2958. Recovered TA, whose commit cannot be recovered, should move to killed state (jlowe)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/191447e0
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/191447e0
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/191447e0
Branch: refs/heads/master
Commit: 191447e092e0432ebbc521113813e138c3e55818
Parents: 8e969ab
Author: Jason Lowe <jl...@apache.org>
Authored: Thu Mar 17 19:00:46 2016 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Thu Mar 17 19:00:46 2016 +0000
----------------------------------------------------------------------
CHANGES.txt | 2 ++
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 8 +++---
.../apache/tez/dag/app/dag/impl/TaskImpl.java | 26 +++++++++++---------
.../tez/dag/app/dag/impl/TestDAGRecovery.java | 7 +++---
4 files changed, 26 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/191447e0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 78228c4..5fd3856 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
TEZ-3029. Add an onError method to service plugin contexts.
ALL CHANGES:
+ TEZ-2958. Recovered TA, whose commit cannot be recovered, should move to killed state
TEZ-2936. Create ATS implementation that enables support for YARN-4265 (ATS v1.5)
TEZ-3148. Invalid event TA_TEZ_EVENT_UPDATE on TaskAttempt.
TEZ-3105. TezMxBeanResourceCalculator does not work on IBM JDK 7 or 8 causing Tez failures.
@@ -404,6 +405,7 @@ INCOMPATIBLE CHANGES
TEZ-2949. Allow duplicate dag names within session for Tez.
ALL CHANGES:
+ TEZ-2958. Recovered TA, whose commit cannot be recovered, should move to killed state
TEZ-3105. TezMxBeanResourceCalculator does not work on IBM JDK 7 or 8 causing Tez failures.
TEZ-2863. Container, node, and logs not available in UI for tasks that fail to launch
TEZ-3140. Reduce AM memory usage during serialization
http://git-wip-us.apache.org/repos/asf/tez/blob/191447e0/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 6f44f3d..702c323 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -79,7 +79,6 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptKilled;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminated;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminatedBySystem;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventKillRequest;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventTezEventUpdate;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventTerminationCauseEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed;
@@ -172,7 +171,7 @@ public class TaskAttemptImpl implements TaskAttempt,
private final Lock writeLock;
protected final AppContext appContext;
private final TaskHeartbeatHandler taskHeartbeatHandler;
- private final TaskAttemptRecoveryData recoveryData;
+ private TaskAttemptRecoveryData recoveryData;
private long launchTime = 0;
private long finishTime = 0;
private String trackerName;
@@ -1663,7 +1662,10 @@ public class TaskAttemptImpl implements TaskAttempt,
MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent, TaskAttemptStateInternal> {
@Override
public TaskAttemptStateInternal transition(TaskAttemptImpl attempt, TaskAttemptEvent event) {
- if (attempt.leafVertex) {
+ boolean fromRecovery = (event instanceof RecoveryEvent
+ && ((RecoveryEvent) event).isFromRecovery());
+ attempt.recoveryData = null;
+ if (!fromRecovery && attempt.leafVertex) {
return TaskAttemptStateInternal.SUCCEEDED;
}
// TODO - TEZ-834. This assumes that the outputs were on that node
http://git-wip-us.apache.org/repos/asf/tez/blob/191447e0/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index bdadf3f..32869a5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -35,6 +35,7 @@ import com.google.common.base.Predicate;
import com.google.common.collect.Maps;
import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -70,6 +71,7 @@ import org.apache.tez.dag.app.dag.event.DAGEvent;
import org.apache.tez.dag.app.dag.event.DAGEventDiagnosticsUpdate;
import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
import org.apache.tez.dag.app.dag.event.DAGEventType;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptKilled;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventKillRequest;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed;
import org.apache.tez.dag.app.dag.event.TaskEvent;
@@ -982,10 +984,10 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
private static class AttemptSucceededTransition
implements MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> {
- private boolean recoverSuccessTaskAttempt(TaskImpl task) {
+ private String recoverSuccessTaskAttempt(TaskImpl task) {
// Found successful attempt
// Recover data
- boolean recoveredData = true;
+ String errorMsg = null;
if (task.getVertex().getOutputCommitters() != null
&& !task.getVertex().getOutputCommitters().isEmpty()) {
for (Entry<String, OutputCommitter> entry
@@ -995,28 +997,29 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
+ ", output=" + entry.getKey());
OutputCommitter committer = entry.getValue();
if (!committer.isTaskRecoverySupported()) {
- LOG.info("Task recovery not supported by committer"
- + ", failing task attempt"
+ errorMsg = "Task recovery not supported by committer"
+ + ", failing task attempt";
+ LOG.info(errorMsg
+ ", taskId=" + task.getTaskId()
+ ", attemptId=" + task.successfulAttempt
+ ", output=" + entry.getKey());
- recoveredData = false;
break;
}
try {
committer.recoverTask(task.getTaskId().getId(),
task.appContext.getApplicationAttemptId().getAttemptId()-1);
} catch (Exception e) {
+ errorMsg = "Task recovery failed by committer: "
+ + ExceptionUtils.getStackTrace(e);
LOG.warn("Task recovery failed by committer"
+ ", taskId=" + task.getTaskId()
+ ", attemptId=" + task.successfulAttempt
+ ", output=" + entry.getKey(), e);
- recoveredData = false;
break;
}
}
}
- return recoveredData;
+ return errorMsg;
}
@Override
@@ -1026,13 +1029,14 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
// recovery. In that case just reschedule new attempt if numFailedAttempts does not exceeded maxFailedAttempts.
if (task.recoveryData!= null
&& task.recoveryData.isTaskAttemptSucceeded(successTaId)) {
- boolean recoveredData = recoverSuccessTaskAttempt(task);
- if (!recoveredData) {
- // Move this TA to KILLED (TEZ-2958)
- LOG.info("Can not recovery the successful task attempt, schedule new task attempt,"
+ String errorMsg = recoverSuccessTaskAttempt(task);
+ if (errorMsg != null) {
+ LOG.info("Can not recover the successful task attempt, schedule new task attempt,"
+ "taskId=" + task.getTaskId());
task.successfulAttempt = null;
task.addAndScheduleAttempt(successTaId);
+ task.eventHandler.handle(new TaskAttemptEventAttemptKilled(successTaId,
+ errorMsg, TaskAttemptTerminationCause.TERMINATED_AT_RECOVERY, true));
return TaskStateInternal.RUNNING;
} else {
task.successfulAttempt = successTaId;
http://git-wip-us.apache.org/repos/asf/tez/blob/191447e0/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
index 0a2613c..0b0af7b 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
@@ -321,6 +321,7 @@ public class TestDAGRecovery {
final ListenableFuture<Void> mockFuture = mock(ListenableFuture.class);
when(appContext.getHadoopShim()).thenReturn(new DefaultHadoopShim());
when(appContext.getApplicationID()).thenReturn(appAttemptId.getApplicationId());
+ when(appContext.getClock()).thenReturn(new SystemClock());
Mockito.doAnswer(new Answer() {
public ListenableFuture<Void> answer(InvocationOnMock invocation) {
@@ -1052,7 +1053,7 @@ public class TestDAGRecovery {
* V2's committer is not recovery supported
*/
@Test//(timeout=5000)
- public void testTARecoverFromSucceeded_OutputCommitterRecoveryNotSupported() {
+ public void testTARecoverFromSucceeded_OutputCommitterRecoveryNotSupported() throws Exception{
initMockDAGRecoveryDataForTaskAttempt();
// set up v2 recovery data
// ta1t1v2: TaskAttemptStartedEvent -> TaskAttemptFinishedEvent(SUCCEEDED)
@@ -1097,8 +1098,8 @@ public class TestDAGRecovery {
TaskImpl task = (TaskImpl)dag.getVertex(v2Id).getTask(t1v2Id);
TaskAttemptImpl taskAttempt = (TaskAttemptImpl)task.getAttempt(ta1t1v2Id);
- assertEquals(TaskAttemptStateInternal.SUCCEEDED, taskAttempt.getInternalState());
- historyEventHandler.verifyHistoryEvent(0, HistoryEventType.TASK_ATTEMPT_FINISHED);
+ assertEquals(TaskAttemptStateInternal.KILLED, taskAttempt.getInternalState());
+ historyEventHandler.verifyHistoryEvent(1, HistoryEventType.TASK_ATTEMPT_FINISHED);
assertEquals(TaskStateInternal.RUNNING, task.getInternalState());
// new task attempt is scheduled
assertEquals(2, task.getAttempts().size());