You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2014/03/06 00:35:19 UTC

[4/4] git commit: TEZ-847. Support basic AM recovery. (hitesh)

TEZ-847. Support basic AM recovery. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/5b464f27
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/5b464f27
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/5b464f27

Branch: refs/heads/master
Commit: 5b464f27da273723e422607518c271cd3f040560
Parents: 18290c8
Author: Hitesh Shah <hi...@apache.org>
Authored: Wed Mar 5 15:34:53 2014 -0800
Committer: Hitesh Shah <hi...@apache.org>
Committed: Wed Mar 5 15:34:53 2014 -0800

----------------------------------------------------------------------
 .../java/org/apache/tez/client/TezSession.java  |    2 -
 .../apache/tez/dag/api/TezConfiguration.java    |   10 +-
 .../tez/dag/api/VertexManagerPluginContext.java |    5 +
 .../apache/tez/runtime/api/OutputCommitter.java |   18 +
 .../tez/runtime/api/OutputCommitterContext.java |   10 +
 tez-api/src/main/proto/DAGApiRecords.proto      |   15 +-
 .../tez/dag/api/client/VertexStatusBuilder.java |    2 +
 .../org/apache/tez/dag/app/DAGAppMaster.java    |   72 +-
 .../org/apache/tez/dag/app/RecoveryParser.java  |  613 ++++++++++
 .../java/org/apache/tez/dag/app/dag/DAG.java    |    3 +
 .../java/org/apache/tez/dag/app/dag/Task.java   |    3 +
 .../org/apache/tez/dag/app/dag/TaskAttempt.java |    6 +-
 .../java/org/apache/tez/dag/app/dag/Vertex.java |    5 +
 .../org/apache/tez/dag/app/dag/VertexState.java |    1 +
 .../tez/dag/app/dag/event/DAGEventType.java     |    6 +-
 .../dag/app/dag/event/TaskAttemptEventType.java |    3 +
 .../dag/app/dag/event/TaskEventRecoverTask.java |   41 +
 .../tez/dag/app/dag/event/TaskEventType.java    |    6 +-
 .../app/dag/event/VertexEventRecoverVertex.java |   36 +
 .../app/dag/event/VertexEventRouteEvent.java    |   14 +-
 .../event/VertexEventSourceVertexRecovered.java |   56 +
 .../tez/dag/app/dag/event/VertexEventType.java  |    8 +-
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    |  465 +++++---
 .../dag/impl/OutputCommitterContextImpl.java    |   10 +-
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   |   87 +-
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   |  234 +++-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 1054 +++++++++++++++---
 .../tez/dag/app/dag/impl/VertexManager.java     |   30 +-
 .../tez/dag/history/HistoryEventType.java       |    5 +-
 .../apache/tez/dag/history/ats/ATSService.java  |    7 -
 .../tez/dag/history/events/AMLaunchedEvent.java |   14 +-
 .../tez/dag/history/events/AMStartedEvent.java  |    9 +
 .../history/events/ContainerLaunchedEvent.java  |   12 +
 .../history/events/DAGCommitStartedEvent.java   |   94 ++
 .../dag/history/events/DAGFinishedEvent.java    |   67 +-
 .../dag/history/events/DAGInitializedEvent.java |    7 +
 .../tez/dag/history/events/DAGStartedEvent.java |    7 +
 .../dag/history/events/DAGSubmittedEvent.java   |   11 +
 .../events/TaskAttemptFinishedEvent.java        |   51 +-
 .../history/events/TaskAttemptStartedEvent.java |   17 +-
 .../dag/history/events/TaskFinishedEvent.java   |   56 +-
 .../dag/history/events/TaskStartedEvent.java    |   14 +-
 .../events/VertexCommitStartedEvent.java        |   94 ++
 .../VertexDataMovementEventsGeneratedEvent.java |    8 +
 .../dag/history/events/VertexFinishedEvent.java |   59 +-
 .../history/events/VertexInitializedEvent.java  |   78 +-
 .../events/VertexParallelismUpdatedEvent.java   |  159 +++
 .../dag/history/events/VertexStartedEvent.java  |   14 +-
 .../dag/history/recovery/RecoveryService.java   |  171 ++-
 .../apache/tez/dag/recovery/RecoveryParser.java |  186 ----
 tez-dag/src/main/proto/HistoryEvents.proto      |   23 +-
 .../dag/api/client/TestVertexStatusBuilder.java |    8 +-
 .../TestHistoryEventsProtoConversion.java       |  569 ++++++++++
 tez-dist/pom.xml                                |   12 +-
 .../mapreduce/committer/MROutputCommitter.java  |   34 +-
 .../tez/runtime/api/impl/EventMetaData.java     |    2 +-
 .../apache/tez/runtime/api/impl/InputSpec.java  |    4 +
 .../vertexmanager/ShuffleVertexManager.java     |    4 +-
 .../org/apache/tez/test/TestDAGRecovery.java    |  135 +++
 .../apache/tez/test/dag/MultiAttemptDAG.java    |  177 +++
 60 files changed, 4259 insertions(+), 664 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-api/src/main/java/org/apache/tez/client/TezSession.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezSession.java b/tez-api/src/main/java/org/apache/tez/client/TezSession.java
index 8f3101e..9e29910 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezSession.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezSession.java
@@ -116,8 +116,6 @@ public class TezSession {
               sessionConfig.getTezConfiguration(), applicationId,
               null, sessionName, sessionConfig.getAMConfiguration(),
               tezJarResources, sessionCredentials);
