You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by jl...@apache.org on 2016/03/17 20:07:25 UTC

tez git commit: TEZ-2958. Recovered TA, whose commit cannot be recovered, should move to killed state (jlowe)

Repository: tez
Updated Branches:
  refs/heads/master 8e969abf9 -> 191447e09


TEZ-2958. Recovered TA, whose commit cannot be recovered, should move to killed state (jlowe)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/191447e0
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/191447e0
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/191447e0

Branch: refs/heads/master
Commit: 191447e092e0432ebbc521113813e138c3e55818
Parents: 8e969ab
Author: Jason Lowe <jl...@apache.org>
Authored: Thu Mar 17 19:00:46 2016 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Thu Mar 17 19:00:46 2016 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 ++
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   |  8 +++---
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   | 26 +++++++++++---------
 .../tez/dag/app/dag/impl/TestDAGRecovery.java   |  7 +++---
 4 files changed, 26 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/191447e0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 78228c4..5fd3856 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
   TEZ-3029. Add an onError method to service plugin contexts.
 
 ALL CHANGES:
+  TEZ-2958. Recovered TA, whose commit cannot be recovered, should move to killed state
   TEZ-2936. Create ATS implementation that enables support for YARN-4265 (ATS v1.5)
   TEZ-3148. Invalid event TA_TEZ_EVENT_UPDATE on TaskAttempt.
   TEZ-3105. TezMxBeanResourceCalculator does not work on IBM JDK 7 or 8 causing Tez failures.
@@ -404,6 +405,7 @@ INCOMPATIBLE CHANGES
   TEZ-2949. Allow duplicate dag names within session for Tez.
 
 ALL CHANGES:
+  TEZ-2958. Recovered TA, whose commit cannot be recovered, should move to killed state
   TEZ-3105. TezMxBeanResourceCalculator does not work on IBM JDK 7 or 8 causing Tez failures.
   TEZ-2863. Container, node, and logs not available in UI for tasks that fail to launch
   TEZ-3140. Reduce AM memory usage during serialization

http://git-wip-us.apache.org/repos/asf/tez/blob/191447e0/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 6f44f3d..702c323 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -79,7 +79,6 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptKilled;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminated;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminatedBySystem;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventKillRequest;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventTezEventUpdate;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventTerminationCauseEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed;
@@ -172,7 +171,7 @@ public class TaskAttemptImpl implements TaskAttempt,
   private final Lock writeLock;
   protected final AppContext appContext;
   private final TaskHeartbeatHandler taskHeartbeatHandler;
-  private final TaskAttemptRecoveryData recoveryData;
+  private TaskAttemptRecoveryData recoveryData;
   private long launchTime = 0;
   private long finishTime = 0;
   private String trackerName;
