You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2014/03/26 19:43:49 UTC

git commit: TEZ-973. Abort additional attempts if recovery fails. (hitesh)

Repository: incubator-tez
Updated Branches:
  refs/heads/master f31aba7bc -> 5a6f42a81


TEZ-973. Abort additional attempts if recovery fails. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/5a6f42a8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/5a6f42a8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/5a6f42a8

Branch: refs/heads/master
Commit: 5a6f42a8118ef47475b4522ae984810616914461
Parents: f31aba7
Author: Hitesh Shah <hi...@apache.org>
Authored: Wed Mar 26 11:43:17 2014 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Wed Mar 26 11:43:17 2014 -0700

----------------------------------------------------------------------
 .../org/apache/tez/dag/app/DAGAppMaster.java    |  29 +++++-
 .../org/apache/tez/dag/app/RecoveryParser.java  |  57 +++++++++--
 .../tez/dag/app/dag/DAGTerminationCause.java    |   5 +-
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    | 101 +++++++++++++------
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  42 +++++---
 .../tez/dag/history/HistoryEventHandler.java    |  14 ++-
 .../dag/history/recovery/RecoveryService.java   |  44 +++++++-
 .../mapreduce/examples/OrderedWordCount.java    |   4 +
 8 files changed, 237 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5a6f42a8/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 45a4f18..526bea7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -444,6 +444,11 @@ public class DAGAppMaster extends AbstractService {
             + ", dagState=" + finishEvt.getDAGState());
         lastDAGCompletionTime = clock.getTime();
         _updateLoggers(currentDAG, "_post");