-      // Set Tez Sessions to not retry on AM crashes
-      appContext.setMaxAppAttempts(1);
       yarnClient.submitApplication(appContext);
     } catch (YarnException e) {
       throw new TezException(e);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 142d2d9..29d04ab 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -369,12 +369,20 @@ public class TezConfiguration extends Configuration {
 
   public static final String DAG_RECOVERY_ENABLED =
       TEZ_PREFIX + "dag.recovery.enabled";
-  public static final boolean DAG_RECOVERY_ENABLED_DEFAULT = false;
+  public static final boolean DAG_RECOVERY_ENABLED_DEFAULT = true;
 
   public static final String DAG_RECOVERY_FILE_IO_BUFFER_SIZE =
       TEZ_PREFIX + "dag.recovery.io.buffer.size";
   public static final int DAG_RECOVERY_FILE_IO_BUFFER_SIZE_DEFAULT = 8192;
 
+  public static final String DAG_RECOVERY_MAX_UNFLUSHED_EVENTS =
+      TEZ_PREFIX + "dag.recovery.max.unflushed.events";
+  public static final int DAG_RECOVERY_MAX_UNFLUSHED_EVENTS_DEFAULT = 100;
+
+  public static final String DAG_RECOVERY_FLUSH_INTERVAL_SECS =
+      TEZ_PREFIX + "dag.recovery.flush.interval.secs";
+  public static final int DAG_RECOVERY_FLUSH_INTERVAL_SECS_DEFAULT = 30;
+
   public static final String DAG_RECOVERY_DATA_DIR_NAME = "recovery";
   public static final String DAG_RECOVERY_SUMMARY_FILE_SUFFIX = ".summary";
   public static final String DAG_RECOVERY_RECOVER_FILE_SUFFIX = ".recovery";

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
index a281b90..76202f8 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
@@ -125,4 +125,9 @@ public interface VertexManagerPluginContext {
    * @param locationHint
    */
   public void setVertexLocationHint(VertexLocationHint locationHint);
+
+  /**
+   * @return DAG Attempt number
+   */
+  public int getDAGAttemptNumber();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitter.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitter.java b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitter.java
index 301d01b..aadbf12 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitter.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitter.java
@@ -69,4 +69,22 @@ public abstract class OutputCommitter {
   public abstract void abortOutput(VertexStatus.State finalState)
     throws Exception;
 
+  /**
+   * Whether the OutputCommitter supports recovery of output from a Task
+   * that completed in a previous DAG attempt
+   * @return True if recovery supported
+   */
+  public boolean isTaskRecoverySupported() {
+    return true;
+  }
+
+  /**
+   * Recover task output from a previous DAG attempt
+   * @param taskIndex Index of task to be recovered
+   * @param previousDAGAttempt Previous DAG Attempt Number
+   * @throws java.lang.Exception
+   */
+  public void recoverTask(int taskIndex, int previousDAGAttempt)  throws Exception {
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitterContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitterContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitterContext.java
index 3132758..b5837e8 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitterContext.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitterContext.java
@@ -18,6 +18,8 @@
 
 package org.apache.tez.runtime.api;
 
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 
 /**
@@ -63,4 +65,12 @@ public interface OutputCommitterContext {
    */
   public byte[] getUserPayload();
 
+  /**
+   * Get Vertex Index in the DAG
+   * @return Vertex index
+   */
+  @Unstable
+  @Evolving
+  public int getVertexIndex();
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-api/src/main/proto/DAGApiRecords.proto
----------------------------------------------------------------------
diff --git a/tez-api/src/main/proto/DAGApiRecords.proto b/tez-api/src/main/proto/DAGApiRecords.proto
index 5faa7f1..c7a317e 100644
--- a/tez-api/src/main/proto/DAGApiRecords.proto
+++ b/tez-api/src/main/proto/DAGApiRecords.proto
@@ -171,13 +171,14 @@ message ProgressProto {
 enum VertexStatusStateProto {
   VERTEX_NEW = 0;
   VERTEX_INITIALIZING = 1;
-  VERTEX_INITED = 2;
-  VERTEX_RUNNING = 3;
-  VERTEX_SUCCEEDED = 4;
-  VERTEX_FAILED = 5;
-  VERTEX_KILLED = 6;
-  VERTEX_ERROR = 7;
-  VERTEX_TERMINATING = 8;
+  VERTEX_RECOVERING = 2;
+  VERTEX_INITED = 3;
+  VERTEX_RUNNING = 4;
+  VERTEX_SUCCEEDED = 5;
+  VERTEX_FAILED = 6;
+  VERTEX_KILLED = 7;
+  VERTEX_ERROR = 8;
+  VERTEX_TERMINATING = 9;
 }
 
 message VertexStatusProto {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java
index 0e693b8..dc24f7f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java
@@ -66,6 +66,8 @@ public class VertexStatusBuilder extends VertexStatus {
         return VertexStatusStateProto.VERTEX_NEW;
       case INITIALIZING:
         return VertexStatusStateProto.VERTEX_INITIALIZING;
+      case RECOVERING:
+        return VertexStatusStateProto.VERTEX_NEW;
       case INITED:
         return VertexStatusStateProto.VERTEX_INITED;
       case RUNNING:

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 1ce07fb..c8185b8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -48,6 +48,8 @@ import org.apache.commons.cli.Options;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
@@ -220,6 +222,8 @@ public class DAGAppMaster extends AbstractService {
   private FileSystem recoveryFS;
   private int recoveryBufferSize;
 
+  protected boolean isLastAMRetry = false;
+
   // DAG Counter
   private final AtomicInteger dagCounter = new AtomicInteger();
 
@@ -262,6 +266,14 @@ public class DAGAppMaster extends AbstractService {
   @Override
   public synchronized void serviceInit(final Configuration conf) throws Exception {
 
+    int maxAppAttempts = 1;
+    String maxAppAttemptsEnv = System.getenv(
+        ApplicationConstants.MAX_APP_ATTEMPTS_ENV);
+    if (maxAppAttemptsEnv != null) {
+      maxAppAttempts = Integer.valueOf(maxAppAttemptsEnv);
+    }
+    isLastAMRetry = appAttemptID.getAttemptId() >= maxAppAttempts;
+
     this.amConf = conf;
     conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
 
@@ -481,6 +493,11 @@ public class DAGAppMaster extends AbstractService {
     }
   }
 
+  public void setCurrentDAG(DAG currentDAG) {
+    this.currentDAG = currentDAG;
+    context.setDAG(currentDAG);
+  }
+
   private class DAGAppMasterEventHandler implements
       EventHandler<DAGAppMasterEvent> {
     @Override
@@ -539,7 +556,7 @@ public class DAGAppMaster extends AbstractService {
   }
 
   /** Create and initialize (but don't start) a single dag. */
-  protected DAG createDAG(DAGPlan dagPB, TezDAGID dagId) {
+  DAGImpl createDAG(DAGPlan dagPB, TezDAGID dagId) {
     if (dagId == null) {
       dagId = TezDAGID.getInstance(appAttemptID.getApplicationId(),
           dagCounter.incrementAndGet());
@@ -566,7 +583,7 @@ public class DAGAppMaster extends AbstractService {
     TokenCache.setSessionToken(sessionToken, dagCredentials);
 
     // create single dag
-    DAG newDag =
+    DAGImpl newDag =
         new DAGImpl(dagId, dagConf, dagPB, dispatcher.getEventHandler(),
             taskAttemptListener, dagCredentials, clock,
             appMasterUgi.getShortUserName(),
@@ -989,6 +1006,7 @@ public class DAGAppMaster extends AbstractService {
         return TezSessionStatus.INITIALIZING;
       case IDLE:
         return TezSessionStatus.READY;
+      case RECOVERING:
       case RUNNING:
         return TezSessionStatus.RUNNING;
       case ERROR:
@@ -1328,6 +1346,31 @@ public class DAGAppMaster extends AbstractService {
     }
   }
 
+  private DAG recoverDAG() throws IOException {
+    DAG recoveredDAG = null;
+    if (recoveryEnabled) {
+      if (this.appAttemptID.getAttemptId() > 1) {
+        this.state = DAGAppMasterState.RECOVERING;
+        RecoveryParser recoveryParser = new RecoveryParser(
+            this, recoveryFS, recoveryDataDir, appAttemptID.getAttemptId());
+        recoveredDAG = recoveryParser.parseRecoveryData();
+        if (recoveredDAG != null) {
+          LOG.info("Found DAG to recover, dagId=" + recoveredDAG.getID());
+          _updateLoggers(recoveredDAG, "");
+          DAGEvent recoverDAGEvent = new DAGEvent(recoveredDAG.getID(),
+              DAGEventType.DAG_RECOVER);
+          dagEventDispatcher.handle(recoverDAGEvent);
+          this.state = DAGAppMasterState.RUNNING;
+        } else {
+          LOG.info("No DAG to recover");
+          this.state = DAGAppMasterState.IDLE;
+        }
+      }
+    }
+    return recoveredDAG;
+  }
+
+  @SuppressWarnings("unchecked")
   @Override
   public synchronized void serviceStart() throws Exception {
 
@@ -1348,18 +1391,18 @@ public class DAGAppMaster extends AbstractService {
     this.lastDAGCompletionTime = clock.getTime();
 
     if (!isSession) {
-      startDAG();
+      DAG recoveredDAG = null;
+      if (appAttemptID.getAttemptId() != 1) {
+        recoveredDAG = recoverDAG();
+      }
+      if (recoveredDAG == null) {
+        dagCounter.set(0);
+        startDAG();
+      }
     } else {
       LOG.info("In Session mode. Waiting for DAG over RPC");
       this.state = DAGAppMasterState.IDLE;
-
-      if (recoveryEnabled) {
-        if (this.appAttemptID.getAttemptId() > 0) {
-          // Recovery data and copy over into new recovery dir
-          this.state = DAGAppMasterState.IDLE;
-          // TODO
-        }
-      }
+      recoverDAG();
 
       this.dagSubmissionTimer = new Timer(true);
       this.dagSubmissionTimer.scheduleAtFixedRate(new TimerTask() {
@@ -1570,6 +1613,9 @@ public class DAGAppMaster extends AbstractService {
       }
 
       appMaster.stop();
+
+
+
     }
   }
 
@@ -1672,4 +1718,8 @@ public class DAGAppMaster extends AbstractService {
   private void sendEvent(Event<?> event) {
     dispatcher.getEventHandler().handle(event);
   }
+
+  synchronized void setDAGCounter(int dagCounter) {
+    this.dagCounter.set(dagCounter);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
new file mode 100644
index 0000000..9e59849
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
@@ -0,0 +1,613 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.dag.Task;
+import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.impl.DAGImpl;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.events.AMLaunchedEvent;
+import org.apache.tez.dag.history.events.AMStartedEvent;
+import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
+import org.apache.tez.dag.history.events.DAGCommitStartedEvent;
+import org.apache.tez.dag.history.events.DAGStartedEvent;
+import org.apache.tez.dag.history.events.DAGFinishedEvent;
+import org.apache.tez.dag.history.events.DAGInitializedEvent;
+import org.apache.tez.dag.history.events.DAGSubmittedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
+import org.apache.tez.dag.history.events.TaskFinishedEvent;
+import org.apache.tez.dag.history.events.TaskStartedEvent;
+import org.apache.tez.dag.history.events.VertexCommitStartedEvent;
+import org.apache.tez.dag.history.events.VertexDataMovementEventsGeneratedEvent;
+import org.apache.tez.dag.history.events.VertexFinishedEvent;
+import org.apache.tez.dag.history.events.VertexInitializedEvent;
+import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
+import org.apache.tez.dag.history.events.VertexStartedEvent;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.recovery.records.RecoveryProtos;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+public class RecoveryParser {
+
+  private static final Log LOG = LogFactory.getLog(RecoveryParser.class);
+
+  private final DAGAppMaster dagAppMaster;
+  private final FileSystem recoveryFS;
+  private final Path recoveryDataDir;
+  private final Path currentAttemptRecoveryDataDir;
+  private final int recoveryBufferSize;
+  private final int currentAttemptId;
+
+  private static final String dataRecoveredFileFlag = "dataRecovered";
+
+  public RecoveryParser(DAGAppMaster dagAppMaster,
+      FileSystem recoveryFS,
+      Path recoveryDataDir,
+      int currentAttemptId) {
+    this.dagAppMaster = dagAppMaster;
+    this.recoveryFS = recoveryFS;
+    this.recoveryDataDir = recoveryDataDir;
+    this.currentAttemptId = currentAttemptId;
+    this.currentAttemptRecoveryDataDir =
+        getAttemptRecoveryDataDir(recoveryDataDir, currentAttemptId);
+    recoveryBufferSize = dagAppMaster.getConfig().getInt(
+        TezConfiguration.DAG_RECOVERY_FILE_IO_BUFFER_SIZE,
+        TezConfiguration.DAG_RECOVERY_FILE_IO_BUFFER_SIZE_DEFAULT);
+  }
+
+  private static void parseSummaryFile(FSDataInputStream inputStream)
+      throws IOException {
+    while (inputStream.available() > 0) {
+      RecoveryProtos.SummaryEventProto proto =
+          RecoveryProtos.SummaryEventProto.parseDelimitedFrom(inputStream);
+      LOG.info("[SUMMARY]"
+          + " dagId=" + proto.getDagId()
+          + ", timestamp=" + proto.getTimestamp()
+          + ", event=" + HistoryEventType.values()[proto.getEventType()]);
+    }
+  }
+
+  private static HistoryEvent getNextEvent(FSDataInputStream inputStream)
+      throws IOException {
+    int eventTypeOrdinal = inputStream.readInt();
+    if (eventTypeOrdinal < 0 || eventTypeOrdinal >=
+        HistoryEventType.values().length) {
+      // Corrupt data
+      // reached end
+      throw new IOException("Corrupt data found when trying to read next event type"
+          + ", eventTypeOrdinal=" + eventTypeOrdinal);
+    }
+    HistoryEventType eventType = HistoryEventType.values()[eventTypeOrdinal];
+    HistoryEvent event;
+    switch (eventType) {
+      case AM_LAUNCHED:
+        event = new AMLaunchedEvent();
+        break;
+      case AM_STARTED:
+        event = new AMStartedEvent();
+        break;
+      case DAG_SUBMITTED:
+        event = new DAGSubmittedEvent();
+        break;
+      case DAG_INITIALIZED:
+        event = new DAGInitializedEvent();
+        break;
+      case DAG_STARTED:
+        event = new DAGStartedEvent();
+        break;
+      case DAG_COMMIT_STARTED:
+        event = new DAGCommitStartedEvent();
+        break;
+      case DAG_FINISHED:
+        event = new DAGFinishedEvent();
+        break;
+      case CONTAINER_LAUNCHED:
+        event = new ContainerLaunchedEvent();
+        break;
+      case VERTEX_INITIALIZED:
+        event = new VertexInitializedEvent();
+        break;
+      case VERTEX_STARTED:
+        event = new VertexStartedEvent();
+        break;
+      case VERTEX_PARALLELISM_UPDATED:
+        event = new VertexParallelismUpdatedEvent();
+        break;
+      case VERTEX_COMMIT_STARTED:
+        event = new VertexCommitStartedEvent();
+        break;
+      case VERTEX_FINISHED:
+        event = new VertexFinishedEvent();
+        break;
+      case TASK_STARTED:
+        event = new TaskStartedEvent();
+        break;
+      case TASK_FINISHED:
+        event = new TaskFinishedEvent();
+        break;
+      case TASK_ATTEMPT_STARTED:
+        event = new TaskAttemptStartedEvent();
+        break;
+      case TASK_ATTEMPT_FINISHED:
+        event = new TaskAttemptFinishedEvent();
+        break;
+      case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
+        event = new VertexDataMovementEventsGeneratedEvent();
+        break;
+      default:
+        throw new IOException("Invalid data found, unknown event type "
+            + eventType);
+
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Parsing event from input stream"
+          + ", eventType=" + eventType);
+    }
+    event.fromProtoStream(inputStream);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Parsed event from input stream"
+          + ", eventType=" + eventType
+          + ", event=" + event.toString());
+    }
+    return event;
+  }
+
+
+
+
+
+  private static void parseDAGRecoveryFile(FSDataInputStream inputStream)
+      throws IOException {
+    while (inputStream.available() > 0) {
+      HistoryEvent historyEvent = getNextEvent(inputStream);
+      LOG.info("Parsed event from recovery stream"
+          + ", eventType=" + historyEvent.getEventType()
+          + ", event=" + historyEvent);
+    }
+  }
+
+  private Path getAttemptRecoveryDataDir(Path recoveryDataDir,
+      int attemptId) {
+    return new Path(recoveryDataDir, Integer.toString(attemptId));
+  }
+
+  public static void main(String argv[]) throws IOException {
+    // TODO clean up with better usage and error handling
+    Configuration conf = new Configuration();
+    String summaryPath = argv[0];
+    List<String> dagPaths = new ArrayList<String>();
+    if (argv.length > 1) {
+      for (int i = 1; i < argv.length; ++i) {
+        dagPaths.add(argv[i]);
+      }
+    }
+    FileSystem fs = FileSystem.get(conf);
+    LOG.info("Parsing Summary file " + summaryPath);
+    parseSummaryFile(fs.open(new Path(summaryPath)));
+    for (String dagPath : dagPaths) {
+      LOG.info("Parsing DAG recovery file " + dagPath);
+      parseDAGRecoveryFile(fs.open(new Path(dagPath)));
+    }
+  }
+
+  private Path getSummaryPath(Path recoveryDataDir) {
+    return new Path(recoveryDataDir,
+        dagAppMaster.getAttemptID().getApplicationId().toString()
+        + TezConfiguration.DAG_RECOVERY_SUMMARY_FILE_SUFFIX);
+  }
+
+  private FSDataOutputStream getSummaryOutputStream(Path summaryPath)
+      throws IOException {
+    return recoveryFS.create(summaryPath, true, recoveryBufferSize);
+  }
+
+  private FSDataInputStream getSummaryStream(Path summaryPath)
+      throws IOException {
+    if (!recoveryFS.exists(summaryPath)) {
+      return null;
+    }
+    return recoveryFS.open(summaryPath, recoveryBufferSize);
+  }
+
+  private Path getDAGRecoveryFilePath(Path recoveryDataDir,
+      TezDAGID dagID) {
+    return new Path(recoveryDataDir,
+        dagID.toString() + TezConfiguration.DAG_RECOVERY_RECOVER_FILE_SUFFIX);
+  }
+
+  private FSDataInputStream getDAGRecoveryStream(Path recoveryDataDir,
+      TezDAGID dagID)
+      throws IOException {
+    Path dagRecoveryPath = getDAGRecoveryFilePath(recoveryDataDir, dagID);
+    if (!recoveryFS.exists(dagRecoveryPath)) {
+      return null;
+    }
+    return recoveryFS.open(dagRecoveryPath, recoveryBufferSize);
+  }
+
+  private FSDataOutputStream getDAGRecoveryOutputStream(Path recoveryDataDir,
+      TezDAGID dagID)
+      throws IOException {
+    Path dagRecoveryPath = new Path(recoveryDataDir,
+        dagID.toString() + TezConfiguration.DAG_RECOVERY_RECOVER_FILE_SUFFIX);
+    return recoveryFS.create(dagRecoveryPath, true, recoveryBufferSize);
+  }
+
+  private TezDAGID getLastInProgressDAG(Map<TezDAGID, Boolean> seenDAGs) {
+    TezDAGID inProgressDAG = null;
+    for (Map.Entry<TezDAGID, Boolean> entry : seenDAGs.entrySet()) {
+      if (!entry.getValue().booleanValue()) {
+        if (inProgressDAG != null) {
+          throw new RuntimeException("Multiple in progress DAGs seen"
+              + ", dagId=" + inProgressDAG
+              + ", dagId=" + entry.getKey());
+        }
+        inProgressDAG = entry.getKey();
+      }
+    }
+    return inProgressDAG;
+  }
+
+  private Path getPreviousAttemptRecoveryDataDir() {
+    int foundPreviousAttempt = -1;
+    for (int i = currentAttemptId - 1; i > 0; --i) {
+      Path attemptPath = getAttemptRecoveryDataDir(recoveryDataDir, i);
+      Path dataRecoveredFile = new Path(attemptPath, dataRecoveredFileFlag);
+      try {
+        if (recoveryFS.exists(dataRecoveredFile)) {
+          foundPreviousAttempt = i;
+          break;
+        }
+      } catch (IOException e) {
+        LOG.warn("Exception when checking previous attempt dir for "
+            + dataRecoveredFile.toString(), e);
+      }
+    }
+    if (foundPreviousAttempt == -1) {
+      LOG.info("Falling back to first attempt as no other recovered attempts"
+          + " found");
+      foundPreviousAttempt = 1;
+    }
+
+    return getAttemptRecoveryDataDir(recoveryDataDir, foundPreviousAttempt);
+  }
+
+
+  public DAG parseRecoveryData() throws IOException {
+    Path previousAttemptRecoveryDataDir = getPreviousAttemptRecoveryDataDir();
+    LOG.info("Using " + previousAttemptRecoveryDataDir.toString()
+        + " for recovering data from previous attempt");
+    if (!recoveryFS.exists(previousAttemptRecoveryDataDir)) {
+      LOG.info("Nothing to recover as previous attempt data does not exist"
+          + ", previousAttemptDir=" + previousAttemptRecoveryDataDir.toString());
+      return null;
+    }
+
+    Path summaryPath = getSummaryPath(previousAttemptRecoveryDataDir);
+    FSDataInputStream summaryStream = getSummaryStream(
+        summaryPath);
+    if (summaryStream == null) {
+      LOG.info("Nothing to recover as summary file does not exist"
+          + ", previousAttemptDir=" + previousAttemptRecoveryDataDir.toString()
+          + ", summaryPath=" + summaryPath.toString());
+      return null;
+    }
+
+    Path newSummaryPath = getSummaryPath(currentAttemptRecoveryDataDir);
+    FSDataOutputStream newSummaryStream =
+        getSummaryOutputStream(newSummaryPath);
+
+    Map<TezDAGID, Boolean> seenDAGs = new TreeMap<TezDAGID, Boolean>();
+
+    FileStatus summaryFileStatus = recoveryFS.getFileStatus(summaryPath);
+    LOG.info("Parsing summary file"
+        + ", path=" + summaryPath.toString()
+        + ", len=" + summaryFileStatus.getLen()
+        + ", lastModTime=" + summaryFileStatus.getModificationTime());
+
+    int dagCounter = 0;
+    while (summaryStream.available() > 0) {
+      RecoveryProtos.SummaryEventProto proto =
+          RecoveryProtos.SummaryEventProto.parseDelimitedFrom(summaryStream);
+      HistoryEventType eventType =
+          HistoryEventType.values()[proto.getEventType()];
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("[RECOVERY SUMMARY]"
+            + " dagId=" + proto.getDagId()
+            + ", timestamp=" + proto.getTimestamp()
+            + ", event=" + eventType);
+      }
+      TezDAGID dagId = TezDAGID.fromString(proto.getDagId());
+      if (dagCounter < dagId.getId()) {
+        dagCounter = dagId.getId();
+      }
+      if (eventType.equals(HistoryEventType.DAG_SUBMITTED)) {
+        seenDAGs.put(dagId, false);
+      } else if (eventType.equals(HistoryEventType.DAG_FINISHED)) {
+        seenDAGs.put(dagId, true);
+      }
+      proto.writeDelimitedTo(newSummaryStream);
+    }
+    newSummaryStream.hsync();
+    newSummaryStream.close();
+
+    // Set counter for next set of DAGs
+    dagAppMaster.setDAGCounter(dagCounter);
+
+    TezDAGID lastInProgressDAG = getLastInProgressDAG(seenDAGs);
+    if (lastInProgressDAG == null) {
+      LOG.info("Nothing to recover as no uncompleted DAGs found");
+      return null;
+    }
+
+    LOG.info("Trying to recover dag from recovery file"
+        + ", dagId=" + lastInProgressDAG.toString()
+        + ", dataDir=" + previousAttemptRecoveryDataDir
+        + ", intoCurrentDir=" + currentAttemptRecoveryDataDir);
+
+    FSDataInputStream dagRecoveryStream = getDAGRecoveryStream(
+        previousAttemptRecoveryDataDir, lastInProgressDAG);
+    if (dagRecoveryStream == null) {
+      // Could not find data to recover
+      // Error out
+      throw new IOException("Could not find recovery data for last in progress DAG"
+          + ", dagId=" + lastInProgressDAG);
+    }
+
+    DAGImpl recoveredDAG = null;
+
+    LOG.info("Copying DAG data into Current Attempt directory"
+        + ", filePath=" + getDAGRecoveryFilePath(currentAttemptRecoveryDataDir,
+        lastInProgressDAG));
+    FSDataOutputStream newDAGRecoveryStream =
+        getDAGRecoveryOutputStream(currentAttemptRecoveryDataDir, lastInProgressDAG);
+
+    while (dagRecoveryStream.available() > 0) {
+      HistoryEvent event;
+      try {
+        event = getNextEvent(dagRecoveryStream);
+      } catch (IOException ioe) {
+        LOG.warn("Corrupt data found when trying to read next event", ioe);
+        break;
+      }
+      if (event == null) {
+        // reached end of data
+        break;
+      }
+      HistoryEventType eventType = event.getEventType();
+      switch (eventType) {
+        case DAG_SUBMITTED:
+        {
+          LOG.info("Recovering from event"
+              + ", eventType=" + eventType
+              + ", event=" + event.toString());
+          recoveredDAG = dagAppMaster.createDAG(((DAGSubmittedEvent) event).getDAGPlan(),
+              lastInProgressDAG);
+          dagAppMaster.setCurrentDAG(recoveredDAG);
+          break;
+        }
+        case DAG_INITIALIZED:
+        {
+          LOG.info("Recovering from event"
+              + ", eventType=" + eventType
+              + ", event=" + event.toString());
+          assert recoveredDAG != null;
+          recoveredDAG.restoreFromEvent(event);
+          break;
+        }
+        case DAG_STARTED:
+        {
+          LOG.info("Recovering from event"
+              + ", eventType=" + eventType
+              + ", event=" + event.toString());
+          assert recoveredDAG != null;
+          recoveredDAG.restoreFromEvent(event);
+          break;
+        }
+        case DAG_COMMIT_STARTED:
+        {
+          LOG.info("Recovering from event"
+              + ", eventType=" + eventType
+              + ", event=" + event.toString());
+          assert recoveredDAG != null;
+          recoveredDAG.restoreFromEvent(event);
+          break;
+        }
+        case DAG_FINISHED:
+        {
+          LOG.info("Recovering from event"
+              + ", eventType=" + eventType
+              + ", event=" + event.toString());
+          // If this is seen, nothing to recover
+          assert recoveredDAG != null;
+          recoveredDAG.restoreFromEvent(event);
+          return recoveredDAG;
+        }
+        case CONTAINER_LAUNCHED:
+        {
+          // Nothing to do?
+          break;
+        }
+        case VERTEX_INITIALIZED:
+        {
+          LOG.info("Recovering from event"
+              + ", eventType=" + eventType
+              + ", event=" + event.toString());
+          assert recoveredDAG != null;
+          VertexInitializedEvent vEvent = (VertexInitializedEvent) event;
+          Vertex v = recoveredDAG.getVertex(vEvent.getVertexID());
+          v.restoreFromEvent(vEvent);
+          break;
+        }
+        case VERTEX_STARTED:
+        {
+          LOG.info("Recovering from event"
+              + ", eventType=" + eventType
+              + ", event=" + event.toString());
+          assert recoveredDAG != null;
+          VertexStartedEvent vEvent = (VertexStartedEvent) event;
+          Vertex v = recoveredDAG.getVertex(vEvent.getVertexID());
+          v.restoreFromEvent(vEvent);
+          break;
+        }
+        case VERTEX_PARALLELISM_UPDATED:
+        {
+          LOG.info("Recovering from event"
+              + ", eventType=" + eventType
+              + ", event=" + event.toString());
+          assert recoveredDAG != null;
+          VertexParallelismUpdatedEvent vEvent = (VertexParallelismUpdatedEvent) event;
+          Vertex v = recoveredDAG.getVertex(vEvent.getVertexID());
+          v.restoreFromEvent(vEvent);
+          break;
+        }
+        case VERTEX_COMMIT_STARTED:
+        {
+          LOG.info("Recovering from event"
+              + ", eventType=" + eventType
+              + ", event=" + event.toString());
+          assert recoveredDAG != null;
+          VertexCommitStartedEvent vEvent = (VertexCommitStartedEvent) event;
+          Vertex v = recoveredDAG.getVertex(vEvent.getVertexID());
+          v.restoreFromEvent(vEvent);
+          break;
+        }
+        case VERTEX_FINISHED:
+        {
+          LOG.info("Recovering from event"
+              + ", eventType=" + eventType
+              + ", event=" + event.toString());
+          assert recoveredDAG != null;
+          VertexFinishedEvent vEvent = (VertexFinishedEvent) event;
+          Vertex v = recoveredDAG.getVertex(vEvent.getVertexID());
+          v.restoreFromEvent(vEvent);
+          break;
+        }
+        case TASK_STARTED:
+        {
+          LOG.info("Recovering from event"
+              + ", eventType=" + eventType
+              + ", event=" + event.toString());
+          assert recoveredDAG != null;
+          TaskStartedEvent tEvent = (TaskStartedEvent) event;
+          Task task = recoveredDAG.getVertex(
+              tEvent.getTaskID().getVertexID()).getTask(tEvent.getTaskID());
+          task.restoreFromEvent(tEvent);
+          break;
+        }
+        case TASK_FINISHED:
+        {
+          LOG.info("Recovering from event"
+              + ", eventType=" + eventType
+              + ", event=" + event.toString());
+          assert recoveredDAG != null;
+          TaskFinishedEvent tEvent = (TaskFinishedEvent) event;
+          Task task = recoveredDAG.getVertex(
+              tEvent.getTaskID().getVertexID()).getTask(tEvent.getTaskID());
+          task.restoreFromEvent(tEvent);
+          break;
+        }
+        case TASK_ATTEMPT_STARTED:
+        {
+          LOG.info("Recovering from event"
+              + ", eventType=" + eventType
+              + ", event=" + event.toString());
+          assert recoveredDAG != null;
+          TaskAttemptStartedEvent tEvent = (TaskAttemptStartedEvent) event;
+          Task task =
+              recoveredDAG.getVertex(
+                  tEvent.getTaskAttemptID().getTaskID().getVertexID())
+                      .getTask(tEvent.getTaskAttemptID().getTaskID());
+          task.restoreFromEvent(tEvent);
+          break;
+        }
+        case TASK_ATTEMPT_FINISHED:
+        {
+          LOG.info("Recovering from event"
+              + ", eventType=" + eventType
+              + ", event=" + event.toString());
+          assert recoveredDAG != null;
+          TaskAttemptFinishedEvent tEvent = (TaskAttemptFinishedEvent) event;
+          Task task =
+              recoveredDAG.getVertex(
+                  tEvent.getTaskAttemptID().getTaskID().getVertexID())
+                  .getTask(tEvent.getTaskAttemptID().getTaskID());
+          task.restoreFromEvent(tEvent);
+          break;
+        }
+        case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
+        {
+          LOG.info("Recovering from event"
+              + ", eventType=" + eventType
+              + ", event=" + event.toString());
+          assert recoveredDAG != null;
+          VertexDataMovementEventsGeneratedEvent vEvent =
+              (VertexDataMovementEventsGeneratedEvent) event;
+          Vertex v = recoveredDAG.getVertex(vEvent.getVertexID());
+          v.restoreFromEvent(vEvent);
+          break;
+        }
+        default:
+          throw new RuntimeException("Invalid data found, unknown event type "
+              + eventType);
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("[DAG RECOVERY]"
+            + " dagId=" + lastInProgressDAG
+            + ", eventType=" + eventType
+            + ", event=" + event.toString());
+      }
+      newDAGRecoveryStream.writeInt(eventType.ordinal());
+      event.toProtoStream(newDAGRecoveryStream);
+    }
+    newDAGRecoveryStream.hsync();
+    newDAGRecoveryStream.close();
+
+    Path dataCopiedFlagPath = new Path(currentAttemptRecoveryDataDir,
+        dataRecoveredFileFlag);
+    LOG.info("Finished copying data from previous attempt into current attempt"
+        + " - setting flag by creating file"
+        + ", path=" + dataCopiedFlagPath.toString());
+    FSDataOutputStream flagFile =
+        recoveryFS.create(dataCopiedFlagPath, true, recoveryBufferSize);
+    flagFile.writeInt(1);
+    flagFile.hsync();
+    flagFile.close();
+
+    return recoveredDAG;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
index 8117619..45fb50a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
@@ -31,6 +31,7 @@ import org.apache.tez.dag.api.client.DAGStatusBuilder;
 import org.apache.tez.dag.api.client.StatusGetOpts;
 import org.apache.tez.dag.api.client.VertexStatusBuilder;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezVertexID;
 
@@ -84,4 +85,6 @@ public interface DAG {
   Credentials getCredentials();
   
   UserGroupInformation getDagUGI();
+
+  DAGState restoreFromEvent(HistoryEvent historyEvent);
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
index 8243b70..ac96681 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
@@ -24,6 +24,8 @@ import java.util.Map;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.oldrecords.TaskReport;
 import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.events.TaskStartedEvent;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.runtime.api.impl.TezEvent;
@@ -80,4 +82,5 @@ public interface Task {
   
   public List<String> getDiagnostics();
 
+  TaskState restoreFromEvent(HistoryEvent historyEvent);
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
index 0cc9163..2af1232 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
@@ -27,6 +27,7 @@ import org.apache.tez.common.counters.DAGCounter;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptReport;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
@@ -36,7 +37,7 @@ import org.apache.tez.dag.records.TezVertexID;
  * Read only view of TaskAttempt.
  */
 public interface TaskAttempt {
-  
+
   public static class TaskAttemptStatus {
     public TaskAttemptState state;
     public DAGCounter localityCounter;
@@ -118,4 +119,7 @@ public interface TaskAttempt {
   public Task getTask();
   
   public boolean getIsRescheduled();
+
+  TaskAttemptState restoreFromEvent(HistoryEvent event);
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index 9e7a0a7..807e9b4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -37,6 +37,8 @@ import org.apache.tez.dag.api.client.VertexStatusBuilder;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.dag.impl.Edge;
 import org.apache.tez.dag.app.dag.impl.RootInputLeafOutputDescriptor;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.events.VertexInitializedEvent;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.runtime.api.OutputCommitter;
@@ -114,4 +116,7 @@ public interface Vertex extends Comparable<Vertex> {
   // TODO remove this once RootInputVertexManager is fixed to not use
   // internal apis
   AppContext getAppContext();
+
+  VertexState restoreFromEvent(HistoryEvent event);
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexState.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexState.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexState.java
index 5a7af0a..7130c7a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexState.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexState.java
@@ -27,4 +27,5 @@ public enum VertexState {
   KILLED,
   ERROR,
   TERMINATING,
+  RECOVERING,
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java
index 476c688..741dcfa 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java
@@ -26,7 +26,7 @@ public enum DAGEventType {
   //Producer:Client
   DAG_KILL,
 
-  //Producer:MRAppMaster
+  //Producer:AM
   DAG_INIT,
   DAG_START,
 
@@ -44,4 +44,8 @@ public enum DAGEventType {
   DAG_DIAGNOSTIC_UPDATE,
   INTERNAL_ERROR,
   DAG_COUNTER_UPDATE,
+
+  // Event to trigger recovery
+  // Producer:AM
+  DAG_RECOVER
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
index 018bd3d..db3fd3b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
@@ -60,5 +60,8 @@ public enum TaskAttemptEventType {
   
   // Producer: consumer destination vertex
   TA_OUTPUT_FAILED,
+
+  // Recovery
+  TA_RECOVER,
   
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventRecoverTask.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventRecoverTask.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventRecoverTask.java
new file mode 100644
index 0000000..e7e59e3
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventRecoverTask.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.records.TezTaskID;
+
+public class TaskEventRecoverTask extends TaskEvent {
+
+  TaskState desiredState;
+
+  public TaskEventRecoverTask(TezTaskID taskID, TaskState desiredState) {
+    super(taskID, TaskEventType.T_RECOVER);
+    this.desiredState = desiredState;
+  }
+
+  public TaskEventRecoverTask(TezTaskID taskID) {
+    this(taskID, null);
+  }
+
+  public TaskState getDesiredState() {
+    return desiredState;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java
index a0b99a9..4830ae0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java
@@ -40,5 +40,9 @@ public enum TaskEventType {
   T_ATTEMPT_OUTPUT_CONSUMABLE,
   T_ATTEMPT_FAILED,
   T_ATTEMPT_SUCCEEDED,
-  T_ATTEMPT_KILLED
+  T_ATTEMPT_KILLED,
+
+  // Recovery event
+  T_RECOVER
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRecoverVertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRecoverVertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRecoverVertex.java
new file mode 100644
index 0000000..34e45fe
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRecoverVertex.java
@@ -0,0 +1,36 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.dag.app.dag.VertexState;
+import org.apache.tez.dag.records.TezVertexID;
+
+public class VertexEventRecoverVertex extends VertexEvent {
+
+  VertexState desiredState;
+
+  public VertexEventRecoverVertex(TezVertexID vertexId, VertexState desiredState) {
+    super(vertexId, VertexEventType.V_RECOVER);
+    this.desiredState = desiredState;
+  }
+
+  public VertexState getDesiredState() {
+    return desiredState;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRouteEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRouteEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRouteEvent.java
index a872ae2..69195db 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRouteEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRouteEvent.java
@@ -27,13 +27,25 @@ public class VertexEventRouteEvent extends VertexEvent {
   
   final List<TezEvent> events;
 
+  final boolean recovered;
+
   public VertexEventRouteEvent(TezVertexID vertexId, List<TezEvent> events) {
+    this(vertexId, events, false);
+  }
+
+  public VertexEventRouteEvent(TezVertexID vertexId, List<TezEvent> events,
+      boolean recovered) {
     super(vertexId, VertexEventType.V_ROUTE_EVENT);
     this.events = events;
+    this.recovered = recovered;
   }
-  
+
   public List<TezEvent> getEvents() {
     return events;
   }
 
+  public boolean isRecovered() {
+    return recovered;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventSourceVertexRecovered.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventSourceVertexRecovered.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventSourceVertexRecovered.java
new file mode 100644
index 0000000..5e61369
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventSourceVertexRecovered.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.dag.app.dag.VertexState;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+
+import java.util.List;
+
+public class VertexEventSourceVertexRecovered extends VertexEvent {
+
+  VertexState sourceVertexState;
+  TezVertexID sourceVertexID;
+  List<TezTaskAttemptID> completedTaskAttempts;
+
+  public VertexEventSourceVertexRecovered(TezVertexID vertexID,
+      TezVertexID sourceVertexID,
+      VertexState sourceVertexState,
+      List<TezTaskAttemptID> completedTaskAttempts) {
+    super(vertexID, VertexEventType.V_SOURCE_VERTEX_RECOVERED);
+    this.sourceVertexState = sourceVertexState;
+    this.sourceVertexID = sourceVertexID;
+    this.completedTaskAttempts = completedTaskAttempts;
+  }
+
+  public VertexState getSourceVertexState() {
+    return sourceVertexState;
+  }
+
+  public TezVertexID getSourceVertexID() {
+    return sourceVertexID;
+  }
+
+  public List<TezTaskAttemptID> getCompletedTaskAttempts() {
+    return completedTaskAttempts;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
index 0cf14eb..69952d9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
@@ -56,5 +56,11 @@ public enum VertexEventType {
   //Producer: VertexInputInitializer
   V_ROOT_INPUT_INITIALIZED,
   V_ROOT_INPUT_FAILED,
-  
+
+  // Recover Event, Producer:DAG
+  V_RECOVER,
+
+  // Recover Event, Producer:Vertex
+  V_SOURCE_VERTEX_RECOVERED
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 04ed223..432c189 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -90,12 +90,15 @@ import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDAGFinished;
 import org.apache.tez.dag.app.dag.event.DAGEventVertexReRunning;
 import org.apache.tez.dag.app.dag.event.VertexEvent;
+import org.apache.tez.dag.app.dag.event.VertexEventRecoverVertex;
 import org.apache.tez.dag.app.dag.event.VertexEventType;
 import org.apache.tez.dag.app.dag.event.VertexEventTermination;
 import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.events.DAGCommitStartedEvent;
+import org.apache.tez.dag.history.events.DAGStartedEvent;
 import org.apache.tez.dag.history.events.DAGFinishedEvent;
 import org.apache.tez.dag.history.events.DAGInitializedEvent;
-import org.apache.tez.dag.history.events.DAGStartedEvent;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.dag.utils.TezBuilderUtils;
@@ -157,6 +160,10 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
 
   private final List<String> diagnostics = new ArrayList<String>();
 
+  // Recovery related flags
+  boolean recoveryInitEventSeen = false;
+  boolean recoveryStartEventSeen = false;
+
   private static final DiagnosticsUpdateTransition
       DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition();
   private static final InternalErrorTransition
@@ -176,6 +183,12 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
           .addTransition(DAGState.NEW, DAGState.NEW,
               DAGEventType.DAG_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
+          .addTransition(DAGState.NEW,
+              EnumSet.of(DAGState.NEW, DAGState.INITED, DAGState.RUNNING,
+                  DAGState.SUCCEEDED, DAGState.FAILED, DAGState.KILLED,
+                  DAGState.ERROR, DAGState.TERMINATING),
+              DAGEventType.DAG_RECOVER,
+              new RecoverTransition())
           .addTransition(DAGState.NEW, DAGState.NEW,
               DAGEventType.DAG_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
           .addTransition
@@ -342,7 +355,9 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   
   Map<String, VertexGroupInfo> vertexGroups = Maps.newHashMap();
   Map<String, List<VertexGroupInfo>> vertexGroupInfo = Maps.newHashMap();
-  
+  private DAGState recoveredState = DAGState.NEW;
+  private boolean recoveryCommitInProgress = false;
+
   static class VertexGroupInfo {
     String groupName;
     Set<String> groupMembers;
@@ -466,6 +481,46 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   }
 
   @Override
+  public DAGState restoreFromEvent(HistoryEvent historyEvent) {
+    switch (historyEvent.getEventType()) {
+      case DAG_INITIALIZED:
+        recoveredState = initializeDAG((DAGInitializedEvent) historyEvent);
+        recoveryInitEventSeen = true;
+        return recoveredState;
+      case DAG_STARTED:
+        if (!recoveryInitEventSeen) {
+          throw new RuntimeException("Started Event seen but"
+              + " no Init Event was encountered earlier");
+        }
+        recoveryStartEventSeen = true;
+        this.startTime = ((DAGStartedEvent) historyEvent).getStartTime();
+        recoveredState = DAGState.RUNNING;
+        return recoveredState;
+      case DAG_COMMIT_STARTED:
+        if (recoveredState != DAGState.RUNNING) {
+          throw new RuntimeException("Commit Started Event seen but"
+              + " recovered state is not RUNNING"
+              + ", recoveredState=" + recoveredState);
+        }
+        recoveryCommitInProgress = true;
+        return recoveredState;
+      case DAG_FINISHED:
+        if (!recoveryStartEventSeen) {
+          throw new RuntimeException("Finished Event seen but"
+              + " no Start Event was encountered earlier");
+        }
+        recoveryCommitInProgress = false;
+        DAGFinishedEvent finishedEvent = (DAGFinishedEvent) historyEvent;
+        this.finishTime = finishedEvent.getFinishTime();
+        recoveredState = finishedEvent.getState();
+        return recoveredState;
+      default:
+        throw new RuntimeException("Unexpected event received for restoring"
+            + " state, eventType=" + historyEvent.getEventType());
+    }
+  }
+
+  @Override
   public TezCounters getAllCounters() {
 
     readLock.lock();
@@ -666,6 +721,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     boolean failedWhileCommitting = false;
     if (dagSucceeded && !successfulOutputsAlreadyCommitted) {
       // commit all shared outputs
+      appContext.getHistoryHandler().handle(new DAGHistoryEvent(getID(),
+          new DAGCommitStartedEvent(getID())));
       for (VertexGroupInfo groupInfo : vertexGroups.values()) {
         if (failedWhileCommitting) {
           break;
@@ -820,7 +877,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   void logJobHistoryFinishedEvent() {
     this.setFinishTime();
     DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, startTime,
-        finishTime, DAGStatus.State.SUCCEEDED, "", getAllCounters());
+        finishTime, DAGState.SUCCEEDED, "", getAllCounters());
     this.appContext.getHistoryHandler().handle(
         new DAGHistoryEvent(dagId, finishEvt));
   }
@@ -839,7 +896,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
         new DAGHistoryEvent(dagId, startEvt));
   }
 
-  void logJobHistoryUnsuccesfulEvent(DAGStatus.State state) {
+  void logJobHistoryUnsuccesfulEvent(DAGState state) {
     DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, startTime,
         clock.getTime(), state,
         StringUtils.join(LINE_SEPARATOR, getDiagnostics()),
@@ -947,11 +1004,10 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       trySetTerminationCause(DAGTerminationCause.COMMIT_FAILURE);
     }
     
-    DAGStatus.State logState = getDAGStatusFromState(finalState);
-    if (logState == DAGStatus.State.SUCCEEDED) {
+    if (finalState == DAGState.SUCCEEDED) {
       logJobHistoryFinishedEvent();
     } else {
-      logJobHistoryUnsuccesfulEvent(logState);
+      logJobHistoryUnsuccesfulEvent(finalState);
     }
     
     eventHandler.handle(new DAGAppMasterEventDAGFinished(getID(), finalState));
@@ -1047,179 +1103,294 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     return null;
   }
 
-  // TODO Recovery
-  /*
-  @Override
-  public List<AMInfo> getAMInfos() {
-    return amInfos;
+  public DAGState initializeDAG() {
+    return initializeDAG(null);
   }
-  */
-  
-  private static class InitTransition
-      implements MultipleArcTransition<DAGImpl, DAGEvent, DAGState> {
 
-    /**
-     * Note that this transition method is called directly (and synchronously)
-     * by MRAppMaster's init() method (i.e., no RPC, no thread-switching;
-     * just plain sequential call within AM context), so we can trigger
-     * modifications in AM state from here (at least, if AM is written that
-     * way; MR version is).
-     */
-    @Override
-    public DAGState transition(DAGImpl dag, DAGEvent event) {
-      // TODO Metrics
-      //dag.metrics.submittedJob(dag);
-      //dag.metrics.preparingJob(dag);
+  DAGState initializeDAG(DAGInitializedEvent event) {
+    if (event != null) {
+      initTime = event.getInitTime();
+    } else {
+      initTime = clock.getTime();
+    }
 
-      dag.initTime = dag.clock.getTime();
-      dag.commitAllOutputsOnSuccess = dag.conf.getBoolean(
-          TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
-          TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS_DEFAULT);
+    commitAllOutputsOnSuccess = conf.getBoolean(
+        TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
+        TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS_DEFAULT);
+
+    // If we have no vertices, fail the dag
+    numVertices = getJobPlan().getVertexCount();
+    if (numVertices == 0) {
+      addDiagnostic("No vertices for dag");
+      trySetTerminationCause(DAGTerminationCause.ZERO_VERTICES);
+      if (event != null) {
+        return DAGState.FAILED;
+      }
+      return finished(DAGState.FAILED);
+    }
 
-      // If we have no vertices, fail the dag
-      dag.numVertices = dag.getJobPlan().getVertexCount();
-      if (dag.numVertices == 0) {
-        dag.addDiagnostic("No vertices for dag");
-        dag.trySetTerminationCause(DAGTerminationCause.ZERO_VERTICES);
-        return dag.finished(DAGState.FAILED);
+    if (jobPlan.getVertexGroupsCount() > 0) {
+      for (PlanVertexGroupInfo groupInfo : jobPlan.getVertexGroupsList()) {
+        vertexGroups.put(groupInfo.getGroupName(), new VertexGroupInfo(groupInfo));
       }
-      
-      if (dag.jobPlan.getVertexGroupsCount() > 0) {
-        for (PlanVertexGroupInfo groupInfo : dag.jobPlan.getVertexGroupsList()) {
-          dag.vertexGroups.put(groupInfo.getGroupName(), new VertexGroupInfo(groupInfo));
-        }
-        for (VertexGroupInfo groupInfo : dag.vertexGroups.values()) {
-          for (String vertexName : groupInfo.groupMembers) {
-            List<VertexGroupInfo> groupList = dag.vertexGroupInfo.get(vertexName);
-            if (groupList == null) {
-              groupList = Lists.newLinkedList();
-              dag.vertexGroupInfo.put(vertexName, groupList);
-            }
-            groupList.add(groupInfo);
+      for (VertexGroupInfo groupInfo : vertexGroups.values()) {
+        for (String vertexName : groupInfo.groupMembers) {
+          List<VertexGroupInfo> groupList = vertexGroupInfo.get(vertexName);
+          if (groupList == null) {
+            groupList = Lists.newLinkedList();
+            vertexGroupInfo.put(vertexName, groupList);
           }
+          groupList.add(groupInfo);
         }
       }
+    }
 
-      // create the vertices`
-      for (int i=0; i < dag.numVertices; ++i) {
-        String vertexName = dag.getJobPlan().getVertex(i).getName();
-        VertexImpl v = createVertex(dag, vertexName, i);
-        dag.addVertex(v);
-      }
+    // create the vertices`
+    for (int i=0; i < numVertices; ++i) {
+      String vertexName = getJobPlan().getVertex(i).getName();
+      VertexImpl v = createVertex(this, vertexName, i);
+      addVertex(v);
+    }
 
-      createDAGEdges(dag);
-      Map<String,EdgePlan> edgePlans = DagTypeConverters.createEdgePlanMapFromDAGPlan(dag.getJobPlan().getEdgeList());
+    createDAGEdges(this);
+    Map<String,EdgePlan> edgePlans = DagTypeConverters.createEdgePlanMapFromDAGPlan(getJobPlan().getEdgeList());
 
-      // setup the dag
-      for (Vertex v : dag.vertices.values()) {
-        parseVertexEdges(dag, edgePlans, v);
-      }
+    // setup the dag
+    for (Vertex v : vertices.values()) {
+      parseVertexEdges(this, edgePlans, v);
+    }
 
-      // Initialize the edges, now that the payload and vertices have been set.
-      for (Edge e : dag.edges.values()) {
-        e.initialize();
-      }
+    // Initialize the edges, now that the payload and vertices have been set.
+    for (Edge e : edges.values()) {
+      e.initialize();
+    }
 
-      assignDAGScheduler(dag);
-      
-      for (Map.Entry<String, VertexGroupInfo> entry : dag.vertexGroups.entrySet()) {
-        String groupName = entry.getKey();
-        VertexGroupInfo groupInfo = entry.getValue();
-        if (!groupInfo.outputs.isEmpty()) {
-          // shared outputs
-          for (String vertexName : groupInfo.groupMembers) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Setting shared outputs for group: " + groupName + 
-                  " on vertex: " + vertexName);
-            }
-            Vertex v = dag.getVertex(vertexName);
-            v.addSharedOutputs(groupInfo.outputs);
+    assignDAGScheduler(this);
+
+    for (Map.Entry<String, VertexGroupInfo> entry : vertexGroups.entrySet()) {
+      String groupName = entry.getKey();
+      VertexGroupInfo groupInfo = entry.getValue();
+      if (!groupInfo.outputs.isEmpty()) {
+        // shared outputs
+        for (String vertexName : groupInfo.groupMembers) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Setting shared outputs for group: " + groupName +
+                " on vertex: " + vertexName);
           }
+          Vertex v = getVertex(vertexName);
+          v.addSharedOutputs(groupInfo.outputs);
         }
       }
+    }
+    return DAGState.INITED;
+  }
 
-      // TODO Metrics
-      //dag.metrics.endPreparingJob(dag);
-      dag.logJobHistoryInitedEvent();
-      return DAGState.INITED;
-
+  private void createDAGEdges(DAGImpl dag) {
+    for (EdgePlan edgePlan : dag.getJobPlan().getEdgeList()) {
+      EdgeProperty edgeProperty = DagTypeConverters
+          .createEdgePropertyMapFromDAGPlan(edgePlan);
+
+      // If CUSTOM without an edge manager, setup a fake edge manager. Avoid
+      // referencing the fake edge manager within the API module.
+      if (edgeProperty.getDataMovementType() == DataMovementType.CUSTOM
+          && edgeProperty.getEdgeManagerDescriptor() == null) {
+        EdgeManagerDescriptor edgeDesc = new EdgeManagerDescriptor(
+            NullEdgeManager.class.getName());
+        EdgeProperty ep = new EdgeProperty(edgeDesc, edgeProperty.getDataSourceType(),
+            edgeProperty.getSchedulingType(), edgeProperty.getEdgeSource(),
+            edgeProperty.getEdgeDestination());
+        edgeProperty = ep;
+      }
 
+      // edge manager may be also set via API when using custom edge type
+      dag.edges.put(edgePlan.getId(),
+          new Edge(edgeProperty, dag.getEventHandler()));
     }
+  }
 
-    private void createDAGEdges(DAGImpl dag) {
-      for (EdgePlan edgePlan : dag.getJobPlan().getEdgeList()) {
-        EdgeProperty edgeProperty = DagTypeConverters
-            .createEdgePropertyMapFromDAGPlan(edgePlan);
-        
-        // If CUSTOM without an edge manager, setup a fake edge manager. Avoid
-        // referencing the fake edge manager within the API module.
-        if (edgeProperty.getDataMovementType() == DataMovementType.CUSTOM
-            && edgeProperty.getEdgeManagerDescriptor() == null) {
-          EdgeManagerDescriptor edgeDesc = new EdgeManagerDescriptor(
-              NullEdgeManager.class.getName());
-          EdgeProperty ep = new EdgeProperty(edgeDesc, edgeProperty.getDataSourceType(),
-              edgeProperty.getSchedulingType(), edgeProperty.getEdgeSource(),
-              edgeProperty.getEdgeDestination());
-          edgeProperty = ep;
-        }
-        
-        // edge manager may be also set via API when using custom edge type
-        dag.edges.put(edgePlan.getId(),
-            new Edge(edgeProperty, dag.getEventHandler()));
-      }
+  private static void assignDAGScheduler(DAGImpl dag) {
+    LOG.info("Using Natural order dag scheduler");
+    dag.dagScheduler = new DAGSchedulerNaturalOrder(dag, dag.eventHandler);
+  }
+
+  private static VertexImpl createVertex(DAGImpl dag, String vertexName, int vId) {
+    TezVertexID vertexId = TezBuilderUtils.newVertexID(dag.getID(), vId);
+
+    VertexPlan vertexPlan = dag.getJobPlan().getVertex(vId);
+    VertexLocationHint vertexLocationHint = DagTypeConverters
+        .convertFromDAGPlan(vertexPlan.getTaskLocationHintList());
+
+    VertexImpl v = new VertexImpl(
+        vertexId, vertexPlan, vertexName, dag.conf,
+        dag.eventHandler, dag.taskAttemptListener,
+        dag.clock, dag.taskHeartbeatHandler,
+        !dag.commitAllOutputsOnSuccess, dag.appContext, vertexLocationHint,
+        dag.vertexGroups);
+    return v;
+  }
+
+  // hooks up this VertexImpl to input and output EdgeProperties
+  private static void parseVertexEdges(DAGImpl dag, Map<String, EdgePlan> edgePlans, Vertex vertex) {
+    VertexPlan vertexPlan = vertex.getVertexPlan();
+
+    Map<Vertex, Edge> inVertices =
+        new HashMap<Vertex, Edge>();
+
+    Map<Vertex, Edge> outVertices =
+        new HashMap<Vertex, Edge>();
+
+    for(String inEdgeId : vertexPlan.getInEdgeIdList()){
+      EdgePlan edgePlan = edgePlans.get(inEdgeId);
+      Vertex inVertex = dag.vertexMap.get(edgePlan.getInputVertexName());
+      Edge edge = dag.edges.get(inEdgeId);
+      edge.setSourceVertex(inVertex);
+      edge.setDestinationVertex(vertex);
+      inVertices.put(inVertex, edge);
     }
 
-    private void assignDAGScheduler(DAGImpl dag) {
-      LOG.info("Using Natural order dag scheduler");
-      dag.dagScheduler = new DAGSchedulerNaturalOrder(dag, dag.eventHandler);
+    for(String outEdgeId : vertexPlan.getOutEdgeIdList()){
+      EdgePlan edgePlan = edgePlans.get(outEdgeId);
+      Vertex outVertex = dag.vertexMap.get(edgePlan.getOutputVertexName());
+      Edge edge = dag.edges.get(outEdgeId);
+      edge.setSourceVertex(vertex);
+      edge.setDestinationVertex(outVertex);
+      outVertices.put(outVertex, edge);
     }
 
-    private VertexImpl createVertex(DAGImpl dag, String vertexName, int vId) {
-      TezVertexID vertexId = TezBuilderUtils.newVertexID(dag.getID(), vId);
+    vertex.setInputVertices(inVertices);
+    vertex.setOutputVertices(outVertices);
+  }
 
-      VertexPlan vertexPlan = dag.getJobPlan().getVertex(vId);
-      VertexLocationHint vertexLocationHint = DagTypeConverters
-          .convertFromDAGPlan(vertexPlan.getTaskLocationHintList());
+  private static class RecoverTransition
+      implements MultipleArcTransition<DAGImpl, DAGEvent, DAGState> {
+
+    @Override
+    public DAGState transition(DAGImpl dag, DAGEvent dagEvent) {
+      switch (dag.recoveredState) {
+        case NEW:
+          // send DAG an Init and start events
+          dag.eventHandler.handle(new DAGEvent(dag.getID(), DAGEventType.DAG_INIT));
+          dag.eventHandler.handle(new DAGEvent(dag.getID(), DAGEventType.DAG_START));
+          return DAGState.NEW;
+        case INITED:
+          // DAG inited but not started
+          // This implies vertices need to be sent init event
+          // Root vertices need to be sent start event
+          // The vertices may already have been sent these events but the
+          // DAG start may not have been persisted
+          for (Vertex v : dag.vertices.values()) {
+            if (v.getInputVerticesCount() == 0) {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Sending Running Recovery event to root vertex "
+                    + v.getName());
+              }
+              dag.eventHandler.handle(new VertexEventRecoverVertex(v.getVertexId(),
+                  VertexState.RUNNING));
+            }
+          }
+          return DAGState.RUNNING;
+        case RUNNING:
+          // if commit is in progress, DAG should fail as commits are not
+          // recoverable
+          if (dag.recoveryCommitInProgress) {
+            // Fail the DAG as we have not seen a completion
+            dag.trySetTerminationCause(DAGTerminationCause.COMMIT_FAILURE);
+            dag.setFinishTime();
+            // Recover all other data for all vertices
+            // send recover event to all vertices with a final end state
+            for (Vertex v : dag.vertices.values()) {
+              VertexState desiredState = VertexState.SUCCEEDED;
+              if (dag.recoveredState.equals(DAGState.KILLED)) {
+                desiredState = VertexState.KILLED;
+              } else if (EnumSet.of(DAGState.ERROR, DAGState.FAILED).contains(
+                  dag.recoveredState)) {
+                desiredState = VertexState.FAILED;
+              }
+              dag.eventHandler.handle(new VertexEventRecoverVertex(v.getVertexId(),
+                  desiredState));
+            }
+            dag.logJobHistoryUnsuccesfulEvent(DAGState.FAILED);
+            dag.eventHandler.handle(new DAGAppMasterEventDAGFinished(dag.getID(),
+                DAGState.FAILED));
+            return DAGState.FAILED;
+          }
+          for (Vertex v : dag.vertices.values()) {
+            if (v.getInputVerticesCount() == 0) {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Sending Running Recovery event to root vertex "
+                    + v.getName());
+              }
+              dag.eventHandler.handle(new VertexEventRecoverVertex(v.getVertexId(),
+                  VertexState.RUNNING));
+            }
+          }
+          return DAGState.RUNNING;
+        case SUCCEEDED:
+        case ERROR:
+        case FAILED:
+        case KILLED:
+          // Completed
+
+          // Recover all other data for all vertices
+          // send recover event to all vertices with a final end state
+          for (Vertex v : dag.vertices.values()) {
+            VertexState desiredState = VertexState.SUCCEEDED;
+            if (dag.recoveredState.equals(DAGState.KILLED)) {
+              desiredState = VertexState.KILLED;
+            } else if (EnumSet.of(DAGState.ERROR, DAGState.FAILED).contains(
+                dag.recoveredState)) {
+              desiredState = VertexState.FAILED;
+            }
+            dag.eventHandler.handle(new VertexEventRecoverVertex(v.getVertexId(),
+                desiredState));
+          }
 
-      VertexImpl v = new VertexImpl(
-          vertexId, vertexPlan, vertexName, dag.conf,
-          dag.eventHandler, dag.taskAttemptListener, 
-          dag.clock, dag.taskHeartbeatHandler,
-          !dag.commitAllOutputsOnSuccess, dag.appContext, vertexLocationHint,
-          dag.vertexGroups);
-      return v;
+          // Let us inform AM of completion
+          dag.eventHandler.handle(new DAGAppMasterEventDAGFinished(dag.getID(),
+              dag.recoveredState));
+
+          LOG.info("Recovered DAG: " + dag.getID() + " finished with state: "
+              + dag.recoveredState);
+          return dag.recoveredState;
+        default:
+          // Error state
+          LOG.warn("Trying to recover DAG, failed to recover"
+              + " from non-handled state" + dag.recoveredState);
+          dag.eventHandler.handle(new DAGAppMasterEventDAGFinished(dag.getID(),
+              DAGState.ERROR));
+          return DAGState.ERROR;
+      }
     }
 
-    // hooks up this VertexImpl to input and output EdgeProperties
-    private void parseVertexEdges(DAGImpl dag, Map<String, EdgePlan> edgePlans, Vertex vertex) {
-      VertexPlan vertexPlan = vertex.getVertexPlan();
+  }
 
-      Map<Vertex, Edge> inVertices =
-          new HashMap<Vertex, Edge>();
+  private static class InitTransition
+      implements MultipleArcTransition<DAGImpl, DAGEvent, DAGState> {
 
-      Map<Vertex, Edge> outVertices =
-          new HashMap<Vertex, Edge>();
+    /**
+     * Note that this transition method is called directly (and synchronously)
+     * by MRAppMaster's init() method (i.e., no RPC, no thread-switching;
+     * just plain sequential call within AM context), so we can trigger
+     * modifications in AM state from here (at least, if AM is written that
+     * way; MR version is).
+     */
+    @Override
+    public DAGState transition(DAGImpl dag, DAGEvent event) {
+      // TODO Metrics
+      //dag.metrics.submittedJob(dag);
+      //dag.metrics.preparingJob(dag);
 
-      for(String inEdgeId : vertexPlan.getInEdgeIdList()){
-        EdgePlan edgePlan = edgePlans.get(inEdgeId);
-        Vertex inVertex = dag.vertexMap.get(edgePlan.getInputVertexName());
-        Edge edge = dag.edges.get(inEdgeId);
-        edge.setSourceVertex(inVertex);
-        edge.setDestinationVertex(vertex);
-        inVertices.put(inVertex, edge);
+      DAGState state = dag.initializeDAG();
+      if (state != DAGState.INITED) {
+        return state;
       }
 
-      for(String outEdgeId : vertexPlan.getOutEdgeIdList()){
-        EdgePlan edgePlan = edgePlans.get(outEdgeId);
-        Vertex outVertex = dag.vertexMap.get(edgePlan.getOutputVertexName());
-        Edge edge = dag.edges.get(outEdgeId);
-        edge.setSourceVertex(vertex);
-        edge.setDestinationVertex(outVertex);
-        outVertices.put(outVertex, edge);
-      }
+      // TODO Metrics
+      //dag.metrics.endPreparingJob(dag);
+      dag.logJobHistoryInitedEvent();
+      return DAGState.INITED;
+
 
-      vertex.setInputVertices(inVertices);
-      vertex.setOutputVertices(outVertices);
     }
 
   } // end of InitTransition
@@ -1427,6 +1598,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
         for (VertexGroupInfo groupInfo : commitList) {
           groupInfo.committed = true;
           Vertex v = getVertex(groupInfo.groupMembers.iterator().next());
+          appContext.getHistoryHandler().handle(new DAGHistoryEvent(getID(),
+              new DAGCommitStartedEvent(dagId)));
           for (String outputName : groupInfo.outputs) {
             OutputCommitter committer = v.getOutputCommitters().get(outputName);
             LOG.info("Committing output: " + outputName);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OutputCommitterContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OutputCommitterContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OutputCommitterContextImpl.java
index 070bd75..ff9e401 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OutputCommitterContextImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OutputCommitterContextImpl.java
@@ -31,13 +31,15 @@ public class OutputCommitterContextImpl implements OutputCommitterContext {
   private final String vertexName;
   private final String outputName;
   private final byte[] userPayload;
+  private final int vertexIdx;
 
   public OutputCommitterContextImpl(ApplicationId applicationId,
       int dagAttemptNumber,
       String dagName,
       String vertexName,
       String outputName,
-      @Nullable byte[] userPayload) {
+      @Nullable byte[] userPayload,
+      int vertexIdx) {
     checkNotNull(applicationId, "applicationId is null");
     checkNotNull(dagName, "dagName is null");
     checkNotNull(vertexName, "vertexName is null");
@@ -48,6 +50,7 @@ public class OutputCommitterContextImpl implements OutputCommitterContext {
     this.vertexName = vertexName;
     this.outputName = outputName;
     this.userPayload = userPayload;
+    this.vertexIdx = vertexIdx;
   }
 
   @Override
@@ -80,4 +83,9 @@ public class OutputCommitterContextImpl implements OutputCommitterContext {
     return userPayload;
   }
 
+  @Override
+  public int getVertexIndex() {
+    return vertexIdx;
+  }
+
 }