@@ -1663,7 +1662,10 @@ public class TaskAttemptImpl implements TaskAttempt,
       MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent, TaskAttemptStateInternal> {
     @Override
     public TaskAttemptStateInternal transition(TaskAttemptImpl attempt, TaskAttemptEvent event) {
-      if (attempt.leafVertex) {
+      boolean fromRecovery = (event instanceof RecoveryEvent
+          && ((RecoveryEvent) event).isFromRecovery());
+      attempt.recoveryData = null;
+      if (!fromRecovery && attempt.leafVertex) {
         return TaskAttemptStateInternal.SUCCEEDED;
       }
       // TODO - TEZ-834. This assumes that the outputs were on that node

http://git-wip-us.apache.org/repos/asf/tez/blob/191447e0/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index bdadf3f..32869a5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -35,6 +35,7 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.Maps;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -70,6 +71,7 @@ import org.apache.tez.dag.app.dag.event.DAGEvent;
 import org.apache.tez.dag.app.dag.event.DAGEventDiagnosticsUpdate;
 import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
 import org.apache.tez.dag.app.dag.event.DAGEventType;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptKilled;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventKillRequest;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed;
 import org.apache.tez.dag.app.dag.event.TaskEvent;
@@ -982,10 +984,10 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
   private static class AttemptSucceededTransition
       implements MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> {
 
-    private boolean recoverSuccessTaskAttempt(TaskImpl task) {
+    private String recoverSuccessTaskAttempt(TaskImpl task) {
       // Found successful attempt
       // Recover data
-      boolean recoveredData = true;
+      String errorMsg = null;
       if (task.getVertex().getOutputCommitters() != null
           && !task.getVertex().getOutputCommitters().isEmpty()) {
         for (Entry<String, OutputCommitter> entry
@@ -995,28 +997,29 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
               + ", output=" + entry.getKey());
           OutputCommitter committer = entry.getValue();
           if (!committer.isTaskRecoverySupported()) {
-            LOG.info("Task recovery not supported by committer"
-                + ", failing task attempt"
+            errorMsg = "Task recovery not supported by committer"
+                + ", failing task attempt";
+            LOG.info(errorMsg
                 + ", taskId=" + task.getTaskId()
                 + ", attemptId=" + task.successfulAttempt
                 + ", output=" + entry.getKey());
-            recoveredData = false;
             break;
           }
           try {
             committer.recoverTask(task.getTaskId().getId(),
                 task.appContext.getApplicationAttemptId().getAttemptId()-1);
           } catch (Exception e) {
+            errorMsg = "Task recovery failed by committer: "
+                + ExceptionUtils.getStackTrace(e);
             LOG.warn("Task recovery failed by committer"
                 + ", taskId=" + task.getTaskId()
                 + ", attemptId=" + task.successfulAttempt
                 + ", output=" + entry.getKey(), e);
-            recoveredData = false;
             break;
           }
         }
       }
-      return recoveredData;
+      return errorMsg;
     }
 
     @Override
@@ -1026,13 +1029,14 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
       // recovery. In that case just reschedule new attempt if numFailedAttempts does not exceeded maxFailedAttempts.
       if (task.recoveryData!= null
           && task.recoveryData.isTaskAttemptSucceeded(successTaId)) {
-        boolean recoveredData = recoverSuccessTaskAttempt(task);
-        if (!recoveredData) {
-          // Move this TA to KILLED (TEZ-2958)
-          LOG.info("Can not recovery the successful task attempt, schedule new task attempt,"
+        String errorMsg = recoverSuccessTaskAttempt(task);
+        if (errorMsg != null) {
+          LOG.info("Can not recover the successful task attempt, schedule new task attempt,"
               + "taskId=" + task.getTaskId());
           task.successfulAttempt = null;
           task.addAndScheduleAttempt(successTaId);
+          task.eventHandler.handle(new TaskAttemptEventAttemptKilled(successTaId,
+              errorMsg, TaskAttemptTerminationCause.TERMINATED_AT_RECOVERY, true));
           return TaskStateInternal.RUNNING;
         } else {
           task.successfulAttempt = successTaId;

http://git-wip-us.apache.org/repos/asf/tez/blob/191447e0/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
index 0a2613c..0b0af7b 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
@@ -321,6 +321,7 @@ public class TestDAGRecovery {
     final ListenableFuture<Void> mockFuture = mock(ListenableFuture.class);
     when(appContext.getHadoopShim()).thenReturn(new DefaultHadoopShim());
     when(appContext.getApplicationID()).thenReturn(appAttemptId.getApplicationId());
+    when(appContext.getClock()).thenReturn(new SystemClock());
 
     Mockito.doAnswer(new Answer() {
       public ListenableFuture<Void> answer(InvocationOnMock invocation) {
@@ -1052,7 +1053,7 @@ public class TestDAGRecovery {
    * V2's committer is not recovery supported
    */
   @Test//(timeout=5000)
-  public void testTARecoverFromSucceeded_OutputCommitterRecoveryNotSupported() {
+  public void testTARecoverFromSucceeded_OutputCommitterRecoveryNotSupported() throws Exception{
     initMockDAGRecoveryDataForTaskAttempt();
     // set up v2 recovery data
     // ta1t1v2: TaskAttemptStartedEvent -> TaskAttemptFinishedEvent(SUCCEEDED)
@@ -1097,8 +1098,8 @@ public class TestDAGRecovery {
     
     TaskImpl task = (TaskImpl)dag.getVertex(v2Id).getTask(t1v2Id);
     TaskAttemptImpl taskAttempt = (TaskAttemptImpl)task.getAttempt(ta1t1v2Id);
-    assertEquals(TaskAttemptStateInternal.SUCCEEDED, taskAttempt.getInternalState());
-    historyEventHandler.verifyHistoryEvent(0, HistoryEventType.TASK_ATTEMPT_FINISHED);
+    assertEquals(TaskAttemptStateInternal.KILLED, taskAttempt.getInternalState());
+    historyEventHandler.verifyHistoryEvent(1, HistoryEventType.TASK_ATTEMPT_FINISHED);
     assertEquals(TaskStateInternal.RUNNING, task.getInternalState());
     // new task attempt is scheduled
     assertEquals(2, task.getAttempts().size());