+        if (this.historyEventHandler.hasRecoveryFailed()) {
+          LOG.warn("Recovery had a fatal error, shutting down session after" +
+              " DAG completion");
+          sessionStopped.set(true);
+        }
         switch(finishEvt.getDAGState()) {
         case SUCCEEDED:
           if (!currentDAG.getName().startsWith(
@@ -451,7 +456,6 @@ public class DAGAppMaster extends AbstractService {
             successfulDAGs.incrementAndGet();
           }
           break;
-        case ERROR:
         case FAILED:
           if (!currentDAG.getName().startsWith(
               TezConfiguration.TEZ_PREWARM_DAG_NAME_PREFIX)) {
@@ -464,6 +468,11 @@ public class DAGAppMaster extends AbstractService {
             killedDAGs.incrementAndGet();
           }
           break;
+        case ERROR:
+          if (!currentDAG.getName().startsWith(
+              TezConfiguration.TEZ_PREWARM_DAG_NAME_PREFIX)) {
+            failedDAGs.incrementAndGet();
+          }
         default:
           LOG.fatal("Received a DAG Finished Event with state="
               + finishEvt.getDAGState()
@@ -481,7 +490,11 @@ public class DAGAppMaster extends AbstractService {
           } else {
             LOG.info("Session shutting down now.");
             this.taskSchedulerEventHandler.setShouldUnregisterFlag();
-            state = DAGAppMasterState.SUCCEEDED;
+            if (this.historyEventHandler.hasRecoveryFailed()) {
+              state = DAGAppMasterState.FAILED;
+            } else {
+              state = DAGAppMasterState.SUCCEEDED;
+            }
             shutdownHandler.shutdown();
           }
         }
@@ -1418,7 +1431,17 @@ public class DAGAppMaster extends AbstractService {
 
     this.lastDAGCompletionTime = clock.getTime();
 
-    RecoveredDAGData recoveredDAGData = recoverDAG();
+    RecoveredDAGData recoveredDAGData;
+    try {
+      recoveredDAGData = recoverDAG();
+    } catch (IOException e) {
+      LOG.error("Error occurred when trying to recover data from previous attempt."
+          + " Shutting down AM", e);
+      this.state = DAGAppMasterState.ERROR;
+      this.taskSchedulerEventHandler.setShouldUnregisterFlag();
+      shutdownHandler.shutdown();
+      return;
+    }
 
     if (!isSession) {
       LOG.info("In Non-Session mode.");

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5a6f42a8/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
index 7e1feca..45c98e6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
@@ -60,6 +60,7 @@ import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent;
 import org.apache.tez.dag.history.events.VertexInitializedEvent;
 import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
 import org.apache.tez.dag.history.events.VertexStartedEvent;
+import org.apache.tez.dag.history.recovery.RecoveryService;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.dag.recovery.records.RecoveryProtos;
@@ -81,7 +82,7 @@ public class RecoveryParser {
   public RecoveryParser(DAGAppMaster dagAppMaster,
       FileSystem recoveryFS,
       Path recoveryDataDir,
-      int currentAttemptId) {
+      int currentAttemptId) throws IOException {
     this.dagAppMaster = dagAppMaster;
     this.recoveryFS = recoveryFS;
     this.recoveryDataDir = recoveryDataDir;
@@ -91,6 +92,8 @@ public class RecoveryParser {
     recoveryBufferSize = dagAppMaster.getConfig().getInt(
         TezConfiguration.DAG_RECOVERY_FILE_IO_BUFFER_SIZE,
         TezConfiguration.DAG_RECOVERY_FILE_IO_BUFFER_SIZE_DEFAULT);
+
+    this.recoveryFS.mkdirs(currentAttemptRecoveryDataDir);
   }
 
   public static class RecoveredDAGData {
@@ -308,22 +311,57 @@ public class RecoveryParser {
     return inProgressDAG;
   }
 
-  private Path getPreviousAttemptRecoveryDataDir() {
+  private Path getPreviousAttemptRecoveryDataDir() throws IOException {
+    LOG.info("Looking for the correct attempt directory to recover from");
     int foundPreviousAttempt = -1;
     for (int i = currentAttemptId - 1; i > 0; --i) {
       Path attemptPath = getAttemptRecoveryDataDir(recoveryDataDir, i);
+      LOG.info("Looking at attempt directory, path=" + attemptPath);
+      Path fatalErrorOccurred = new Path(attemptPath,
+          RecoveryService.RECOVERY_FATAL_OCCURRED_DIR);
+      if (recoveryFS.exists(fatalErrorOccurred)) {
+        throw new IOException("Found that a fatal error occurred in"
+            + " recovery during previous attempt, foundFile="
+            + fatalErrorOccurred.toString());
+      }
+
       Path dataRecoveredFile = new Path(attemptPath, dataRecoveredFileFlag);
       try {
         if (recoveryFS.exists(dataRecoveredFile)) {
+          LOG.info("Found data recovered file in attempt directory"
+              + ", dataRecoveredFile=" + dataRecoveredFile
+              + ", path=" + attemptPath);
           foundPreviousAttempt = i;
           break;
         }
+        LOG.info("Skipping attempt directory as data recovered file does not exist"
+            + ", dataRecoveredFile=" + dataRecoveredFile
+            + ", path=" + attemptPath);
       } catch (IOException e) {
         LOG.warn("Exception when checking previous attempt dir for "
             + dataRecoveredFile.toString(), e);
       }
     }
     if (foundPreviousAttempt == -1) {
+      // Look for oldest summary file and use that
+      LOG.info("Did not find any attempt dir that had data recovered file."
+          + " Looking for oldest summary file");
+      for (int i = 1; i < currentAttemptId; ++i) {
+        Path attemptPath = getAttemptRecoveryDataDir(recoveryDataDir, i);
+        Path summaryPath = getSummaryPath(attemptPath);
+        if (recoveryFS.exists(summaryPath)) {
+          LOG.info("Found summary file in attempt directory"
+              + ", summaryFile=" + summaryPath
+              + ", path=" + attemptPath);
+          foundPreviousAttempt = i;
+          break;
+        }
+        LOG.info("Skipping attempt directory as no summary file found"
+            + ", summaryFile=" + summaryPath
+            + ", path=" + attemptPath);
+      }
+    }
+    if (foundPreviousAttempt == -1) {
       LOG.info("Falling back to first attempt as no other recovered attempts"
           + " found");
       foundPreviousAttempt = 1;
@@ -466,6 +504,7 @@ public class RecoveryParser {
     if (!recoveryFS.exists(previousAttemptRecoveryDataDir)) {
       LOG.info("Nothing to recover as previous attempt data does not exist"
           + ", previousAttemptDir=" + previousAttemptRecoveryDataDir.toString());
+      createDataRecoveredFlagFile();
       return null;
     }
 
@@ -476,6 +515,7 @@ public class RecoveryParser {
       LOG.info("Nothing to recover as summary file does not exist"
           + ", previousAttemptDir=" + previousAttemptRecoveryDataDir.toString()
           + ", summaryPath=" + summaryPath.toString());
+      createDataRecoveredFlagFile();
       return null;
     }
 
@@ -832,18 +872,23 @@ public class RecoveryParser {
       }
     }
 
+    LOG.info("Finished copying data from previous attempt into current attempt");
+    createDataRecoveredFlagFile();
+
+    return recoveredDAGData;
+  }
+
+  private void createDataRecoveredFlagFile() throws IOException {
     Path dataCopiedFlagPath = new Path(currentAttemptRecoveryDataDir,
         dataRecoveredFileFlag);
-    LOG.info("Finished copying data from previous attempt into current attempt"
-        + " - setting flag by creating file"
-        + ", path=" + dataCopiedFlagPath.toString());
+    LOG.info("Trying to create data recovered flag file"
+        + ", filePath=" + dataCopiedFlagPath.toString());
     FSDataOutputStream flagFile =
         recoveryFS.create(dataCopiedFlagPath, true, recoveryBufferSize);
     flagFile.writeInt(1);
     flagFile.hsync();
     flagFile.close();
 
-    return recoveredDAGData;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5a6f42a8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java
index f7020da..d01fb2f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java
@@ -38,6 +38,9 @@ public enum DAGTerminationCause {
   
   /** DAG failed during output commit. */
   COMMIT_FAILURE,
-  
+
+  /** DAG failed while trying to write recovery events */
+  RECOVERY_FAILURE,
+
   INTERNAL_ERROR
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5a6f42a8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index ed73433..0e4c504 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -250,7 +250,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
           // Transitions from TERMINATING state.
           .addTransition
               (DAGState.TERMINATING,
-              EnumSet.of(DAGState.TERMINATING, DAGState.KILLED, DAGState.FAILED),
+              EnumSet.of(DAGState.TERMINATING, DAGState.KILLED, DAGState.FAILED,
+                  DAGState.ERROR),
               DAGEventType.DAG_VERTEX_COMPLETED,
               new VertexCompletedTransition())
           .addTransition(DAGState.TERMINATING, DAGState.TERMINATING,
@@ -734,6 +735,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
             new DAGCommitStartedEvent(getID(), clock.getTime())));
       } catch (IOException e) {
         LOG.error("Failed to send commit event to history/recovery handler", e);
+        trySetTerminationCause(DAGTerminationCause.RECOVERY_FAILURE);
         return false;
       }
       for (VertexGroupInfo groupInfo : vertexGroups.values()) {
@@ -887,11 +889,11 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     finishTime = clock.getTime();
   }
 
-  void logJobHistoryFinishedEvent() {
+  void logJobHistoryFinishedEvent() throws IOException {
     this.setFinishTime();
     DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, startTime,
         finishTime, DAGState.SUCCEEDED, "", getAllCounters());
-    this.appContext.getHistoryHandler().handle(
+    this.appContext.getHistoryHandler().handleCriticalEvent(
         new DAGHistoryEvent(dagId, finishEvt));
   }
 
@@ -909,12 +911,12 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
         new DAGHistoryEvent(dagId, startEvt));
   }
 
-  void logJobHistoryUnsuccesfulEvent(DAGState state) {
+  void logJobHistoryUnsuccesfulEvent(DAGState state) throws IOException {
     DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, startTime,
         clock.getTime(), state,
         StringUtils.join(LINE_SEPARATOR, getDiagnostics()),
         getAllCounters());
-    this.appContext.getHistoryHandler().handle(
+    this.appContext.getHistoryHandler().handleCriticalEvent(
         new DAGHistoryEvent(dagId, finishEvt));
   }
 
@@ -967,7 +969,15 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
         dag.addDiagnostic(diagnosticMsg);
         return dag.finished(DAGState.FAILED);
       }
-      
+      if(dag.terminationCause == DAGTerminationCause.RECOVERY_FAILURE ){
+        String diagnosticMsg = "DAG failed due to failure in recovery handling." +
+            " failedVertices:" + dag.numFailedVertices +
+            " killedVertices:" + dag.numKilledVertices;
+        LOG.info(diagnosticMsg);
+        dag.addDiagnostic(diagnosticMsg);
+        return dag.finished(DAGState.FAILED);
+      }
+
       // catch all
       String diagnosticMsg = "All vertices complete, but cannot determine final state of DAG"
           + ", numCompletedVertices=" + dag.numCompletedVertices
@@ -1016,14 +1026,26 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       finalState = DAGState.FAILED;
       trySetTerminationCause(DAGTerminationCause.COMMIT_FAILURE);
     }
-    
-    if (finalState == DAGState.SUCCEEDED) {
-      logJobHistoryFinishedEvent();
+
+    boolean recoveryError = false;
+    try {
+      if (finalState == DAGState.SUCCEEDED) {
+        logJobHistoryFinishedEvent();
+      } else {
+        logJobHistoryUnsuccesfulEvent(finalState);
+      }
+    } catch (IOException e) {
+      LOG.warn("Failed to persist recovery event for DAG completion"
+          + ", dagId=" + dagId
+          + ", finalState=" + finalState);
+      recoveryError = true;
+    }
+
+    if (recoveryError) {
+      eventHandler.handle(new DAGAppMasterEventDAGFinished(getID(), DAGState.ERROR));
     } else {
-      logJobHistoryUnsuccesfulEvent(finalState);
+      eventHandler.handle(new DAGAppMasterEventDAGFinished(getID(), finalState));
     }
-    
-    eventHandler.handle(new DAGAppMasterEventDAGFinished(getID(), finalState));
 
     LOG.info("DAG: " + getID() + " finished with state: " + finalState);
     return finalState;
@@ -1339,10 +1361,17 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
               dag.eventHandler.handle(new VertexEventRecoverVertex(v.getVertexId(),
                   desiredState));
             }
-            dag.logJobHistoryUnsuccesfulEvent(DAGState.FAILED);
+            DAGState endState = DAGState.FAILED;
+            try {
+              dag.logJobHistoryUnsuccesfulEvent(endState);
+            } catch (IOException e) {
+              LOG.warn("Failed to persist recovery event for DAG completion"
+                  + ", dagId=" + dag.dagId
+                  + ", finalState=" + endState);
+            }
             dag.eventHandler.handle(new DAGAppMasterEventDAGFinished(dag.getID(),
-                DAGState.FAILED));
-            return DAGState.FAILED;
+                endState));
+            return endState;
           }
 
           for (Vertex v : dag.vertices.values()) {
@@ -1387,9 +1416,10 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
           // Error state
           LOG.warn("Trying to recover DAG, failed to recover"
               + " from non-handled state" + dag.recoveredState);
+          // Tell AM ERROR so that it can shutdown
           dag.eventHandler.handle(new DAGAppMasterEventDAGFinished(dag.getID(),
               DAGState.ERROR));
-          return DAGState.ERROR;
+          return DAGState.FAILED;
       }
     }
 
@@ -1494,23 +1524,27 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   // Task-start has been moved out of InitTransition, so this arc simply
   // hardcodes 0 for both map and reduce finished tasks.
   private static class KillNewJobTransition
-  implements SingleArcTransition<DAGImpl, DAGEvent> {
+      implements SingleArcTransition<DAGImpl, DAGEvent> {
+
     @Override
-    public void transition(DAGImpl job, DAGEvent event) {
-      job.setFinishTime();
-      job.trySetTerminationCause(DAGTerminationCause.DAG_KILL);
-      job.finished(DAGState.KILLED);
+    public void transition(DAGImpl dag, DAGEvent dagEvent) {
+      dag.setFinishTime();
+      dag.trySetTerminationCause(DAGTerminationCause.DAG_KILL);
+      dag.finished(DAGState.KILLED);
     }
+
   }
 
   private static class KillInitedJobTransition
-  implements SingleArcTransition<DAGImpl, DAGEvent> {
+      implements SingleArcTransition<DAGImpl, DAGEvent> {
+
     @Override
-    public void transition(DAGImpl job, DAGEvent event) {
-      job.trySetTerminationCause(DAGTerminationCause.DAG_KILL);
-      job.addDiagnostic("Job received Kill in INITED state.");
-      job.finished(DAGState.KILLED);
+    public void transition(DAGImpl dag, DAGEvent dagEvent) {
+      dag.trySetTerminationCause(DAGTerminationCause.DAG_KILL);
+      dag.addDiagnostic("Job received Kill in INITED state.");
+      dag.finished(DAGState.KILLED);
     }
+
   }
 
   private static class DAGKilledTransition
@@ -1610,6 +1644,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   private boolean vertexSucceeded(Vertex vertex) {
     numSuccessfulVertices++;
     boolean failedCommit = false;
+    boolean recoveryFailed = false;
     if (!commitAllOutputsOnSuccess) {
       // committing successful outputs immediately. check for shared outputs
       List<VertexGroupInfo> groupsList = vertexGroupInfo.get(vertex.getName());
@@ -1640,6 +1675,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
                     clock.getTime())));
           } catch (IOException e) {
             LOG.error("Failed to send commit recovery event to handler", e);
+            recoveryFailed = true;
             failedCommit = true;
           }
           if (!failedCommit) {
@@ -1662,6 +1698,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
                     clock.getTime())));
           } catch (IOException e) {
             LOG.error("Failed to send commit recovery event to handler", e);
+            recoveryFailed = true;
             failedCommit = true;
           }
         }
@@ -1670,10 +1707,16 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
 
     if (failedCommit) {
       LOG.info("Aborting job due to failure in commit.");
-      enactKill(DAGTerminationCause.COMMIT_FAILURE,
-          VertexTerminationCause.COMMIT_FAILURE);
+      if (!recoveryFailed) {
+        enactKill(DAGTerminationCause.COMMIT_FAILURE,
+            VertexTerminationCause.COMMIT_FAILURE);
+      } else {
+        LOG.info("Recovery failure occurred during commit");
+        enactKill(DAGTerminationCause.RECOVERY_FAILURE,
+            VertexTerminationCause.COMMIT_FAILURE);
+      }
     }
-    
+
     return !failedCommit;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5a6f42a8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 7f94ed7..6f1398a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1276,12 +1276,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         new DAGHistoryEvent(getDAGId(), finishEvt));
   }
 
-  void logJobHistoryVertexFailedEvent(VertexState state) {
+  void logJobHistoryVertexFailedEvent(VertexState state) throws IOException {
     VertexFinishedEvent finishEvt = new VertexFinishedEvent(vertexId,
         vertexName, initTimeRequested, initedTime, startTimeRequested,
         startedTime, clock.getTime(), state, StringUtils.join(LINE_SEPARATOR,
             getDiagnostics()), getAllCounters(), getVertexStats());
-    this.appContext.getHistoryHandler().handle(
+    this.appContext.getHistoryHandler().handleCriticalEvent(
         new DAGHistoryEvent(getDAGId(), finishEvt));
   }
 
@@ -1331,10 +1331,17 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                 }
                 if (firstCommit) {
                   // Log commit start event on first actual commit
-                  vertex.appContext.getHistoryHandler().handle(
-                      new DAGHistoryEvent(vertex.getDAGId(),
-                          new VertexCommitStartedEvent(vertex.vertexId,
-                              vertex.clock.getTime())));
+                  try {
+                    vertex.appContext.getHistoryHandler().handleCriticalEvent(
+                        new DAGHistoryEvent(vertex.getDAGId(),
+                            new VertexCommitStartedEvent(vertex.vertexId,
+                                vertex.clock.getTime())));
+                  } catch (IOException e) {
+                    LOG.error("Failed to persist commit start event to recovery, vertexId="
+                        + vertex.logIdentifier, e);
+                    vertex.trySetTerminationCause(VertexTerminationCause.INTERNAL_ERROR);
+                    return vertex.finished(VertexState.FAILED);
+                  }
                 } else {
                   firstCommit = false;
                 }
@@ -1430,24 +1437,33 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       case ERROR:
         eventHandler.handle(new DAGEvent(getDAGId(),
             DAGEventType.INTERNAL_ERROR));
-        logJobHistoryVertexFailedEvent(finalState);
+        try {
+          logJobHistoryVertexFailedEvent(finalState);
+        } catch (IOException e) {
+          LOG.error("Failed to send vertex finished event to recovery", e);
+        }
         break;
       case KILLED:
       case FAILED:
         eventHandler.handle(new DAGEventVertexCompleted(getVertexId(),
             finalState, terminationCause));
-        logJobHistoryVertexFailedEvent(finalState);
+        try {
+          logJobHistoryVertexFailedEvent(finalState);
+        } catch (IOException e) {
+          LOG.error("Failed to send vertex finished event to recovery", e);
+        }
         break;
       case SUCCEEDED:
-        eventHandler.handle(new DAGEventVertexCompleted(getVertexId(),
-            finalState));
         try {
           logJobHistoryVertexFinishedEvent();
+          eventHandler.handle(new DAGEventVertexCompleted(getVertexId(),
+              finalState));
         } catch (IOException e) {
           LOG.error("Failed to send vertex finished event to recovery", e);
-          finalState = VertexState.ERROR;
-          eventHandler.handle(new DAGEvent(getDAGId(),
-              DAGEventType.INTERNAL_ERROR));
+          finalState = VertexState.FAILED;
+          this.terminationCause = VertexTerminationCause.INTERNAL_ERROR;
+          eventHandler.handle(new DAGEventVertexCompleted(getVertexId(),
+              finalState));
         }
         break;
       default:

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5a6f42a8/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
index 413d4ef..4eb094f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
@@ -37,7 +37,6 @@ public class HistoryEventHandler extends CompositeService {
 
   private final AppContext context;
   private boolean yarnATSEnabled;
-  private AtomicBoolean stopped = new AtomicBoolean(false);
   private ATSService atsService;
   private RecoveryService recoveryService;
   private boolean recoveryEnabled;
@@ -114,12 +113,17 @@ public class HistoryEventHandler extends CompositeService {
     try {
       handleCriticalEvent(event);
     } catch (IOException e) {
-      throw new RuntimeException(e);
+      LOG.warn("Failed to handle recovery event"
+          + ", eventType=" + event.getHistoryEvent().getEventType(), e);
     }
   }
 
-
-
-
+  public boolean hasRecoveryFailed() {
+    if (recoveryEnabled) {
+      return recoveryService.hasRecoveryFailed();
+    } else {
+      return false;
+    }
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5a6f42a8/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
index 110da65..840ad1d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
@@ -48,6 +48,9 @@ public class RecoveryService extends AbstractService {
   private static final Log LOG = LogFactory.getLog(RecoveryService.class);
   private final AppContext appContext;
 
+  public static final String RECOVERY_FATAL_OCCURRED_DIR =
+      "RecoveryFatalErrorOccurred";
+
   private LinkedBlockingQueue<DAGHistoryEvent> eventQueue =
       new LinkedBlockingQueue<DAGHistoryEvent>();
   private Set<TezDAGID> completedDAGs = new HashSet<TezDAGID>();
@@ -69,6 +72,7 @@ public class RecoveryService extends AbstractService {
   private long lastFlushTime = -1;
   private int maxUnflushedEvents;
   private int flushInterval;
+  private AtomicBoolean recoveryFatalErrorOccurred = new AtomicBoolean(false);
 
   public RecoveryService(AppContext appContext) {
     super(RecoveryService.class.getName());
@@ -99,6 +103,13 @@ public class RecoveryService extends AbstractService {
         DAGHistoryEvent event;
         while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
 
+          if (recoveryFatalErrorOccurred.get()) {
+            LOG.error("Recovery failure occurred. Stopping recovery thread."
+                + " Current eventQueueSize=" + eventQueue.size());
+            eventQueue.clear();
+            return;
+          }
+
           // Log the size of the event-queue every so often.
           if (eventCounter != 0 && eventCounter % 1000 == 0) {
             LOG.info("Event queue stats"
@@ -169,6 +180,11 @@ public class RecoveryService extends AbstractService {
       return;
     }
     HistoryEventType eventType = event.getHistoryEvent().getEventType();
+
+    if (recoveryFatalErrorOccurred.get()) {
+      return;
+    }
+
     if (!started.get()) {
       LOG.warn("Adding event of type " + eventType
           + " to queue as service not started");
@@ -233,9 +249,29 @@ public class RecoveryService extends AbstractService {
             }
           }
         } catch (IOException ioe) {
-          LOG.warn("Error handling summary event"
+          LOG.error("Error handling summary event"
               + ", eventType=" + event.getHistoryEvent().getEventType(), ioe);
-          throw ioe;
+          Path fatalErrorDir = new Path(recoveryPath, RECOVERY_FATAL_OCCURRED_DIR);
+          try {
+            LOG.error("Adding a flag to ensure next AM attempt does not start up"
+                + ", flagFile=" + fatalErrorDir.toString());
+            recoveryFatalErrorOccurred.set(true);
+            recoveryDirFS.mkdirs(fatalErrorDir);
+            if (recoveryDirFS.exists(fatalErrorDir)) {
+              LOG.error("Recovery failure occurred. Skipping all events");
+            } else {
+              // throw error if fatal error flag could not be set
+              throw ioe;
+            }
+          } catch (IOException e) {
+            LOG.fatal("Failed to create fatal error flag dir "
+                + fatalErrorDir.toString(), e);
+            throw ioe;
+          }
+          if (eventType.equals(HistoryEventType.DAG_SUBMITTED)) {
+            // Throw error to tell client that dag submission failed
+            throw ioe;
+          }
         }
       }
     } else {
@@ -374,4 +410,8 @@ public class RecoveryService extends AbstractService {
     lastFlushTime = currentTime;
   }
 
+  public boolean hasRecoveryFailed() {
+    return recoveryFatalErrorOccurred.get();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5a6f42a8/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
index 93a3a48..9874f7b 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
@@ -509,11 +509,15 @@ public class OrderedWordCount {
             + dagStatus.getDiagnostics());
         }
       }
+    } catch (Exception e) {
+      LOG.error("Error occurred when submitting/running DAGs", e);
+      throw e;
     } finally {
       if (!retainStagingDir) {
         fs.delete(stagingDir, true);
       }
       if (useTezSession) {
+        LOG.info("Shutting down session");
         tezSession.stop();
       }
     }