You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2014/03/06 00:35:16 UTC

[1/4] TEZ-847. Support basic AM recovery. (hitesh)

Repository: incubator-tez
Updated Branches:
  refs/heads/master 18290c848 -> 5b464f27d


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
new file mode 100644
index 0000000..f03863c
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -0,0 +1,569 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License: Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing: software
+ * distributed under the License is distributed on an "AS IS" BASIS:
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND: either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.events;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.EdgeManagerDescriptor;
+import org.apache.tez.dag.api.VertexLocationHint;
+import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.app.dag.VertexState;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.RuntimeUtils;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.impl.EventMetaData;
+import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TestHistoryEventsProtoConversion {
+
+  private static final Log LOG = LogFactory.getLog(
+      TestHistoryEventsProtoConversion.class);
+
+
+  private HistoryEvent testProtoConversion(HistoryEvent event) throws IOException {
+    ByteArrayOutputStream os = new ByteArrayOutputStream();
+    HistoryEvent deserializedEvent = null;
+    event.toProtoStream(os);
+    os.flush();
+    os.close();
+    deserializedEvent = RuntimeUtils.createClazzInstance(
+        event.getClass().getName());
+    LOG.info("Serialized event to byte array"
+        + ", eventType=" + event.getEventType()
+        + ", bufLen=" + os.toByteArray().length);
+    deserializedEvent.fromProtoStream(
+        new ByteArrayInputStream(os.toByteArray()));
+    return deserializedEvent;
+  }
+
+  private void logEvents(HistoryEvent event,
+      HistoryEvent deserializedEvent) {
+    LOG.info("Initial Event toString: " + event.toString());
+    LOG.info("Deserialized Event toString: " + deserializedEvent.toString());
+  }
+
+  private void testAMLaunchedEvent() throws Exception {
+    AMLaunchedEvent event = new AMLaunchedEvent(
+        ApplicationAttemptId.newInstance(
+            ApplicationId.newInstance(0, 1), 1),
+        100, 100);
+    AMLaunchedEvent deserializedEvent = (AMLaunchedEvent)
+        testProtoConversion(event);
+    Assert.assertEquals(event.getApplicationAttemptId(),
+        deserializedEvent.getApplicationAttemptId());
+    Assert.assertEquals(event.getAppSubmitTime(),
+        deserializedEvent.getAppSubmitTime());
+    Assert.assertEquals(event.getLaunchTime(),
+        deserializedEvent.getLaunchTime());
+    logEvents(event, deserializedEvent);
+  }
+
+  private void testAMStartedEvent() throws Exception {
+    AMStartedEvent event = new AMStartedEvent(
+        ApplicationAttemptId.newInstance(
+            ApplicationId.newInstance(0, 1), 1), 100);
+    AMStartedEvent deserializedEvent = (AMStartedEvent)
+        testProtoConversion(event);
+    Assert.assertEquals(event.getApplicationAttemptId(),
+        deserializedEvent.getApplicationAttemptId());
+    Assert.assertEquals(event.getStartTime(),
+        deserializedEvent.getStartTime());
+    logEvents(event, deserializedEvent);
+  }
+
+  private void testDAGSubmittedEvent() throws Exception {
+    DAGSubmittedEvent event = new DAGSubmittedEvent(TezDAGID.getInstance(
+        ApplicationId.newInstance(0, 1), 1), 1001l,
+        DAGPlan.newBuilder().setName("foo").build(),
+        ApplicationAttemptId.newInstance(
+            ApplicationId.newInstance(0, 1), 1));
+    DAGSubmittedEvent deserializedEvent = (DAGSubmittedEvent)
+        testProtoConversion(event);
+    Assert.assertEquals(event.getApplicationAttemptId(),
+        deserializedEvent.getApplicationAttemptId());
+    Assert.assertEquals(event.getDagID(),
+        deserializedEvent.getDagID());
+    Assert.assertEquals(event.getDAGName(),
+        deserializedEvent.getDAGName());
+    Assert.assertEquals(event.getSubmitTime(),
+        deserializedEvent.getSubmitTime());
+    Assert.assertEquals(event.getDAGPlan(),
+        deserializedEvent.getDAGPlan());
+    logEvents(event, deserializedEvent);
+  }
+
+  private void testDAGInitializedEvent() throws Exception {
+    DAGInitializedEvent event = new DAGInitializedEvent(
+        TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 100334l);
+    DAGInitializedEvent deserializedEvent = (DAGInitializedEvent)
+        testProtoConversion(event);
+    Assert.assertEquals(event.getDagID(),
+        deserializedEvent.getDagID());
+    Assert.assertEquals(event.getInitTime(), deserializedEvent.getInitTime());
+    logEvents(event, deserializedEvent);
+  }
+
+  private void testDAGStartedEvent() throws Exception {
+    DAGStartedEvent event = new DAGStartedEvent(
+        TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 100334l);
+    DAGStartedEvent deserializedEvent = (DAGStartedEvent)
+        testProtoConversion(event);
+    Assert.assertEquals(event.getDagID(),
+        deserializedEvent.getDagID());
+    Assert.assertEquals(event.getStartTime(), deserializedEvent.getStartTime());
+    logEvents(event, deserializedEvent);
+  }
+
+  private void testDAGFinishedEvent() throws Exception {
+    {
+      DAGFinishedEvent event = new DAGFinishedEvent(
+          TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 1000l, 20000l,
+          DAGState.FAILED, null, null);
+      DAGFinishedEvent deserializedEvent = (DAGFinishedEvent)
+          testProtoConversion(event);
+      Assert.assertEquals(
+          event.getDagID(),
+          deserializedEvent.getDagID());
+      Assert.assertEquals(event.getState(), deserializedEvent.getState());
+      Assert.assertNotEquals(event.getStartTime(), deserializedEvent.getStartTime());
+      Assert.assertEquals(event.getFinishTime(), deserializedEvent.getFinishTime());
+      Assert.assertEquals(event.getDiagnostics(), deserializedEvent.getDiagnostics());
+      Assert.assertEquals(event.getTezCounters(), deserializedEvent.getTezCounters());
+      logEvents(event, deserializedEvent);
+    }
+    {
+      TezCounters tezCounters = new TezCounters();
+      tezCounters.addGroup("foo", "bar");
+      tezCounters.getGroup("foo").addCounter("c1", "c1", 100);
+      tezCounters.getGroup("foo").findCounter("c1").increment(1);
+      DAGFinishedEvent event = new DAGFinishedEvent(
+          TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 1000l, 20000l,
+          DAGState.FAILED, "bad diagnostics", tezCounters);
+      DAGFinishedEvent deserializedEvent = (DAGFinishedEvent)
+          testProtoConversion(event);
+      Assert.assertEquals(
+          event.getDagID(),
+          deserializedEvent.getDagID());
+      Assert.assertEquals(event.getState(), deserializedEvent.getState());
+      Assert.assertNotEquals(event.getStartTime(), deserializedEvent.getStartTime());
+      Assert.assertEquals(event.getFinishTime(), deserializedEvent.getFinishTime());
+      Assert.assertEquals(event.getDiagnostics(), deserializedEvent.getDiagnostics());
+      Assert.assertEquals(event.getTezCounters(), deserializedEvent.getTezCounters());
+      Assert.assertEquals(101,
+          deserializedEvent.getTezCounters().getGroup("foo").findCounter("c1").getValue());
+      logEvents(event, deserializedEvent);
+    }
+  }
+
+  private void testVertexInitializedEvent() throws Exception {
+    VertexInitializedEvent event = new VertexInitializedEvent(
+        TezVertexID.getInstance(
+            TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111),
+        "vertex1", 1000l, 15000l, 100, "procName", null);
+    VertexInitializedEvent deserializedEvent = (VertexInitializedEvent)
+        testProtoConversion(event);
+    Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID());
+    Assert.assertEquals(event.getInitRequestedTime(),
+        deserializedEvent.getInitRequestedTime());
+    Assert.assertEquals(event.getInitedTime(),
+        deserializedEvent.getInitedTime());
+    Assert.assertEquals(event.getNumTasks(),
+        deserializedEvent.getNumTasks());
+    Assert.assertEquals(event.getAdditionalInputs(),
+        deserializedEvent.getAdditionalInputs());
+    Assert.assertNull(deserializedEvent.getProcessorName());
+    logEvents(event, deserializedEvent);
+  }
+
+  private void testVertexStartedEvent() throws Exception {
+    VertexStartedEvent event = new VertexStartedEvent(
+        TezVertexID.getInstance(
+            TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111),
+        145553l, 12334455l);
+    VertexStartedEvent deserializedEvent = (VertexStartedEvent)
+        testProtoConversion(event);
+    Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID());
+    Assert.assertEquals(event.getStartRequestedTime(),
+        deserializedEvent.getStartRequestedTime());
+    Assert.assertEquals(event.getStartTime(),
+        deserializedEvent.getStartTime());
+    logEvents(event, deserializedEvent);
+  }
+
+  private void testVertexParallelismUpdatedEvent() throws Exception {
+    {
+      VertexParallelismUpdatedEvent event =
+          new VertexParallelismUpdatedEvent(
+              TezVertexID.getInstance(
+                  TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111),
+              100, null, null);
+      VertexParallelismUpdatedEvent deserializedEvent = (VertexParallelismUpdatedEvent)
+          testProtoConversion(event);
+      Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID());
+      Assert.assertEquals(event.getNumTasks(), deserializedEvent.getNumTasks());
+      Assert.assertEquals(event.getSourceEdgeManagers(),
+          deserializedEvent.getSourceEdgeManagers());
+      Assert.assertEquals(event.getVertexLocationHint(),
+          deserializedEvent.getVertexLocationHint());
+      logEvents(event, deserializedEvent);
+    }
+    {
+      Map<String,EdgeManagerDescriptor> sourceEdgeManagers
+          = new LinkedHashMap<String, EdgeManagerDescriptor>();
+      sourceEdgeManagers.put("foo", new EdgeManagerDescriptor("bar"));
+      sourceEdgeManagers.put("foo1", new EdgeManagerDescriptor("bar1").setUserPayload(
+          new String("payload").getBytes()));
+      VertexParallelismUpdatedEvent event =
+          new VertexParallelismUpdatedEvent(
+              TezVertexID.getInstance(
+                  TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111),
+              100, new VertexLocationHint(Arrays.asList(new TaskLocationHint(
+                  new HashSet<String>(Arrays.asList("h1")),
+              new HashSet<String>(Arrays.asList("r1"))))),
+              sourceEdgeManagers);
+
+      VertexParallelismUpdatedEvent deserializedEvent = (VertexParallelismUpdatedEvent)
+          testProtoConversion(event);
+      Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID());
+      Assert.assertEquals(event.getNumTasks(), deserializedEvent.getNumTasks());
+      Assert.assertEquals(event.getSourceEdgeManagers().size(),
+          deserializedEvent.getSourceEdgeManagers().size());
+      Assert.assertEquals(event.getSourceEdgeManagers().get("foo").getClassName(),
+          deserializedEvent.getSourceEdgeManagers().get("foo").getClassName());
+      Assert.assertArrayEquals(event.getSourceEdgeManagers().get("foo").getUserPayload(),
+          deserializedEvent.getSourceEdgeManagers().get("foo").getUserPayload());
+      Assert.assertEquals(event.getSourceEdgeManagers().get("foo1").getClassName(),
+          deserializedEvent.getSourceEdgeManagers().get("foo1").getClassName());
+      Assert.assertArrayEquals(event.getSourceEdgeManagers().get("foo1").getUserPayload(),
+          deserializedEvent.getSourceEdgeManagers().get("foo1").getUserPayload());
+      Assert.assertEquals(event.getVertexLocationHint(),
+          deserializedEvent.getVertexLocationHint());
+      logEvents(event, deserializedEvent);
+    }
+  }
+
+  private void testVertexFinishedEvent() throws Exception {
+    {
+      VertexFinishedEvent event =
+          new VertexFinishedEvent(TezVertexID.getInstance(
+              TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111),
+              "vertex1", 1000l, 15000l, 16000l, 20000l, 1344400l, VertexState.ERROR,
+              null, null);
+      VertexFinishedEvent deserializedEvent = (VertexFinishedEvent)
+          testProtoConversion(event);
+      Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID());
+      Assert.assertEquals(event.getFinishTime(),
+          deserializedEvent.getFinishTime());
+      Assert.assertEquals(event.getState(), deserializedEvent.getState());
+      Assert.assertEquals(event.getDiagnostics(), deserializedEvent.getDiagnostics());
+      Assert.assertEquals(event.getTezCounters(), deserializedEvent.getTezCounters());
+      logEvents(event, deserializedEvent);
+    }
+    {
+      VertexFinishedEvent event =
+          new VertexFinishedEvent(TezVertexID.getInstance(
+              TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111),
+              "vertex1", 1000l, 15000l, 16000l, 20000l, 1344400l, VertexState.ERROR,
+              "diagnose", new TezCounters());
+      VertexFinishedEvent deserializedEvent = (VertexFinishedEvent)
+          testProtoConversion(event);
+      Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID());
+      Assert.assertEquals(event.getFinishTime(),
+          deserializedEvent.getFinishTime());
+      Assert.assertEquals(event.getState(), deserializedEvent.getState());
+      Assert.assertEquals(event.getDiagnostics(), deserializedEvent.getDiagnostics());
+      Assert.assertEquals(event.getTezCounters(), deserializedEvent.getTezCounters());
+      logEvents(event, deserializedEvent);
+    }
+  }
+
+  private void testTaskStartedEvent() throws Exception {
+    TaskStartedEvent event = new TaskStartedEvent(
+        TezTaskID.getInstance(TezVertexID.getInstance(
+            TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1),
+        "vertex1", 1000l, 100000l);
+    TaskStartedEvent deserializedEvent = (TaskStartedEvent)
+        testProtoConversion(event);
+    Assert.assertEquals(event.getTaskID(), deserializedEvent.getTaskID());
+    Assert.assertEquals(event.getScheduledTime(),
+        deserializedEvent.getScheduledTime());
+    Assert.assertEquals(event.getStartTime(),
+        deserializedEvent.getStartTime());
+    logEvents(event, deserializedEvent);
+  }
+
+  private void testTaskFinishedEvent() throws Exception {
+    {
+      TaskFinishedEvent event = new TaskFinishedEvent(
+          TezTaskID.getInstance(TezVertexID.getInstance(
+              TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1),
+          "vertex1", 11000l, 1000000l, null, TaskState.FAILED, null);
+      TaskFinishedEvent deserializedEvent = (TaskFinishedEvent)
+          testProtoConversion(event);
+      Assert.assertEquals(event.getTaskID(), deserializedEvent.getTaskID());
+      Assert.assertEquals(event.getFinishTime(),
+          deserializedEvent.getFinishTime());
+      Assert.assertEquals(event.getState(),
+          deserializedEvent.getState());
+      Assert.assertEquals(event.getTezCounters(),
+          deserializedEvent.getTezCounters());
+      Assert.assertEquals(event.getSuccessfulAttemptID(),
+          deserializedEvent.getSuccessfulAttemptID());
+      logEvents(event, deserializedEvent);
+    }
+    {
+      TaskFinishedEvent event = new TaskFinishedEvent(
+          TezTaskID.getInstance(TezVertexID.getInstance(
+              TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1),
+          "vertex1", 11000l, 1000000l,
+          TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
+              TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 1),
+          TaskState.FAILED, new TezCounters());
+      TaskFinishedEvent deserializedEvent = (TaskFinishedEvent)
+          testProtoConversion(event);
+      Assert.assertEquals(event.getTaskID(), deserializedEvent.getTaskID());
+      Assert.assertEquals(event.getFinishTime(),
+          deserializedEvent.getFinishTime());
+      Assert.assertEquals(event.getState(),
+          deserializedEvent.getState());
+      Assert.assertEquals(event.getTezCounters(),
+          deserializedEvent.getTezCounters());
+      Assert.assertEquals(event.getSuccessfulAttemptID(),
+          deserializedEvent.getSuccessfulAttemptID());
+      logEvents(event, deserializedEvent);
+    }
+  }
+
+  private void testTaskAttemptStartedEvent() throws Exception {
+    TaskAttemptStartedEvent event = new TaskAttemptStartedEvent(
+        TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
+            TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 1),
+        "vertex1", 10009l, ContainerId.newInstance(
+        ApplicationAttemptId.newInstance(
+            ApplicationId.newInstance(0, 1), 1), 1001), NodeId.newInstance(
+        "host1", 19999), "inProgress", "Completed");
+    TaskAttemptStartedEvent deserializedEvent = (TaskAttemptStartedEvent)
+        testProtoConversion(event);
+    Assert.assertEquals(event.getTaskAttemptID(),
+        deserializedEvent.getTaskAttemptID());
+    Assert.assertEquals(event.getContainerId(),
+        deserializedEvent.getContainerId());
+    Assert.assertEquals(event.getNodeId(),
+        deserializedEvent.getNodeId());
+    Assert.assertEquals(event.getStartTime(),
+        deserializedEvent.getStartTime());
+    logEvents(event, deserializedEvent);
+  }
+
+  private void testTaskAttemptFinishedEvent() throws Exception {
+    {
+      TaskAttemptFinishedEvent event = new TaskAttemptFinishedEvent(
+          TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
+              TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 1),
+          "vertex1", 10001l, 1000434444l, TaskAttemptState.FAILED,
+          null, null);
+      TaskAttemptFinishedEvent deserializedEvent = (TaskAttemptFinishedEvent)
+          testProtoConversion(event);
+      Assert.assertEquals(event.getTaskAttemptID(),
+          deserializedEvent.getTaskAttemptID());
+      Assert.assertEquals(event.getFinishTime(),
+          deserializedEvent.getFinishTime());
+      Assert.assertEquals(event.getDiagnostics(),
+          deserializedEvent.getDiagnostics());
+      Assert.assertEquals(event.getState(),
+          deserializedEvent.getState());
+      Assert.assertEquals(event.getCounters(),
+          deserializedEvent.getCounters());
+      logEvents(event, deserializedEvent);
+    }
+    {
+      TaskAttemptFinishedEvent event = new TaskAttemptFinishedEvent(
+          TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
+              TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 1),
+          "vertex1", 10001l, 1000434444l, TaskAttemptState.FAILED,
+          "diagnose", new TezCounters());
+      TaskAttemptFinishedEvent deserializedEvent = (TaskAttemptFinishedEvent)
+          testProtoConversion(event);
+      Assert.assertEquals(event.getTaskAttemptID(),
+          deserializedEvent.getTaskAttemptID());
+      Assert.assertEquals(event.getFinishTime(),
+          deserializedEvent.getFinishTime());
+      Assert.assertEquals(event.getDiagnostics(),
+          deserializedEvent.getDiagnostics());
+      Assert.assertEquals(event.getState(),
+          deserializedEvent.getState());
+      Assert.assertEquals(event.getCounters(),
+          deserializedEvent.getCounters());
+      logEvents(event, deserializedEvent);
+    }
+  }
+
+  private void testContainerLaunchedEvent() throws Exception {
+    ContainerLaunchedEvent event = new ContainerLaunchedEvent(
+        ContainerId.newInstance(ApplicationAttemptId.newInstance(
+            ApplicationId.newInstance(0, 1), 1), 1001), 100034566,
+        ApplicationAttemptId.newInstance(
+            ApplicationId.newInstance(0, 1), 1));
+    ContainerLaunchedEvent deserializedEvent = (ContainerLaunchedEvent)
+        testProtoConversion(event);
+    Assert.assertEquals(event.getContainerId(),
+        deserializedEvent.getContainerId());
+    Assert.assertEquals(event.getLaunchTime(),
+        deserializedEvent.getLaunchTime());
+    Assert.assertEquals(event.getApplicationAttemptId(),
+        deserializedEvent.getApplicationAttemptId());
+    logEvents(event, deserializedEvent);
+  }
+
+  private void testVertexDataMovementEventsGeneratedEvent() throws Exception {
+    VertexDataMovementEventsGeneratedEvent event;
+    try {
+      event = new VertexDataMovementEventsGeneratedEvent(
+          TezVertexID.getInstance(
+              TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 1), null);
+      Assert.fail("Invalid creation should have errored out");
+    } catch (RuntimeException e) {
+      // Expected
+    }
+    List<TezEvent> events =
+        Arrays.asList(new TezEvent(new DataMovementEvent(1, null), new EventMetaData(
+            EventProducerConsumerType.SYSTEM, "foo", "bar", null)));
+    event = new VertexDataMovementEventsGeneratedEvent(
+            TezVertexID.getInstance(
+                TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 1), events);
+    VertexDataMovementEventsGeneratedEvent deserializedEvent =
+        (VertexDataMovementEventsGeneratedEvent) testProtoConversion(event);
+    Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID());
+    Assert.assertEquals(1,
+        deserializedEvent.getTezEvents().size());
+    Assert.assertEquals(event.getTezEvents().get(0).getEventType(),
+        deserializedEvent.getTezEvents().get(0).getEventType());
+    logEvents(event, deserializedEvent);
+  }
+
+  private void testDAGCommitStartedEvent() throws Exception {
+    DAGCommitStartedEvent event = new DAGCommitStartedEvent(
+        TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1));
+    DAGCommitStartedEvent deserializedEvent =
+        (DAGCommitStartedEvent) testProtoConversion(event);
+    Assert.assertEquals(event.getDagID(), deserializedEvent.getDagID());
+    logEvents(event, deserializedEvent);
+  }
+
+  private void testVertexCommitStartedEvent() throws Exception {
+    VertexCommitStartedEvent event = new VertexCommitStartedEvent(
+        TezVertexID.getInstance(
+            TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 1));
+    VertexCommitStartedEvent deserializedEvent =
+        (VertexCommitStartedEvent) testProtoConversion(event);
+    Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID());
+    logEvents(event, deserializedEvent);
+  }
+
+  @Test
+  public void testDefaultProtoConversion() throws Exception {
+    for (HistoryEventType eventType : HistoryEventType.values()) {
+      switch (eventType) {
+        case AM_LAUNCHED:
+          testAMLaunchedEvent();
+          break;
+        case AM_STARTED:
+          testAMStartedEvent();
+          break;
+        case DAG_SUBMITTED:
+          testDAGSubmittedEvent();
+          break;
+        case DAG_INITIALIZED:
+          testDAGInitializedEvent();
+          break;
+        case DAG_STARTED:
+          testDAGStartedEvent();
+          break;
+        case DAG_FINISHED:
+          testDAGFinishedEvent();
+          break;
+        case VERTEX_INITIALIZED:
+          testVertexInitializedEvent();
+          break;
+        case VERTEX_STARTED:
+          testVertexStartedEvent();
+          break;
+        case VERTEX_PARALLELISM_UPDATED:
+          testVertexParallelismUpdatedEvent();
+          break;
+        case VERTEX_FINISHED:
+          testVertexFinishedEvent();
+          break;
+        case TASK_STARTED:
+          testTaskStartedEvent();
+          break;
+        case TASK_FINISHED:
+          testTaskFinishedEvent();
+          break;
+        case TASK_ATTEMPT_STARTED:
+          testTaskAttemptStartedEvent();
+          break;
+        case TASK_ATTEMPT_FINISHED:
+          testTaskAttemptFinishedEvent();
+          break;
+        case CONTAINER_LAUNCHED:
+          testContainerLaunchedEvent();
+          break;
+        case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
+          testVertexDataMovementEventsGeneratedEvent();
+          break;
+        case DAG_COMMIT_STARTED:
+          testDAGCommitStartedEvent();
+          break;
+        case VERTEX_COMMIT_STARTED:
+          testVertexCommitStartedEvent();
+          break;
+        default:
+          throw new Exception("Unhandled Event type in Unit tests: " + eventType);
+        }
+      }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dist/pom.xml
----------------------------------------------------------------------
diff --git a/tez-dist/pom.xml b/tez-dist/pom.xml
index edcfd9f..b315368 100644
--- a/tez-dist/pom.xml
+++ b/tez-dist/pom.xml
@@ -87,11 +87,11 @@
         </configuration>
         <executions>
           <execution>
-            <id>package-tez-full</id>
+            <id>package-tez</id>
             <configuration>
-              <finalName>tez-${project.version}-full</finalName>
+              <finalName>tez-${project.version}</finalName>
               <descriptors>
-                <descriptor>src/main/assembly/tez-dist-full.xml</descriptor>
+                <descriptor>src/main/assembly/tez-dist.xml</descriptor>
               </descriptors>
               <formats>
                 <format>${package.format}</format>
@@ -103,11 +103,11 @@
             </goals>
           </execution>
           <execution>
-            <id>package-tez</id>
+            <id>package-tez-full</id>
             <configuration>
-              <finalName>tez-${project.version}</finalName>
+              <finalName>tez-${project.version}-full</finalName>
               <descriptors>
-                <descriptor>src/main/assembly/tez-dist.xml</descriptor>
+                <descriptor>src/main/assembly/tez-dist-full.xml</descriptor>
               </descriptors>
               <formats>
                 <format>${package.format}</format>

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
index 03f28be..e919516 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
@@ -47,10 +47,12 @@ public class MROutputCommitter extends OutputCommitter {
 
   private static final Log LOG = LogFactory.getLog(MROutputCommitter.class);
 
+  private OutputCommitterContext context;
   private org.apache.hadoop.mapreduce.OutputCommitter committer = null;
   private JobContext jobContext = null;
   private volatile boolean initialized = false;
   private JobConf jobConf = null;
+  private boolean newApiCommitter;
 
   @Override
   public void initialize(OutputCommitterContext context) throws IOException {
@@ -66,7 +68,8 @@ public class MROutputCommitter extends OutputCommitter {
     jobConf.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials());
     jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
         context.getDAGAttemptNumber());
-    committer = getOutputCommitter(context);
+    this.context = context;
+    committer = getOutputCommitter(this.context);
     jobContext = getJobContextFromVertexContext(context);
     initialized = true;
   }
@@ -101,7 +104,7 @@ public class MROutputCommitter extends OutputCommitter {
       getOutputCommitter(OutputCommitterContext context) {
 
     org.apache.hadoop.mapreduce.OutputCommitter committer = null;
-    boolean newApiCommitter = false;
+    newApiCommitter = false;
     if (jobConf.getBoolean("mapred.reducer.new-api", false)
         || jobConf.getBoolean("mapred.mapper.new-api", false))  {
       newApiCommitter = true;
@@ -118,8 +121,8 @@ public class MROutputCommitter extends OutputCommitter {
       TaskAttemptID taskAttemptID = new TaskAttemptID(
           Long.toString(context.getApplicationId().getClusterTimestamp()),
           context.getApplicationId().getId(),
-          (jobConf.getBoolean(MRConfig.IS_MAP_PROCESSOR,
-              false) ? TaskType.MAP : TaskType.REDUCE),
+          ((jobConf.getBoolean(MRConfig.IS_MAP_PROCESSOR, false) ?
+              TaskType.MAP : TaskType.REDUCE)),
           0, context.getDAGAttemptNumber());
 
       TaskAttemptContext taskContext = new TaskAttemptContextImpl(jobConf,
@@ -179,6 +182,29 @@ public class MROutputCommitter extends OutputCommitter {
 
   }
 
+  @Override
+  public boolean isTaskRecoverySupported() {
+    if (!initialized) {
+      throw new RuntimeException("Committer not initialized");
+    }
+    return committer.isRecoverySupported();
+  }
 
+  @Override
+  public void recoverTask(int taskIndex, int attemptId) throws IOException {
+    if (!initialized) {
+      throw new RuntimeException("Committer not initialized");
+    }
+    TaskAttemptID taskAttemptID = new TaskAttemptID(
+        Long.toString(context.getApplicationId().getClusterTimestamp())
+        + String.valueOf(context.getVertexIndex()),
+        context.getApplicationId().getId(),
+        ((jobConf.getBoolean(MRConfig.IS_MAP_PROCESSOR, false) ?
+            TaskType.MAP : TaskType.REDUCE)),
+        taskIndex, attemptId);
+    TaskAttemptContext taskContext = new TaskAttemptContextImpl(jobConf,
+        taskAttemptID);
+    committer.recoverTask(taskContext);
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventMetaData.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventMetaData.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventMetaData.java
index c05ec57..159934a 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventMetaData.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventMetaData.java
@@ -135,7 +135,7 @@ public class EventMetaData implements Writable {
     return "{ producerConsumerType=" + producerConsumerType
         + ", taskVertexName=" + taskVertexName
         + ", edgeVertexName=" + edgeVertexName
-        + ", taskAttemptId=" + taskAttemptID
+        + ", taskAttemptId=" + (taskAttemptID == null? "null" : taskAttemptID)
         + " }";
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/InputSpec.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/InputSpec.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/InputSpec.java
index 1e9265a..1ee7b44 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/InputSpec.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/InputSpec.java
@@ -50,6 +50,10 @@ public class InputSpec implements Writable {
     return inputDescriptor;
   }
 
+  public void setInputDescriptor(InputDescriptor inputDescriptor) {
+    this.inputDescriptor = inputDescriptor;
+  }
+
   public int getPhysicalEdgeCount() {
     return physicalEdgeCount;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index f4e1957..e0e19c9 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -454,7 +454,7 @@ public class ShuffleVertexManager implements VertexManagerPlugin {
     context.scheduleVertexTasks(scheduledTasks);
   }
   
-  void schedulePendingTasks() {    
+  void schedulePendingTasks() {
     int numPendingTasks = pendingTasks.size();
     if (numPendingTasks == 0) {
       return;
@@ -526,7 +526,7 @@ public class ShuffleVertexManager implements VertexManagerPlugin {
     }
     
     this.context = context;
-    
+
     this.slowStartMinSrcCompletionFraction = conf
         .getFloat(
             ShuffleVertexManager.TEZ_AM_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
new file mode 100644
index 0000000..29b6b5e
--- /dev/null
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.test;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.tez.client.AMConfiguration;
+import org.apache.tez.client.TezClientUtils;
+import org.apache.tez.client.TezSession;
+import org.apache.tez.client.TezSessionConfiguration;
+import org.apache.tez.client.TezSessionStatus;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.test.dag.MultiAttemptDAG;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Random;
+
+public class TestDAGRecovery {
+
+  private static final Log LOG = LogFactory.getLog(TestDAGRecovery.class);
+
+  private static Configuration conf = new Configuration();
+  private static MiniTezCluster miniTezCluster;
+  private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR
+      + TestDAGRecovery.class.getName() + "-tmpDir";
+  protected static MiniDFSCluster dfsCluster;
+
+  private static TezSession tezSession = null;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    LOG.info("Starting mini clusters");
+    FileSystem remoteFs = null;
+    try {
+      conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
+      dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(3)
+          .format(true).racks(null).build();
+      remoteFs = dfsCluster.getFileSystem();
+    } catch (IOException io) {
+      throw new RuntimeException("problem starting mini dfs cluster", io);
+    }
+    if (miniTezCluster == null) {
+      miniTezCluster = new MiniTezCluster(TestDAGRecovery.class.getName(),
+          1, 1, 1);
+      Configuration miniTezconf = new Configuration(conf);
+      miniTezconf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 4);
+      miniTezconf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
+      miniTezCluster.init(miniTezconf);
+      miniTezCluster.start();
+
+      Path remoteStagingDir = remoteFs.makeQualified(new Path(TEST_ROOT_DIR, String
+          .valueOf(new Random().nextInt(100000))));
+      TezClientUtils.ensureStagingDirExists(conf, remoteStagingDir);
+
+      TezConfiguration tezConf = new TezConfiguration(miniTezCluster.getConfig());
+      tezConf.setInt(TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS, 0);
+      tezConf.set(TezConfiguration.TEZ_AM_LOG_LEVEL, "DEBUG");
+      tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
+          remoteStagingDir.toString());
+      tezConf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false);
+      tezConf.setInt(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS, 4);
+      tezConf.setInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, 500);
+      tezConf.set(TezConfiguration.TEZ_AM_JAVA_OPTS, " -Xmx256m");
+
+      AMConfiguration amConfig = new AMConfiguration(
+          new HashMap<String, String>(), new HashMap<String, LocalResource>(),
+          tezConf, null);
+      TezSessionConfiguration tezSessionConfig =
+          new TezSessionConfiguration(amConfig, tezConf);
+      tezSession = new TezSession("TestDAGRecovery", tezSessionConfig);
+      tezSession.start();
+    }
+  }
+  void runDAGAndVerify(DAG dag, DAGStatus.State finalState) throws Exception {
+    TezSessionStatus status = tezSession.getSessionStatus();
+    while (status != TezSessionStatus.READY && status != TezSessionStatus.SHUTDOWN) {
+      LOG.info("Waiting for session to be ready. Current: " + status);
+      Thread.sleep(100);
+      status = tezSession.getSessionStatus();
+    }
+    if (status == TezSessionStatus.SHUTDOWN) {
+      throw new TezUncheckedException("Unexpected Session shutdown");
+    }
+    DAGClient dagClient = tezSession.submitDAG(dag);
+    DAGStatus dagStatus = dagClient.getDAGStatus(null);
+    while (!dagStatus.isCompleted()) {
+      LOG.info("Waiting for dag to complete. Sleeping for 500ms."
+          + " DAG name: " + dag.getName()
+          + " DAG appId: " + dagClient.getApplicationId()
+          + " Current state: " + dagStatus.getState());
+      Thread.sleep(100);
+      dagStatus = dagClient.getDAGStatus(null);
+    }
+
+    Assert.assertEquals(finalState, dagStatus.getState());
+  }
+
+  @Test(timeout=120000)
+  public void testBasicRecovery() throws Exception {
+    DAG dag = MultiAttemptDAG.createDAG("TestBasicRecovery", null);
+    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java b/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
new file mode 100644
index 0000000..65c8383
--- /dev/null
+++ b/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.test.dag;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.VertexManagerPlugin;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.apache.tez.test.TestInput;
+import org.apache.tez.test.TestOutput;
+import org.apache.tez.test.TestProcessor;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class MultiAttemptDAG {
+
+  private static final Log LOG =
+      LogFactory.getLog(MultiAttemptDAG.class);
+
+  static Resource defaultResource = Resource.newInstance(100, 0);
+  public static String MULTI_ATTEMPT_DAG_VERTEX_NUM_TASKS =
+      "tez.multi-attempt-dag.vertex.num-tasks";
+  public static int MULTI_ATTEMPT_DAG_VERTEX_NUM_TASKS_DEFAULT = 2;
+
+  public static class FailOnAttemptVertexManagerPlugin
+      implements VertexManagerPlugin {
+    private int numSourceTasks = 0;
+    private AtomicInteger numCompletions = new AtomicInteger();
+    private VertexManagerPluginContext context;
+    private boolean tasksScheduled = false;
+
+    @Override
+    public void initialize(VertexManagerPluginContext context) {
+      this.context = context;
+      for (String input :
+          context.getInputVertexEdgeProperties().keySet()) {
+        LOG.info("Adding sourceTasks for Vertex " + input);
+        numSourceTasks += context.getVertexNumTasks(input);
+        LOG.info("Current numSourceTasks=" + numSourceTasks);
+      }
+    }
+
+    @Override
+    public void onVertexStarted(Map<String, List<Integer>> completions) {
+      if (completions != null) {
+        for (Entry<String, List<Integer>> entry : completions.entrySet()) {
+          LOG.info("Received completion events on vertexStarted"
+              + ", vertex=" + entry.getKey()
+              + ", completions=" + entry.getValue().size());
+          numCompletions.addAndGet(entry.getValue().size());
+        }
+      }
+      maybeScheduleTasks();
+    }
+
+    private synchronized void maybeScheduleTasks() {
+      if (numCompletions.get() >= numSourceTasks
+          && !tasksScheduled) {
+        tasksScheduled = true;
+        String payload = new String(context.getUserPayload());
+        int successAttemptId = Integer.valueOf(payload);
+        LOG.info("Checking whether to crash AM or schedule tasks"
+            + ", successfulAttemptID=" + successAttemptId
+            + ", currentAttempt=" + context.getDAGAttemptNumber());
+        if (successAttemptId > context.getDAGAttemptNumber()) {
+          Runtime.getRuntime().halt(-1);
+        } else if (successAttemptId == context.getDAGAttemptNumber()) {
+          LOG.info("Scheduling tasks for vertex=" + context.getVertexName());
+          int numTasks = context.getVertexNumTasks(context.getVertexName());
+          List<Integer> scheduledTasks = new ArrayList<Integer>(numTasks);
+          for (int i=0; i<numTasks; ++i) {
+            scheduledTasks.add(new Integer(i));
+          }
+          context.scheduleVertexTasks(scheduledTasks);
+        }
+      }
+    }
+
+    @Override
+    public void onSourceTaskCompleted(String srcVertexName, Integer taskId) {
+      LOG.info("Received completion events for source task"
+          + ", vertex=" + srcVertexName
+          + ", taskIdx=" + taskId);
+      numCompletions.incrementAndGet();
+      maybeScheduleTasks();
+    }
+
+    @Override
+    public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
+      // Nothing to do
+    }
+
+    @Override
+    public void onRootVertexInitialized(String inputName, InputDescriptor inputDescriptor, List<Event> events) {
+      // Do nothing
+    }
+  }
+
+
+  public static DAG createDAG(String name,
+      Configuration conf) throws Exception {
+    byte[] payload = null;
+    int taskCount = MULTI_ATTEMPT_DAG_VERTEX_NUM_TASKS_DEFAULT;
+    if (conf != null) {
+      taskCount = conf.getInt(MULTI_ATTEMPT_DAG_VERTEX_NUM_TASKS, MULTI_ATTEMPT_DAG_VERTEX_NUM_TASKS_DEFAULT);
+      payload = TezUtils.createUserPayloadFromConf(conf);
+    }
+    DAG dag = new DAG(name);
+    Vertex v1 = new Vertex("v1", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
+    Vertex v2 = new Vertex("v2", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
+    Vertex v3 = new Vertex("v3", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
+
+    // Make each vertex manager fail on appropriate attempt
+    v1.setVertexManagerPlugin(new VertexManagerPluginDescriptor(
+        FailOnAttemptVertexManagerPlugin.class.getName())
+        .setUserPayload(new String("1").getBytes()));
+    v2.setVertexManagerPlugin(new VertexManagerPluginDescriptor(
+        FailOnAttemptVertexManagerPlugin.class.getName())
+        .setUserPayload(new String("2").getBytes()));
+    v3.setVertexManagerPlugin(new VertexManagerPluginDescriptor(
+        FailOnAttemptVertexManagerPlugin.class.getName())
+        .setUserPayload(new String("3").getBytes()));
+    dag.addVertex(v1).addVertex(v2).addVertex(v3);
+    dag.addEdge(new Edge(v1, v2,
+        new EdgeProperty(DataMovementType.SCATTER_GATHER,
+            DataSourceType.PERSISTED,
+            SchedulingType.SEQUENTIAL,
+            TestOutput.getOutputDesc(payload),
+            TestInput.getInputDesc(payload))));
+    dag.addEdge(new Edge(v2, v3,
+        new EdgeProperty(DataMovementType.SCATTER_GATHER,
+            DataSourceType.PERSISTED,
+            SchedulingType.SEQUENTIAL,
+            TestOutput.getOutputDesc(payload),
+            TestInput.getInputDesc(payload))));
+    return dag;
+  }
+
+  public static DAG createDAG(Configuration conf) throws Exception {
+    return createDAG("SimpleVTestDAG", conf);
+  }
+
+}


[2/4] TEZ-847. Support basic AM recovery. (hitesh)

Posted by hi...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java
new file mode 100644
index 0000000..6d5a769
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.events;
+
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.DAGCommitStartedProto;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+public class DAGCommitStartedEvent implements HistoryEvent {
+
+  private TezDAGID dagID;
+
+  public DAGCommitStartedEvent() {
+  }
+
+  public DAGCommitStartedEvent(TezDAGID dagID) {
+    this.dagID = dagID;
+  }
+
+  @Override
+  public HistoryEventType getEventType() {
+    return HistoryEventType.DAG_COMMIT_STARTED;
+  }
+
+  @Override
+  public JSONObject convertToATSJSON() throws JSONException {
+    // TODO
+    return null;
+  }
+
+  @Override
+  public boolean isRecoveryEvent() {
+    return true;
+  }
+
+  @Override
+  public boolean isHistoryEvent() {
+    return false;
+  }
+
+  public DAGCommitStartedProto toProto() {
+    return DAGCommitStartedProto.newBuilder()
+        .setDagId(dagID.toString())
+        .build();
+  }
+
+  public void fromProto(DAGCommitStartedProto proto) {
+    this.dagID = TezDAGID.fromString(proto.getDagId());
+  }
+
+  @Override
+  public void toProtoStream(OutputStream outputStream) throws IOException {
+    toProto().writeDelimitedTo(outputStream);
+  }
+
+  @Override
+  public void fromProtoStream(InputStream inputStream) throws IOException {
+    DAGCommitStartedProto proto = DAGCommitStartedProto.parseDelimitedFrom(inputStream);
+    fromProto(proto);
+  }
+
+  @Override
+  public String toString() {
+    return "dagID=" + dagID;
+  }
+
+  public TezDAGID getDagID() {
+    return dagID;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
index cc9c3ad..38e0702 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
@@ -20,7 +20,7 @@ package org.apache.tez.dag.history.events;
 
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DagTypeConverters;
-import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.app.dag.DAGState;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.history.SummaryEvent;
@@ -43,7 +43,7 @@ public class DAGFinishedEvent implements HistoryEvent, SummaryEvent {
   private TezDAGID dagID;
   private long startTime;
   private long finishTime;
-  private DAGStatus.State state;
+  private DAGState state;
   private String diagnostics;
   private TezCounters tezCounters;
 
@@ -51,7 +51,7 @@ public class DAGFinishedEvent implements HistoryEvent, SummaryEvent {
   }
 
   public DAGFinishedEvent(TezDAGID dagId, long startTime,
-      long finishTime, DAGStatus.State state,
+      long finishTime, DAGState state,
       String diagnostics, TezCounters counters) {
     this.dagID = dagId;
     this.startTime = startTime;
@@ -111,22 +111,33 @@ public class DAGFinishedEvent implements HistoryEvent, SummaryEvent {
   }
 
   public DAGFinishedProto toProto() {
-    return DAGFinishedProto.newBuilder()
-        .setDagId(dagID.toString())
+    DAGFinishedProto.Builder builder = DAGFinishedProto.newBuilder();
+
+    builder.setDagId(dagID.toString())
         .setState(state.ordinal())
-        .setDiagnostics(diagnostics)
-        .setFinishTime(finishTime)
-        .setCounters(DagTypeConverters.convertTezCountersToProto(tezCounters))
-        .build();
+        .setFinishTime(finishTime);
+
+    if (diagnostics != null) {
+      builder.setDiagnostics(diagnostics);
+    }
+    if (tezCounters != null) {
+      builder.setCounters(DagTypeConverters.convertTezCountersToProto(tezCounters));
+    }
+
+    return builder.build();
   }
 
   public void fromProto(DAGFinishedProto proto) {
     this.dagID = TezDAGID.fromString(proto.getDagId());
     this.finishTime = proto.getFinishTime();
-    this.state = DAGStatus.State.values()[proto.getState()];
-    this.diagnostics = proto.getDiagnostics();
-    this.tezCounters = DagTypeConverters.convertTezCountersFromProto(
-        proto.getCounters());
+    this.state = DAGState.values()[proto.getState()];
+    if (proto.hasDiagnostics()) {
+      this.diagnostics = proto.getDiagnostics();
+    }
+    if (proto.hasCounters()) {
+      this.tezCounters = DagTypeConverters.convertTezCountersFromProto(
+          proto.getCounters());
+    }
   }
 
   @Override
@@ -148,9 +159,9 @@ public class DAGFinishedEvent implements HistoryEvent, SummaryEvent {
         + ", timeTaken=" + (finishTime - startTime)
         + ", status=" + state.name()
         + ", diagnostics=" + diagnostics
-        + ", counters="
-        + tezCounters.toString()
-            .replaceAll("\\n", ", ").replaceAll("\\s+", " ");
+        + ", counters=" + ((tezCounters == null) ? "null" :
+          (tezCounters.toString()
+            .replaceAll("\\n", ", ").replaceAll("\\s+", " ")));
   }
 
   @Override
@@ -159,4 +170,28 @@ public class DAGFinishedEvent implements HistoryEvent, SummaryEvent {
         HistoryEventType.DAG_FINISHED).writeDelimitedTo(outputStream);
   }
 
+  public long getFinishTime() {
+    return this.finishTime;
+  }
+
+  public DAGState getState() {
+    return state;
+  }
+
+  public TezDAGID getDagID() {
+    return dagID;
+  }
+
+  public long getStartTime() {
+    return startTime;
+  }
+
+  public String getDiagnostics() {
+    return diagnostics;
+  }
+
+  public TezCounters getTezCounters() {
+    return tezCounters;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java
index 20479e6..9b001b6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java
@@ -94,4 +94,11 @@ public class DAGInitializedEvent implements HistoryEvent {
     fromProto(proto);
   }
 
+  public long getInitTime() {
+    return this.initTime;
+  }
+
+  public TezDAGID getDagID() {
+    return dagID;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java
index 4574753..a1bcdf2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java
@@ -113,4 +113,11 @@ public class DAGStartedEvent implements HistoryEvent {
         + ", startTime=" + startTime;
   }
 
+  public long getStartTime() {
+    return this.startTime;
+  }
+
+  public TezDAGID getDagID() {
+    return dagID;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
index 3d24105..853bea7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
@@ -179,4 +179,15 @@ import java.io.OutputStream;
     return this.dagPlan;
   }
 
+  public TezDAGID getDagID() {
+    return dagID;
+  }
+
+  public ApplicationAttemptId getApplicationAttemptId() {
+    return applicationAttemptId;
+  }
+
+  public long getSubmitTime() {
+    return submitTime;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
index 97f3be3..ecb6818 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
@@ -110,22 +110,31 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
   }
 
   public TaskAttemptFinishedProto toProto() {
-    return TaskAttemptFinishedProto.newBuilder()
-        .setTaskAttemptId(taskAttemptId.toString())
+    TaskAttemptFinishedProto.Builder builder =
+        TaskAttemptFinishedProto.newBuilder();
+    builder.setTaskAttemptId(taskAttemptId.toString())
         .setState(state.ordinal())
-        .setDiagnostics(diagnostics)
-        .setFinishTime(finishTime)
-        .setCounters(DagTypeConverters.convertTezCountersToProto(tezCounters))
-        .build();
+        .setFinishTime(finishTime);
+    if (diagnostics != null) {
+      builder.setDiagnostics(diagnostics);
+    }
+    if (tezCounters != null) {
+      builder.setCounters(DagTypeConverters.convertTezCountersToProto(tezCounters));
+    }
+    return builder.build();
   }
 
   public void fromProto(TaskAttemptFinishedProto proto) {
     this.taskAttemptId = TezTaskAttemptID.fromString(proto.getTaskAttemptId());
     this.finishTime = proto.getFinishTime();
     this.state = TaskAttemptState.values()[proto.getState()];
-    this.diagnostics = proto.getDiagnostics();
-    this.tezCounters = DagTypeConverters.convertTezCountersFromProto(
+    if (proto.hasDiagnostics()) {
+      this.diagnostics = proto.getDiagnostics();
+    }
+    if (proto.hasCounters()) {
+      this.tezCounters = DagTypeConverters.convertTezCountersFromProto(
         proto.getCounters());
+    }
   }
 
   @Override
@@ -149,9 +158,29 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
         + ", timeTaken=" + (finishTime - startTime)
         + ", status=" + state.name()
         + ", diagnostics=" + diagnostics
-        + ", counters="
-        + tezCounters.toString()
-            .replaceAll("\\n", ", ").replaceAll("\\s+", " ");
+        + ", counters=" + (tezCounters == null ? "null" :
+          tezCounters.toString()
+            .replaceAll("\\n", ", ").replaceAll("\\s+", " "));
+  }
+
+  public TezTaskAttemptID getTaskAttemptID() {
+    return taskAttemptId;
+  }
+
+  public TezCounters getCounters() {
+    return tezCounters;
+  }
+
+  public String getDiagnostics() {
+    return diagnostics;
+  }
+
+  public long getFinishTime() {
+    return finishTime;
+  }
+
+  public TaskAttemptState getState() {
+    return state;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
index 11a1c62..ba91db8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
@@ -112,7 +112,7 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
 
   @Override
   public boolean isRecoveryEvent() {
-    return false;
+    return true;
   }
 
   @Override
@@ -158,4 +158,19 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
         + ", completedLogs=" + completedLogsUrl;
   }
 
+  public TezTaskAttemptID getTaskAttemptID() {
+    return this.taskAttemptId;
+  }
+
+  public long getStartTime() {
+    return startTime;
+  }
+
+  public ContainerId getContainerId() {
+    return containerId;
+  }
+
+  public NodeId getNodeId() {
+    return nodeId;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
index dac2f9a..74c804c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
@@ -18,6 +18,7 @@
 
 package org.apache.tez.dag.history.events;
 
+import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.oldrecords.TaskState;
@@ -26,6 +27,7 @@ import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.history.ats.EntityTypes;
 import org.apache.tez.dag.history.utils.ATSConstants;
 import org.apache.tez.dag.history.utils.DAGUtils;
+import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.TaskFinishedProto;
 import org.codehaus.jettison.json.JSONArray;
@@ -44,9 +46,11 @@ public class TaskFinishedEvent implements HistoryEvent {
   private long finishTime;
   private TaskState state;
   private TezCounters tezCounters;
+  private TezTaskAttemptID successfulAttemptID;
 
   public TaskFinishedEvent(TezTaskID taskID,
       String vertexName, long startTime, long finishTime,
+      TezTaskAttemptID successfulAttemptID,
       TaskState state, TezCounters counters) {
     this.vertexName = vertexName;
     this.taskID = taskID;
@@ -103,20 +107,31 @@ public class TaskFinishedEvent implements HistoryEvent {
   }
 
   public TaskFinishedProto toProto() {
-    return TaskFinishedProto.newBuilder()
-        .setTaskId(taskID.toString())
+    TaskFinishedProto.Builder builder = TaskFinishedProto.newBuilder();
+    builder.setTaskId(taskID.toString())
         .setState(state.ordinal())
-        .setFinishTime(finishTime)
-        .setCounters(DagTypeConverters.convertTezCountersToProto(tezCounters))
-        .build();
+        .setFinishTime(finishTime);
+    if (tezCounters != null) {
+      builder.setCounters(DagTypeConverters.convertTezCountersToProto(tezCounters));
+    }
+    if (successfulAttemptID != null) {
+      builder.setSuccessfulTaskAttemptId(successfulAttemptID.toString());
+    }
+    return builder.build();
   }
 
   public void fromProto(TaskFinishedProto proto) {
     this.taskID = TezTaskID.fromString(proto.getTaskId());
     this.finishTime = proto.getFinishTime();
     this.state = TaskState.values()[proto.getState()];
-    this.tezCounters = DagTypeConverters.convertTezCountersFromProto(
-        proto.getCounters());
+    if (proto.hasCounters()) {
+      this.tezCounters = DagTypeConverters.convertTezCountersFromProto(
+          proto.getCounters());
+    }
+    if (proto.hasSuccessfulTaskAttemptId()) {
+      this.successfulAttemptID =
+          TezTaskAttemptID.fromString(proto.getSuccessfulTaskAttemptId());
+    }
   }
 
   @Override
@@ -138,9 +153,30 @@ public class TaskFinishedEvent implements HistoryEvent {
         + ", finishTime=" + finishTime
         + ", timeTaken=" + (finishTime - startTime)
         + ", status=" + state.name()
-        + ", counters="
-        + tezCounters.toString()
-            .replaceAll("\\n", ", ").replaceAll("\\s+", " ");
+        + ", successfulAttemptID=" + (successfulAttemptID == null ? "null" :
+            successfulAttemptID.toString())
+        + ", counters=" + ( tezCounters == null ? "null" :
+          tezCounters.toString()
+            .replaceAll("\\n", ", ").replaceAll("\\s+", " "));
   }
 
+  public TezTaskID getTaskID() {
+    return taskID;
+  }
+
+  public TaskState getState() {
+    return state;
+  }
+
+  public long getFinishTime() {
+    return finishTime;
+  }
+
+  public TezCounters getTezCounters() {
+    return tezCounters;
+  }
+
+  public TezTaskAttemptID getSuccessfulAttemptID() {
+    return successfulAttemptID;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java
index 9efa357..c2a380b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java
@@ -91,7 +91,7 @@ public class TaskStartedEvent implements HistoryEvent {
 
   @Override
   public boolean isRecoveryEvent() {
-    return false;
+    return true;
   }
 
   @Override
@@ -132,4 +132,16 @@ public class TaskStartedEvent implements HistoryEvent {
         + ", launchTime=" + startTime;
   }
 
+  public TezTaskID getTaskID() {
+    return taskID;
+  }
+
+  public long getScheduledTime() {
+    return scheduledTime;
+  }
+
+  public long getStartTime() {
+    return startTime;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
new file mode 100644
index 0000000..564f9ed
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.events;
+
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexCommitStartedProto;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+public class VertexCommitStartedEvent implements HistoryEvent {
+
+  private TezVertexID vertexID;
+
+  public VertexCommitStartedEvent() {
+  }
+
+  public VertexCommitStartedEvent(TezVertexID vertexId) {
+    this.vertexID = vertexId;
+  }
+
+  @Override
+  public HistoryEventType getEventType() {
+    return HistoryEventType.VERTEX_COMMIT_STARTED;
+  }
+
+  @Override
+  public JSONObject convertToATSJSON() throws JSONException {
+    // TODO
+    return null;
+  }
+
+  @Override
+  public boolean isRecoveryEvent() {
+    return true;
+  }
+
+  @Override
+  public boolean isHistoryEvent() {
+    return false;
+  }
+
+  public VertexCommitStartedProto toProto() {
+    return VertexCommitStartedProto.newBuilder()
+        .setVertexId(vertexID.toString())
+        .build();
+  }
+
+  public void fromProto(VertexCommitStartedProto proto) {
+    this.vertexID = TezVertexID.fromString(proto.getVertexId());
+  }
+
+  @Override
+  public void toProtoStream(OutputStream outputStream) throws IOException {
+    toProto().writeDelimitedTo(outputStream);
+  }
+
+  @Override
+  public void fromProtoStream(InputStream inputStream) throws IOException {
+    VertexCommitStartedProto proto = VertexCommitStartedProto.parseDelimitedFrom(inputStream);
+    fromProto(proto);
+  }
+
+  @Override
+  public String toString() {
+    return "vertexId=" + vertexID;
+  }
+
+  public TezVertexID getVertexID() {
+    return this.vertexID;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexDataMovementEventsGeneratedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexDataMovementEventsGeneratedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexDataMovementEventsGeneratedEvent.java
index 7c2a16f..035c9ca 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexDataMovementEventsGeneratedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexDataMovementEventsGeneratedEvent.java
@@ -206,4 +206,12 @@ public class VertexDataMovementEventsGeneratedEvent implements HistoryEvent {
 
   }
 
+  public TezVertexID getVertexID() {
+    return this.vertexID;
+  }
+
+  public List<TezEvent> getTezEvents() {
+    return this.events;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
index 0f2b8a1..2366eb1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
@@ -20,7 +20,7 @@ package org.apache.tez.dag.history.events;
 
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DagTypeConverters;
-import org.apache.tez.dag.api.client.VertexStatus;
+import org.apache.tez.dag.app.dag.VertexState;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.history.ats.EntityTypes;
@@ -45,13 +45,13 @@ public class VertexFinishedEvent implements HistoryEvent {
   private long startRequestedTime;
   private long startTime;
   private long finishTime;
-  private VertexStatus.State state;
+  private VertexState state;
   private String diagnostics;
   private TezCounters tezCounters;
 
   public VertexFinishedEvent(TezVertexID vertexId,
       String vertexName, long initRequestedTime, long initedTime, long startRequestedTime, long startedTime, long finishTime,
-      VertexStatus.State state, String diagnostics,
+      VertexState state, String diagnostics,
       TezCounters counters) {
     this.vertexName = vertexName;
     this.vertexID = vertexId;
@@ -111,24 +111,32 @@ public class VertexFinishedEvent implements HistoryEvent {
   }
 
   public VertexFinishedProto toProto() {
-    return VertexFinishedProto.newBuilder()
-        .setVertexName(vertexName)
+    VertexFinishedProto.Builder builder = VertexFinishedProto.newBuilder();
+    builder.setVertexName(vertexName)
         .setVertexId(vertexID.toString())
         .setState(state.ordinal())
-        .setDiagnostics(diagnostics)
-        .setFinishTime(finishTime)
-        .setCounters(DagTypeConverters.convertTezCountersToProto(tezCounters))
-        .build();
+        .setFinishTime(finishTime);
+    if (diagnostics != null) {
+      builder.setDiagnostics(diagnostics);
+    }
+    if (tezCounters != null) {
+      builder.setCounters(DagTypeConverters.convertTezCountersToProto(tezCounters));
+    }
+    return builder.build();
   }
 
   public void fromProto(VertexFinishedProto proto) {
     this.vertexName = proto.getVertexName();
     this.vertexID = TezVertexID.fromString(proto.getVertexId());
     this.finishTime = proto.getFinishTime();
-    this.state = VertexStatus.State.values()[proto.getState()];
-    this.diagnostics = proto.getDiagnostics();
-    this.tezCounters = DagTypeConverters.convertTezCountersFromProto(
-        proto.getCounters());
+    this.state = VertexState.values()[proto.getState()];
+    if (proto.hasDiagnostics())  {
+      this.diagnostics = proto.getDiagnostics();
+    }
+    if (proto.hasCounters()) {
+      this.tezCounters = DagTypeConverters.convertTezCountersFromProto(
+          proto.getCounters());
+    }
   }
 
   @Override
@@ -154,9 +162,28 @@ public class VertexFinishedEvent implements HistoryEvent {
         + ", timeTaken=" + (finishTime - startTime)
         + ", status=" + state.name()
         + ", diagnostics=" + diagnostics
-        + ", counters="
-        + tezCounters.toString()
-            .replaceAll("\\n", ", ").replaceAll("\\s+", " ");
+        + ", counters=" + ( tezCounters == null ? "null" :
+          tezCounters.toString()
+            .replaceAll("\\n", ", ").replaceAll("\\s+", " "));
   }
 
+  public TezVertexID getVertexID() {
+    return this.vertexID;
+  }
+
+  public VertexState getState() {
+    return this.state;
+  }
+
+  public long getFinishTime() {
+    return this.finishTime;
+  }
+
+  public String getDiagnostics() {
+    return diagnostics;
+  }
+
+  public TezCounters getTezCounters() {
+    return tezCounters;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
index 21c7587..e9e4a8c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
@@ -18,12 +18,19 @@
 
 package org.apache.tez.dag.history.events;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tez.dag.api.DagTypeConverters;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.records.DAGProtos.RootInputLeafOutputProto;
+import org.apache.tez.dag.app.dag.impl.RootInputLeafOutputDescriptor;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.history.ats.EntityTypes;
 import org.apache.tez.dag.history.utils.ATSConstants;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.dag.recovery.records.RecoveryProtos;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexInitializedProto;
 import org.codehaus.jettison.json.JSONArray;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
@@ -31,28 +38,35 @@ import org.codehaus.jettison.json.JSONObject;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.LinkedHashMap;
+import java.util.Map;
 
 public class VertexInitializedEvent implements HistoryEvent {
 
+  private static final Log LOG = LogFactory.getLog(VertexInitializedEvent.class);
+
   private TezVertexID vertexID;
   private String vertexName;
   private long initRequestedTime;
   private long initedTime;
-  private long numTasks;
+  private int numTasks;
   private String processorName;
+  private Map<String, RootInputLeafOutputDescriptor<InputDescriptor>> additionalInputs;
 
   public VertexInitializedEvent() {
   }
 
   public VertexInitializedEvent(TezVertexID vertexId,
       String vertexName, long initRequestedTime, long initedTime,
-      long numTasks, String processorName) {
+      int numTasks, String processorName,
+      Map<String, RootInputLeafOutputDescriptor<InputDescriptor>> additionalInputs) {
     this.vertexName = vertexName;
     this.vertexID = vertexId;
     this.initRequestedTime = initRequestedTime;
     this.initedTime = initedTime;
     this.numTasks = numTasks;
     this.processorName = processorName;
+    this.additionalInputs = additionalInputs;
   }
 
   @Override
@@ -107,8 +121,23 @@ public class VertexInitializedEvent implements HistoryEvent {
   }
 
   public RecoveryProtos.VertexInitializedProto toProto() {
-    return RecoveryProtos.VertexInitializedProto.newBuilder()
-        .setVertexId(vertexID.toString())
+    VertexInitializedProto.Builder builder = VertexInitializedProto.newBuilder();
+    if (additionalInputs != null
+      && !additionalInputs.isEmpty()) {
+      for (RootInputLeafOutputDescriptor<InputDescriptor> input :
+        additionalInputs.values()) {
+        RootInputLeafOutputProto.Builder inputBuilder
+            = RootInputLeafOutputProto.newBuilder();
+        inputBuilder.setName(input.getEntityName());
+        if (input.getInitializerClassName() != null) {
+          inputBuilder.setInitializerClassName(input.getInitializerClassName());
+        }
+        inputBuilder.setEntityDescriptor(
+            DagTypeConverters.convertToDAGPlan(input.getDescriptor()));
+        builder.addInputs(inputBuilder.build());
+      }
+    }
+    return builder.setVertexId(vertexID.toString())
         .setVertexName(vertexName)
         .setInitRequestedTime(initRequestedTime)
         .setInitTime(initedTime)
@@ -122,6 +151,20 @@ public class VertexInitializedEvent implements HistoryEvent {
     this.initRequestedTime = proto.getInitRequestedTime();
     this.initedTime = proto.getInitTime();
     this.numTasks = proto.getNumTasks();
+    if (proto.getInputsCount() > 0) {
+      this.additionalInputs =
+          new LinkedHashMap<String, RootInputLeafOutputDescriptor<InputDescriptor>>();
+      for (RootInputLeafOutputProto inputProto : proto.getInputsList()) {
+        RootInputLeafOutputDescriptor<InputDescriptor> input =
+            new RootInputLeafOutputDescriptor<InputDescriptor>(
+                inputProto.getName(),
+                DagTypeConverters.convertInputDescriptorFromDAGPlan(
+                    inputProto.getEntityDescriptor()),
+                inputProto.hasInitializerClassName() ?
+                    inputProto.getInitializerClassName() : null);
+        additionalInputs.put(input.getEntityName(), input);
+      }
+    }
   }
 
   @Override
@@ -143,7 +186,32 @@ public class VertexInitializedEvent implements HistoryEvent {
         + ", initRequestedTime=" + initRequestedTime
         + ", initedTime=" + initedTime
         + ", numTasks=" + numTasks
-        + ", processorName=" + processorName;
+        + ", processorName=" + processorName
+        + ", additionalInputsCount="
+        + (additionalInputs != null ? additionalInputs.size() : 0);
+  }
+
+  public TezVertexID getVertexID() {
+    return this.vertexID;
+  }
+
+  public long getInitRequestedTime() {
+    return initRequestedTime;
+  }
+
+  public long getInitedTime() {
+    return initedTime;
+  }
+
+  public int getNumTasks() {
+    return numTasks;
   }
 
+  public Map<String, RootInputLeafOutputDescriptor<InputDescriptor>> getAdditionalInputs() {
+    return additionalInputs;
+  }
+
+  public String getProcessorName() {
+    return processorName;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java
new file mode 100644
index 0000000..43cc787
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.events;
+
+import org.apache.tez.dag.api.DagTypeConverters;
+import org.apache.tez.dag.api.EdgeManagerDescriptor;
+import org.apache.tez.dag.api.VertexLocationHint;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.EdgeManagerDescriptorProto;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexParallelismUpdatedProto;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+public class VertexParallelismUpdatedEvent implements HistoryEvent {
+
+  private TezVertexID vertexID;
+  private int numTasks;
+  private VertexLocationHint vertexLocationHint;
+  private Map<String, EdgeManagerDescriptor> sourceEdgeManagers;
+
+  public VertexParallelismUpdatedEvent() {
+  }
+
+  public VertexParallelismUpdatedEvent(TezVertexID vertexID,
+      int numTasks, VertexLocationHint vertexLocationHint,
+      Map<String, EdgeManagerDescriptor> sourceEdgeManagers) {
+    this.vertexID = vertexID;
+    this.numTasks = numTasks;
+    this.vertexLocationHint = vertexLocationHint;
+    this.sourceEdgeManagers = sourceEdgeManagers;
+  }
+
+  @Override
+  public HistoryEventType getEventType() {
+    return HistoryEventType.VERTEX_PARALLELISM_UPDATED;
+  }
+
+  @Override
+  public JSONObject convertToATSJSON() throws JSONException {
+    throw new UnsupportedOperationException("VertexParallelismUpdatedEvent"
+        + " not a History event");
+  }
+
+  @Override
+  public boolean isRecoveryEvent() {
+    return true;
+  }
+
+  @Override
+  public boolean isHistoryEvent() {
+    return false;
+  }
+
+  public VertexParallelismUpdatedProto toProto() {
+    VertexParallelismUpdatedProto.Builder builder =
+        VertexParallelismUpdatedProto.newBuilder();
+    builder.setVertexId(vertexID.toString())
+        .setNumTasks(numTasks);
+    if (vertexLocationHint != null) {
+      builder.setVertexLocationHint(DagTypeConverters.convertVertexLocationHintToProto(
+            this.vertexLocationHint));
+    }
+    if (sourceEdgeManagers != null) {
+      for (Entry<String, EdgeManagerDescriptor> entry :
+          sourceEdgeManagers.entrySet()) {
+        EdgeManagerDescriptorProto.Builder edgeMgrBuilder =
+            EdgeManagerDescriptorProto.newBuilder();
+        edgeMgrBuilder.setEdgeName(entry.getKey());
+        edgeMgrBuilder.setEntityDescriptor(
+            DagTypeConverters.convertToDAGPlan(entry.getValue()));
+        builder.addEdgeManagerDescriptors(edgeMgrBuilder.build());
+      }
+    }
+    return builder.build();
+  }
+
+  public void fromProto(VertexParallelismUpdatedProto proto) {
+    this.vertexID = TezVertexID.fromString(proto.getVertexId());
+    this.numTasks = proto.getNumTasks();
+    if (proto.hasVertexLocationHint()) {
+      this.vertexLocationHint = DagTypeConverters.convertVertexLocationHintFromProto(
+          proto.getVertexLocationHint());
+    }
+    if (proto.getEdgeManagerDescriptorsCount() > 0) {
+      this.sourceEdgeManagers = new HashMap<String, EdgeManagerDescriptor>(
+          proto.getEdgeManagerDescriptorsCount());
+      for (EdgeManagerDescriptorProto edgeManagerProto :
+        proto.getEdgeManagerDescriptorsList()) {
+        EdgeManagerDescriptor edgeManagerDescriptor =
+            DagTypeConverters.convertEdgeManagerDescriptorFromDAGPlan(
+                edgeManagerProto.getEntityDescriptor());
+        sourceEdgeManagers.put(edgeManagerProto.getEdgeName(),
+            edgeManagerDescriptor);
+      }
+    }
+  }
+
+  @Override
+  public void toProtoStream(OutputStream outputStream) throws IOException {
+    toProto().writeDelimitedTo(outputStream);
+  }
+
+  @Override
+  public void fromProtoStream(InputStream inputStream) throws IOException {
+    VertexParallelismUpdatedProto proto = VertexParallelismUpdatedProto.parseDelimitedFrom(inputStream);
+    fromProto(proto);
+  }
+
+  @Override
+  public String toString() {
+    return "vertexId=" + vertexID
+        + ", numTasks=" + numTasks
+        + ", vertexLocationHint=" +
+        (vertexLocationHint == null? "null" : vertexLocationHint)
+        + ", edgeManagersCount=" +
+        (sourceEdgeManagers == null? "null" : sourceEdgeManagers.size());
+  }
+
+  public TezVertexID getVertexID() {
+    return this.vertexID;
+  }
+
+  public int getNumTasks() {
+    return numTasks;
+  }
+
+  public VertexLocationHint getVertexLocationHint() {
+    return vertexLocationHint;
+  }
+
+  public Map<String, EdgeManagerDescriptor> getSourceEdgeManagers() {
+    return sourceEdgeManagers;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
index 6bb383d..e6023f1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
@@ -88,7 +88,7 @@ public class VertexStartedEvent implements HistoryEvent {
 
   @Override
   public boolean isRecoveryEvent() {
-    return false;
+    return true;
   }
 
   @Override
@@ -128,4 +128,16 @@ public class VertexStartedEvent implements HistoryEvent {
         + ", startedTime=" + startTime;
   }
 
+  public TezVertexID getVertexID() {
+    return this.vertexID;
+  }
+
+  public long getStartRequestedTime() {
+    return startRequestedTime;
+  }
+
+  public long getStartTime() {
+    return startTime;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
index 807ad81..5986657 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
@@ -34,9 +34,11 @@ import org.apache.tez.dag.history.events.DAGSubmittedEvent;
 import org.apache.tez.dag.records.TezDAGID;
 
 import java.io.IOException;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -49,6 +51,7 @@ public class RecoveryService extends AbstractService {
   private LinkedBlockingQueue<DAGHistoryEvent> eventQueue =
       new LinkedBlockingQueue<DAGHistoryEvent>();
   private Set<TezDAGID> completedDAGs = new HashSet<TezDAGID>();
+  private Set<TezDAGID> skippedDAGs = new HashSet<TezDAGID>();
 
   private Thread eventHandlingThread;
   private AtomicBoolean stopped = new AtomicBoolean(false);
@@ -60,9 +63,12 @@ public class RecoveryService extends AbstractService {
   Path recoveryPath;
   Map<TezDAGID, FSDataOutputStream> outputStreamMap = new
       HashMap<TezDAGID, FSDataOutputStream>();
-  // FSDataOutputStream metaInfoStream;
   private int bufferSize;
   private FSDataOutputStream summaryStream;
+  private int unflushedEventsCount = 0;
+  private long lastFlushTime = -1;
+  private int maxUnflushedEvents;
+  private int flushInterval;
 
   public RecoveryService(AppContext appContext) {
     super(RecoveryService.class.getName());
@@ -76,11 +82,17 @@ public class RecoveryService extends AbstractService {
     recoveryDirFS = FileSystem.get(recoveryPath.toUri(), conf);
     bufferSize = conf.getInt(TezConfiguration.DAG_RECOVERY_FILE_IO_BUFFER_SIZE,
         TezConfiguration.DAG_RECOVERY_FILE_IO_BUFFER_SIZE_DEFAULT);
+
+    flushInterval = conf.getInt(TezConfiguration.DAG_RECOVERY_FLUSH_INTERVAL_SECS,
+        TezConfiguration.DAG_RECOVERY_FLUSH_INTERVAL_SECS_DEFAULT);
+    maxUnflushedEvents = conf.getInt(TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS,
+        TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS_DEFAULT);
   }
 
   @Override
   public void serviceStart() {
     LOG.info("Starting RecoveryService");
+    lastFlushTime = appContext.getClock().getTime();
     eventHandlingThread = new Thread(new Runnable() {
       @Override
       public void run() {
@@ -128,18 +140,21 @@ public class RecoveryService extends AbstractService {
     if (eventHandlingThread != null) {
       eventHandlingThread.interrupt();
     }
+
     if (summaryStream != null) {
       try {
-        summaryStream.flush();
+        LOG.info("Closing Summary Stream");
+        summaryStream.hsync();
         summaryStream.close();
       } catch (IOException ioe) {
         LOG.warn("Error when closing summary stream", ioe);
       }
     }
-    for (FSDataOutputStream outputStream : outputStreamMap.values()) {
+    for (Entry<TezDAGID, FSDataOutputStream> entry : outputStreamMap.entrySet()) {
       try {
-        outputStream.flush();
-        outputStream.close();
+        LOG.info("Closing Output Stream for DAG " + entry.getKey());
+        entry.getValue().hsync();
+        entry.getValue().close();
       } catch (IOException ioe) {
         LOG.warn("Error when closing output stream", ioe);
       }
@@ -152,38 +167,44 @@ public class RecoveryService extends AbstractService {
           + event.getHistoryEvent().getEventType());
       return;
     }
+    HistoryEventType eventType = event.getHistoryEvent().getEventType();
     if (!started.get()) {
+      LOG.warn("Adding event of type " + eventType
+          + " to queue as service not started");
       eventQueue.add(event);
       return;
     }
-    HistoryEventType eventType = event.getHistoryEvent().getEventType();
+
     if (eventType.equals(HistoryEventType.DAG_SUBMITTED)
       || eventType.equals(HistoryEventType.DAG_FINISHED)) {
       // handle submissions and completion immediately
       synchronized (lock) {
         try {
           handleEvent(event);
-          summaryStream.flush();
+          summaryStream.hsync();
           if (eventType.equals(HistoryEventType.DAG_SUBMITTED)) {
-            outputStreamMap.get(event.getDagID()).flush();
+            if (outputStreamMap.containsKey(event.getDagID())) {
+              doFlush(outputStreamMap.get(event.getDagID()),
+                  appContext.getClock().getTime(), true);
+            }
           } else if (eventType.equals(HistoryEventType.DAG_FINISHED)) {
             completedDAGs.add(event.getDagID());
             if (outputStreamMap.containsKey(event.getDagID())) {
               try {
-                outputStreamMap.get(event.getDagID()).flush();
+                doFlush(outputStreamMap.get(event.getDagID()),
+                    appContext.getClock().getTime(), true);
                 outputStreamMap.get(event.getDagID()).close();
                 outputStreamMap.remove(event.getDagID());
               } catch (IOException ioe) {
                 LOG.warn("Error when trying to flush/close recovery file for"
                     + " dag, dagId=" + event.getDagID());
+                // FIXME handle error ?
               }
-            } else {
-              // TODO this is an error
             }
           }
         } catch (Exception e) {
-            // TODO handle failures - treat as fatal or ignore?
-            LOG.warn("Error handling recovery event", e);
+          // FIXME handle failures
+          LOG.warn("Error handling recovery event", e);
         }
       }
       LOG.info("DAG completed"
@@ -191,6 +212,9 @@ public class RecoveryService extends AbstractService {
           + ", queueSize=" + eventQueue.size());
     } else {
       // All other events just get queued
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Queueing Recovery event of type " + eventType.name());
+      }
       eventQueue.add(event);
     }
   }
@@ -206,27 +230,42 @@ public class RecoveryService extends AbstractService {
       // AM event
       // anything to be done?
       // TODO
+      LOG.info("Skipping Recovery Event as DAG is null"
+          + ", eventType=" + event.getHistoryEvent().getEventType());
       return;
     }
 
     TezDAGID dagID = event.getDagID();
-    if (completedDAGs.contains(dagID)) {
-      // Skip events for completed DAGs
+    if (completedDAGs.contains(dagID)
+        || skippedDAGs.contains(dagID)) {
+      // Skip events for completed and skipped DAGs
       // no need to recover completed DAGs
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Skipping Recovery Event as either completed or skipped"
+            + ", dagId=" + dagID
+            + ", completed=" + completedDAGs.contains(dagID)
+            + ", skipped=" + skippedDAGs.contains(dagID)
+            + ", eventType=" + event.getHistoryEvent().getEventType());
+      }
       return;
     }
 
     try {
 
-      if (eventType.equals(HistoryEventType.DAG_SUBMITTED)
-          || eventType.equals(HistoryEventType.DAG_FINISHED)) {
-        if (summaryStream == null) {
-          Path summaryPath = new Path(recoveryPath,
-              appContext.getApplicationID()
-              + TezConfiguration.DAG_RECOVERY_SUMMARY_FILE_SUFFIX);
+      if (summaryStream == null) {
+        Path summaryPath = new Path(recoveryPath,
+            appContext.getApplicationID()
+                + TezConfiguration.DAG_RECOVERY_SUMMARY_FILE_SUFFIX);
+        if (!recoveryDirFS.exists(summaryPath)) {
           summaryStream = recoveryDirFS.create(summaryPath, false,
               bufferSize);
+        } else {
+          summaryStream = recoveryDirFS.append(summaryPath, bufferSize);
         }
+      }
+
+      if (eventType.equals(HistoryEventType.DAG_SUBMITTED)
+          || eventType.equals(HistoryEventType.DAG_FINISHED)) {
         if (eventType.equals(HistoryEventType.DAG_SUBMITTED)) {
           DAGSubmittedEvent dagSubmittedEvent =
               (DAGSubmittedEvent) event.getHistoryEvent();
@@ -235,33 +274,97 @@ public class RecoveryService extends AbstractService {
               && dagName.startsWith(
               TezConfiguration.TEZ_PREWARM_DAG_NAME_PREFIX)) {
             // Skip recording pre-warm DAG events
+            skippedDAGs.add(dagID);
             return;
           }
-          Path dagFilePath = new Path(recoveryPath,
-              dagID.toString() + TezConfiguration.DAG_RECOVERY_RECOVER_FILE_SUFFIX);
-          FSDataOutputStream outputStream =
-              recoveryDirFS.create(dagFilePath, false, bufferSize);
-          outputStreamMap.put(dagID, outputStream);
         }
+        SummaryEvent summaryEvent = (SummaryEvent) event.getHistoryEvent();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Writing recovery event to summary stream"
+              + ", dagId=" + dagID
+              + ", type="
+              + event.getHistoryEvent().getEventType());
+        }
+        summaryEvent.toSummaryProtoStream(summaryStream);
+      }
 
-        if (outputStreamMap.containsKey(dagID)) {
-          SummaryEvent summaryEvent = (SummaryEvent) event.getHistoryEvent();
-          summaryEvent.toSummaryProtoStream(summaryStream);
+      if (!outputStreamMap.containsKey(dagID)) {
+        Path dagFilePath = new Path(recoveryPath,
+            dagID.toString() + TezConfiguration.DAG_RECOVERY_RECOVER_FILE_SUFFIX);
+        FSDataOutputStream outputStream;
+        if (recoveryDirFS.exists(dagFilePath)) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Opening DAG recovery file in append mode"
+                + ", filePath=" + dagFilePath);
+          }
+          outputStream = recoveryDirFS.append(dagFilePath, bufferSize);
+        } else {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Opening DAG recovery file in create mode"
+                + ", filePath=" + dagFilePath);
+          }
+          outputStream = recoveryDirFS.create(dagFilePath, false, bufferSize);
         }
+        outputStreamMap.put(dagID, outputStream);
       }
 
       FSDataOutputStream outputStream = outputStreamMap.get(dagID);
-      if (outputStream == null) {
-        return;
-      }
 
-      outputStream.write(event.getHistoryEvent().getEventType().ordinal());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Writing recovery event to output stream"
+            + ", dagId=" + dagID
+            + ", type="
+            + event.getHistoryEvent().getEventType());
+      }
+      ++unflushedEventsCount;
+      outputStream.writeInt(event.getHistoryEvent().getEventType().ordinal());
       event.getHistoryEvent().toProtoStream(outputStream);
+      if (!EnumSet.of(HistoryEventType.DAG_SUBMITTED,
+          HistoryEventType.DAG_FINISHED).contains(eventType)) {
+        maybeFlush(outputStream);
+      }
     } catch (IOException ioe) {
-      // TODO handle failures - treat as fatal or ignore?
+      // FIXME handle failures
       LOG.warn("Failed to write to stream", ioe);
     }
 
   }
 
+  private void maybeFlush(FSDataOutputStream outputStream) throws IOException {
+    long currentTime = appContext.getClock().getTime();
+    boolean doFlush = false;
+    if (unflushedEventsCount >= maxUnflushedEvents) {
+      doFlush = true;
+    } else if (flushInterval >= 0
+        && ((currentTime - lastFlushTime) >= (flushInterval*1000))) {
+      doFlush = true;
+    }
+
+    if (!doFlush) {
+      return;
+    }
+
+    doFlush(outputStream, currentTime, false);
+  }
+
+  private void doFlush(FSDataOutputStream outputStream,
+      long currentTime, boolean sync) throws IOException {
+    if (sync) {
+      outputStream.hsync();
+    } else {
+      outputStream.hflush();
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Flushing output stream"
+          + ", lastTimeSinceFLush=" + lastFlushTime
+          + ", unflushedEventsCount=" + unflushedEventsCount
+          + ", maxUnflushedEvents=" + maxUnflushedEvents
+          + ", currentTime=" + currentTime);
+    }
+
+    unflushedEventsCount = 0;
+    lastFlushTime = currentTime;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/recovery/RecoveryParser.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/recovery/RecoveryParser.java b/tez-dag/src/main/java/org/apache/tez/dag/recovery/RecoveryParser.java
deleted file mode 100644
index 5b87bc8..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/recovery/RecoveryParser.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.recovery;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.history.HistoryEvent;
-import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.history.events.AMLaunchedEvent;
-import org.apache.tez.dag.history.events.AMStartedEvent;
-import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
-import org.apache.tez.dag.history.events.DAGFinishedEvent;
-import org.apache.tez.dag.history.events.DAGInitializedEvent;
-import org.apache.tez.dag.history.events.DAGStartedEvent;
-import org.apache.tez.dag.history.events.DAGSubmittedEvent;
-import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
-import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
-import org.apache.tez.dag.history.events.TaskFinishedEvent;
-import org.apache.tez.dag.history.events.TaskStartedEvent;
-import org.apache.tez.dag.history.events.VertexDataMovementEventsGeneratedEvent;
-import org.apache.tez.dag.history.events.VertexFinishedEvent;
-import org.apache.tez.dag.history.events.VertexInitializedEvent;
-import org.apache.tez.dag.history.events.VertexStartedEvent;
-import org.apache.tez.dag.recovery.records.RecoveryProtos;
-
-import java.io.IOException;
-
-public class RecoveryParser {
-
-  private static final Log LOG = LogFactory.getLog(RecoveryParser.class);
-
-  Path recoveryDirectory;
-  FileSystem recoveryDirFS;
-
-  public RecoveryParser(Path recoveryDirectory, Configuration conf)
-      throws IOException {
-    this.recoveryDirectory = recoveryDirectory;
-    recoveryDirFS = FileSystem.get(recoveryDirectory.toUri(), conf);
-
-  }
-
-  public void parse() throws IOException {
-    RemoteIterator<LocatedFileStatus> locatedFilesStatus =
-        recoveryDirFS.listFiles(recoveryDirectory, false);
-    while (locatedFilesStatus.hasNext()) {
-      LocatedFileStatus fileStatus = locatedFilesStatus.next();
-      String fileName = fileStatus.getPath().getName();
-      if (fileName.endsWith(TezConfiguration.DAG_RECOVERY_RECOVER_FILE_SUFFIX)) {
-        FSDataInputStream inputStream =
-            recoveryDirFS.open(fileStatus.getPath());
-        LOG.info("Parsing DAG file " + fileName);
-        parseDAGRecoveryFile(inputStream);
-      } else if (fileName.endsWith(TezConfiguration.DAG_RECOVERY_SUMMARY_FILE_SUFFIX)) {
-        FSDataInputStream inputStream =
-            recoveryDirFS.open(fileStatus.getPath());
-        LOG.info("Parsing Summary file " + fileName);
-        parseSummaryFile(inputStream);
-      } else {
-        LOG.warn("Encountered unknown file in recovery dir, fileName="
-            + fileName);
-        continue;
-      }
-    }
-  }
-
-  private void parseSummaryFile(FSDataInputStream inputStream)
-      throws IOException {
-    int counter = 0;
-    while (inputStream.available() > 0) {
-      RecoveryProtos.SummaryEventProto proto =
-          RecoveryProtos.SummaryEventProto.parseDelimitedFrom(inputStream);
-      LOG.info("[SUMMARY]"
-          + " dagId=" + proto.getDagId()
-          + ", timestamp=" + proto.getTimestamp()
-          + ", event=" + HistoryEventType.values()[proto.getEventType()]);
-    }
-  }
-
-  private void parseDAGRecoveryFile(FSDataInputStream inputStream)
-      throws IOException {
-    int counter = 0;
-    while (inputStream.available() > 0) {
-      int eventTypeOrdinal = inputStream.read();
-      if (eventTypeOrdinal < 0 || eventTypeOrdinal >=
-          HistoryEventType.values().length) {
-        // Corrupt data
-        // reached end
-        LOG.warn("Corrupt data found when trying to read next event type");
-        break;
-      }
-      HistoryEventType eventType = HistoryEventType.values()[eventTypeOrdinal];
-      HistoryEvent event = null;
-      switch (eventType) {
-        case AM_LAUNCHED:
-          event = new AMLaunchedEvent();
-          break;
-        case AM_STARTED:
-          event = new AMStartedEvent();
-          break;
-        case DAG_SUBMITTED:
-          event = new DAGSubmittedEvent();
-          break;
-        case DAG_INITIALIZED:
-          event = new DAGInitializedEvent();
-          break;
-        case DAG_STARTED:
-          event = new DAGStartedEvent();
-          break;
-        case DAG_FINISHED:
-          event = new DAGFinishedEvent();
-          break;
-        case CONTAINER_LAUNCHED:
-          event = new ContainerLaunchedEvent();
-          break;
-        case VERTEX_INITIALIZED:
-          event = new VertexInitializedEvent();
-          break;
-        case VERTEX_STARTED:
-          event = new VertexStartedEvent();
-          break;
-        case VERTEX_FINISHED:
-          event = new VertexFinishedEvent();
-          break;
-        case TASK_STARTED:
-          event = new TaskStartedEvent();
-          break;
-        case TASK_FINISHED:
-          event = new TaskFinishedEvent();
-          break;
-        case TASK_ATTEMPT_STARTED:
-          event = new TaskAttemptStartedEvent();
-          break;
-        case TASK_ATTEMPT_FINISHED:
-          event = new TaskAttemptFinishedEvent();
-          break;
-        case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
-          event = new VertexDataMovementEventsGeneratedEvent();
-          break;
-        default:
-          throw new IOException("Invalid data found, unknown event type "
-              + eventType);
-
-      }
-      ++counter;
-      LOG.info("Parsing event from input stream"
-          + ", eventType=" + eventType
-          + ", eventIndex=" + counter);
-      event.fromProtoStream(inputStream);
-      LOG.info("Parsed event from input stream"
-          + ", eventType=" + eventType
-          + ", eventIndex=" + counter
-          + ", event=" + event.toString());
-    }
-  }
-
-  public static void main(String argv[]) throws IOException {
-    // TODO clean up with better usage and error handling
-    Configuration conf = new Configuration();
-    String dir = argv[0];
-    RecoveryParser parser = new RecoveryParser(new Path(dir), conf);
-    parser.parse();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/proto/HistoryEvents.proto
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/proto/HistoryEvents.proto b/tez-dag/src/main/proto/HistoryEvents.proto
index e21f5df..65f3aaf 100644
--- a/tez-dag/src/main/proto/HistoryEvents.proto
+++ b/tez-dag/src/main/proto/HistoryEvents.proto
@@ -56,6 +56,9 @@ message DAGStartedProto {
   optional int64 start_time = 3;
 }
 
+message DAGCommitStartedProto {
+  optional string dag_id = 1;
+}
 message DAGFinishedProto {
   optional string dag_id = 1;
   optional int64 finish_time = 2;
@@ -69,7 +72,8 @@ message VertexInitializedProto {
   optional string vertex_id = 2;
   optional int64 init_requested_time = 3;
   optional int64 init_time = 4;
-  optional int64 num_tasks = 5;
+  optional int32 num_tasks = 5;
+  repeated RootInputLeafOutputProto inputs = 6;
 }
 
 message VertexStartedProto {
@@ -79,6 +83,22 @@ message VertexStartedProto {
   optional int64 start_time = 4;
 }
 
+message EdgeManagerDescriptorProto {
+  optional string edge_name = 1;
+  optional TezEntityDescriptorProto entity_descriptor = 2;
+}
+
+message VertexParallelismUpdatedProto {
+  optional string vertex_id = 1;
+  optional int32 num_tasks = 2;
+  optional VertexLocationHintProto vertex_location_hint = 3;
+  repeated EdgeManagerDescriptorProto edge_manager_descriptors = 4;
+}
+
+message VertexCommitStartedProto {
+  optional string vertex_id = 1;
+}
+
 message VertexFinishedProto {
   optional string vertex_name = 1;
   optional string vertex_id = 2;
@@ -100,6 +120,7 @@ message TaskFinishedProto {
   optional int32 state = 3;
   optional string diagnostics = 4;
   optional TezCountersProto counters = 5;
+  optional string successful_task_attempt_id = 6;
 }
 
 message TaskAttemptStartedProto {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestVertexStatusBuilder.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestVertexStatusBuilder.java b/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestVertexStatusBuilder.java
index e965bca..0a3b9e0 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestVertexStatusBuilder.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestVertexStatusBuilder.java
@@ -18,6 +18,7 @@
 
 package org.apache.tez.dag.api.client;
 
+import org.apache.tez.dag.api.client.VertexStatus.State;
 import org.apache.tez.dag.api.records.DAGProtos;
 import org.apache.tez.dag.app.dag.VertexState;
 import org.junit.Assert;
@@ -32,7 +33,12 @@ public class TestVertexStatusBuilder {
           VertexStatusBuilder.getProtoState(state);
       VertexStatus.State clientState =
           VertexStatus.getState(stateProto);
-      Assert.assertEquals(state.name(), clientState.name());
+      if (state.equals(VertexState.RECOVERING)) {
+        Assert.assertEquals(clientState.name(),
+            State.NEW.name());
+      } else {
+        Assert.assertEquals(state.name(), clientState.name());
+      }
     }
   }
 


[3/4] TEZ-847. Support basic AM recovery. (hitesh)

Posted by hi...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 7fe07af..9572f6c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -25,6 +25,7 @@ import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -84,6 +85,7 @@ import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
 import org.apache.tez.dag.app.rm.AMSchedulerEventTAEnded;
 import org.apache.tez.dag.app.rm.AMSchedulerEventTALaunchRequest;
 import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
 import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
 import org.apache.tez.dag.records.TezDAGID;
@@ -91,6 +93,7 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.dag.utils.TezBuilderUtils;
+import org.apache.tez.runtime.api.OutputCommitter;
 import org.apache.tez.runtime.api.events.InputFailedEvent;
 import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
@@ -193,6 +196,14 @@ public class TaskAttemptImpl implements TaskAttempt,
           TaskAttemptEventType.TA_KILL_REQUEST,
           new TerminateTransition(KILLED_HELPER))
 
+      .addTransition(TaskAttemptStateInternal.NEW,
+          EnumSet.of(TaskAttemptStateInternal.NEW,
+              TaskAttemptStateInternal.RUNNING,
+              TaskAttemptStateInternal.KILLED,
+              TaskAttemptStateInternal.FAILED,
+              TaskAttemptStateInternal.SUCCEEDED),
+          TaskAttemptEventType.TA_RECOVER, new RecoverTransition())
+
       .addTransition(TaskAttemptStateInternal.START_WAIT,
           TaskAttemptStateInternal.RUNNING,
           TaskAttemptEventType.TA_STARTED_REMOTELY, new StartedTransition())
@@ -462,6 +473,8 @@ public class TaskAttemptImpl implements TaskAttempt,
 
         .installTopology();
 
+  private TaskAttemptState recoveredState = TaskAttemptState.NEW;
+  private boolean recoveryStartEventSeen = false;
 
   @SuppressWarnings("rawtypes")
   public TaskAttemptImpl(TezTaskID taskId, int attemptNumber, EventHandler eventHandler,
@@ -782,6 +795,39 @@ public class TaskAttemptImpl implements TaskAttempt,
     return isRescheduled;
   }
 
+  @Override
+  public TaskAttemptState restoreFromEvent(HistoryEvent historyEvent) {
+    switch (historyEvent.getEventType()) {
+      case TASK_ATTEMPT_STARTED:
+      {
+        TaskAttemptStartedEvent tEvent = (TaskAttemptStartedEvent) historyEvent;
+        this.launchTime = tEvent.getStartTime();
+        recoveryStartEventSeen = true;
+        recoveredState = TaskAttemptState.RUNNING;
+        return recoveredState;
+      }
+      case TASK_ATTEMPT_FINISHED:
+      {
+        if (!recoveryStartEventSeen) {
+          throw new RuntimeException("Finished Event seen but"
+              + " no Started Event was encountered earlier");
+        }
+        TaskAttemptFinishedEvent tEvent = (TaskAttemptFinishedEvent) historyEvent;
+        this.finishTime = tEvent.getFinishTime();
+        this.reportedStatus.counters = tEvent.getCounters();
+        this.reportedStatus.progress = 1f;
+        this.reportedStatus.state = tEvent.getState();
+        this.diagnostics.add(tEvent.getDiagnostics());
+        this.recoveredState = tEvent.getState();
+        return recoveredState;
+      }
+      default:
+        throw new RuntimeException("Unexpected event received for restoring"
+            + " state, eventType=" + historyEvent.getEventType());
+
+    }
+  }
+
   @SuppressWarnings("unchecked")
   private void sendEvent(Event<?> event) {
     this.eventHandler.handle(event);
@@ -1366,7 +1412,46 @@ public class TaskAttemptImpl implements TaskAttempt,
     }
 
   }
-  
+
+  protected static class RecoverTransition implements
+      MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent, TaskAttemptStateInternal> {
+
+    @Override
+    public TaskAttemptStateInternal transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent taskAttemptEvent) {
+      TaskAttemptStateInternal endState = TaskAttemptStateInternal.FAILED;
+      switch(taskAttempt.recoveredState) {
+        case NEW:
+        case RUNNING:
+          // FIXME once running containers can be recovered, this
+          // should be handled differently
+          // TODO abort taskattempt
+          taskAttempt.sendEvent(new TaskEventTAUpdate(taskAttempt.attemptId,
+              TaskEventType.T_ATTEMPT_FAILED));
+          endState = TaskAttemptStateInternal.FAILED;
+          break;
+        case SUCCEEDED:
+          // Do not inform Task as it already knows about completed attempts
+          endState = TaskAttemptStateInternal.SUCCEEDED;
+          break;
+        case FAILED:
+          // Do not inform Task as it already knows about completed attempts
+          endState = TaskAttemptStateInternal.FAILED;
+          break;
+        case KILLED:
+          // Do not inform Task as it already knows about completed attempts
+          endState = TaskAttemptStateInternal.KILLED;
+          break;
+        default:
+          throw new RuntimeException("Failed to recover from non-handled state"
+              + ", taskAttemptId=" + taskAttempt.getID()
+              + ", state=" + taskAttempt.recoveredState);
+      }
+
+      return endState;
+    }
+
+  }
+
   protected static class TerminatedAfterSuccessTransition implements
       MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent, TaskAttemptStateInternal> {
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 793c12a..16c063a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -24,6 +24,7 @@ import java.util.EnumSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -63,7 +64,9 @@ import org.apache.tez.dag.app.dag.event.DAGEvent;
 import org.apache.tez.dag.app.dag.event.DAGEventDiagnosticsUpdate;
 import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
 import org.apache.tez.dag.app.dag.event.DAGEventType;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventKillRequest;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.dag.event.TaskEvent;
 import org.apache.tez.dag.app.dag.event.TaskEventAddTezEvent;
 import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
@@ -74,11 +77,15 @@ import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule;
 import org.apache.tez.dag.app.rm.container.AMContainer;
 import org.apache.tez.dag.app.rm.node.AMNodeEventTaskAttemptEnded;
 import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
 import org.apache.tez.dag.history.events.TaskFinishedEvent;
 import org.apache.tez.dag.history.events.TaskStartedEvent;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.OutputCommitter;
 import org.apache.tez.runtime.api.impl.TezEvent;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -130,6 +137,9 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
   private static final SingleArcTransition<TaskImpl, TaskEvent>
      ADD_TEZ_EVENT_TRANSITION = new AddTezEventTransition();
 
+  // Recovery related flags
+  boolean recoveryStartEventSeen = false;
+
   private static final StateMachineFactory
                <TaskImpl, TaskStateInternal, TaskEventType, TaskEvent>
             stateMachineFactory
@@ -142,11 +152,18 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     .addTransition(TaskStateInternal.NEW, TaskStateInternal.SCHEDULED,
         TaskEventType.T_SCHEDULE, new InitialScheduleTransition())
     .addTransition(TaskStateInternal.NEW, TaskStateInternal.KILLED,
-            TaskEventType.T_TERMINATE,
-            new KillNewTransition())
+        TaskEventType.T_TERMINATE,
+        new KillNewTransition())
     .addTransition(TaskStateInternal.NEW, TaskStateInternal.NEW,
         TaskEventType.T_ADD_TEZ_EVENT, ADD_TEZ_EVENT_TRANSITION)
 
+    // Recover transition
+    .addTransition(TaskStateInternal.NEW,
+        EnumSet.of(TaskStateInternal.NEW,
+            TaskStateInternal.SCHEDULED,
+            TaskStateInternal.RUNNING, TaskStateInternal.SUCCEEDED,
+            TaskStateInternal.FAILED, TaskStateInternal.KILLED),
+        TaskEventType.T_RECOVER, new RecoverTransition())
 
     // Transitions from SCHEDULED state
       //when the first attempt is launched, the task state is set to RUNNING
@@ -154,7 +171,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
          TaskEventType.T_ATTEMPT_LAUNCHED, new LaunchTransition())
      .addTransition(TaskStateInternal.SCHEDULED, TaskStateInternal.KILL_WAIT,
          TaskEventType.T_TERMINATE,
-             KILL_TRANSITION)
+         KILL_TRANSITION)
      .addTransition(TaskStateInternal.SCHEDULED, TaskStateInternal.SCHEDULED,
          TaskEventType.T_ATTEMPT_KILLED, ATTEMPT_KILLED_TRANSITION)
      .addTransition(TaskStateInternal.SCHEDULED,
@@ -190,7 +207,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
         KILL_TRANSITION)
     .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING,
         TaskEventType.T_ADD_TEZ_EVENT, ADD_TEZ_EVENT_TRANSITION)
-
+    .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING,
+        TaskEventType.T_SCHEDULE)
 
     // Transitions from KILL_WAIT state
     .addTransition(TaskStateInternal.KILL_WAIT,
@@ -235,6 +253,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
             TaskEventType.T_TERMINATE,
             TaskEventType.T_ATTEMPT_SUCCEEDED, // Maybe track and reuse later
             TaskEventType.T_ATTEMPT_LAUNCHED))
+    .addTransition(TaskStateInternal.SUCCEEDED, TaskStateInternal.SUCCEEDED,
+        TaskEventType.T_SCHEDULE)
 
     // Transitions from FAILED state
     .addTransition(TaskStateInternal.FAILED, TaskStateInternal.FAILED,
@@ -251,6 +271,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
         EnumSet.of(
             TaskEventType.T_TERMINATE,
             TaskEventType.T_ADD_SPEC_ATTEMPT))
+    .addTransition(TaskStateInternal.FAILED, TaskStateInternal.FAILED,
+        TaskEventType.T_SCHEDULE)
 
     // create the topology tables
     .installTopology();
@@ -292,6 +314,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
   private int finishedAttempts;//finish are total of success, failed and killed
 
   private final boolean leafVertex;
+  private TaskState recoveredState = TaskState.NEW;
 
   @Override
   public TaskState getState() {
@@ -507,6 +530,92 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     return diagnostics;
   }
 
+  private TaskAttempt createRecoveredEvent(TaskAttemptStartedEvent
+      taskAttemptStartedEvent) {
+    TaskAttempt taskAttempt = createAttempt(
+        taskAttemptStartedEvent.getTaskAttemptID().getId());
+    return taskAttempt;
+  }
+
+  @Override
+  public TaskState restoreFromEvent(HistoryEvent historyEvent) {
+    switch (historyEvent.getEventType()) {
+      case TASK_STARTED:
+      {
+        TaskStartedEvent tEvent = (TaskStartedEvent) historyEvent;
+        recoveryStartEventSeen = true;
+        this.scheduledTime = tEvent.getScheduledTime();
+        if (this.attempts == null
+            || this.attempts.isEmpty()) {
+          this.attempts = new LinkedHashMap<TezTaskAttemptID, TaskAttempt>();
+        }
+        recoveredState = TaskState.SCHEDULED;
+        finishedAttempts = 0;
+        return recoveredState;
+      }
+      case TASK_FINISHED:
+      {
+        if (!recoveryStartEventSeen) {
+          throw new RuntimeException("Finished Event seen but"
+              + " no Started Event was encountered earlier");
+        }
+        TaskFinishedEvent tEvent = (TaskFinishedEvent) historyEvent;
+        recoveredState = tEvent.getState();
+        if (tEvent.getState() == TaskState.SUCCEEDED
+            && tEvent.getSuccessfulAttemptID() != null) {
+          successfulAttempt = tEvent.getSuccessfulAttemptID();
+        }
+        return recoveredState;
+      }
+      case TASK_ATTEMPT_STARTED:
+      {
+        TaskAttemptStartedEvent taskAttemptStartedEvent =
+            (TaskAttemptStartedEvent) historyEvent;
+        TaskAttempt recoveredAttempt = createRecoveredEvent(taskAttemptStartedEvent);
+        recoveredAttempt.restoreFromEvent(taskAttemptStartedEvent);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Adding restored attempt into known attempts map"
+              + ", taskAttemptId=" + taskAttemptStartedEvent.getTaskAttemptID());
+        }
+        this.attempts.put(taskAttemptStartedEvent.getTaskAttemptID(),
+            recoveredAttempt);
+        ++numberUncompletedAttempts;
+        this.recoveredState = TaskState.RUNNING;
+        return recoveredState;
+      }
+      case TASK_ATTEMPT_FINISHED:
+      {
+        finishedAttempts++;
+        --numberUncompletedAttempts;
+        if (numberUncompletedAttempts < 0) {
+          throw new RuntimeException("Invalid recovery event for attempt finished"
+              + ", more completions than starts encountered"
+              + ", finishedAttempts=" + finishedAttempts
+              + ", incompleteAttempts=" + numberUncompletedAttempts);
+        }
+        TaskAttemptFinishedEvent taskAttemptFinishedEvent =
+            (TaskAttemptFinishedEvent) historyEvent;
+        TaskAttempt taskAttempt = this.attempts.get(
+            taskAttemptFinishedEvent.getTaskAttemptID());
+        if (taskAttempt == null) {
+          throw new RuntimeException("Could not find task attempt"
+              + " when trying to recover"
+              + ", taskAttemptId=" + taskAttemptFinishedEvent.getTaskAttemptID());
+        }
+        TaskAttemptState taskAttemptState = taskAttempt.restoreFromEvent(
+            taskAttemptFinishedEvent);
+        if (taskAttemptState.equals(TaskAttemptState.SUCCEEDED)) {
+          recoveredState = TaskState.SUCCEEDED;
+          successfulAttempt = taskAttempt.getID();
+        }
+        return recoveredState;
+      }
+      default:
+        throw new RuntimeException("Unexpected event received for restoring"
+            + " state, eventType=" + historyEvent.getEventType());
+    }
+  }
+
   @VisibleForTesting
   public TaskStateInternal getInternalState() {
     readLock.lock();
@@ -851,6 +960,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     // is called from within a transition
     TaskFinishedEvent finishEvt = new TaskFinishedEvent(taskId,
         getVertex().getName(), getLaunchTime(), clock.getTime(),
+        successfulAttempt,
         TaskState.SUCCEEDED, getCounters());
     this.appContext.getHistoryHandler().handle(
         new DAGHistoryEvent(taskId.getVertexID().getDAGId(), finishEvt));
@@ -858,7 +968,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
 
   protected void logJobHistoryTaskFailedEvent(TaskState finalState) {
     TaskFinishedEvent finishEvt = new TaskFinishedEvent(taskId,
-        getVertex().getName(), getLaunchTime(), clock.getTime(),
+        getVertex().getName(), getLaunchTime(), clock.getTime(), null,
         finalState, getCounters());
     this.appContext.getHistoryHandler().handle(
         new DAGHistoryEvent(taskId.getVertexID().getDAGId(), finishEvt));
@@ -992,6 +1102,120 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     }
   }
 
+  private static class RecoverTransition implements
+      MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> {
+
+    @Override
+    public TaskStateInternal transition(TaskImpl task, TaskEvent taskEvent) {
+      TaskStateInternal endState = TaskStateInternal.NEW;
+      if (task.attempts != null) {
+        for (TaskAttempt taskAttempt : task.attempts.values()) {
+          task.eventHandler.handle(new TaskAttemptEvent(
+              taskAttempt.getID(), TaskAttemptEventType.TA_RECOVER));
+        }
+      }
+      LOG.info("Trying to recover task"
+          + ", taskId=" + task.getTaskId()
+          + ", recoveredState=" + task.recoveredState);
+      switch(task.recoveredState) {
+        case NEW:
+          // Nothing to do until the vertex schedules this task
+          endState = TaskStateInternal.NEW;
+          break;
+        case SCHEDULED:
+        case RUNNING:
+        case SUCCEEDED:
+          if (task.successfulAttempt != null) {
+            //Found successful attempt
+            //Recover data
+            boolean recoveredData = true;
+            if (task.getVertex().getOutputCommitters() != null
+                && !task.getVertex().getOutputCommitters().isEmpty()) {
+              for (Entry<String, OutputCommitter> entry
+                  : task.getVertex().getOutputCommitters().entrySet()) {
+                LOG.info("Recovering data for task from previous DAG attempt"
+                    + ", taskId=" + task.getTaskId()
+                    + ", output=" + entry.getKey());
+                OutputCommitter committer = entry.getValue();
+                if (!committer.isTaskRecoverySupported()) {
+                  LOG.info("Task recovery not supported by committer"
+                      + ", failing task attempt"
+                      + ", taskId=" + task.getTaskId()
+                      + ", attemptId=" + task.successfulAttempt
+                      + ", output=" + entry.getKey());
+                  recoveredData = false;
+                  break;
+                }
+                try {
+                  committer.recoverTask(task.getTaskId().getId(),
+                      task.appContext.getApplicationAttemptId().getAttemptId()-1);
+                } catch (Exception e) {
+                  LOG.warn("Task recovery failed by committer"
+                      + ", taskId=" + task.getTaskId()
+                      + ", attemptId=" + task.successfulAttempt
+                      + ", output=" + entry.getKey(), e);
+                  recoveredData = false;
+                  break;
+                }
+              }
+            }
+            if (!recoveredData) {
+              task.successfulAttempt = null;
+            } else {
+              LOG.info("Recovered a successful attempt"
+                  + ", taskAttemptId=" + task.successfulAttempt.toString());
+              task.logJobHistoryTaskFinishedEvent();
+              task.eventHandler.handle(
+                  new VertexEventTaskCompleted(task.taskId,
+                      getExternalState(TaskStateInternal.SUCCEEDED)));
+              task.eventHandler.handle(
+                  new VertexEventTaskAttemptCompleted(
+                      task.successfulAttempt, TaskAttemptStateInternal.SUCCEEDED));
+              endState = TaskStateInternal.SUCCEEDED;
+              break;
+            }
+          }
+
+          if (endState != TaskStateInternal.SUCCEEDED &&
+              task.attempts.size() >= task.maxAttempts) {
+            // Exceeded max attempts
+            task.finished(TaskStateInternal.FAILED);
+            endState = TaskStateInternal.FAILED;
+            break;
+          }
+
+          // no successful attempt and all attempts completed
+          // schedule a new one
+          // If any incomplete, the running attempt will moved to failed and its
+          // update will trigger a new attempt if possible
+          if (task.attempts.size() == task.finishedAttempts) {
+            task.addAndScheduleAttempt();
+          }
+          endState = TaskStateInternal.RUNNING;
+          break;
+        case KILLED:
+          // Nothing to do
+          // Inform vertex
+          task.eventHandler.handle(
+              new VertexEventTaskCompleted(task.taskId,
+                  getExternalState(TaskStateInternal.KILLED)));
+          endState  = TaskStateInternal.KILLED;
+          break;
+        case FAILED:
+          // Nothing to do
+          // Inform vertex
+          task.eventHandler.handle(
+              new VertexEventTaskCompleted(task.taskId,
+                  getExternalState(TaskStateInternal.FAILED)));
+
+          endState = TaskStateInternal.FAILED;
+          break;
+      }
+
+      return endState;
+    }
+  }
+
 
   private static class KillWaitAttemptCompletedTransition implements
       MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index d3e07cb..67f978a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -52,11 +52,9 @@ import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DagTypeConverters;
-import org.apache.tez.dag.api.EdgeManagerContext;
 import org.apache.tez.dag.api.EdgeManagerDescriptor;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
-import org.apache.tez.dag.api.EdgeManager;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
@@ -94,13 +92,16 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.dag.event.TaskEvent;
 import org.apache.tez.dag.app.dag.event.TaskEventAddTezEvent;
+import org.apache.tez.dag.app.dag.event.TaskEventRecoverTask;
 import org.apache.tez.dag.app.dag.event.TaskEventTermination;
 import org.apache.tez.dag.app.dag.event.TaskEventType;
 import org.apache.tez.dag.app.dag.event.VertexEvent;
+import org.apache.tez.dag.app.dag.event.VertexEventRecoverVertex;
 import org.apache.tez.dag.app.dag.event.VertexEventRootInputFailed;
 import org.apache.tez.dag.app.dag.event.VertexEventRootInputInitialized;
 import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
 import org.apache.tez.dag.app.dag.event.VertexEventSourceTaskAttemptCompleted;
+import org.apache.tez.dag.app.dag.event.VertexEventSourceVertexRecovered;
 import org.apache.tez.dag.app.dag.event.VertexEventSourceVertexStarted;
 import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptCompleted;
 import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted;
@@ -110,9 +111,12 @@ import org.apache.tez.dag.app.dag.event.VertexEventType;
 import org.apache.tez.dag.app.dag.event.VertexEventOneToOneSourceSplit;
 import org.apache.tez.dag.app.dag.impl.DAGImpl.VertexGroupInfo;
 import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.events.VertexCommitStartedEvent;
 import org.apache.tez.dag.history.events.VertexDataMovementEventsGeneratedEvent;
 import org.apache.tez.dag.history.events.VertexFinishedEvent;
 import org.apache.tez.dag.history.events.VertexInitializedEvent;
+import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
 import org.apache.tez.dag.history.events.VertexStartedEvent;
 import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
 import org.apache.tez.dag.records.TezDAGID;
@@ -185,6 +189,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
   private int numStartedSourceVertices = 0;
   private int numInitedSourceVertices = 0;
+  private int numRecoveredSourceVertices = 0;
+
   private int distanceFromRoot = 0;
 
   private final List<String> diagnostics = new ArrayList<String>();
@@ -209,6 +215,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       SOURCE_TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION =
           new SourceTaskAttemptCompletedEventTransition();
 
+  private VertexState recoveredState = VertexState.NEW;
+  private List<TezEvent> recoveredEvents = new ArrayList<TezEvent>();
+
   protected static final
     StateMachineFactory<VertexImpl, VertexState, VertexEventType, VertexEvent>
        stateMachineFactory
@@ -222,6 +231,33 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                   VertexState.INITIALIZING, VertexState.FAILED),
               VertexEventType.V_INIT,
               new InitTransition())
+          .addTransition
+              (VertexState.NEW,
+                  EnumSet.of(VertexState.NEW, VertexState.INITED,
+                      VertexState.INITIALIZING, VertexState.RUNNING,
+                      VertexState.SUCCEEDED, VertexState.FAILED,
+                      VertexState.KILLED, VertexState.ERROR,
+                      VertexState.RECOVERING),
+                  VertexEventType.V_RECOVER,
+                  new StartRecoverTransition())
+          .addTransition
+              (VertexState.RECOVERING,
+                  EnumSet.of(VertexState.NEW, VertexState.INITED,
+                      VertexState.INITIALIZING, VertexState.RUNNING,
+                      VertexState.SUCCEEDED, VertexState.FAILED,
+                      VertexState.KILLED, VertexState.ERROR,
+                      VertexState.RECOVERING),
+                  VertexEventType.V_SOURCE_VERTEX_RECOVERED,
+                  new RecoverTransition())
+          .addTransition
+              (VertexState.NEW,
+                  EnumSet.of(VertexState.INITED,
+                      VertexState.INITIALIZING, VertexState.RUNNING,
+                      VertexState.SUCCEEDED, VertexState.FAILED,
+                      VertexState.KILLED, VertexState.ERROR,
+                      VertexState.RECOVERING),
+                  VertexEventType.V_SOURCE_VERTEX_RECOVERED,
+                  new RecoverTransition())
           .addTransition(VertexState.NEW, VertexState.NEW,
               VertexEventType.V_SOURCE_VERTEX_STARTED,
               new SourceVertexStartedTransition())
@@ -367,6 +403,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                   // reruns.
                   VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
                   VertexEventType.V_TASK_COMPLETED))
+          .addTransition(VertexState.SUCCEEDED, VertexState.SUCCEEDED,
+              VertexEventType.V_TASK_ATTEMPT_COMPLETED,
+              new TaskAttemptCompletedEventTransition())
 
           // Transitions from FAILED state
           .addTransition(
@@ -491,6 +530,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   private VertexTerminationCause terminationCause;
   
   private String logIdentifier;
+  private boolean recoveryCommitInProgress = false;
+  private Map<String,EdgeManagerDescriptor> recoveredSourceEdgeManagers = null;
+
+  // Recovery related flags
+  boolean recoveryInitEventSeen = false;
+  boolean recoveryStartEventSeen = false;
 
   public VertexImpl(TezVertexID vertexId, VertexPlan vertexPlan,
       String vertexName, Configuration conf, EventHandler eventHandler,
@@ -515,7 +560,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
     this.vertexLocationHint = vertexLocationHint;
     if (LOG.isDebugEnabled()) {
-      logLocationHints(this.vertexLocationHint);
+      logLocationHints(this.vertexName, this.vertexLocationHint);
     }
 
     this.dagUgi = appContext.getCurrentDAG().getDagUGI();
@@ -800,6 +845,104 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     return this.appContext;
   }
 
+  private void handleParallelismUpdate(int newParallelism,
+      Map<String, EdgeManagerDescriptor> sourceEdgeManagers) {
+    LinkedHashMap<TezTaskID, Task> currentTasks = this.tasks;
+    Iterator<Map.Entry<TezTaskID, Task>> iter = currentTasks.entrySet()
+        .iterator();
+    int i = 0;
+    while (iter.hasNext()) {
+      i++;
+      Map.Entry<TezTaskID, Task> entry = iter.next();
+      if (i <= newParallelism) {
+        continue;
+      }
+      iter.remove();
+    }
+    this.recoveredSourceEdgeManagers =
+        sourceEdgeManagers;
+  }
+
+  @Override
+  public VertexState restoreFromEvent(HistoryEvent historyEvent) {
+    switch (historyEvent.getEventType()) {
+      case VERTEX_INITIALIZED:
+        recoveryInitEventSeen = true;
+        recoveredState = setupVertex((VertexInitializedEvent) historyEvent);
+        createTasks();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Recovered state for vertex after Init event"
+              + ", vertex=" + logIdentifier
+              + ", recoveredState=" + recoveredState);
+        }
+        return recoveredState;
+      case VERTEX_STARTED:
+        if (!recoveryInitEventSeen) {
+          throw new RuntimeException("Started Event seen but"
+              + " no Init Event was encountered earlier");
+        }
+        recoveryStartEventSeen = true;
+        VertexStartedEvent startedEvent = (VertexStartedEvent) historyEvent;
+        startTimeRequested = startedEvent.getStartRequestedTime();
+        startedTime = startedEvent.getStartTime();
+        recoveredState = VertexState.RUNNING;
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Recovered state for vertex after Started event"
+              + ", vertex=" + logIdentifier
+              + ", recoveredState=" + recoveredState);
+        }
+        return recoveredState;
+      case VERTEX_PARALLELISM_UPDATED:
+        VertexParallelismUpdatedEvent updatedEvent =
+            (VertexParallelismUpdatedEvent) historyEvent;
+        if (updatedEvent.getVertexLocationHint() != null) {
+          vertexLocationHint = updatedEvent.getVertexLocationHint();
+        }
+        numTasks = updatedEvent.getNumTasks();
+        handleParallelismUpdate(numTasks, updatedEvent.getSourceEdgeManagers());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Recovered state for vertex after parallelism updated event"
+              + ", vertex=" + logIdentifier
+              + ", recoveredState=" + recoveredState);
+        }
+        return recoveredState;
+      case VERTEX_COMMIT_STARTED:
+        if (recoveredState != VertexState.RUNNING) {
+          throw new RuntimeException("Commit Started Event seen but"
+              + " recovered state is not RUNNING"
+              + ", recoveredState=" + recoveredState);
+        }
+        recoveryCommitInProgress = true;
+        return recoveredState;
+      case VERTEX_FINISHED:
+        if (!recoveryStartEventSeen) {
+          throw new RuntimeException("Finished Event seen but"
+              + " no Started Event was encountered earlier");
+        }
+        recoveryCommitInProgress = false;
+        VertexFinishedEvent finishedEvent = (VertexFinishedEvent) historyEvent;
+        recoveredState = finishedEvent.getState();
+        diagnostics.add(finishedEvent.getDiagnostics());
+        finishTime = finishedEvent.getFinishTime();
+        // TODO counters ??
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Recovered state for vertex after finished event"
+              + ", vertex=" + logIdentifier
+              + ", recoveredState=" + recoveredState);
+        }
+        return recoveredState;
+      case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
+        VertexDataMovementEventsGeneratedEvent vEvent =
+            (VertexDataMovementEventsGeneratedEvent) historyEvent;
+        this.recoveredEvents.addAll(vEvent.getTezEvents());
+        return recoveredState;
+      default:
+        throw new RuntimeException("Unexpected event received for restoring"
+            + " state, eventType=" + historyEvent.getEventType());
+
+    }
+  }
+
   // TODO Create InputReadyVertexManager that schedules when there is something
   // to read and use that as default instead of ImmediateStart.TEZ-480
   @Override
@@ -829,8 +972,40 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   @Override
   public boolean setParallelism(int parallelism, VertexLocationHint vertexLocationHint,
       Map<String, EdgeManagerDescriptor> sourceEdgeManagers) {
-    writeLock.lock();
+    return setParallelism(parallelism, vertexLocationHint, sourceEdgeManagers, false);
+  }
+
+  private boolean setParallelism(int parallelism, VertexLocationHint vertexLocationHint,
+      Map<String, EdgeManagerDescriptor> sourceEdgeManagers,
+      boolean recovering) {
+    if (recovering) {
+      writeLock.lock();
+      try {
+        if (sourceEdgeManagers != null) {
+          for(Map.Entry<String, EdgeManagerDescriptor> entry :
+              sourceEdgeManagers.entrySet()) {
+            LOG.info("Recovering edge manager for source:"
+                + entry.getKey() + " destination: " + getVertexId());
+            Vertex sourceVertex = appContext.getCurrentDAG().getVertex(entry.getKey());
+            Edge edge = sourceVertices.get(sourceVertex);
+            try {
+              edge.setCustomEdgeManager(entry.getValue());
+            } catch (Exception e) {
+              LOG.warn("Failed to initialize edge manager for edge"
+                  + ", sourceVertexName=" + sourceVertex.getName()
+                  + ", destinationVertexName=" + edge.getDestinationVertexName(),
+                  e);
+              return false;
+            }
+          }
+        }
+        return true;
+      } finally {
+        writeLock.unlock();
+      }
+    }
     setVertexLocationHint(vertexLocationHint);
+    writeLock.lock();
     try {
       if (parallelismSet == true) {
         LOG.info("Parallelism can only be set dynamically once per vertex");
@@ -928,7 +1103,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
             }
           }
         }
-  
+
+        VertexParallelismUpdatedEvent parallelismUpdatedEvent =
+            new VertexParallelismUpdatedEvent(vertexId, numTasks,
+                vertexLocationHint,
+                sourceEdgeManagers);
+        appContext.getHistoryHandler().handle(new DAGHistoryEvent(getDAGId(),
+            parallelismUpdatedEvent));
+
         // stop buffering events
         for (Edge edge : sourceVertices.values()) {
           edge.stopEventBuffering();
@@ -962,7 +1144,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     try {
       this.vertexLocationHint = vertexLocationHint;
       if (LOG.isDebugEnabled()) {
-        logLocationHints(this.vertexLocationHint);
+        logLocationHints(this.vertexName, this.vertexLocationHint);
       }
     } finally {
       writeLock.unlock();
@@ -1039,7 +1221,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   void logJobHistoryVertexInitializedEvent() {
     VertexInitializedEvent initEvt = new VertexInitializedEvent(vertexId, vertexName,
         initTimeRequested, initedTime, numTasks,
-        getProcessorName());
+        getProcessorName(), getAdditionalInputs());
     this.appContext.getHistoryHandler().handle(
         new DAGHistoryEvent(getDAGId(), initEvt));
   }
@@ -1055,13 +1237,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     this.setFinishTime();
     VertexFinishedEvent finishEvt = new VertexFinishedEvent(vertexId,
         vertexName, initTimeRequested, initedTime, startTimeRequested,
-        startedTime, finishTime, VertexStatus.State.SUCCEEDED, "",
+        startedTime, finishTime, VertexState.SUCCEEDED, "",
         getAllCounters());
     this.appContext.getHistoryHandler().handle(
         new DAGHistoryEvent(getDAGId(), finishEvt));
   }
 
-  void logJobHistoryVertexFailedEvent(VertexStatus.State state) {
+  void logJobHistoryVertexFailedEvent(VertexState state) {
     VertexFinishedEvent finishEvt = new VertexFinishedEvent(vertexId,
         vertexName, initTimeRequested, initedTime, startTimeRequested,
         startedTime, clock.getTime(), state, StringUtils.join(LINE_SEPARATOR,
@@ -1073,7 +1255,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   static VertexState checkVertexForCompletion(final VertexImpl vertex) {
 
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Checking for vertex completion"
+      LOG.debug("Checking for vertex completion for "
+          + vertex.logIdentifier
+          + ", numTasks=" + vertex.numTasks
           + ", failedTaskCount=" + vertex.failedTaskCount
           + ", killedTaskCount=" + vertex.killedTaskCount
           + ", successfulTaskCount=" + vertex.succeededTaskCount
@@ -1084,6 +1268,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     //check for vertex failure first
     if (vertex.completedTaskCount > vertex.tasks.size()) {
       LOG.error("task completion accounting issue: completedTaskCount > nTasks:"
+          + " for vertex " + vertex.logIdentifier
+          + ", numTasks=" + vertex.numTasks
           + ", failedTaskCount=" + vertex.failedTaskCount
           + ", killedTaskCount=" + vertex.killedTaskCount
           + ", successfulTaskCount=" + vertex.succeededTaskCount
@@ -1096,6 +1282,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       if(vertex.succeededTaskCount == vertex.tasks.size() && vertex.terminationCause == null) {
         LOG.info("Vertex succeeded: " + vertex.logIdentifier);
         try {
+          if (vertex.outputCommitters != null) {
+            vertex.appContext.getHistoryHandler().handle(
+                new DAGHistoryEvent(vertex.getDAGId(),
+                    new VertexCommitStartedEvent(vertex.vertexId)));
+          }
           if (vertex.commitVertexOutputs && !vertex.committed.getAndSet(true)) {
             // commit only once. Dont commit shared outputs
             LOG.info("Invoking committer commit for vertex, vertexId="
@@ -1197,20 +1388,16 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     if (finishTime == 0) setFinishTime();
 
     switch (finalState) {
-      case KILLED:
-        eventHandler.handle(new DAGEventVertexCompleted(getVertexId(),
-            finalState, terminationCause));
-        logJobHistoryVertexFailedEvent(VertexStatus.State.KILLED);
-        break;
       case ERROR:
         eventHandler.handle(new DAGEvent(getDAGId(),
             DAGEventType.INTERNAL_ERROR));
-        logJobHistoryVertexFailedEvent(VertexStatus.State.FAILED);
+        logJobHistoryVertexFailedEvent(finalState);
         break;
+      case KILLED:
       case FAILED:
         eventHandler.handle(new DAGEventVertexCompleted(getVertexId(),
             finalState, terminationCause));
-        logJobHistoryVertexFailedEvent(VertexStatus.State.FAILED);
+        logJobHistoryVertexFailedEvent(finalState);
         break;
       case SUCCEEDED:
         eventHandler.handle(new DAGEventVertexCompleted(getVertexId(),
@@ -1227,58 +1414,65 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     return finished(finalState, null);
   }
 
-  private VertexState initializeVertex() {
+
+  private void initializeCommitters() throws Exception {
     if (!this.additionalOutputSpecs.isEmpty()) {
-      try {
-        LOG.info("Invoking committer inits for vertex, vertexId=" + logIdentifier);
-        for (Entry<String, RootInputLeafOutputDescriptor<OutputDescriptor>> entry:
+      LOG.info("Invoking committer inits for vertex, vertexId=" + logIdentifier);
+      for (Entry<String, RootInputLeafOutputDescriptor<OutputDescriptor>> entry:
           additionalOutputs.entrySet())  {
-          final String outputName = entry.getKey();
-          final RootInputLeafOutputDescriptor<OutputDescriptor> od = entry.getValue();
-          if (od.getInitializerClassName() == null
+        final String outputName = entry.getKey();
+        final RootInputLeafOutputDescriptor<OutputDescriptor> od = entry.getValue();
+        if (od.getInitializerClassName() == null
             || od.getInitializerClassName().isEmpty()) {
-            LOG.info("Ignoring committer as none specified for output="
-                + outputName
+          LOG.info("Ignoring committer as none specified for output="
+              + outputName
+              + ", vertexId=" + logIdentifier);
+          continue;
+        }
+        LOG.info("Instantiating committer for output=" + outputName
+            + ", vertexId=" + logIdentifier
+            + ", committerClass=" + od.getInitializerClassName());
+
+        dagUgi.doAs(new PrivilegedExceptionAction<Void>() {
+          @Override
+          public Void run() throws Exception {
+            OutputCommitter outputCommitter = RuntimeUtils.createClazzInstance(
+                od.getInitializerClassName());
+            OutputCommitterContext outputCommitterContext =
+                new OutputCommitterContextImpl(appContext.getApplicationID(),
+                    appContext.getApplicationAttemptId().getAttemptId(),
+                    appContext.getCurrentDAG().getName(),
+                    vertexName,
+                    outputName,
+                    od.getDescriptor().getUserPayload(),
+                    vertexId.getId());
+
+            LOG.info("Invoking committer init for output=" + outputName
                 + ", vertexId=" + logIdentifier);
-            continue;
+            outputCommitter.initialize(outputCommitterContext);
+            outputCommitters.put(outputName, outputCommitter);
+            LOG.info("Invoking committer setup for output=" + outputName
+                + ", vertexId=" + logIdentifier);
+            outputCommitter.setupOutput();
+            return null;
           }
-          LOG.info("Instantiating committer for output=" + outputName
-              + ", vertexId=" + logIdentifier
-              + ", committerClass=" + od.getInitializerClassName());
-
-          dagUgi.doAs(new PrivilegedExceptionAction<Void>() {
-            @Override
-            public Void run() throws Exception {
-              OutputCommitter outputCommitter = RuntimeUtils.createClazzInstance(
-                  od.getInitializerClassName());
-              OutputCommitterContext outputCommitterContext =
-                  new OutputCommitterContextImpl(appContext.getApplicationID(),
-                      appContext.getApplicationAttemptId().getAttemptId(),
-                      appContext.getCurrentDAG().getName(),
-                      vertexName,
-                      outputName,
-                      od.getDescriptor().getUserPayload());
-
-              LOG.info("Invoking committer init for output=" + outputName
-                  + ", vertexId=" + logIdentifier);
-              outputCommitter.initialize(outputCommitterContext);
-              outputCommitters.put(outputName, outputCommitter);
-              LOG.info("Invoking committer setup for output=" + outputName
-                  + ", vertexId=" + logIdentifier);
-              outputCommitter.setupOutput();
-              return null;
-            }
-          });
-        }
-      } catch (Exception e) {
-        LOG.warn("Vertex Committer init failed, vertexId=" + logIdentifier, e);
-        addDiagnostic("Vertex init failed : "
-            + StringUtils.stringifyException(e));
-        trySetTerminationCause(VertexTerminationCause.INIT_FAILURE);
-        abortVertex(VertexStatus.State.FAILED);
-        return finished(VertexState.FAILED);
+        });
       }
     }
+  }
+
+  private VertexState initializeVertex() {
+    try {
+      initializeCommitters();
+    } catch (Exception e) {
+      LOG.warn("Vertex Committer init failed, vertexId=" + logIdentifier, e);
+      addDiagnostic("Vertex init failed : "
+          + StringUtils.stringifyException(e));
+      trySetTerminationCause(VertexTerminationCause.INIT_FAILURE);
+      abortVertex(VertexStatus.State.FAILED);
+      return finished(VertexState.FAILED);
+    }
+
     // TODO: Metrics
     initedTime = clock.getTime();
 
@@ -1329,130 +1523,645 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
   }
 
-  public static class InitTransition implements
-      MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
+  private VertexState setupVertex() {
+    return setupVertex(null);
+  }
 
-    @Override
-    public VertexState transition(VertexImpl vertex, VertexEvent event) {
-      VertexState vertexState = VertexState.NEW;
-      vertex.numInitedSourceVertices++;
-      if (vertex.sourceVertices == null || vertex.sourceVertices.isEmpty() ||
-          vertex.numInitedSourceVertices == vertex.sourceVertices.size()) {
-        vertexState = handleInitEvent(vertex, event);
-        if (vertexState != VertexState.FAILED) {
-          if (vertex.targetVertices != null && !vertex.targetVertices.isEmpty()) {
-            for (Vertex target : vertex.targetVertices.keySet()) {
-              vertex.getEventHandler().handle(new VertexEvent(target.getVertexId(),
-                VertexEventType.V_INIT));
-            }
-          }
+  private VertexState setupVertex(VertexInitializedEvent event) {
+
+    if (event == null) {
+      initTimeRequested = clock.getTime();
+    } else {
+      initTimeRequested = event.getInitRequestedTime();
+      initedTime = event.getInitedTime();
+    }
+
+    // VertexManager needs to be setup before attempting to Initialize any
+    // Inputs - since events generated by them will be routed to the
+    // VertexManager for handling.
+
+    if (dagVertexGroups != null && !dagVertexGroups.isEmpty()) {
+      List<GroupInputSpec> groupSpecList = Lists.newLinkedList();
+      for (VertexGroupInfo groupInfo : dagVertexGroups.values()) {
+        if (groupInfo.edgeMergedInputs.containsKey(getName())) {
+          InputDescriptor mergedInput = groupInfo.edgeMergedInputs.get(getName());
+          groupSpecList.add(new GroupInputSpec(groupInfo.groupName,
+              Lists.newLinkedList(groupInfo.groupMembers), mergedInput));
         }
       }
-      return vertexState;
+      if (!groupSpecList.isEmpty()) {
+        groupInputSpecList = groupSpecList;
+      }
     }
 
-    private VertexState handleInitEvent(VertexImpl vertex, VertexEvent event) {
-      vertex.initTimeRequested = vertex.clock.getTime();
-
-      // VertexManager needs to be setup before attempting to Initialize any
-      // Inputs - since events generated by them will be routed to the
-      // VertexManager for handling.
-
-      if (vertex.dagVertexGroups != null && !vertex.dagVertexGroups.isEmpty()) {
-        List<GroupInputSpec> groupSpecList = Lists.newLinkedList();
-        for (VertexGroupInfo groupInfo : vertex.dagVertexGroups.values()) {
-          if (groupInfo.edgeMergedInputs.containsKey(vertex.getName())) {
-            InputDescriptor mergedInput = groupInfo.edgeMergedInputs.get(vertex.getName());
-            groupSpecList.add(new GroupInputSpec(groupInfo.groupName, 
-                Lists.newLinkedList(groupInfo.groupMembers), mergedInput));
+    // Check if any inputs need initializers
+    if (event != null) {
+      this.additionalInputs = event.getAdditionalInputs();
+      if (additionalInputs != null) {
+      // FIXME References to descriptor kept in both objects
+        for (InputSpec inputSpec : this.additionalInputSpecs) {
+          if (additionalInputs.containsKey(inputSpec.getSourceVertexName())
+                && additionalInputs.get(inputSpec.getSourceVertexName()).getDescriptor() != null) {
+            inputSpec.setInputDescriptor(
+                additionalInputs.get(inputSpec.getSourceVertexName()).getDescriptor());
           }
         }
-        if (!groupSpecList.isEmpty()) {
-          vertex.groupInputSpecList = groupSpecList;
-        }
       }
-      
-      // Check if any inputs need initializers
-      if (vertex.additionalInputs != null) {
-        LOG.info("Root Inputs exist for Vertex: " + vertex.getName() + " : "
-            + vertex.additionalInputs);
-        for (RootInputLeafOutputDescriptor<InputDescriptor> input : vertex.additionalInputs.values()) {
+    } else {
+      if (additionalInputs != null) {
+        LOG.info("Root Inputs exist for Vertex: " + getName() + " : "
+            + additionalInputs);
+        for (RootInputLeafOutputDescriptor<InputDescriptor> input : additionalInputs.values()) {
           if (input.getInitializerClassName() != null) {
-            if (vertex.inputsWithInitializers == null) {
-              vertex.inputsWithInitializers = Sets.newHashSet();
+            if (inputsWithInitializers == null) {
+              inputsWithInitializers = Sets.newHashSet();
             }
-            vertex.inputsWithInitializers.add(input.getEntityName());
+            inputsWithInitializers.add(input.getEntityName());
             LOG.info("Starting root input initializer for input: "
                 + input.getEntityName() + ", with class: ["
                 + input.getInitializerClassName() + "]");
           }
         }
       }
+    }
+
+    boolean hasBipartite = false;
+    if (sourceVertices != null) {
+      for (Edge edge : sourceVertices.values()) {
+        if (edge.getEdgeProperty().getDataMovementType() == DataMovementType.SCATTER_GATHER) {
+          hasBipartite = true;
+          break;
+        }
+      }
+    }
+
+    if (hasBipartite && inputsWithInitializers != null) {
+      LOG.fatal("A vertex with an Initial Input and a Shuffle Input are not supported at the moment");
+      if (event != null) {
+        return VertexState.FAILED;
+      } else {
+        return finished(VertexState.FAILED);
+      }
+    }
+
+    boolean hasUserVertexManager = vertexPlan.hasVertexManagerPlugin();
+
+    if (hasUserVertexManager) {
+      VertexManagerPluginDescriptor pluginDesc = DagTypeConverters
+          .convertVertexManagerPluginDescriptorFromDAGPlan(vertexPlan
+              .getVertexManagerPlugin());
+      LOG.info("Setting user vertex manager plugin: "
+          + pluginDesc.getClassName() + " on vertex: " + getName());
+      vertexManager = new VertexManager(pluginDesc, this, appContext);
+    } else {
+      if (hasBipartite) {
+        // setup vertex manager
+        // TODO this needs to consider data size and perhaps API.
+        // Currently implicitly BIPARTITE is the only edge type
+        LOG.info("Setting vertexManager to ShuffleVertexManager for "
+            + logIdentifier);
+        vertexManager = new VertexManager(new ShuffleVertexManager(),
+            this, appContext);
+      } else if (inputsWithInitializers != null) {
+        LOG.info("Setting vertexManager to RootInputVertexManager for "
+            + logIdentifier);
+        vertexManager = new VertexManager(new RootInputVertexManager(),
+            this, appContext);
+      } else {
+        // schedule all tasks upon vertex start. Default behavior.
+        LOG.info("Setting vertexManager to ImmediateStartVertexManager for "
+            + logIdentifier);
+        vertexManager = new VertexManager(
+            new ImmediateStartVertexManager(), this, appContext);
+      }
+    }
+
+    vertexManager.initialize();
+
+    // Setup tasks early if possible. If the VertexManager is not being used
+    // to set parallelism, sending events to Tasks is safe (and less confusing
+    // then relying on tasks to be created after TaskEvents are generated).
+    // For VertexManagers setting parallelism, the setParallelism call needs
+    // to be inline.
+    if (event != null) {
+      numTasks = event.getNumTasks();
+    } else {
+      numTasks = getVertexPlan().getTaskConfig().getNumTasks();
+    }
+
+    if (!(numTasks == -1 || numTasks >= 0)) {
+      addDiagnostic("Invalid task count for vertex"
+          + ", numTasks=" + numTasks);
+      trySetTerminationCause(VertexTerminationCause.INVALID_NUM_OF_TASKS);
+      if (event != null) {
+        abortVertex(VertexStatus.State.FAILED);
+        return finished(VertexState.FAILED);
+      } else {
+        return VertexState.FAILED;
+      }
+    }
+
+    checkTaskLimits();
+    return VertexState.INITED;
+  }
+
+  public static class StartRecoverTransition implements
+      MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
+
+    @Override
+    public VertexState transition(VertexImpl vertex, VertexEvent vertexEvent) {
+      VertexEventRecoverVertex recoverEvent = (VertexEventRecoverVertex) vertexEvent;
+      VertexState desiredState = recoverEvent.getDesiredState();
 
-      boolean hasBipartite = false;
-      if (vertex.sourceVertices != null) {
-        for (Edge edge : vertex.sourceVertices.values()) {
-          if (edge.getEdgeProperty().getDataMovementType() == DataMovementType.SCATTER_GATHER) {
-            hasBipartite = true;
+      switch (desiredState) {
+        case RUNNING:
+          break;
+        case SUCCEEDED:
+        case KILLED:
+        case FAILED:
+        case ERROR:
+          switch (desiredState) {
+            case SUCCEEDED:
+              vertex.succeededTaskCount = vertex.numTasks;
+              vertex.completedTaskCount = vertex.numTasks;
+              break;
+            case KILLED:
+              vertex.killedTaskCount = vertex.numTasks;
+              break;
+            case FAILED:
+            case ERROR:
+              vertex.failedTaskCount = vertex.numTasks;
+              break;
+          }
+          if (vertex.tasks != null) {
+            TaskState taskState = TaskState.KILLED;
+            switch (desiredState) {
+              case SUCCEEDED:
+                taskState = TaskState.SUCCEEDED;
+                break;
+              case KILLED:
+                taskState = TaskState.KILLED;
+                break;
+              case FAILED:
+              case ERROR:
+                taskState = TaskState.FAILED;
+                break;
+            }
+            for (Task task : vertex.tasks.values()) {
+              vertex.eventHandler.handle(
+                  new TaskEventRecoverTask(task.getTaskId(),
+                      taskState));
+            }
+          }
+          LOG.info("DAG informed Vertex of its final completed state"
+              + ", vertex=" + vertex.logIdentifier
+              + ", state=" + desiredState);
+          return desiredState;
+        default:
+          LOG.info("Unhandled desired state provided by DAG"
+              + ", vertex=" + vertex.logIdentifier
+              + ", state=" + desiredState);
+          vertex.finished(VertexState.ERROR);
+      }
+
+      VertexState endState;
+      switch (vertex.recoveredState) {
+        case NEW:
+          // Trigger init and start as desired state is RUNNING
+          // Drop all root events
+          Iterator<TezEvent> iterator = vertex.recoveredEvents.iterator();
+          while (iterator.hasNext()) {
+            if (iterator.next().getEventType().equals(
+                EventType.ROOT_INPUT_DATA_INFORMATION_EVENT)) {
+              iterator.remove();
+            }
+          }
+          vertex.eventHandler.handle(new VertexEvent(vertex.vertexId,
+              VertexEventType.V_INIT));
+          vertex.eventHandler.handle(new VertexEvent(vertex.vertexId,
+              VertexEventType.V_START));
+          endState = VertexState.NEW;
+          break;
+        case INITED:
+          try {
+            vertex.initializeCommitters();
+          } catch (Exception e) {
+            LOG.info("Failed to initialize committers", e);
+            vertex.finished(VertexState.FAILED,
+                VertexTerminationCause.INIT_FAILURE);
+            endState = VertexState.FAILED;
+            break;
+          }
+
+          // Recover tasks
+          if (vertex.tasks != null) {
+            for (Task task : vertex.tasks.values()) {
+              vertex.eventHandler.handle(
+                  new TaskEventRecoverTask(task.getTaskId()));
+            }
+          }
+          // Update tasks with their input payloads as needed
+
+          vertex.eventHandler.handle(new VertexEvent(vertex.vertexId,
+              VertexEventType.V_START));
+          if (vertex.getInputVertices().isEmpty()) {
+            endState = VertexState.INITED;
+          } else {
+            endState = VertexState.RECOVERING;
+          }
+          break;
+        case RUNNING:
+          vertex.tasksNotYetScheduled = false;
+          try {
+            vertex.initializeCommitters();
+          } catch (Exception e) {
+            LOG.info("Failed to initialize committers", e);
+            vertex.finished(VertexState.FAILED,
+                VertexTerminationCause.INIT_FAILURE);
+            endState = VertexState.FAILED;
             break;
           }
+
+          // if commit in progress and desired state is not a succeeded one,
+          // move to failed
+          if (vertex.recoveryCommitInProgress) {
+            LOG.info("Recovered vertex was in the middle of a commit"
+                + ", failing Vertex=" + vertex.logIdentifier);
+            vertex.finished(VertexState.FAILED,
+                VertexTerminationCause.COMMIT_FAILURE);
+            endState = VertexState.FAILED;
+            break;
+          }
+          assert vertex.tasks.size() == vertex.numTasks;
+          if (vertex.tasks != null) {
+            for (Task task : vertex.tasks.values()) {
+              vertex.eventHandler.handle(
+                  new TaskEventRecoverTask(task.getTaskId()));
+            }
+            vertex.vertexManager.onVertexStarted(vertex.pendingReportedSrcCompletions);
+            endState = VertexState.RUNNING;
+          } else {
+            endState = VertexState.SUCCEEDED;
+            vertex.finished(endState);
+          }
+          break;
+        case SUCCEEDED:
+        case FAILED:
+        case KILLED:
+          vertex.tasksNotYetScheduled = false;
+          // recover tasks
+          if (vertex.tasks != null) {
+            TaskState taskState = TaskState.KILLED;
+            switch (vertex.recoveredState) {
+              case SUCCEEDED:
+                taskState = TaskState.SUCCEEDED;
+                break;
+              case KILLED:
+                taskState = TaskState.KILLED;
+                break;
+              case FAILED:
+                taskState = TaskState.FAILED;
+                break;
+            }
+            for (Task task : vertex.tasks.values()) {
+              vertex.eventHandler.handle(
+                  new TaskEventRecoverTask(task.getTaskId(),
+                      taskState));
+            }
+            vertex.vertexManager.onVertexStarted(vertex.pendingReportedSrcCompletions);
+            endState = VertexState.RUNNING;
+          } else {
+            endState = vertex.recoveredState;
+            vertex.finished(endState);
+          }
+          break;
+        default:
+          LOG.warn("Invalid recoveredState found when trying to recover"
+              + " vertex, recoveredState=" + vertex.recoveredState);
+          vertex.finished(VertexState.ERROR);
+          endState = VertexState.ERROR;
+          break;
+      }
+      if (!endState.equals(VertexState.RECOVERING)) {
+        LOG.info("Recovered Vertex State"
+            + ", vertexId=" + vertex.logIdentifier
+            + ", state=" + endState
+            + ", numInitedSourceVertices=" + vertex.numInitedSourceVertices
+            + ", numStartedSourceVertices=" + vertex.numStartedSourceVertices
+            + ", numRecoveredSourceVertices=" + vertex.numRecoveredSourceVertices
+            + ", recoveredEvents="
+            + ( vertex.recoveredEvents == null ? "null" : vertex.recoveredEvents.size())
+            + ", tasksIsNull=" + (vertex.tasks == null)
+            + ", numTasks=" + ( vertex.tasks == null ? "null" : vertex.tasks.size()));
+        for (Entry<Vertex, Edge> entry : vertex.getOutputVertices().entrySet()) {
+          vertex.eventHandler.handle(new VertexEventSourceVertexRecovered(
+              entry.getKey().getVertexId(),
+              vertex.vertexId, endState, null));
         }
       }
-      
-      if (hasBipartite && vertex.inputsWithInitializers != null) {
-        LOG.fatal("A vertex with an Initial Input and a Shuffle Input are not supported at the moment");
-        return vertex.finished(VertexState.FAILED);
+      if (EnumSet.of(VertexState.RUNNING, VertexState.SUCCEEDED, VertexState.INITED)
+          .contains(endState)) {
+        // Send events downstream
+        vertex.routeRecoveredEvents(endState, vertex.recoveredEvents);
+        vertex.recoveredEvents.clear();
+      } else {
+        // Ensure no recovered events
+        if (!vertex.recoveredEvents.isEmpty()) {
+          throw new RuntimeException("Invalid Vertex state"
+              + ", found non-zero recovered events in invalid state"
+              + ", recoveredState=" + endState
+              + ", recoveredEvents=" + vertex.recoveredEvents.size());
+        }
       }
-      
-      boolean hasUserVertexManager = vertex.vertexPlan.hasVertexManagerPlugin();
-      
-      if (hasUserVertexManager) {
-        VertexManagerPluginDescriptor pluginDesc = DagTypeConverters
-            .convertVertexManagerPluginDescriptorFromDAGPlan(vertex.vertexPlan
-                .getVertexManagerPlugin());
-        LOG.info("Setting user vertex manager plugin: "
-            + pluginDesc.getClassName() + " on vertex: " + vertex.getName());
-        vertex.vertexManager = new VertexManager(pluginDesc, vertex, vertex.appContext);
+      return endState;
+    }
+
+  }
+
+  private void routeRecoveredEvents(VertexState vertexState,
+      List<TezEvent> tezEvents) {
+    for (TezEvent tezEvent : tezEvents) {
+      EventMetaData sourceMeta = tezEvent.getSourceInfo();
+      TezTaskAttemptID srcTaId = sourceMeta.getTaskAttemptID();
+      if (tezEvent.getEventType() == EventType.DATA_MOVEMENT_EVENT) {
+        ((DataMovementEvent) tezEvent.getEvent()).setVersion(srcTaId.getId());
+      } else if (tezEvent.getEventType() == EventType.COMPOSITE_DATA_MOVEMENT_EVENT) {
+        ((CompositeDataMovementEvent) tezEvent.getEvent()).setVersion(srcTaId.getId());
+      } else if (tezEvent.getEventType() == EventType.INPUT_FAILED_EVENT) {
+        ((InputFailedEvent) tezEvent.getEvent()).setVersion(srcTaId.getId());
+      } else if (tezEvent.getEventType() == EventType.ROOT_INPUT_DATA_INFORMATION_EVENT) {
+        if (vertexState == VertexState.RUNNING
+            || vertexState == VertexState.INITED) {
+          // Only routed if vertex is still running
+          eventHandler.handle(new VertexEventRouteEvent(
+              this.getVertexId(), Collections.singletonList(tezEvent), true));
+        }
+        continue;
+      }
+
+      Vertex destVertex = getDAG().getVertex(sourceMeta.getEdgeVertexName());
+      Edge destEdge = targetVertices.get(destVertex);
+      if (destEdge == null) {
+        throw new TezUncheckedException("Bad destination vertex: " +
+            sourceMeta.getEdgeVertexName() + " for event vertex: " +
+            getVertexId());
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Routing recovered event"
+            + ", eventType=" + tezEvent.getEventType()
+            + ", sourceInfo=" + sourceMeta
+            + ", destinationVertex" + destVertex.getName());
+      }
+      eventHandler.handle(new VertexEventRouteEvent(destVertex
+          .getVertexId(), Collections.singletonList(tezEvent), true));
+    }
+  }
+
+  public static class RecoverTransition implements
+      MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
+
+    @Override
+    public VertexState transition(VertexImpl vertex, VertexEvent vertexEvent) {
+      VertexEventSourceVertexRecovered sourceRecoveredEvent =
+          (VertexEventSourceVertexRecovered) vertexEvent;
+      ++vertex.numRecoveredSourceVertices;
+
+      switch (sourceRecoveredEvent.getSourceVertexState()) {
+        case NEW:
+          // Nothing to do
+          break;
+        case INITED:
+          ++vertex.numInitedSourceVertices;
+          break;
+        case RUNNING:
+        case SUCCEEDED:
+          ++vertex.numInitedSourceVertices;
+          ++vertex.numStartedSourceVertices;
+          if (sourceRecoveredEvent.getCompletedTaskAttempts() != null) {
+            vertex.pendingReportedSrcCompletions.addAll(
+                sourceRecoveredEvent.getCompletedTaskAttempts());
+          }
+          break;
+        case FAILED:
+        case KILLED:
+        case ERROR:
+          // Nothing to do
+          // Recover as if source vertices have not inited/started
+          break;
+        default:
+          LOG.warn("Received invalid SourceVertexRecovered event"
+              + ", sourceVertex=" + sourceRecoveredEvent.getSourceVertexID()
+              + ", sourceVertexState=" + sourceRecoveredEvent.getSourceVertexState());
+          return vertex.finished(VertexState.ERROR);
+      }
+
+      if (vertex.numRecoveredSourceVertices !=
+          vertex.getInputVerticesCount()) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Waiting for source vertices to recover"
+              + ", numRecoveredSourceVertices=" + vertex.numRecoveredSourceVertices
+              + ", totalSourceVertices=" + vertex.getInputVerticesCount());
+        }
+        return VertexState.RECOVERING;
+      }
+
+
+      // Complete recovery
+      VertexState endState = VertexState.NEW;
+      List<TezTaskAttemptID> completedTaskAttempts = Lists.newLinkedList();
+      switch (vertex.recoveredState) {
+        case NEW:
+          // Drop all root events if not inited properly
+          Iterator<TezEvent> iterator = vertex.recoveredEvents.iterator();
+          while (iterator.hasNext()) {
+            if (iterator.next().getEventType().equals(
+                EventType.ROOT_INPUT_DATA_INFORMATION_EVENT)) {
+              iterator.remove();
+            }
+          }
+          // Trigger init if all sources initialized
+          if (vertex.numInitedSourceVertices == vertex.getInputVerticesCount()) {
+            vertex.eventHandler.handle(new VertexEvent(vertex.vertexId,
+                VertexEventType.V_INIT));
+          }
+          if (vertex.numStartedSourceVertices == vertex.getInputVerticesCount()) {
+            vertex.eventHandler.handle(new VertexEvent(vertex.vertexId,
+                VertexEventType.V_START));
+          }
+          endState = VertexState.NEW;
+          break;
+        case INITED:
+          try {
+            vertex.initializeCommitters();
+          } catch (Exception e) {
+            LOG.info("Failed to initialize committers", e);
+            vertex.finished(VertexState.FAILED,
+                VertexTerminationCause.INIT_FAILURE);
+            endState = VertexState.FAILED;
+            break;
+          }
+          if (!vertex.setParallelism(0,
+              null, vertex.recoveredSourceEdgeManagers, true)) {
+            LOG.info("Failed to recover edge managers");
+            vertex.finished(VertexState.FAILED,
+                VertexTerminationCause.INIT_FAILURE);
+            endState = VertexState.FAILED;
+            break;
+          }
+          // Recover tasks
+          if (vertex.tasks != null) {
+            for (Task task : vertex.tasks.values()) {
+              vertex.eventHandler.handle(
+                  new TaskEventRecoverTask(task.getTaskId()));
+            }
+          }
+          if (vertex.numStartedSourceVertices == vertex.getInputVerticesCount()) {
+            vertex.eventHandler.handle(new VertexEvent(vertex.vertexId,
+              VertexEventType.V_START));
+          }
+          endState = VertexState.INITED;
+          break;
+        case RUNNING:
+          vertex.tasksNotYetScheduled = false;
+          // if commit in progress and desired state is not a succeeded one,
+          // move to failed
+          if (vertex.recoveryCommitInProgress) {
+            LOG.info("Recovered vertex was in the middle of a commit"
+                + ", failing Vertex=" + vertex.logIdentifier);
+            vertex.finished(VertexState.FAILED,
+                VertexTerminationCause.COMMIT_FAILURE);
+            endState = VertexState.FAILED;
+            break;
+          }
+          try {
+            vertex.initializeCommitters();
+          } catch (Exception e) {
+            LOG.info("Failed to initialize committers", e);
+            vertex.finished(VertexState.FAILED,
+                VertexTerminationCause.INIT_FAILURE);
+            endState = VertexState.FAILED;
+            break;
+          }
+          if (!vertex.setParallelism(0, null, vertex.recoveredSourceEdgeManagers, true)) {
+            LOG.info("Failed to recover edge managers");
+            vertex.finished(VertexState.FAILED,
+                VertexTerminationCause.INIT_FAILURE);
+            endState = VertexState.FAILED;
+            break;
+          }
+          assert vertex.tasks.size() == vertex.numTasks;
+          if (vertex.tasks != null) {
+            for (Task task : vertex.tasks.values()) {
+              vertex.eventHandler.handle(
+                  new TaskEventRecoverTask(task.getTaskId()));
+            }
+            vertex.vertexManager.onVertexStarted(vertex.pendingReportedSrcCompletions);
+            endState = VertexState.RUNNING;
+          } else {
+            endState = VertexState.SUCCEEDED;
+            vertex.finished(endState);
+          }
+          break;
+        case SUCCEEDED:
+        case FAILED:
+        case KILLED:
+          vertex.tasksNotYetScheduled = false;
+          // recover tasks
+          assert vertex.tasks.size() == vertex.numTasks;
+          if (vertex.tasks != null) {
+            TaskState taskState = TaskState.KILLED;
+            switch (vertex.recoveredState) {
+              case SUCCEEDED:
+                taskState = TaskState.SUCCEEDED;
+                break;
+              case KILLED:
+                taskState = TaskState.KILLED;
+                break;
+              case FAILED:
+                taskState = TaskState.FAILED;
+                break;
+            }
+            for (Task task : vertex.tasks.values()) {
+              vertex.eventHandler.handle(
+                  new TaskEventRecoverTask(task.getTaskId(),
+                      taskState));
+            }
+            // Wait for all tasks to recover and report back
+            vertex.vertexManager.onVertexStarted(vertex.pendingReportedSrcCompletions);
+            endState = VertexState.RUNNING;
+          } else {
+            endState = vertex.recoveredState;
+            vertex.finished(endState);
+          }
+          break;
+        default:
+          LOG.warn("Invalid recoveredState found when trying to recover"
+              + " vertex, recoveredState=" + vertex.recoveredState);
+          vertex.finished(VertexState.ERROR);
+          endState = VertexState.ERROR;
+          break;
+      }
+
+      LOG.info("Recovered Vertex State"
+          + ", vertexId=" + vertex.logIdentifier
+          + ", state=" + endState
+          + ", numInitedSourceVertices" + vertex.numInitedSourceVertices
+          + ", numStartedSourceVertices=" + vertex.numStartedSourceVertices
+          + ", numRecoveredSourceVertices=" + vertex.numRecoveredSourceVertices
+          + ", tasksIsNull=" + (vertex.tasks == null)
+          + ", numTasks=" + ( vertex.tasks == null ? 0 : vertex.tasks.size()));
+
+      for (Entry<Vertex, Edge> entry : vertex.getOutputVertices().entrySet()) {
+        vertex.eventHandler.handle(new VertexEventSourceVertexRecovered(
+            entry.getKey().getVertexId(),
+            vertex.vertexId, endState, completedTaskAttempts));
+      }
+      if (EnumSet.of(VertexState.RUNNING, VertexState.SUCCEEDED, VertexState.INITED)
+          .contains(endState)) {
+        // Send events downstream
+        vertex.routeRecoveredEvents(endState, vertex.recoveredEvents);
+        vertex.recoveredEvents.clear();
       } else {
-        if (hasBipartite) {
-          // setup vertex manager
-          // TODO this needs to consider data size and perhaps API.
-          // Currently implicitly BIPARTITE is the only edge type
-          LOG.info("Setting vertexManager to ShuffleVertexManager for "
-              + vertex.logIdentifier);
-          vertex.vertexManager = new VertexManager(new ShuffleVertexManager(),
-              vertex, vertex.appContext);
-        } else if (vertex.inputsWithInitializers != null) {
-          LOG.info("Setting vertexManager to RootInputVertexManager for "
-              + vertex.logIdentifier);
-          vertex.vertexManager = new VertexManager(new RootInputVertexManager(),
-              vertex, vertex.appContext);
-        } else {
-          // schedule all tasks upon vertex start. Default behavior.
-          LOG.info("Setting vertexManager to ImmediateStartVertexManager for "
-              + vertex.logIdentifier);
-          vertex.vertexManager = new VertexManager(
-              new ImmediateStartVertexManager(), vertex, vertex.appContext);
+        // Ensure no recovered events
+        if (!vertex.recoveredEvents.isEmpty()) {
+          throw new RuntimeException("Invalid Vertex state"
+              + ", found non-zero recovered events in invalid state"
+              + ", recoveredState=" + endState
+              + ", recoveredEvents=" + vertex.recoveredEvents.size());
         }
       }
-      
-      vertex.vertexManager.initialize();
-
-      // Setup tasks early if possible. If the VertexManager is not being used
-      // to set parallelism, sending events to Tasks is safe (and less confusing
-      // then relying on tasks to be created after TaskEvents are generated).
-      // For VertexManagers setting parallelism, the setParallelism call needs
-      // to be inline.
-      vertex.numTasks = vertex.getVertexPlan().getTaskConfig().getNumTasks();
-      if (!(vertex.numTasks == -1 || vertex.numTasks >= 0)) {
-        vertex.addDiagnostic("Invalid task count for vertex"
-          + ", numTasks=" + vertex.numTasks);
-        vertex.trySetTerminationCause(VertexTerminationCause.INVALID_NUM_OF_TASKS);
-        vertex.abortVertex(VertexStatus.State.FAILED);
-        return vertex.finished(VertexState.FAILED);
+      return endState;
+    }
+
+  }
+
+  public static class InitTransition implements
+      MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
+
+    @Override
+    public VertexState transition(VertexImpl vertex, VertexEvent event) {
+      VertexState vertexState = VertexState.NEW;
+      vertex.numInitedSourceVertices++;
+      if (vertex.sourceVertices == null || vertex.sourceVertices.isEmpty() ||
+          vertex.numInitedSourceVertices == vertex.sourceVertices.size()) {
+        vertexState = handleInitEvent(vertex, event);
+        if (vertexState != VertexState.FAILED) {
+          if (vertex.targetVertices != null && !vertex.targetVertices.isEmpty()) {
+            for (Vertex target : vertex.targetVertices.keySet()) {
+              vertex.getEventHandler().handle(new VertexEvent(target.getVertexId(),
+                VertexEventType.V_INIT));
+            }
+          }
+        }
       }
+      return vertexState;
+    }
 
-      vertex.checkTaskLimits();
+    private VertexState handleInitEvent(VertexImpl vertex, VertexEvent event) {
+      VertexState state = vertex.setupVertex();
+      if (state.equals(VertexState.FAILED)) {
+        return state;
+      }
 
       // Create tasks based on initial configuration, but don't start them yet.
       if (vertex.numTasks == -1) {
@@ -1862,7 +2571,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         vertex.numSuccessSourceAttemptCompletions++;
         if (vertex.getState() == VertexState.RUNNING) {
           vertex.vertexManager.onSourceTaskCompleted(completionEvent
-              .getTaskAttemptId());
+              .getTaskAttemptId().getTaskID());
         } else {
           vertex.pendingReportedSrcCompletions.add(completionEvent.getTaskAttemptId());
         }
@@ -2034,9 +2743,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     @Override
     public void transition(VertexImpl vertex, VertexEvent event) {
       VertexEventRouteEvent rEvent = (VertexEventRouteEvent) event;
+      boolean recovered = rEvent.isRecovered();
       List<TezEvent> tezEvents = rEvent.getEvents();
 
       if (vertex.getAppContext().isRecoveryEnabled()
+          && !recovered
           && !tezEvents.isEmpty()) {
         List<TezEvent> dataMovementEvents =
             Lists.newArrayList();
@@ -2061,7 +2772,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       for(TezEvent tezEvent : tezEvents) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Vertex: " + vertex.getName() + " routing event: "
-              + tezEvent.getEventType());
+              + tezEvent.getEventType()
+              + " Recovered:" + recovered);
         }
         EventMetaData sourceMeta = tezEvent.getSourceInfo();
         switch(tezEvent.getEventType()) {
@@ -2074,7 +2786,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               TezTaskAttemptID srcTaId = sourceMeta.getTaskAttemptID();
               if (tezEvent.getEventType() == EventType.DATA_MOVEMENT_EVENT) {
                 ((DataMovementEvent) tezEvent.getEvent()).setVersion(srcTaId.getId());
-              } else if (tezEvent.getEventType() == EventType.COMPOSITE_DATA_MOVEMENT_EVENT) { 
+              } else if (tezEvent.getEventType() == EventType.COMPOSITE_DATA_MOVEMENT_EVENT) {
                 ((CompositeDataMovementEvent) tezEvent.getEvent()).setVersion(srcTaId.getId());
               } else {
                 ((InputFailedEvent) tezEvent.getEvent()).setVersion(srcTaId.getId());
@@ -2396,7 +3108,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     return this.vertexManager;
   }
 
-  private static void logLocationHints(VertexLocationHint locationHint) {
+  private static void logLocationHints(String vertexName,
+      VertexLocationHint locationHint) {
+    if (locationHint == null) {
+      LOG.debug("No Vertex LocationHint specified for vertex=" + vertexName);
+      return;
+    }
     Multiset<String> hosts = HashMultiset.create();
     Multiset<String> racks = HashMultiset.create();
     int counter = 0;
@@ -2421,18 +3138,19 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           sb.append(rack).append(", ");
         }
       }
-      LOG.debug("Location: " + counter + " : " + sb.toString());
+      LOG.debug("Vertex: " + vertexName + ", Location: "
+          + counter + " : " + sb.toString());
       counter++;
     }
 
-    LOG.debug("Host Counts");
+    LOG.debug("Vertex: " + vertexName + ", Host Counts");
     for (Multiset.Entry<String> host : hosts.entrySet()) {
-      LOG.debug("host: " + host.toString());
+      LOG.debug("Vertex: " + vertexName + ", host: " + host.toString());
     }
 
-    LOG.debug("Rack Counts");
+    LOG.debug("Vertex: " + vertexName + ", Rack Counts");
     for (Multiset.Entry<String> rack : racks.entrySet()) {
-      LOG.debug("rack: " + rack.toString());
+      LOG.debug("Vertex: " + vertexName + ", rack: " + rack.toString());
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
index e08e7ed..6f75481 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
@@ -147,6 +147,11 @@ public class VertexManager {
       managedVertex.setVertexLocationHint(locationHint);
     }
 
+    @Override
+    public int getDAGAttemptNumber() {
+      return appContext.getApplicationAttemptId().getAttemptId();
+    }
+
     private void verifyIsRootInput(String inputName) {
       Preconditions.checkState(managedVertex.getAdditionalInputs().get(inputName) != null,
           "Cannot add events for non-root inputs");
@@ -222,23 +227,24 @@ public class VertexManager {
 
   public void onVertexStarted(List<TezTaskAttemptID> completions) {
     Map<String, List<Integer>> pluginCompletionsMap = Maps.newHashMap();
-    for (TezTaskAttemptID attemptId : completions) {
-      TezTaskID tezTaskId = attemptId.getTaskID();
-      Integer taskId = new Integer(tezTaskId.getId());
-      String vertexName = 
-          appContext.getCurrentDAG().getVertex(tezTaskId.getVertexID()).getName();
-      List<Integer> taskIdList = pluginCompletionsMap.get(vertexName);
-      if (taskIdList == null) {
-        taskIdList = Lists.newArrayList();
-        pluginCompletionsMap.put(vertexName, taskIdList);
+    if (completions != null && !completions.isEmpty()) {
+      for (TezTaskAttemptID tezTaskAttemptID : completions) {
+        Integer taskId = new Integer(tezTaskAttemptID.getTaskID().getId());
+        String vertexName =
+            appContext.getCurrentDAG().getVertex(
+                tezTaskAttemptID.getTaskID().getVertexID()).getName();
+        List<Integer> taskIdList = pluginCompletionsMap.get(vertexName);
+        if (taskIdList == null) {
+          taskIdList = Lists.newArrayList();
+          pluginCompletionsMap.put(vertexName, taskIdList);
+        }
+        taskIdList.add(taskId);
       }
-      taskIdList.add(taskId);
     }
     plugin.onVertexStarted(pluginCompletionsMap);
   }
 
-  public void onSourceTaskCompleted(TezTaskAttemptID attemptId) {
-    TezTaskID tezTaskId = attemptId.getTaskID();
+  public void onSourceTaskCompleted(TezTaskID tezTaskId) {
     Integer taskId = new Integer(tezTaskId.getId());
     String vertexName = 
         appContext.getCurrentDAG().getVertex(tezTaskId.getVertexID()).getName();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
index a71686b..7b2087a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
@@ -27,11 +27,14 @@ public enum HistoryEventType {
   DAG_FINISHED,
   VERTEX_INITIALIZED,
   VERTEX_STARTED,
+  VERTEX_PARALLELISM_UPDATED,
   VERTEX_FINISHED,
   TASK_STARTED,
   TASK_FINISHED,
   TASK_ATTEMPT_STARTED,
   TASK_ATTEMPT_FINISHED,
   CONTAINER_LAUNCHED,
-  VERTEX_DATA_MOVEMENT_EVENTS_GENERATED
+  VERTEX_DATA_MOVEMENT_EVENTS_GENERATED,
+  DAG_COMMIT_STARTED,
+  VERTEX_COMMIT_STARTED
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/ats/ATSService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/ats/ATSService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/ats/ATSService.java
index 4ec0632..690e850 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/ats/ATSService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/ats/ATSService.java
@@ -22,16 +22,9 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.tez.dag.history.DAGHistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.history.recovery.RecoveryService;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
 
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java
index 4794a7b..54bc658 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java
@@ -98,7 +98,7 @@ public class AMLaunchedEvent implements HistoryEvent {
 
   @Override
   public boolean isRecoveryEvent() {
-    return true;
+    return false;
   }
 
   @Override
@@ -139,4 +139,16 @@ public class AMLaunchedEvent implements HistoryEvent {
     fromProto(proto);
   }
 
+  public ApplicationAttemptId getApplicationAttemptId() {
+    return applicationAttemptId;
+  }
+
+  public long getLaunchTime() {
+    return launchTime;
+  }
+
+  public long getAppSubmitTime() {
+    return appSubmitTime;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java
index b3cbb5c..e66141b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java
@@ -129,4 +129,13 @@ public class AMStartedEvent implements HistoryEvent {
     fromProto(proto);
   }
 
+  public ApplicationAttemptId getApplicationAttemptId() {
+    return applicationAttemptId;
+  }
+
+  public long getStartTime() {
+    return startTime;
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java
index 066f315..471ddd1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java
@@ -138,4 +138,16 @@ public class ContainerLaunchedEvent implements HistoryEvent {
         + ", launchTime=" + launchTime;
   }
 
+  public ContainerId getContainerId() {
+    return containerId;
+  }
+
+  public long getLaunchTime() {
+    return launchTime;
+  }
+
+  public ApplicationAttemptId getApplicationAttemptId() {
+    return applicationAttemptId;
+  }
+
 }


[4/4] git commit: TEZ-847. Support basic AM recovery. (hitesh)

Posted by hi...@apache.org.
TEZ-847. Support basic AM recovery. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/5b464f27
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/5b464f27
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/5b464f27

Branch: refs/heads/master
Commit: 5b464f27da273723e422607518c271cd3f040560
Parents: 18290c8
Author: Hitesh Shah <hi...@apache.org>
Authored: Wed Mar 5 15:34:53 2014 -0800
Committer: Hitesh Shah <hi...@apache.org>
Committed: Wed Mar 5 15:34:53 2014 -0800

----------------------------------------------------------------------
 .../java/org/apache/tez/client/TezSession.java  |    2 -
 .../apache/tez/dag/api/TezConfiguration.java    |   10 +-
 .../tez/dag/api/VertexManagerPluginContext.java |    5 +
 .../apache/tez/runtime/api/OutputCommitter.java |   18 +
 .../tez/runtime/api/OutputCommitterContext.java |   10 +
 tez-api/src/main/proto/DAGApiRecords.proto      |   15 +-
 .../tez/dag/api/client/VertexStatusBuilder.java |    2 +
 .../org/apache/tez/dag/app/DAGAppMaster.java    |   72 +-
 .../org/apache/tez/dag/app/RecoveryParser.java  |  613 ++++++++++
 .../java/org/apache/tez/dag/app/dag/DAG.java    |    3 +
 .../java/org/apache/tez/dag/app/dag/Task.java   |    3 +
 .../org/apache/tez/dag/app/dag/TaskAttempt.java |    6 +-
 .../java/org/apache/tez/dag/app/dag/Vertex.java |    5 +
 .../org/apache/tez/dag/app/dag/VertexState.java |    1 +
 .../tez/dag/app/dag/event/DAGEventType.java     |    6 +-
 .../dag/app/dag/event/TaskAttemptEventType.java |    3 +
 .../dag/app/dag/event/TaskEventRecoverTask.java |   41 +
 .../tez/dag/app/dag/event/TaskEventType.java    |    6 +-
 .../app/dag/event/VertexEventRecoverVertex.java |   36 +
 .../app/dag/event/VertexEventRouteEvent.java    |   14 +-
 .../event/VertexEventSourceVertexRecovered.java |   56 +
 .../tez/dag/app/dag/event/VertexEventType.java  |    8 +-
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    |  465 +++++---
 .../dag/impl/OutputCommitterContextImpl.java    |   10 +-
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   |   87 +-
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   |  234 +++-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 1054 +++++++++++++++---
 .../tez/dag/app/dag/impl/VertexManager.java     |   30 +-
 .../tez/dag/history/HistoryEventType.java       |    5 +-
 .../apache/tez/dag/history/ats/ATSService.java  |    7 -
 .../tez/dag/history/events/AMLaunchedEvent.java |   14 +-
 .../tez/dag/history/events/AMStartedEvent.java  |    9 +
 .../history/events/ContainerLaunchedEvent.java  |   12 +
 .../history/events/DAGCommitStartedEvent.java   |   94 ++
 .../dag/history/events/DAGFinishedEvent.java    |   67 +-
 .../dag/history/events/DAGInitializedEvent.java |    7 +
 .../tez/dag/history/events/DAGStartedEvent.java |    7 +
 .../dag/history/events/DAGSubmittedEvent.java   |   11 +
 .../events/TaskAttemptFinishedEvent.java        |   51 +-
 .../history/events/TaskAttemptStartedEvent.java |   17 +-
 .../dag/history/events/TaskFinishedEvent.java   |   56 +-
 .../dag/history/events/TaskStartedEvent.java    |   14 +-
 .../events/VertexCommitStartedEvent.java        |   94 ++
 .../VertexDataMovementEventsGeneratedEvent.java |    8 +
 .../dag/history/events/VertexFinishedEvent.java |   59 +-
 .../history/events/VertexInitializedEvent.java  |   78 +-
 .../events/VertexParallelismUpdatedEvent.java   |  159 +++
 .../dag/history/events/VertexStartedEvent.java  |   14 +-
 .../dag/history/recovery/RecoveryService.java   |  171 ++-
 .../apache/tez/dag/recovery/RecoveryParser.java |  186 ----
 tez-dag/src/main/proto/HistoryEvents.proto      |   23 +-
 .../dag/api/client/TestVertexStatusBuilder.java |    8 +-
 .../TestHistoryEventsProtoConversion.java       |  569 ++++++++++
 tez-dist/pom.xml                                |   12 +-
 .../mapreduce/committer/MROutputCommitter.java  |   34 +-
 .../tez/runtime/api/impl/EventMetaData.java     |    2 +-
 .../apache/tez/runtime/api/impl/InputSpec.java  |    4 +
 .../vertexmanager/ShuffleVertexManager.java     |    4 +-
 .../org/apache/tez/test/TestDAGRecovery.java    |  135 +++
 .../apache/tez/test/dag/MultiAttemptDAG.java    |  177 +++
 60 files changed, 4259 insertions(+), 664 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-api/src/main/java/org/apache/tez/client/TezSession.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezSession.java b/tez-api/src/main/java/org/apache/tez/client/TezSession.java
index 8f3101e..9e29910 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezSession.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezSession.java
@@ -116,8 +116,6 @@ public class TezSession {
               sessionConfig.getTezConfiguration(), applicationId,
               null, sessionName, sessionConfig.getAMConfiguration(),
               tezJarResources, sessionCredentials);
-      // Set Tez Sessions to not retry on AM crashes
-      appContext.setMaxAppAttempts(1);
       yarnClient.submitApplication(appContext);
     } catch (YarnException e) {
       throw new TezException(e);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 142d2d9..29d04ab 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -369,12 +369,20 @@ public class TezConfiguration extends Configuration {
 
   public static final String DAG_RECOVERY_ENABLED =
       TEZ_PREFIX + "dag.recovery.enabled";
-  public static final boolean DAG_RECOVERY_ENABLED_DEFAULT = false;
+  public static final boolean DAG_RECOVERY_ENABLED_DEFAULT = true;
 
   public static final String DAG_RECOVERY_FILE_IO_BUFFER_SIZE =
       TEZ_PREFIX + "dag.recovery.io.buffer.size";
   public static final int DAG_RECOVERY_FILE_IO_BUFFER_SIZE_DEFAULT = 8192;
 
+  public static final String DAG_RECOVERY_MAX_UNFLUSHED_EVENTS =
+      TEZ_PREFIX + "dag.recovery.max.unflushed.events";
+  public static final int DAG_RECOVERY_MAX_UNFLUSHED_EVENTS_DEFAULT = 100;
+
+  public static final String DAG_RECOVERY_FLUSH_INTERVAL_SECS =
+      TEZ_PREFIX + "dag.recovery.flush.interval.secs";
+  public static final int DAG_RECOVERY_FLUSH_INTERVAL_SECS_DEFAULT = 30;
+
   public static final String DAG_RECOVERY_DATA_DIR_NAME = "recovery";
   public static final String DAG_RECOVERY_SUMMARY_FILE_SUFFIX = ".summary";
   public static final String DAG_RECOVERY_RECOVER_FILE_SUFFIX = ".recovery";

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
index a281b90..76202f8 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
@@ -125,4 +125,9 @@ public interface VertexManagerPluginContext {
    * @param locationHint
    */
   public void setVertexLocationHint(VertexLocationHint locationHint);
+
+  /**
+   * @return DAG Attempt number
+   */
+  public int getDAGAttemptNumber();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitter.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitter.java b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitter.java
index 301d01b..aadbf12 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitter.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitter.java
@@ -69,4 +69,22 @@ public abstract class OutputCommitter {
   public abstract void abortOutput(VertexStatus.State finalState)
     throws Exception;
 
+  /**
+   * Whether the OutputCommitter supports recovery of output from a Task
+   * that completed in a previous DAG attempt
+   * @return True if recovery supported
+   */
+  public boolean isTaskRecoverySupported() {
+    return true;
+  }
+
+  /**
+   * Recover task output from a previous DAG attempt
+   * @param taskIndex Index of task to be recovered
+   * @param previousDAGAttempt Previous DAG Attempt Number
+   * @throws java.lang.Exception
+   */
+  public void recoverTask(int taskIndex, int previousDAGAttempt)  throws Exception {
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitterContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitterContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitterContext.java
index 3132758..b5837e8 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitterContext.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitterContext.java
@@ -18,6 +18,8 @@
 
 package org.apache.tez.runtime.api;
 
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 
 /**
@@ -63,4 +65,12 @@ public interface OutputCommitterContext {
    */
   public byte[] getUserPayload();
 
+  /**
+   * Get Vertex Index in the DAG
+   * @return Vertex index
+   */
+  @Unstable
+  @Evolving
+  public int getVertexIndex();
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-api/src/main/proto/DAGApiRecords.proto
----------------------------------------------------------------------
diff --git a/tez-api/src/main/proto/DAGApiRecords.proto b/tez-api/src/main/proto/DAGApiRecords.proto
index 5faa7f1..c7a317e 100644
--- a/tez-api/src/main/proto/DAGApiRecords.proto
+++ b/tez-api/src/main/proto/DAGApiRecords.proto
@@ -171,13 +171,14 @@ message ProgressProto {
 enum VertexStatusStateProto {
   VERTEX_NEW = 0;
   VERTEX_INITIALIZING = 1;
-  VERTEX_INITED = 2;
-  VERTEX_RUNNING = 3;
-  VERTEX_SUCCEEDED = 4;
-  VERTEX_FAILED = 5;
-  VERTEX_KILLED = 6;
-  VERTEX_ERROR = 7;
-  VERTEX_TERMINATING = 8;
+  VERTEX_RECOVERING = 2;
+  VERTEX_INITED = 3;
+  VERTEX_RUNNING = 4;
+  VERTEX_SUCCEEDED = 5;
+  VERTEX_FAILED = 6;
+  VERTEX_KILLED = 7;
+  VERTEX_ERROR = 8;
+  VERTEX_TERMINATING = 9;
 }
 
 message VertexStatusProto {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java
index 0e693b8..dc24f7f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java
@@ -66,6 +66,8 @@ public class VertexStatusBuilder extends VertexStatus {
         return VertexStatusStateProto.VERTEX_NEW;
       case INITIALIZING:
         return VertexStatusStateProto.VERTEX_INITIALIZING;
+      case RECOVERING:
+        return VertexStatusStateProto.VERTEX_NEW;
       case INITED:
         return VertexStatusStateProto.VERTEX_INITED;
       case RUNNING:

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 1ce07fb..c8185b8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -48,6 +48,8 @@ import org.apache.commons.cli.Options;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
@@ -220,6 +222,8 @@ public class DAGAppMaster extends AbstractService {
   private FileSystem recoveryFS;
   private int recoveryBufferSize;
 
+  protected boolean isLastAMRetry = false;
+
   // DAG Counter
   private final AtomicInteger dagCounter = new AtomicInteger();
 
@@ -262,6 +266,14 @@ public class DAGAppMaster extends AbstractService {
   @Override
   public synchronized void serviceInit(final Configuration conf) throws Exception {
 
+    int maxAppAttempts = 1;
+    String maxAppAttemptsEnv = System.getenv(
+        ApplicationConstants.MAX_APP_ATTEMPTS_ENV);
+    if (maxAppAttemptsEnv != null) {
+      maxAppAttempts = Integer.valueOf(maxAppAttemptsEnv);
+    }
+    isLastAMRetry = appAttemptID.getAttemptId() >= maxAppAttempts;
+
     this.amConf = conf;
     conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
 
@@ -481,6 +493,11 @@ public class DAGAppMaster extends AbstractService {
     }
   }
 
+  public void setCurrentDAG(DAG currentDAG) {
+    this.currentDAG = currentDAG;
+    context.setDAG(currentDAG);
+  }
+
   private class DAGAppMasterEventHandler implements
       EventHandler<DAGAppMasterEvent> {
     @Override
@@ -539,7 +556,7 @@ public class DAGAppMaster extends AbstractService {
   }
 
   /** Create and initialize (but don't start) a single dag. */
-  protected DAG createDAG(DAGPlan dagPB, TezDAGID dagId) {
+  DAGImpl createDAG(DAGPlan dagPB, TezDAGID dagId) {
     if (dagId == null) {
       dagId = TezDAGID.getInstance(appAttemptID.getApplicationId(),
           dagCounter.incrementAndGet());
@@ -566,7 +583,7 @@ public class DAGAppMaster extends AbstractService {
     TokenCache.setSessionToken(sessionToken, dagCredentials);
 
     // create single dag
-    DAG newDag =
+    DAGImpl newDag =
         new DAGImpl(dagId, dagConf, dagPB, dispatcher.getEventHandler(),
             taskAttemptListener, dagCredentials, clock,
             appMasterUgi.getShortUserName(),
@@ -989,6 +1006,7 @@ public class DAGAppMaster extends AbstractService {
         return TezSessionStatus.INITIALIZING;
       case IDLE:
         return TezSessionStatus.READY;
+      case RECOVERING:
       case RUNNING:
         return TezSessionStatus.RUNNING;
       case ERROR:
@@ -1328,6 +1346,31 @@ public class DAGAppMaster extends AbstractService {
     }
   }
 
+  private DAG recoverDAG() throws IOException {
+    DAG recoveredDAG = null;
+    if (recoveryEnabled) {
+      if (this.appAttemptID.getAttemptId() > 1) {
+        this.state = DAGAppMasterState.RECOVERING;
+        RecoveryParser recoveryParser = new RecoveryParser(
+            this, recoveryFS, recoveryDataDir, appAttemptID.getAttemptId());
+        recoveredDAG = recoveryParser.parseRecoveryData();
+        if (recoveredDAG != null) {
+          LOG.info("Found DAG to recover, dagId=" + recoveredDAG.getID());
+          _updateLoggers(recoveredDAG, "");
+          DAGEvent recoverDAGEvent = new DAGEvent(recoveredDAG.getID(),
+              DAGEventType.DAG_RECOVER);
+          dagEventDispatcher.handle(recoverDAGEvent);
+          this.state = DAGAppMasterState.RUNNING;
+        } else {
+          LOG.info("No DAG to recover");
+          this.state = DAGAppMasterState.IDLE;
+        }
+      }
+    }
+    return recoveredDAG;
+  }
+
+  @SuppressWarnings("unchecked")
   @Override
   public synchronized void serviceStart() throws Exception {
 
@@ -1348,18 +1391,18 @@ public class DAGAppMaster extends AbstractService {
     this.lastDAGCompletionTime = clock.getTime();
 
     if (!isSession) {
-      startDAG();
+      DAG recoveredDAG = null;
+      if (appAttemptID.getAttemptId() != 1) {
+        recoveredDAG = recoverDAG();
+      }
+      if (recoveredDAG == null) {
+        dagCounter.set(0);
+        startDAG();
+      }
     } else {
       LOG.info("In Session mode. Waiting for DAG over RPC");
       this.state = DAGAppMasterState.IDLE;
-
-      if (recoveryEnabled) {
-        if (this.appAttemptID.getAttemptId() > 0) {
-          // Recovery data and copy over into new recovery dir
-          this.state = DAGAppMasterState.IDLE;
-          // TODO
-        }
-      }
+      recoverDAG();
 
       this.dagSubmissionTimer = new Timer(true);
       this.dagSubmissionTimer.scheduleAtFixedRate(new TimerTask() {
@@ -1570,6 +1613,9 @@ public class DAGAppMaster extends AbstractService {
       }
 
       appMaster.stop();
+
+
+
     }
   }
 
@@ -1672,4 +1718,8 @@ public class DAGAppMaster extends AbstractService {
   private void sendEvent(Event<?> event) {
     dispatcher.getEventHandler().handle(event);
   }
+
+  synchronized void setDAGCounter(int dagCounter) {
+    this.dagCounter.set(dagCounter);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
new file mode 100644
index 0000000..9e59849
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
@@ -0,0 +1,613 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.dag.Task;
+import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.impl.DAGImpl;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.events.AMLaunchedEvent;
+import org.apache.tez.dag.history.events.AMStartedEvent;
+import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
+import org.apache.tez.dag.history.events.DAGCommitStartedEvent;
+import org.apache.tez.dag.history.events.DAGStartedEvent;
+import org.apache.tez.dag.history.events.DAGFinishedEvent;
+import org.apache.tez.dag.history.events.DAGInitializedEvent;
+import org.apache.tez.dag.history.events.DAGSubmittedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
+import org.apache.tez.dag.history.events.TaskFinishedEvent;
+import org.apache.tez.dag.history.events.TaskStartedEvent;
+import org.apache.tez.dag.history.events.VertexCommitStartedEvent;
+import org.apache.tez.dag.history.events.VertexDataMovementEventsGeneratedEvent;
+import org.apache.tez.dag.history.events.VertexFinishedEvent;
+import org.apache.tez.dag.history.events.VertexInitializedEvent;
+import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
+import org.apache.tez.dag.history.events.VertexStartedEvent;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.recovery.records.RecoveryProtos;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+public class RecoveryParser {
+
+  private static final Log LOG = LogFactory.getLog(RecoveryParser.class);
+
+  private final DAGAppMaster dagAppMaster;
+  private final FileSystem recoveryFS;
+  private final Path recoveryDataDir;
+  private final Path currentAttemptRecoveryDataDir;
+  private final int recoveryBufferSize;
+  private final int currentAttemptId;
+
+  private static final String dataRecoveredFileFlag = "dataRecovered";
+
+  public RecoveryParser(DAGAppMaster dagAppMaster,
+      FileSystem recoveryFS,
+      Path recoveryDataDir,
+      int currentAttemptId) {
+    this.dagAppMaster = dagAppMaster;
+    this.recoveryFS = recoveryFS;
+    this.recoveryDataDir = recoveryDataDir;
+    this.currentAttemptId = currentAttemptId;
+    this.currentAttemptRecoveryDataDir =
+        getAttemptRecoveryDataDir(recoveryDataDir, currentAttemptId);
+    recoveryBufferSize = dagAppMaster.getConfig().getInt(
+        TezConfiguration.DAG_RECOVERY_FILE_IO_BUFFER_SIZE,
+        TezConfiguration.DAG_RECOVERY_FILE_IO_BUFFER_SIZE_DEFAULT);
+  }
+
+  private static void parseSummaryFile(FSDataInputStream inputStream)
+      throws IOException {
+    while (inputStream.available() > 0) {
+      RecoveryProtos.SummaryEventProto proto =
+          RecoveryProtos.SummaryEventProto.parseDelimitedFrom(inputStream);
+      LOG.info("[SUMMARY]"
+          + " dagId=" + proto.getDagId()
+          + ", timestamp=" + proto.getTimestamp()
+          + ", event=" + HistoryEventType.values()[proto.getEventType()]);
+    }
+  }
+
+  private static HistoryEvent getNextEvent(FSDataInputStream inputStream)
+      throws IOException {
+    int eventTypeOrdinal = inputStream.readInt();
+    if (eventTypeOrdinal < 0 || eventTypeOrdinal >=
+        HistoryEventType.values().length) {
+      // Corrupt data
+      // reached end
+      throw new IOException("Corrupt data found when trying to read next event type"
+          + ", eventTypeOrdinal=" + eventTypeOrdinal);
+    }
+    HistoryEventType eventType = HistoryEventType.values()[eventTypeOrdinal];
+    HistoryEvent event;
+    switch (eventType) {
+      case AM_LAUNCHED:
+        event = new AMLaunchedEvent();
+        break;
+      case AM_STARTED:
+        event = new AMStartedEvent();
+        break;
+      case DAG_SUBMITTED:
+        event = new DAGSubmittedEvent();
+        break;
+      case DAG_INITIALIZED:
+        event = new DAGInitializedEvent();
+        break;
+      case DAG_STARTED:
+        event = new DAGStartedEvent();
+        break;
+      case DAG_COMMIT_STARTED:
+        event = new DAGCommitStartedEvent();
+        break;
+      case DAG_FINISHED:
+        event = new DAGFinishedEvent();
+        break;
+      case CONTAINER_LAUNCHED:
+        event = new ContainerLaunchedEvent();
+        break;
+      case VERTEX_INITIALIZED:
+        event = new VertexInitializedEvent();
+        break;
+      case VERTEX_STARTED:
+        event = new VertexStartedEvent();
+        break;
+      case VERTEX_PARALLELISM_UPDATED:
+        event = new VertexParallelismUpdatedEvent();
+        break;
+      case VERTEX_COMMIT_STARTED:
+        event = new VertexCommitStartedEvent();
+        break;
+      case VERTEX_FINISHED:
+        event = new VertexFinishedEvent();
+        break;
+      case TASK_STARTED:
+        event = new TaskStartedEvent();
+        break;
+      case TASK_FINISHED:
+        event = new TaskFinishedEvent();
+        break;
+      case TASK_ATTEMPT_STARTED:
+        event = new TaskAttemptStartedEvent();
+        break;
+      case TASK_ATTEMPT_FINISHED:
+        event = new TaskAttemptFinishedEvent();
+        break;
+      case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
+        event = new VertexDataMovementEventsGeneratedEvent();
+        break;
+      default:
+        throw new IOException("Invalid data found, unknown event type "
+            + eventType);
+
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Parsing event from input stream"
+          + ", eventType=" + eventType);
+    }
+    event.fromProtoStream(inputStream);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Parsed event from input stream"
+          + ", eventType=" + eventType
+          + ", event=" + event.toString());
+    }
+    return event;
+  }
+
+
+
+
+
+  private static void parseDAGRecoveryFile(FSDataInputStream inputStream)
+      throws IOException {
+    while (inputStream.available() > 0) {
+      HistoryEvent historyEvent = getNextEvent(inputStream);
+      LOG.info("Parsed event from recovery stream"
+          + ", eventType=" + historyEvent.getEventType()
+          + ", event=" + historyEvent);
+    }
+  }
+
+  private Path getAttemptRecoveryDataDir(Path recoveryDataDir,
+      int attemptId) {
+    return new Path(recoveryDataDir, Integer.toString(attemptId));
+  }
+
+  public static void main(String argv[]) throws IOException {
+    // TODO clean up with better usage and error handling
+    Configuration conf = new Configuration();
+    String summaryPath = argv[0];
+    List<String> dagPaths = new ArrayList<String>();
+    if (argv.length > 1) {
+      for (int i = 1; i < argv.length; ++i) {
+        dagPaths.add(argv[i]);
+      }
+    }
+    FileSystem fs = FileSystem.get(conf);
+    LOG.info("Parsing Summary file " + summaryPath);
+    parseSummaryFile(fs.open(new Path(summaryPath)));
+    for (String dagPath : dagPaths) {
+      LOG.info("Parsing DAG recovery file " + dagPath);
+      parseDAGRecoveryFile(fs.open(new Path(dagPath)));
+    }
+  }
+
+  private Path getSummaryPath(Path recoveryDataDir) {
+    return new Path(recoveryDataDir,
+        dagAppMaster.getAttemptID().getApplicationId().toString()
+        + TezConfiguration.DAG_RECOVERY_SUMMARY_FILE_SUFFIX);
+  }
+
+  private FSDataOutputStream getSummaryOutputStream(Path summaryPath)
+      throws IOException {
+    return recoveryFS.create(summaryPath, true, recoveryBufferSize);
+  }
+
+  private FSDataInputStream getSummaryStream(Path summaryPath)
+      throws IOException {
+    if (!recoveryFS.exists(summaryPath)) {
+      return null;
+    }
+    return recoveryFS.open(summaryPath, recoveryBufferSize);
+  }
+
+  private Path getDAGRecoveryFilePath(Path recoveryDataDir,
+      TezDAGID dagID) {
+    return new Path(recoveryDataDir,
+        dagID.toString() + TezConfiguration.DAG_RECOVERY_RECOVER_FILE_SUFFIX);
+  }
+
+  private FSDataInputStream getDAGRecoveryStream(Path recoveryDataDir,
+      TezDAGID dagID)
+      throws IOException {
+    Path dagRecoveryPath = getDAGRecoveryFilePath(recoveryDataDir, dagID);
+    if (!recoveryFS.exists(dagRecoveryPath)) {
+      return null;
+    }
+    return recoveryFS.open(dagRecoveryPath, recoveryBufferSize);
+  }
+
+  private FSDataOutputStream getDAGRecoveryOutputStream(Path recoveryDataDir,
+      TezDAGID dagID)
+      throws IOException {
+    Path dagRecoveryPath = new Path(recoveryDataDir,
+        dagID.toString() + TezConfiguration.DAG_RECOVERY_RECOVER_FILE_SUFFIX);
+    return recoveryFS.create(dagRecoveryPath, true, recoveryBufferSize);
+  }
+
+  private TezDAGID getLastInProgressDAG(Map<TezDAGID, Boolean> seenDAGs) {
+    TezDAGID inProgressDAG = null;
+    for (Map.Entry<TezDAGID, Boolean> entry : seenDAGs.entrySet()) {
+      if (!entry.getValue().booleanValue()) {
+        if (inProgressDAG != null) {
+          throw new RuntimeException("Multiple in progress DAGs seen"
+              + ", dagId=" + inProgressDAG
+              + ", dagId=" + entry.getKey());
+        }
+        inProgressDAG = entry.getKey();
+      }
+    }
+    return inProgressDAG;
+  }
+
+  private Path getPreviousAttemptRecoveryDataDir() {
+    int foundPreviousAttempt = -1;
+    for (int i = currentAttemptId - 1; i > 0; --i) {
+      Path attemptPath = getAttemptRecoveryDataDir(recoveryDataDir, i);
+      Path dataRecoveredFile = new Path(attemptPath, dataRecoveredFileFlag);
+      try {
+        if (recoveryFS.exists(dataRecoveredFile)) {
+          foundPreviousAttempt = i;
+          break;
+        }
+      } catch (IOException e) {
+        LOG.warn("Exception when checking previous attempt dir for "
+            + dataRecoveredFile.toString(), e);
+      }
+    }
+    if (foundPreviousAttempt == -1) {
+      LOG.info("Falling back to first attempt as no other recovered attempts"
+          + " found");
+      foundPreviousAttempt = 1;
+    }
+
+    return getAttemptRecoveryDataDir(recoveryDataDir, foundPreviousAttempt);
+  }
+
+
+  public DAG parseRecoveryData() throws IOException {
+    Path previousAttemptRecoveryDataDir = getPreviousAttemptRecoveryDataDir();
+    LOG.info("Using " + previousAttemptRecoveryDataDir.toString()
+        + " for recovering data from previous attempt");
+    if (!recoveryFS.exists(previousAttemptRecoveryDataDir)) {
+      LOG.info("Nothing to recover as previous attempt data does not exist"
+          + ", previousAttemptDir=" + previousAttemptRecoveryDataDir.toString());
+      return null;
+    }
+
+    Path summaryPath = getSummaryPath(previousAttemptRecoveryDataDir);
+    FSDataInputStream summaryStream = getSummaryStream(
+        summaryPath);
+    if (summaryStream == null) {
+      LOG.info("Nothing to recover as summary file does not exist"
+          + ", previousAttemptDir=" + previousAttemptRecoveryDataDir.toString()
+          + ", summaryPath=" + summaryPath.toString());
+      return null;
+    }
+
+    Path newSummaryPath = getSummaryPath(currentAttemptRecoveryDataDir);
+    FSDataOutputStream newSummaryStream =
+        getSummaryOutputStream(newSummaryPath);
+
+    Map<TezDAGID, Boolean> seenDAGs = new TreeMap<TezDAGID, Boolean>();
+
+    FileStatus summaryFileStatus = recoveryFS.getFileStatus(summaryPath);
+    LOG.info("Parsing summary file"
+        + ", path=" + summaryPath.toString()
+        + ", len=" + summaryFileStatus.getLen()
+        + ", lastModTime=" + summaryFileStatus.getModificationTime());
+
+    int dagCounter = 0;
+    while (summaryStream.available() > 0) {
+      RecoveryProtos.SummaryEventProto proto =
+          RecoveryProtos.SummaryEventProto.parseDelimitedFrom(summaryStream);
+      HistoryEventType eventType =
+          HistoryEventType.values()[proto.getEventType()];
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("[RECOVERY SUMMARY]"
+            + " dagId=" + proto.getDagId()
+            + ", timestamp=" + proto.getTimestamp()
+            + ", event=" + eventType);
+      }
+      TezDAGID dagId = TezDAGID.fromString(proto.getDagId());
+      if (dagCounter < dagId.getId()) {
+        dagCounter = dagId.getId();
+      }
+      if (eventType.equals(HistoryEventType.DAG_SUBMITTED)) {
+        seenDAGs.put(dagId, false);
+      } else if (eventType.equals(HistoryEventType.DAG_FINISHED)) {
+        seenDAGs.put(dagId, true);
+      }
+      proto.writeDelimitedTo(newSummaryStream);
+    }
+    newSummaryStream.hsync();
+    newSummaryStream.close();
+
+    // Set counter for next set of DAGs
+    dagAppMaster.setDAGCounter(dagCounter);
+
+    TezDAGID lastInProgressDAG = getLastInProgressDAG(seenDAGs);
+    if (lastInProgressDAG == null) {
+      LOG.info("Nothing to recover as no uncompleted DAGs found");
+      return null;
+    }
+
+    LOG.info("Trying to recover dag from recovery file"
+        + ", dagId=" + lastInProgressDAG.toString()
+        + ", dataDir=" + previousAttemptRecoveryDataDir
+        + ", intoCurrentDir=" + currentAttemptRecoveryDataDir);
+
+    FSDataInputStream dagRecoveryStream = getDAGRecoveryStream(
+        previousAttemptRecoveryDataDir, lastInProgressDAG);
+    if (dagRecoveryStream == null) {
+      // Could not find data to recover
+      // Error out
+      throw new IOException("Could not find recovery data for last in progress DAG"
+          + ", dagId=" + lastInProgressDAG);
+    }
+
+    DAGImpl recoveredDAG = null;
+
+    LOG.info("Copying DAG data into Current Attempt directory"
+        + ", filePath=" + getDAGRecoveryFilePath(currentAttemptRecoveryDataDir,
+        lastInProgressDAG));
+    FSDataOutputStream newDAGRecoveryStream =
+        getDAGRecoveryOutputStream(currentAttemptRecoveryDataDir, lastInProgressDAG);
+
+    while (dagRecoveryStream.available() > 0) {
+      HistoryEvent event;
+      try {
+        event = getNextEvent(dagRecoveryStream);
+      } catch (IOException ioe) {
+        LOG.warn("Corrupt data found when trying to read next event", ioe);
+        break;
+      }
+      if (event == null) {
+        // reached end of data
+        break;
+      }
+      HistoryEventType eventType = event.getEventType();
+      switch (eventType) {
+        case DAG_SUBMITTED:
+        {
+          LOG.info("Recovering from event"
+              + ", eventType=" + eventType
+              + ", event=" + event.toString());
+          recoveredDAG = dagAppMaster.createDAG(((DAGSubmittedEvent) event).getDAGPlan(),
+              lastInProgressDAG);
+          dagAppMaster.setCurrentDAG(recoveredDAG);
+          break;
+        }
+        case DAG_INITIALIZED:
+        {
+          LOG.info("Recovering from event"
+              + ", eventType=" + eventType
+              + ", event=" + event.toString());
+          assert recoveredDAG != null;
+          recoveredDAG.restoreFromEvent(event);
+          break;
+        }
+        case DAG_STARTED:
+        {
+          LOG.info("Recovering from event"
+              + ", eventType=" + eventType
+              + ", event=" + event.toString());
+          assert recoveredDAG != null;
+          recoveredDAG.restoreFromEvent(event);
+          break;
+        }
+        case DAG_COMMIT_STARTED:
+        {
+          LOG.info("Recovering from event"
+              + ", eventType=" + eventType
+              + ", event=" + event.toString());
+          assert recoveredDAG != null;
+          recoveredDAG.restoreFromEvent(event);
+          break;
+        }
+        case DAG_FINISHED:
+        {
+          LOG.info("Recovering from event"
+              + ", eventType=" + eventType
+              + ", event=" + event.toString());
+          // If this is seen, nothing to recover
+          assert recoveredDAG != null;
+          recoveredDAG.restoreFromEvent(event);
+          return recoveredDAG;
+        }
+        case CONTAINER_LAUNCHED:
+        {
+          // Nothing to do?
+          break;
+        }
+        case VERTEX_INITIALIZED:
+        {
+          LOG.info("Recovering from event"
+              + ", eventType=" + eventType
+              + ", event=" + event.toString());
+          assert recoveredDAG != null;
+          VertexInitializedEvent vEvent = (VertexInitializedEvent) event;
+          Vertex v = recoveredDAG.getVertex(vEvent.getVertexID());
+          v.restoreFromEvent(vEvent);
+          break;
+        }
+        case VERTEX_STARTED:
+        {
+          LOG.info("Recovering from event"
+              + ", eventType=" + eventType
+              + ", event=" + event.toString());
+          assert recoveredDAG != null;
+          VertexStartedEvent vEvent = (VertexStartedEvent) event;
+          Vertex v = recoveredDAG.getVertex(vEvent.getVertexID());
+          v.restoreFromEvent(vEvent);
+          break;
+        }
+        case VERTEX_PARALLELISM_UPDATED:
+        {
+          LOG.info("Recovering from event"
+              + ", eventType=" + eventType
+              + ", event=" + event.toString());
+          assert recoveredDAG != null;
+          VertexParallelismUpdatedEvent vEvent = (VertexParallelismUpdatedEvent) event;
+          Vertex v = recoveredDAG.getVertex(vEvent.getVertexID());
+          v.restoreFromEvent(vEvent);
+          break;
+        }
+        case VERTEX_COMMIT_STARTED:
+        {
+          LOG.info("Recovering from event"
+              + ", eventType=" + eventType
+              + ", event=" + event.toString());
+          assert recoveredDAG != null;
+          VertexCommitStartedEvent vEvent = (VertexCommitStartedEvent) event;
+          Vertex v = recoveredDAG.getVertex(vEvent.getVertexID());
+          v.restoreFromEvent(vEvent);
+          break;
+        }
+        case VERTEX_FINISHED:
+        {
+          LOG.info("Recovering from event"
+              + ", eventType=" + eventType
+              + ", event=" + event.toString());
+          assert recoveredDAG != null;
+          VertexFinishedEvent vEvent = (VertexFinishedEvent) event;
+          Vertex v = recoveredDAG.getVertex(vEvent.getVertexID());
+          v.restoreFromEvent(vEvent);
+          break;
+        }
+        case TASK_STARTED:
+        {
+          LOG.info("Recovering from event"
+              + ", eventType=" + eventType
+              + ", event=" + event.toString());
+          assert recoveredDAG != null;
+          TaskStartedEvent tEvent = (TaskStartedEvent) event;
+          Task task = recoveredDAG.getVertex(
+              tEvent.getTaskID().getVertexID()).getTask(tEvent.getTaskID());
+          task.restoreFromEvent(tEvent);
+          break;
+        }
+        case TASK_FINISHED:
+        {
+          LOG.info("Recovering from event"
+              + ", eventType=" + eventType
+              + ", event=" + event.toString());
+          assert recoveredDAG != null;
+          TaskFinishedEvent tEvent = (TaskFinishedEvent) event;
+          Task task = recoveredDAG.getVertex(
+              tEvent.getTaskID().getVertexID()).getTask(tEvent.getTaskID());
+          task.restoreFromEvent(tEvent);
+          break;
+        }
+        case TASK_ATTEMPT_STARTED:
+        {
+          LOG.info("Recovering from event"
+              + ", eventType=" + eventType
+              + ", event=" + event.toString());
+          assert recoveredDAG != null;
+          TaskAttemptStartedEvent tEvent = (TaskAttemptStartedEvent) event;
+          Task task =
+              recoveredDAG.getVertex(
+                  tEvent.getTaskAttemptID().getTaskID().getVertexID())
+                      .getTask(tEvent.getTaskAttemptID().getTaskID());
+          task.restoreFromEvent(tEvent);
+          break;
+        }
+        case TASK_ATTEMPT_FINISHED:
+        {
+          LOG.info("Recovering from event"
+              + ", eventType=" + eventType
+              + ", event=" + event.toString());
+          assert recoveredDAG != null;
+          TaskAttemptFinishedEvent tEvent = (TaskAttemptFinishedEvent) event;
+          Task task =
+              recoveredDAG.getVertex(
+                  tEvent.getTaskAttemptID().getTaskID().getVertexID())
+                  .getTask(tEvent.getTaskAttemptID().getTaskID());
+          task.restoreFromEvent(tEvent);
+          break;
+        }
+        case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
+        {
+          LOG.info("Recovering from event"
+              + ", eventType=" + eventType
+              + ", event=" + event.toString());
+          assert recoveredDAG != null;
+          VertexDataMovementEventsGeneratedEvent vEvent =
+              (VertexDataMovementEventsGeneratedEvent) event;
+          Vertex v = recoveredDAG.getVertex(vEvent.getVertexID());
+          v.restoreFromEvent(vEvent);
+          break;
+        }
+        default:
+          throw new RuntimeException("Invalid data found, unknown event type "
+              + eventType);
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("[DAG RECOVERY]"
+            + " dagId=" + lastInProgressDAG
+            + ", eventType=" + eventType
+            + ", event=" + event.toString());
+      }
+      newDAGRecoveryStream.writeInt(eventType.ordinal());
+      event.toProtoStream(newDAGRecoveryStream);
+    }
+    newDAGRecoveryStream.hsync();
+    newDAGRecoveryStream.close();
+
+    Path dataCopiedFlagPath = new Path(currentAttemptRecoveryDataDir,
+        dataRecoveredFileFlag);
+    LOG.info("Finished copying data from previous attempt into current attempt"
+        + " - setting flag by creating file"
+        + ", path=" + dataCopiedFlagPath.toString());
+    FSDataOutputStream flagFile =
+        recoveryFS.create(dataCopiedFlagPath, true, recoveryBufferSize);
+    flagFile.writeInt(1);
+    flagFile.hsync();
+    flagFile.close();
+
+    return recoveredDAG;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
index 8117619..45fb50a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
@@ -31,6 +31,7 @@ import org.apache.tez.dag.api.client.DAGStatusBuilder;
 import org.apache.tez.dag.api.client.StatusGetOpts;
 import org.apache.tez.dag.api.client.VertexStatusBuilder;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezVertexID;
 
@@ -84,4 +85,6 @@ public interface DAG {
   Credentials getCredentials();
   
   UserGroupInformation getDagUGI();
+
+  DAGState restoreFromEvent(HistoryEvent historyEvent);
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
index 8243b70..ac96681 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
@@ -24,6 +24,8 @@ import java.util.Map;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.oldrecords.TaskReport;
 import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.events.TaskStartedEvent;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.runtime.api.impl.TezEvent;
@@ -80,4 +82,5 @@ public interface Task {
   
   public List<String> getDiagnostics();
 
+  TaskState restoreFromEvent(HistoryEvent historyEvent);
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
index 0cc9163..2af1232 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
@@ -27,6 +27,7 @@ import org.apache.tez.common.counters.DAGCounter;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptReport;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
@@ -36,7 +37,7 @@ import org.apache.tez.dag.records.TezVertexID;
  * Read only view of TaskAttempt.
  */
 public interface TaskAttempt {
-  
+
   public static class TaskAttemptStatus {
     public TaskAttemptState state;
     public DAGCounter localityCounter;
@@ -118,4 +119,7 @@ public interface TaskAttempt {
   public Task getTask();
   
   public boolean getIsRescheduled();
+
+  TaskAttemptState restoreFromEvent(HistoryEvent event);
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index 9e7a0a7..807e9b4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -37,6 +37,8 @@ import org.apache.tez.dag.api.client.VertexStatusBuilder;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.dag.impl.Edge;
 import org.apache.tez.dag.app.dag.impl.RootInputLeafOutputDescriptor;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.events.VertexInitializedEvent;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.runtime.api.OutputCommitter;
@@ -114,4 +116,7 @@ public interface Vertex extends Comparable<Vertex> {
   // TODO remove this once RootInputVertexManager is fixed to not use
   // internal apis
   AppContext getAppContext();
+
+  VertexState restoreFromEvent(HistoryEvent event);
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexState.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexState.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexState.java
index 5a7af0a..7130c7a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexState.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexState.java
@@ -27,4 +27,5 @@ public enum VertexState {
   KILLED,
   ERROR,
   TERMINATING,
+  RECOVERING,
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java
index 476c688..741dcfa 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java
@@ -26,7 +26,7 @@ public enum DAGEventType {
   //Producer:Client
   DAG_KILL,
 
-  //Producer:MRAppMaster
+  //Producer:AM
   DAG_INIT,
   DAG_START,
 
@@ -44,4 +44,8 @@ public enum DAGEventType {
   DAG_DIAGNOSTIC_UPDATE,
   INTERNAL_ERROR,
   DAG_COUNTER_UPDATE,
+
+  // Event to trigger recovery
+  // Producer:AM
+  DAG_RECOVER
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
index 018bd3d..db3fd3b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
@@ -60,5 +60,8 @@ public enum TaskAttemptEventType {
   
   // Producer: consumer destination vertex
   TA_OUTPUT_FAILED,
+
+  // Recovery
+  TA_RECOVER,
   
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventRecoverTask.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventRecoverTask.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventRecoverTask.java
new file mode 100644
index 0000000..e7e59e3
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventRecoverTask.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.records.TezTaskID;
+
+public class TaskEventRecoverTask extends TaskEvent {
+
+  TaskState desiredState;
+
+  public TaskEventRecoverTask(TezTaskID taskID, TaskState desiredState) {
+    super(taskID, TaskEventType.T_RECOVER);
+    this.desiredState = desiredState;
+  }
+
+  public TaskEventRecoverTask(TezTaskID taskID) {
+    this(taskID, null);
+  }
+
+  public TaskState getDesiredState() {
+    return desiredState;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java
index a0b99a9..4830ae0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java
@@ -40,5 +40,9 @@ public enum TaskEventType {
   T_ATTEMPT_OUTPUT_CONSUMABLE,
   T_ATTEMPT_FAILED,
   T_ATTEMPT_SUCCEEDED,
-  T_ATTEMPT_KILLED
+  T_ATTEMPT_KILLED,
+
+  // Recovery event
+  T_RECOVER
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRecoverVertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRecoverVertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRecoverVertex.java
new file mode 100644
index 0000000..34e45fe
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRecoverVertex.java
@@ -0,0 +1,36 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.dag.app.dag.VertexState;
+import org.apache.tez.dag.records.TezVertexID;
+
+public class VertexEventRecoverVertex extends VertexEvent {
+
+  VertexState desiredState;
+
+  public VertexEventRecoverVertex(TezVertexID vertexId, VertexState desiredState) {
+    super(vertexId, VertexEventType.V_RECOVER);
+    this.desiredState = desiredState;
+  }
+
+  public VertexState getDesiredState() {
+    return desiredState;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRouteEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRouteEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRouteEvent.java
index a872ae2..69195db 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRouteEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRouteEvent.java
@@ -27,13 +27,25 @@ public class VertexEventRouteEvent extends VertexEvent {
   
   final List<TezEvent> events;
 
+  final boolean recovered;
+
   public VertexEventRouteEvent(TezVertexID vertexId, List<TezEvent> events) {
+    this(vertexId, events, false);
+  }
+
+  public VertexEventRouteEvent(TezVertexID vertexId, List<TezEvent> events,
+      boolean recovered) {
     super(vertexId, VertexEventType.V_ROUTE_EVENT);
     this.events = events;
+    this.recovered = recovered;
   }
-  
+
   public List<TezEvent> getEvents() {
     return events;
   }
 
+  public boolean isRecovered() {
+    return recovered;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventSourceVertexRecovered.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventSourceVertexRecovered.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventSourceVertexRecovered.java
new file mode 100644
index 0000000..5e61369
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventSourceVertexRecovered.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.dag.app.dag.VertexState;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+
+import java.util.List;
+
+public class VertexEventSourceVertexRecovered extends VertexEvent {
+
+  VertexState sourceVertexState;
+  TezVertexID sourceVertexID;
+  List<TezTaskAttemptID> completedTaskAttempts;
+
+  public VertexEventSourceVertexRecovered(TezVertexID vertexID,
+      TezVertexID sourceVertexID,
+      VertexState sourceVertexState,
+      List<TezTaskAttemptID> completedTaskAttempts) {
+    super(vertexID, VertexEventType.V_SOURCE_VERTEX_RECOVERED);
+    this.sourceVertexState = sourceVertexState;
+    this.sourceVertexID = sourceVertexID;
+    this.completedTaskAttempts = completedTaskAttempts;
+  }
+
+  public VertexState getSourceVertexState() {
+    return sourceVertexState;
+  }
+
+  public TezVertexID getSourceVertexID() {
+    return sourceVertexID;
+  }
+
+  public List<TezTaskAttemptID> getCompletedTaskAttempts() {
+    return completedTaskAttempts;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
index 0cf14eb..69952d9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
@@ -56,5 +56,11 @@ public enum VertexEventType {
   //Producer: VertexInputInitializer
   V_ROOT_INPUT_INITIALIZED,
   V_ROOT_INPUT_FAILED,
-  
+
+  // Recover Event, Producer:DAG
+  V_RECOVER,
+
+  // Recover Event, Producer:Vertex
+  V_SOURCE_VERTEX_RECOVERED
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 04ed223..432c189 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -90,12 +90,15 @@ import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDAGFinished;
 import org.apache.tez.dag.app.dag.event.DAGEventVertexReRunning;
 import org.apache.tez.dag.app.dag.event.VertexEvent;
+import org.apache.tez.dag.app.dag.event.VertexEventRecoverVertex;
 import org.apache.tez.dag.app.dag.event.VertexEventType;
 import org.apache.tez.dag.app.dag.event.VertexEventTermination;
 import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.events.DAGCommitStartedEvent;
+import org.apache.tez.dag.history.events.DAGStartedEvent;
 import org.apache.tez.dag.history.events.DAGFinishedEvent;
 import org.apache.tez.dag.history.events.DAGInitializedEvent;
-import org.apache.tez.dag.history.events.DAGStartedEvent;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.dag.utils.TezBuilderUtils;
@@ -157,6 +160,10 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
 
   private final List<String> diagnostics = new ArrayList<String>();
 
+  // Recovery related flags
+  boolean recoveryInitEventSeen = false;
+  boolean recoveryStartEventSeen = false;
+
   private static final DiagnosticsUpdateTransition
       DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition();
   private static final InternalErrorTransition
@@ -176,6 +183,12 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
           .addTransition(DAGState.NEW, DAGState.NEW,
               DAGEventType.DAG_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
+          .addTransition(DAGState.NEW,
+              EnumSet.of(DAGState.NEW, DAGState.INITED, DAGState.RUNNING,
+                  DAGState.SUCCEEDED, DAGState.FAILED, DAGState.KILLED,
+                  DAGState.ERROR, DAGState.TERMINATING),
+              DAGEventType.DAG_RECOVER,
+              new RecoverTransition())
           .addTransition(DAGState.NEW, DAGState.NEW,
               DAGEventType.DAG_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
           .addTransition
@@ -342,7 +355,9 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   
   Map<String, VertexGroupInfo> vertexGroups = Maps.newHashMap();
   Map<String, List<VertexGroupInfo>> vertexGroupInfo = Maps.newHashMap();
-  
+  private DAGState recoveredState = DAGState.NEW;
+  private boolean recoveryCommitInProgress = false;
+
   static class VertexGroupInfo {
     String groupName;
     Set<String> groupMembers;
@@ -466,6 +481,46 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   }
 
   @Override
+  public DAGState restoreFromEvent(HistoryEvent historyEvent) {
+    switch (historyEvent.getEventType()) {
+      case DAG_INITIALIZED:
+        recoveredState = initializeDAG((DAGInitializedEvent) historyEvent);
+        recoveryInitEventSeen = true;
+        return recoveredState;
+      case DAG_STARTED:
+        if (!recoveryInitEventSeen) {
+          throw new RuntimeException("Started Event seen but"
+              + " no Init Event was encountered earlier");
+        }
+        recoveryStartEventSeen = true;
+        this.startTime = ((DAGStartedEvent) historyEvent).getStartTime();
+        recoveredState = DAGState.RUNNING;
+        return recoveredState;
+      case DAG_COMMIT_STARTED:
+        if (recoveredState != DAGState.RUNNING) {
+          throw new RuntimeException("Commit Started Event seen but"
+              + " recovered state is not RUNNING"
+              + ", recoveredState=" + recoveredState);
+        }
+        recoveryCommitInProgress = true;
+        return recoveredState;
+      case DAG_FINISHED:
+        if (!recoveryStartEventSeen) {
+          throw new RuntimeException("Finished Event seen but"
+              + " no Start Event was encountered earlier");
+        }
+        recoveryCommitInProgress = false;
+        DAGFinishedEvent finishedEvent = (DAGFinishedEvent) historyEvent;
+        this.finishTime = finishedEvent.getFinishTime();
+        recoveredState = finishedEvent.getState();
+        return recoveredState;
+      default:
+        throw new RuntimeException("Unexpected event received for restoring"
+            + " state, eventType=" + historyEvent.getEventType());
+    }
+  }
+
+  @Override
   public TezCounters getAllCounters() {
 
     readLock.lock();
@@ -666,6 +721,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     boolean failedWhileCommitting = false;
     if (dagSucceeded && !successfulOutputsAlreadyCommitted) {
       // commit all shared outputs
+      appContext.getHistoryHandler().handle(new DAGHistoryEvent(getID(),
+          new DAGCommitStartedEvent(getID())));
       for (VertexGroupInfo groupInfo : vertexGroups.values()) {
         if (failedWhileCommitting) {
           break;
@@ -820,7 +877,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   void logJobHistoryFinishedEvent() {
     this.setFinishTime();
     DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, startTime,
-        finishTime, DAGStatus.State.SUCCEEDED, "", getAllCounters());
+        finishTime, DAGState.SUCCEEDED, "", getAllCounters());
     this.appContext.getHistoryHandler().handle(
         new DAGHistoryEvent(dagId, finishEvt));
   }
@@ -839,7 +896,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
         new DAGHistoryEvent(dagId, startEvt));
   }
 
-  void logJobHistoryUnsuccesfulEvent(DAGStatus.State state) {
+  void logJobHistoryUnsuccesfulEvent(DAGState state) {
     DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, startTime,
         clock.getTime(), state,
         StringUtils.join(LINE_SEPARATOR, getDiagnostics()),
@@ -947,11 +1004,10 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       trySetTerminationCause(DAGTerminationCause.COMMIT_FAILURE);
     }
     
-    DAGStatus.State logState = getDAGStatusFromState(finalState);
-    if (logState == DAGStatus.State.SUCCEEDED) {
+    if (finalState == DAGState.SUCCEEDED) {
       logJobHistoryFinishedEvent();
     } else {
-      logJobHistoryUnsuccesfulEvent(logState);
+      logJobHistoryUnsuccesfulEvent(finalState);
     }
     
     eventHandler.handle(new DAGAppMasterEventDAGFinished(getID(), finalState));
@@ -1047,179 +1103,294 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     return null;
   }
 
-  // TODO Recovery
-  /*
-  @Override
-  public List<AMInfo> getAMInfos() {
-    return amInfos;
+  public DAGState initializeDAG() {
+    return initializeDAG(null);
   }
-  */
-  
-  private static class InitTransition
-      implements MultipleArcTransition<DAGImpl, DAGEvent, DAGState> {
 
-    /**
-     * Note that this transition method is called directly (and synchronously)
-     * by MRAppMaster's init() method (i.e., no RPC, no thread-switching;
-     * just plain sequential call within AM context), so we can trigger
-     * modifications in AM state from here (at least, if AM is written that
-     * way; MR version is).
-     */
-    @Override
-    public DAGState transition(DAGImpl dag, DAGEvent event) {
-      // TODO Metrics
-      //dag.metrics.submittedJob(dag);
-      //dag.metrics.preparingJob(dag);
+  DAGState initializeDAG(DAGInitializedEvent event) {
+    if (event != null) {
+      initTime = event.getInitTime();
+    } else {
+      initTime = clock.getTime();
+    }
 
-      dag.initTime = dag.clock.getTime();
-      dag.commitAllOutputsOnSuccess = dag.conf.getBoolean(
-          TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
-          TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS_DEFAULT);
+    commitAllOutputsOnSuccess = conf.getBoolean(
+        TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
+        TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS_DEFAULT);
+
+    // If we have no vertices, fail the dag
+    numVertices = getJobPlan().getVertexCount();
+    if (numVertices == 0) {
+      addDiagnostic("No vertices for dag");
+      trySetTerminationCause(DAGTerminationCause.ZERO_VERTICES);
+      if (event != null) {
+        return DAGState.FAILED;
+      }
+      return finished(DAGState.FAILED);
+    }
 
-      // If we have no vertices, fail the dag
-      dag.numVertices = dag.getJobPlan().getVertexCount();
-      if (dag.numVertices == 0) {
-        dag.addDiagnostic("No vertices for dag");
-        dag.trySetTerminationCause(DAGTerminationCause.ZERO_VERTICES);
-        return dag.finished(DAGState.FAILED);
+    if (jobPlan.getVertexGroupsCount() > 0) {
+      for (PlanVertexGroupInfo groupInfo : jobPlan.getVertexGroupsList()) {
+        vertexGroups.put(groupInfo.getGroupName(), new VertexGroupInfo(groupInfo));
       }
-      
-      if (dag.jobPlan.getVertexGroupsCount() > 0) {
-        for (PlanVertexGroupInfo groupInfo : dag.jobPlan.getVertexGroupsList()) {
-          dag.vertexGroups.put(groupInfo.getGroupName(), new VertexGroupInfo(groupInfo));
-        }
-        for (VertexGroupInfo groupInfo : dag.vertexGroups.values()) {
-          for (String vertexName : groupInfo.groupMembers) {
-            List<VertexGroupInfo> groupList = dag.vertexGroupInfo.get(vertexName);
-            if (groupList == null) {
-              groupList = Lists.newLinkedList();
-              dag.vertexGroupInfo.put(vertexName, groupList);
-            }
-            groupList.add(groupInfo);
+      for (VertexGroupInfo groupInfo : vertexGroups.values()) {
+        for (String vertexName : groupInfo.groupMembers) {
+          List<VertexGroupInfo> groupList = vertexGroupInfo.get(vertexName);
+          if (groupList == null) {
+            groupList = Lists.newLinkedList();
+            vertexGroupInfo.put(vertexName, groupList);
           }
+          groupList.add(groupInfo);
         }
       }
+    }
 
-      // create the vertices`
-      for (int i=0; i < dag.numVertices; ++i) {
-        String vertexName = dag.getJobPlan().getVertex(i).getName();
-        VertexImpl v = createVertex(dag, vertexName, i);
-        dag.addVertex(v);
-      }
+    // create the vertices`
+    for (int i=0; i < numVertices; ++i) {
+      String vertexName = getJobPlan().getVertex(i).getName();
+      VertexImpl v = createVertex(this, vertexName, i);
+      addVertex(v);
+    }
 
-      createDAGEdges(dag);
-      Map<String,EdgePlan> edgePlans = DagTypeConverters.createEdgePlanMapFromDAGPlan(dag.getJobPlan().getEdgeList());
+    createDAGEdges(this);
+    Map<String,EdgePlan> edgePlans = DagTypeConverters.createEdgePlanMapFromDAGPlan(getJobPlan().getEdgeList());
 
-      // setup the dag
-      for (Vertex v : dag.vertices.values()) {
-        parseVertexEdges(dag, edgePlans, v);
-      }
+    // setup the dag
+    for (Vertex v : vertices.values()) {
+      parseVertexEdges(this, edgePlans, v);
+    }
 
-      // Initialize the edges, now that the payload and vertices have been set.
-      for (Edge e : dag.edges.values()) {
-        e.initialize();
-      }
+    // Initialize the edges, now that the payload and vertices have been set.
+    for (Edge e : edges.values()) {
+      e.initialize();
+    }
 
-      assignDAGScheduler(dag);
-      
-      for (Map.Entry<String, VertexGroupInfo> entry : dag.vertexGroups.entrySet()) {
-        String groupName = entry.getKey();
-        VertexGroupInfo groupInfo = entry.getValue();
-        if (!groupInfo.outputs.isEmpty()) {
-          // shared outputs
-          for (String vertexName : groupInfo.groupMembers) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Setting shared outputs for group: " + groupName + 
-                  " on vertex: " + vertexName);
-            }
-            Vertex v = dag.getVertex(vertexName);
-            v.addSharedOutputs(groupInfo.outputs);
+    assignDAGScheduler(this);
+
+    for (Map.Entry<String, VertexGroupInfo> entry : vertexGroups.entrySet()) {
+      String groupName = entry.getKey();
+      VertexGroupInfo groupInfo = entry.getValue();
+      if (!groupInfo.outputs.isEmpty()) {
+        // shared outputs
+        for (String vertexName : groupInfo.groupMembers) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Setting shared outputs for group: " + groupName +
+                " on vertex: " + vertexName);
           }
+          Vertex v = getVertex(vertexName);
+          v.addSharedOutputs(groupInfo.outputs);
         }
       }
+    }
+    return DAGState.INITED;
+  }
 
-      // TODO Metrics
-      //dag.metrics.endPreparingJob(dag);
-      dag.logJobHistoryInitedEvent();
-      return DAGState.INITED;
-
+  private void createDAGEdges(DAGImpl dag) {
+    for (EdgePlan edgePlan : dag.getJobPlan().getEdgeList()) {
+      EdgeProperty edgeProperty = DagTypeConverters
+          .createEdgePropertyMapFromDAGPlan(edgePlan);
+
+      // If CUSTOM without an edge manager, setup a fake edge manager. Avoid
+      // referencing the fake edge manager within the API module.
+      if (edgeProperty.getDataMovementType() == DataMovementType.CUSTOM
+          && edgeProperty.getEdgeManagerDescriptor() == null) {
+        EdgeManagerDescriptor edgeDesc = new EdgeManagerDescriptor(
+            NullEdgeManager.class.getName());
+        EdgeProperty ep = new EdgeProperty(edgeDesc, edgeProperty.getDataSourceType(),
+            edgeProperty.getSchedulingType(), edgeProperty.getEdgeSource(),
+            edgeProperty.getEdgeDestination());
+        edgeProperty = ep;
+      }
 
+      // edge manager may be also set via API when using custom edge type
+      dag.edges.put(edgePlan.getId(),
+          new Edge(edgeProperty, dag.getEventHandler()));
     }
+  }
 
-    private void createDAGEdges(DAGImpl dag) {
-      for (EdgePlan edgePlan : dag.getJobPlan().getEdgeList()) {
-        EdgeProperty edgeProperty = DagTypeConverters
-            .createEdgePropertyMapFromDAGPlan(edgePlan);
-        
-        // If CUSTOM without an edge manager, setup a fake edge manager. Avoid
-        // referencing the fake edge manager within the API module.
-        if (edgeProperty.getDataMovementType() == DataMovementType.CUSTOM
-            && edgeProperty.getEdgeManagerDescriptor() == null) {
-          EdgeManagerDescriptor edgeDesc = new EdgeManagerDescriptor(
-              NullEdgeManager.class.getName());
-          EdgeProperty ep = new EdgeProperty(edgeDesc, edgeProperty.getDataSourceType(),
-              edgeProperty.getSchedulingType(), edgeProperty.getEdgeSource(),
-              edgeProperty.getEdgeDestination());
-          edgeProperty = ep;
-        }
-        
-        // edge manager may be also set via API when using custom edge type
-        dag.edges.put(edgePlan.getId(),
-            new Edge(edgeProperty, dag.getEventHandler()));
-      }
+  private static void assignDAGScheduler(DAGImpl dag) {
+    LOG.info("Using Natural order dag scheduler");
+    dag.dagScheduler = new DAGSchedulerNaturalOrder(dag, dag.eventHandler);
+  }
+
+  private static VertexImpl createVertex(DAGImpl dag, String vertexName, int vId) {
+    TezVertexID vertexId = TezBuilderUtils.newVertexID(dag.getID(), vId);
+
+    VertexPlan vertexPlan = dag.getJobPlan().getVertex(vId);
+    VertexLocationHint vertexLocationHint = DagTypeConverters
+        .convertFromDAGPlan(vertexPlan.getTaskLocationHintList());
+
+    VertexImpl v = new VertexImpl(
+        vertexId, vertexPlan, vertexName, dag.conf,
+        dag.eventHandler, dag.taskAttemptListener,
+        dag.clock, dag.taskHeartbeatHandler,
+        !dag.commitAllOutputsOnSuccess, dag.appContext, vertexLocationHint,
+        dag.vertexGroups);
+    return v;
+  }
+
+  // hooks up this VertexImpl to input and output EdgeProperties
+  private static void parseVertexEdges(DAGImpl dag, Map<String, EdgePlan> edgePlans, Vertex vertex) {
+    VertexPlan vertexPlan = vertex.getVertexPlan();
+
+    Map<Vertex, Edge> inVertices =
+        new HashMap<Vertex, Edge>();
+
+    Map<Vertex, Edge> outVertices =
+        new HashMap<Vertex, Edge>();
+
+    for(String inEdgeId : vertexPlan.getInEdgeIdList()){
+      EdgePlan edgePlan = edgePlans.get(inEdgeId);
+      Vertex inVertex = dag.vertexMap.get(edgePlan.getInputVertexName());
+      Edge edge = dag.edges.get(inEdgeId);
+      edge.setSourceVertex(inVertex);
+      edge.setDestinationVertex(vertex);
+      inVertices.put(inVertex, edge);
     }
 
-    private void assignDAGScheduler(DAGImpl dag) {
-      LOG.info("Using Natural order dag scheduler");
-      dag.dagScheduler = new DAGSchedulerNaturalOrder(dag, dag.eventHandler);
+    for(String outEdgeId : vertexPlan.getOutEdgeIdList()){
+      EdgePlan edgePlan = edgePlans.get(outEdgeId);
+      Vertex outVertex = dag.vertexMap.get(edgePlan.getOutputVertexName());
+      Edge edge = dag.edges.get(outEdgeId);
+      edge.setSourceVertex(vertex);
+      edge.setDestinationVertex(outVertex);
+      outVertices.put(outVertex, edge);
     }
 
-    private VertexImpl createVertex(DAGImpl dag, String vertexName, int vId) {
-      TezVertexID vertexId = TezBuilderUtils.newVertexID(dag.getID(), vId);
+    vertex.setInputVertices(inVertices);
+    vertex.setOutputVertices(outVertices);
+  }
 
-      VertexPlan vertexPlan = dag.getJobPlan().getVertex(vId);
-      VertexLocationHint vertexLocationHint = DagTypeConverters
-          .convertFromDAGPlan(vertexPlan.getTaskLocationHintList());
+  private static class RecoverTransition
+      implements MultipleArcTransition<DAGImpl, DAGEvent, DAGState> {
+
+    @Override
+    public DAGState transition(DAGImpl dag, DAGEvent dagEvent) {
+      switch (dag.recoveredState) {
+        case NEW:
+          // send DAG an Init and start events
+          dag.eventHandler.handle(new DAGEvent(dag.getID(), DAGEventType.DAG_INIT));
+          dag.eventHandler.handle(new DAGEvent(dag.getID(), DAGEventType.DAG_START));
+          return DAGState.NEW;
+        case INITED:
+          // DAG inited but not started
+          // This implies vertices need to be sent init event
+          // Root vertices need to be sent start event
+          // The vertices may already have been sent these events but the
+          // DAG start may not have been persisted
+          for (Vertex v : dag.vertices.values()) {
+            if (v.getInputVerticesCount() == 0) {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Sending Running Recovery event to root vertex "
+                    + v.getName());
+              }
+              dag.eventHandler.handle(new VertexEventRecoverVertex(v.getVertexId(),
+                  VertexState.RUNNING));
+            }
+          }
+          return DAGState.RUNNING;
+        case RUNNING:
+          // if commit is in progress, DAG should fail as commits are not
+          // recoverable
+          if (dag.recoveryCommitInProgress) {
+            // Fail the DAG as we have not seen a completion
+            dag.trySetTerminationCause(DAGTerminationCause.COMMIT_FAILURE);
+            dag.setFinishTime();
+            // Recover all other data for all vertices
+            // send recover event to all vertices with a final end state
+            for (Vertex v : dag.vertices.values()) {
+              VertexState desiredState = VertexState.SUCCEEDED;
+              if (dag.recoveredState.equals(DAGState.KILLED)) {
+                desiredState = VertexState.KILLED;
+              } else if (EnumSet.of(DAGState.ERROR, DAGState.FAILED).contains(
+                  dag.recoveredState)) {
+                desiredState = VertexState.FAILED;
+              }
+              dag.eventHandler.handle(new VertexEventRecoverVertex(v.getVertexId(),
+                  desiredState));
+            }
+            dag.logJobHistoryUnsuccesfulEvent(DAGState.FAILED);
+            dag.eventHandler.handle(new DAGAppMasterEventDAGFinished(dag.getID(),
+                DAGState.FAILED));
+            return DAGState.FAILED;
+          }
+          for (Vertex v : dag.vertices.values()) {
+            if (v.getInputVerticesCount() == 0) {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Sending Running Recovery event to root vertex "
+                    + v.getName());
+              }
+              dag.eventHandler.handle(new VertexEventRecoverVertex(v.getVertexId(),
+                  VertexState.RUNNING));
+            }
+          }
+          return DAGState.RUNNING;
+        case SUCCEEDED:
+        case ERROR:
+        case FAILED:
+        case KILLED:
+          // Completed
+
+          // Recover all other data for all vertices
+          // send recover event to all vertices with a final end state
+          for (Vertex v : dag.vertices.values()) {
+            VertexState desiredState = VertexState.SUCCEEDED;
+            if (dag.recoveredState.equals(DAGState.KILLED)) {
+              desiredState = VertexState.KILLED;
+            } else if (EnumSet.of(DAGState.ERROR, DAGState.FAILED).contains(
+                dag.recoveredState)) {
+              desiredState = VertexState.FAILED;
+            }
+            dag.eventHandler.handle(new VertexEventRecoverVertex(v.getVertexId(),
+                desiredState));
+          }
 
-      VertexImpl v = new VertexImpl(
-          vertexId, vertexPlan, vertexName, dag.conf,
-          dag.eventHandler, dag.taskAttemptListener, 
-          dag.clock, dag.taskHeartbeatHandler,
-          !dag.commitAllOutputsOnSuccess, dag.appContext, vertexLocationHint,
-          dag.vertexGroups);
-      return v;
+          // Let us inform AM of completion
+          dag.eventHandler.handle(new DAGAppMasterEventDAGFinished(dag.getID(),
+              dag.recoveredState));
+
+          LOG.info("Recovered DAG: " + dag.getID() + " finished with state: "
+              + dag.recoveredState);
+          return dag.recoveredState;
+        default:
+          // Error state
+          LOG.warn("Trying to recover DAG, failed to recover"
+              + " from non-handled state" + dag.recoveredState);
+          dag.eventHandler.handle(new DAGAppMasterEventDAGFinished(dag.getID(),
+              DAGState.ERROR));
+          return DAGState.ERROR;
+      }
     }
 
-    // hooks up this VertexImpl to input and output EdgeProperties
-    private void parseVertexEdges(DAGImpl dag, Map<String, EdgePlan> edgePlans, Vertex vertex) {
-      VertexPlan vertexPlan = vertex.getVertexPlan();
+  }
 
-      Map<Vertex, Edge> inVertices =
-          new HashMap<Vertex, Edge>();
+  private static class InitTransition
+      implements MultipleArcTransition<DAGImpl, DAGEvent, DAGState> {
 
-      Map<Vertex, Edge> outVertices =
-          new HashMap<Vertex, Edge>();
+    /**
+     * Note that this transition method is called directly (and synchronously)
+     * by MRAppMaster's init() method (i.e., no RPC, no thread-switching;
+     * just plain sequential call within AM context), so we can trigger
+     * modifications in AM state from here (at least, if AM is written that
+     * way; MR version is).
+     */
+    @Override
+    public DAGState transition(DAGImpl dag, DAGEvent event) {
+      // TODO Metrics
+      //dag.metrics.submittedJob(dag);
+      //dag.metrics.preparingJob(dag);
 
-      for(String inEdgeId : vertexPlan.getInEdgeIdList()){
-        EdgePlan edgePlan = edgePlans.get(inEdgeId);
-        Vertex inVertex = dag.vertexMap.get(edgePlan.getInputVertexName());
-        Edge edge = dag.edges.get(inEdgeId);
-        edge.setSourceVertex(inVertex);
-        edge.setDestinationVertex(vertex);
-        inVertices.put(inVertex, edge);
+      DAGState state = dag.initializeDAG();
+      if (state != DAGState.INITED) {
+        return state;
       }
 
-      for(String outEdgeId : vertexPlan.getOutEdgeIdList()){
-        EdgePlan edgePlan = edgePlans.get(outEdgeId);
-        Vertex outVertex = dag.vertexMap.get(edgePlan.getOutputVertexName());
-        Edge edge = dag.edges.get(outEdgeId);
-        edge.setSourceVertex(vertex);
-        edge.setDestinationVertex(outVertex);
-        outVertices.put(outVertex, edge);
-      }
+      // TODO Metrics
+      //dag.metrics.endPreparingJob(dag);
+      dag.logJobHistoryInitedEvent();
+      return DAGState.INITED;
+
 
-      vertex.setInputVertices(inVertices);
-      vertex.setOutputVertices(outVertices);
     }
 
   } // end of InitTransition
@@ -1427,6 +1598,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
         for (VertexGroupInfo groupInfo : commitList) {
           groupInfo.committed = true;
           Vertex v = getVertex(groupInfo.groupMembers.iterator().next());
+          appContext.getHistoryHandler().handle(new DAGHistoryEvent(getID(),
+              new DAGCommitStartedEvent(dagId)));
           for (String outputName : groupInfo.outputs) {
             OutputCommitter committer = v.getOutputCommitters().get(outputName);
             LOG.info("Committing output: " + outputName);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OutputCommitterContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OutputCommitterContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OutputCommitterContextImpl.java
index 070bd75..ff9e401 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OutputCommitterContextImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OutputCommitterContextImpl.java
@@ -31,13 +31,15 @@ public class OutputCommitterContextImpl implements OutputCommitterContext {
   private final String vertexName;
   private final String outputName;
   private final byte[] userPayload;
+  private final int vertexIdx;
 
   public OutputCommitterContextImpl(ApplicationId applicationId,
       int dagAttemptNumber,
       String dagName,
       String vertexName,
       String outputName,
-      @Nullable byte[] userPayload) {
+      @Nullable byte[] userPayload,
+      int vertexIdx) {
     checkNotNull(applicationId, "applicationId is null");
     checkNotNull(dagName, "dagName is null");
     checkNotNull(vertexName, "vertexName is null");
@@ -48,6 +50,7 @@ public class OutputCommitterContextImpl implements OutputCommitterContext {
     this.vertexName = vertexName;
     this.outputName = outputName;
     this.userPayload = userPayload;
+    this.vertexIdx = vertexIdx;
   }
 
   @Override
@@ -80,4 +83,9 @@ public class OutputCommitterContextImpl implements OutputCommitterContext {
     return userPayload;
   }
 
+  @Override
+  public int getVertexIndex() {
+    return vertexIdx;
+  }
+
 }