You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2014/03/14 02:47:32 UTC

[1/2] TEZ-904. Committer recovery events should be out-of-band. (hitesh)

Repository: incubator-tez
Updated Branches:
  refs/heads/master 693f2ca92 -> f58508a56


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
index 5986657..0074a4c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
@@ -18,6 +18,16 @@
 
 package org.apache.tez.dag.history.recovery;
 
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -33,16 +43,6 @@ import org.apache.tez.dag.history.SummaryEvent;
 import org.apache.tez.dag.history.events.DAGSubmittedEvent;
 import org.apache.tez.dag.records.TezDAGID;
 
-import java.io.IOException;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-
 public class RecoveryService extends AbstractService {
 
   private static final Log LOG = LogFactory.getLog(RecoveryService.class);
@@ -120,7 +120,7 @@ public class RecoveryService extends AbstractService {
           synchronized (lock) {
             try {
               ++eventsProcessed;
-              handleEvent(event);
+              handleRecoveryEvent(event);
             } catch (Exception e) {
               // TODO handle failures - treat as fatal or ignore?
               LOG.warn("Error handling recovery event", e);
@@ -175,26 +175,56 @@ public class RecoveryService extends AbstractService {
       return;
     }
 
-    if (eventType.equals(HistoryEventType.DAG_SUBMITTED)
-      || eventType.equals(HistoryEventType.DAG_FINISHED)) {
-      // handle submissions and completion immediately
+    TezDAGID dagId = event.getDagID();
+    if (eventType.equals(HistoryEventType.DAG_SUBMITTED)) {
+      DAGSubmittedEvent dagSubmittedEvent =
+          (DAGSubmittedEvent) event.getHistoryEvent();
+      String dagName = dagSubmittedEvent.getDAGName();
+      if (dagName != null
+          && dagName.startsWith(
+          TezConfiguration.TEZ_PREWARM_DAG_NAME_PREFIX)) {
+        // Skip recording pre-warm DAG events
+        skippedDAGs.add(dagId);
+        return;
+      }
+    }
+    if (dagId == null || skippedDAGs.contains(dagId)) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Skipping event for DAG"
+            + ", eventType=" + eventType
+            + ", dagId=" + (dagId == null ? "null" : dagId.toString())
+            + ", isSkippedDAG=" + (dagId == null ? "null"
+            : skippedDAGs.contains(dagId)));
+      }
+      return;
+    }
+
+    if (event.getHistoryEvent() instanceof SummaryEvent) {
       synchronized (lock) {
         try {
-          handleEvent(event);
+          SummaryEvent summaryEvent = (SummaryEvent) event.getHistoryEvent();
+          handleSummaryEvent(dagId, eventType, summaryEvent);
           summaryStream.hsync();
-          if (eventType.equals(HistoryEventType.DAG_SUBMITTED)) {
-            if (outputStreamMap.containsKey(event.getDagID())) {
-              doFlush(outputStreamMap.get(event.getDagID()),
-                  appContext.getClock().getTime(), true);
+          if (summaryEvent.writeToRecoveryImmediately()) {
+            handleRecoveryEvent(event);
+            doFlush(outputStreamMap.get(event.getDagID()),
+                appContext.getClock().getTime(), true);
+          } else {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Queueing Non-immediate Summary/Recovery event of type"
+                  + eventType.name());
             }
-          } else if (eventType.equals(HistoryEventType.DAG_FINISHED)) {
-            completedDAGs.add(event.getDagID());
-            if (outputStreamMap.containsKey(event.getDagID())) {
+            eventQueue.add(event);
+          }
+          if (eventType.equals(HistoryEventType.DAG_FINISHED)) {
+            LOG.info("DAG completed"
+                + ", dagId=" + event.getDagID()
+                + ", queueSize=" + eventQueue.size());
+            completedDAGs.add(dagId);
+            if (outputStreamMap.containsKey(dagId)) {
               try {
-                doFlush(outputStreamMap.get(event.getDagID()),
-                    appContext.getClock().getTime(), true);
-                outputStreamMap.get(event.getDagID()).close();
-                outputStreamMap.remove(event.getDagID());
+                outputStreamMap.get(dagId).close();
+                outputStreamMap.remove(dagId);
               } catch (IOException ioe) {
                 LOG.warn("Error when trying to flush/close recovery file for"
                     + " dag, dagId=" + event.getDagID());
@@ -207,87 +237,71 @@ public class RecoveryService extends AbstractService {
           LOG.warn("Error handling recovery event", e);
         }
       }
-      LOG.info("DAG completed"
-          + ", dagId=" + event.getDagID()
-          + ", queueSize=" + eventQueue.size());
     } else {
       // All other events just get queued
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Queueing Recovery event of type " + eventType.name());
+        LOG.debug("Queueing Non-Summary Recovery event of type " + eventType.name());
       }
       eventQueue.add(event);
     }
   }
 
+  private void handleSummaryEvent(TezDAGID dagID,
+      HistoryEventType eventType,
+      SummaryEvent summaryEvent) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Handling summary event"
+          + ", dagID=" + dagID
+          + ", eventType=" + eventType);
+    }
+    try {
+      if (summaryStream == null) {
+        Path summaryPath = new Path(recoveryPath,
+            appContext.getApplicationID()
+                + TezConfiguration.DAG_RECOVERY_SUMMARY_FILE_SUFFIX);
+        if (!recoveryDirFS.exists(summaryPath)) {
+          summaryStream = recoveryDirFS.create(summaryPath, false,
+              bufferSize);
+        } else {
+          summaryStream = recoveryDirFS.append(summaryPath, bufferSize);
+        }
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Writing recovery event to summary stream"
+            + ", dagId=" + dagID
+            + ", eventType=" + eventType);
+      }
+      summaryEvent.toSummaryProtoStream(summaryStream);
+    } catch (IOException ioe) {
+      // FIXME handle failures
+      LOG.warn("Failed to write to stream", ioe);
+    }
+
 
-  private void handleEvent(DAGHistoryEvent event) {
+  }
+
+  private void handleRecoveryEvent(DAGHistoryEvent event) {
     HistoryEventType eventType = event.getHistoryEvent().getEventType();
     if (LOG.isDebugEnabled()) {
       LOG.debug("Handling recovery event of type "
           + event.getHistoryEvent().getEventType());
     }
-    if (event.getDagID() == null) {
-      // AM event
-      // anything to be done?
-      // TODO
-      LOG.info("Skipping Recovery Event as DAG is null"
-          + ", eventType=" + event.getHistoryEvent().getEventType());
-      return;
-    }
-
     TezDAGID dagID = event.getDagID();
-    if (completedDAGs.contains(dagID)
-        || skippedDAGs.contains(dagID)) {
-      // Skip events for completed and skipped DAGs
+
+    if (completedDAGs.contains(dagID)) {
       // no need to recover completed DAGs
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Skipping Recovery Event as either completed or skipped"
+        LOG.debug("Skipping Recovery Event as DAG completed"
             + ", dagId=" + dagID
             + ", completed=" + completedDAGs.contains(dagID)
             + ", skipped=" + skippedDAGs.contains(dagID)
-            + ", eventType=" + event.getHistoryEvent().getEventType());
+            + ", eventType=" + eventType);
       }
       return;
     }
 
     try {
 
-      if (summaryStream == null) {
-        Path summaryPath = new Path(recoveryPath,
-            appContext.getApplicationID()
-                + TezConfiguration.DAG_RECOVERY_SUMMARY_FILE_SUFFIX);
-        if (!recoveryDirFS.exists(summaryPath)) {
-          summaryStream = recoveryDirFS.create(summaryPath, false,
-              bufferSize);
-        } else {
-          summaryStream = recoveryDirFS.append(summaryPath, bufferSize);
-        }
-      }
-
-      if (eventType.equals(HistoryEventType.DAG_SUBMITTED)
-          || eventType.equals(HistoryEventType.DAG_FINISHED)) {
-        if (eventType.equals(HistoryEventType.DAG_SUBMITTED)) {
-          DAGSubmittedEvent dagSubmittedEvent =
-              (DAGSubmittedEvent) event.getHistoryEvent();
-          String dagName = dagSubmittedEvent.getDAGName();
-          if (dagName != null
-              && dagName.startsWith(
-              TezConfiguration.TEZ_PREWARM_DAG_NAME_PREFIX)) {
-            // Skip recording pre-warm DAG events
-            skippedDAGs.add(dagID);
-            return;
-          }
-        }
-        SummaryEvent summaryEvent = (SummaryEvent) event.getHistoryEvent();
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Writing recovery event to summary stream"
-              + ", dagId=" + dagID
-              + ", type="
-              + event.getHistoryEvent().getEventType());
-        }
-        summaryEvent.toSummaryProtoStream(summaryStream);
-      }
-
       if (!outputStreamMap.containsKey(dagID)) {
         Path dagFilePath = new Path(recoveryPath,
             dagID.toString() + TezConfiguration.DAG_RECOVERY_RECOVER_FILE_SUFFIX);
@@ -313,8 +327,7 @@ public class RecoveryService extends AbstractService {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Writing recovery event to output stream"
             + ", dagId=" + dagID
-            + ", type="
-            + event.getHistoryEvent().getEventType());
+            + ", eventType=" + eventType);
       }
       ++unflushedEventsCount;
       outputStream.writeInt(event.getHistoryEvent().getEventType().ordinal());
@@ -339,11 +352,9 @@ public class RecoveryService extends AbstractService {
         && ((currentTime - lastFlushTime) >= (flushInterval*1000))) {
       doFlush = true;
     }
-
     if (!doFlush) {
       return;
     }
-
     doFlush(outputStream, currentTime, false);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/proto/HistoryEvents.proto
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/proto/HistoryEvents.proto b/tez-dag/src/main/proto/HistoryEvents.proto
index 65f3aaf..c640baa 100644
--- a/tez-dag/src/main/proto/HistoryEvents.proto
+++ b/tez-dag/src/main/proto/HistoryEvents.proto
@@ -59,6 +59,7 @@ message DAGStartedProto {
 message DAGCommitStartedProto {
   optional string dag_id = 1;
 }
+
 message DAGFinishedProto {
   optional string dag_id = 1;
   optional int64 finish_time = 2;
@@ -99,6 +100,20 @@ message VertexCommitStartedProto {
   optional string vertex_id = 1;
 }
 
+message VertexCommitFinishedProto {
+  optional string vertex_id = 1;
+}
+
+message VertexGroupCommitStartedProto {
+  optional string dag_id = 1;
+  optional string vertex_group_name = 2;
+}
+
+message VertexGroupCommitFinishedProto {
+  optional string dag_id = 1;
+  optional string vertex_group_name = 2;
+}
+
 message VertexFinishedProto {
   optional string vertex_name = 1;
   optional string vertex_id = 2;
@@ -162,4 +177,10 @@ message SummaryEventProto {
   optional string dag_id = 1;
   optional int64 timestamp = 2;
   optional int32 event_type = 3;
+  optional bytes event_payload = 4;
+}
+
+message VertexFinishStateProto {
+  optional string vertex_id = 1;
+  optional int32 state = 2;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index f03863c..2cf3eaf 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -35,10 +35,12 @@ import org.apache.tez.dag.app.dag.DAGState;
 import org.apache.tez.dag.app.dag.VertexState;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.SummaryEvent;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
 import org.apache.tez.runtime.RuntimeUtils;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.impl.EventMetaData;
@@ -78,6 +80,26 @@ public class TestHistoryEventsProtoConversion {
     return deserializedEvent;
   }
 
+  private HistoryEvent testSummaryProtoConversion(HistoryEvent historyEvent)
+      throws IOException {
+    SummaryEvent event = (SummaryEvent) historyEvent;
+    ByteArrayOutputStream os = new ByteArrayOutputStream();
+    HistoryEvent deserializedEvent = null;
+    event.toSummaryProtoStream(os);
+    os.flush();
+    os.close();
+    LOG.info("Serialized event to byte array"
+        + ", eventType=" + historyEvent.getEventType()
+        + ", bufLen=" + os.toByteArray().length);
+    SummaryEventProto summaryEventProto =
+        SummaryEventProto.parseDelimitedFrom(
+            new ByteArrayInputStream(os.toByteArray()));
+    deserializedEvent = RuntimeUtils.createClazzInstance(
+        event.getClass().getName());
+    ((SummaryEvent)deserializedEvent).fromSummaryProtoStream(summaryEventProto);
+    return deserializedEvent;
+  }
+
   private void logEvents(HistoryEvent event,
       HistoryEvent deserializedEvent) {
     LOG.info("Initial Event toString: " + event.toString());
@@ -485,7 +507,7 @@ public class TestHistoryEventsProtoConversion {
 
   private void testDAGCommitStartedEvent() throws Exception {
     DAGCommitStartedEvent event = new DAGCommitStartedEvent(
-        TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1));
+        TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 100l);
     DAGCommitStartedEvent deserializedEvent =
         (DAGCommitStartedEvent) testProtoConversion(event);
     Assert.assertEquals(event.getDagID(), deserializedEvent.getDagID());
@@ -495,13 +517,56 @@ public class TestHistoryEventsProtoConversion {
   private void testVertexCommitStartedEvent() throws Exception {
     VertexCommitStartedEvent event = new VertexCommitStartedEvent(
         TezVertexID.getInstance(
-            TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 1));
+            TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 1), 100l);
     VertexCommitStartedEvent deserializedEvent =
         (VertexCommitStartedEvent) testProtoConversion(event);
     Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID());
     logEvents(event, deserializedEvent);
   }
 
+  private void testVertexGroupCommitStartedEvent() throws Exception {
+    VertexGroupCommitStartedEvent event = new VertexGroupCommitStartedEvent(
+        TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1),
+        "fooGroup", 1000344l);
+    {
+      VertexGroupCommitStartedEvent deserializedEvent =
+          (VertexGroupCommitStartedEvent) testProtoConversion(event);
+      Assert.assertEquals(event.getDagID(), deserializedEvent.getDagID());
+      Assert.assertEquals(event.getVertexGroupName(),
+          deserializedEvent.getVertexGroupName());
+      logEvents(event, deserializedEvent);
+    }
+    {
+      VertexGroupCommitStartedEvent deserializedEvent =
+          (VertexGroupCommitStartedEvent) testSummaryProtoConversion(event);
+      Assert.assertEquals(event.getVertexGroupName(),
+          deserializedEvent.getVertexGroupName());
+      logEvents(event, deserializedEvent);
+    }
+  }
+
+  private void testVertexGroupCommitFinishedEvent() throws Exception {
+    VertexGroupCommitFinishedEvent event = new VertexGroupCommitFinishedEvent(
+        TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1),
+        "fooGroup", 1000344l);
+    {
+      VertexGroupCommitFinishedEvent deserializedEvent =
+          (VertexGroupCommitFinishedEvent) testProtoConversion(event);
+      Assert.assertEquals(event.getDagID(), deserializedEvent.getDagID());
+      Assert.assertEquals(event.getVertexGroupName(),
+          deserializedEvent.getVertexGroupName());
+      logEvents(event, deserializedEvent);
+    }
+    {
+      VertexGroupCommitFinishedEvent deserializedEvent =
+          (VertexGroupCommitFinishedEvent) testSummaryProtoConversion(event);
+      Assert.assertEquals(event.getVertexGroupName(),
+          deserializedEvent.getVertexGroupName());
+      logEvents(event, deserializedEvent);
+    }
+  }
+
+
   @Test
   public void testDefaultProtoConversion() throws Exception {
     for (HistoryEventType eventType : HistoryEventType.values()) {
@@ -560,6 +625,12 @@ public class TestHistoryEventsProtoConversion {
         case VERTEX_COMMIT_STARTED:
           testVertexCommitStartedEvent();
           break;
+        case VERTEX_GROUP_COMMIT_STARTED:
+          testVertexGroupCommitStartedEvent();
+          break;
+        case VERTEX_GROUP_COMMIT_FINISHED:
+          testVertexGroupCommitFinishedEvent();
+          break;
         default:
           throw new Exception("Unhandled Event type in Unit tests: " + eventType);
         }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java
new file mode 100644
index 0000000..2716fdd
--- /dev/null
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.test;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.tez.client.AMConfiguration;
+import org.apache.tez.client.TezClientUtils;
+import org.apache.tez.client.TezSession;
+import org.apache.tez.client.TezSessionConfiguration;
+import org.apache.tez.client.TezSessionStatus;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.DAGStatus.State;
+import org.apache.tez.test.dag.MultiAttemptDAG;
+import org.apache.tez.test.dag.SimpleVTestDAG;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Random;
+
+public class TestDAGRecovery2 {
+
+  private static final Log LOG = LogFactory.getLog(TestDAGRecovery2.class);
+
+  private static Configuration conf = new Configuration();
+  private static MiniTezCluster miniTezCluster;
+  private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR
+      + TestDAGRecovery2.class.getName() + "-tmpDir";
+  protected static MiniDFSCluster dfsCluster;
+
+  private static TezSession tezSession = null;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    LOG.info("Starting mini clusters");
+    FileSystem remoteFs = null;
+    try {
+      conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
+      dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(3)
+          .format(true).racks(null).build();
+      remoteFs = dfsCluster.getFileSystem();
+    } catch (IOException io) {
+      throw new RuntimeException("problem starting mini dfs cluster", io);
+    }
+    if (miniTezCluster == null) {
+      miniTezCluster = new MiniTezCluster(TestDAGRecovery2.class.getName(),
+          1, 1, 1);
+      Configuration miniTezconf = new Configuration(conf);
+      miniTezconf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 4);
+      miniTezconf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
+      miniTezCluster.init(miniTezconf);
+      miniTezCluster.start();
+
+      Path remoteStagingDir = remoteFs.makeQualified(new Path(TEST_ROOT_DIR, String
+          .valueOf(new Random().nextInt(100000))));
+      TezClientUtils.ensureStagingDirExists(conf, remoteStagingDir);
+
+      TezConfiguration tezConf = new TezConfiguration(miniTezCluster.getConfig());
+      tezConf.setInt(TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS, 10);
+      tezConf.set(TezConfiguration.TEZ_AM_LOG_LEVEL, "DEBUG");
+      tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
+          remoteStagingDir.toString());
+      tezConf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false);
+      tezConf.setInt(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS, 4);
+      tezConf.setInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, 500);
+      tezConf.set(TezConfiguration.TEZ_AM_JAVA_OPTS, " -Xmx256m");
+
+      AMConfiguration amConfig = new AMConfiguration(
+          new HashMap<String, String>(), new HashMap<String, LocalResource>(),
+          tezConf, null);
+      TezSessionConfiguration tezSessionConfig =
+          new TezSessionConfiguration(amConfig, tezConf);
+      tezSession = new TezSession("TestDAGRecovery2", tezSessionConfig);
+      tezSession.start();
+    }
+  }
+  void runDAGAndVerify(DAG dag, DAGStatus.State finalState) throws Exception {
+    TezSessionStatus status = tezSession.getSessionStatus();
+    while (status != TezSessionStatus.READY && status != TezSessionStatus.SHUTDOWN) {
+      LOG.info("Waiting for session to be ready. Current: " + status);
+      Thread.sleep(100);
+      status = tezSession.getSessionStatus();
+    }
+    if (status == TezSessionStatus.SHUTDOWN) {
+      throw new TezUncheckedException("Unexpected Session shutdown");
+    }
+    DAGClient dagClient = tezSession.submitDAG(dag);
+    DAGStatus dagStatus = dagClient.getDAGStatus(null);
+    while (!dagStatus.isCompleted()) {
+      LOG.info("Waiting for dag to complete. Sleeping for 500ms."
+          + " DAG name: " + dag.getName()
+          + " DAG appId: " + dagClient.getApplicationId()
+          + " Current state: " + dagStatus.getState());
+      Thread.sleep(100);
+      dagStatus = dagClient.getDAGStatus(null);
+    }
+
+    Assert.assertEquals(finalState, dagStatus.getState());
+  }
+
+  @Test(timeout=120000)
+  public void testBasicRecovery() throws Exception {
+    DAG dag = SimpleVTestDAG.createDAG("FailingCommitterDAG", null);
+    OutputDescriptor od =
+        new OutputDescriptor(MultiAttemptDAG.NoOpOutput.class.getName());
+    od.setUserPayload(new
+        MultiAttemptDAG.FailingOutputCommitter.FailingOutputCommitterConfig(true)
+            .toUserPayload());
+    dag.getVertex("v3").addOutput("FailingOutput", od,
+        MultiAttemptDAG.FailingOutputCommitter.class);
+    runDAGAndVerify(dag, State.FAILED);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java b/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
index 086e458..6a25f13 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
@@ -216,6 +216,11 @@ public class TestProcessor implements LogicalIOProcessor {
         LOG.info(msg);
     }
     for (Map.Entry<String, LogicalOutput> entry : outputs.entrySet()) {
+      if (!(entry.getValue() instanceof TestOutput)) {
+        LOG.info("Ignoring non TestOutput: " + entry.getKey()
+            + " outputClass= " + entry.getValue().getClass().getSimpleName());
+        continue;
+      }
       LOG.info("Writing output: " + entry.getKey() + " sum= " + sum);
       TestOutput output = (TestOutput) entry.getValue();
       output.write(sum);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java b/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
index 65c8383..80d8588 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
@@ -18,6 +18,8 @@
 
 package org.apache.tez.test.dag;
 
+import com.google.common.primitives.Booleans;
+import com.google.common.primitives.Ints;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -30,15 +32,24 @@ import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
 import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.VertexManagerPlugin;
 import org.apache.tez.dag.api.VertexManagerPluginContext;
 import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
+import org.apache.tez.dag.api.client.VertexStatus.State;
 import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.MemoryUpdateCallback;
+import org.apache.tez.runtime.api.OutputCommitter;
+import org.apache.tez.runtime.api.OutputCommitterContext;
+import org.apache.tez.runtime.api.TezOutputContext;
+import org.apache.tez.runtime.api.Writer;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
 import org.apache.tez.test.TestInput;
 import org.apache.tez.test.TestOutput;
 import org.apache.tez.test.TestProcessor;
+import org.apache.tez.test.dag.MultiAttemptDAG.FailingOutputCommitter.FailingOutputCommitterConfig;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -56,6 +67,10 @@ public class MultiAttemptDAG {
       "tez.multi-attempt-dag.vertex.num-tasks";
   public static int MULTI_ATTEMPT_DAG_VERTEX_NUM_TASKS_DEFAULT = 2;
 
+  public static String MULTI_ATTEMPT_DAG_USE_FAILING_COMMITTER =
+      "tez.multi-attempt-dag.use-failing-committer";
+  public static boolean MULTI_ATTEMPT_DAG_USE_FAILING_COMMITTER_DEFAULT = false;
+
   public static class FailOnAttemptVertexManagerPlugin
       implements VertexManagerPlugin {
     private int numSourceTasks = 0;
@@ -130,6 +145,100 @@ public class MultiAttemptDAG {
     }
   }
 
+  public static class FailingOutputCommitter extends OutputCommitter {
+
+    boolean failOnCommit = false;
+
+    @Override
+    public void initialize(OutputCommitterContext context) throws Exception {
+      FailingOutputCommitterConfig config = new
+          FailingOutputCommitterConfig();
+      config.fromUserPayload(context.getUserPayload());
+      failOnCommit = config.failOnCommit;
+    }
+
+    @Override
+    public void setupOutput() throws Exception {
+
+    }
+
+    @Override
+    public void commitOutput() throws Exception {
+      if (failOnCommit) {
+        LOG.info("Committer causing AM to shutdown");
+        Runtime.getRuntime().halt(-1);
+      }
+    }
+
+    @Override
+    public void abortOutput(State finalState) throws Exception {
+
+    }
+
+    public static class FailingOutputCommitterConfig {
+      boolean failOnCommit;
+
+      public FailingOutputCommitterConfig() {
+        this(false);
+      }
+
+      public FailingOutputCommitterConfig(boolean failOnCommit) {
+        this.failOnCommit = failOnCommit;
+      }
+
+      public byte[] toUserPayload() {
+        return Ints.toByteArray((failOnCommit ? 1 : 0));
+      }
+
+      public void fromUserPayload(byte[] userPayload) {
+        int failInt = Ints.fromByteArray(userPayload);
+        if (failInt == 0) {
+          failOnCommit = false;
+        } else {
+          failOnCommit = true;
+        }
+      }
+    }
+  }
+
+  public static class NoOpOutput implements LogicalOutput, MemoryUpdateCallback {
+
+    @Override
+    public void setNumPhysicalOutputs(int numOutputs) {
+
+    }
+
+    @Override
+    public List<Event> initialize(TezOutputContext outputContext) throws Exception {
+      outputContext.requestInitialMemory(1l, this);
+      return null;
+    }
+
+    @Override
+    public void start() throws Exception {
+
+    }
+
+    @Override
+    public Writer getWriter() throws Exception {
+      return null;
+    }
+
+    @Override
+    public void handleEvents(List<Event> outputEvents) {
+
+    }
+
+    @Override
+    public List<Event> close() throws Exception {
+      return null;
+    }
+
+    @Override
+    public void memoryAssigned(long assignedSize) {
+    }
+  }
+
 
   public static DAG createDAG(String name,
       Configuration conf) throws Exception {


[2/2] git commit: TEZ-904. Committer recovery events should be out-of-band. (hitesh)

Posted by hi...@apache.org.
TEZ-904. Committer recovery events should be out-of-band. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/f58508a5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/f58508a5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/f58508a5

Branch: refs/heads/master
Commit: f58508a5692c817a11b7996ba563ba8a26b85560
Parents: 693f2ca
Author: Hitesh Shah <hi...@apache.org>
Authored: Thu Mar 13 18:47:05 2014 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Thu Mar 13 18:47:05 2014 -0700

----------------------------------------------------------------------
 .../org/apache/tez/dag/app/DAGAppMaster.java    |  77 ++--
 .../org/apache/tez/dag/app/RecoveryParser.java  | 354 +++++++++++++++----
 .../dag/app/dag/event/DAGEventRecoverEvent.java |  37 ++
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    |  73 +++-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 114 +++---
 .../apache/tez/dag/history/HistoryEvent.java    |   8 +-
 .../tez/dag/history/HistoryEventType.java       |   4 +-
 .../apache/tez/dag/history/SummaryEvent.java    |  11 +
 .../history/events/DAGCommitStartedEvent.java   |  34 +-
 .../dag/history/events/DAGFinishedEvent.java    |  33 +-
 .../dag/history/events/DAGSubmittedEvent.java   |  23 +-
 .../events/VertexCommitStartedEvent.java        |  41 ++-
 .../dag/history/events/VertexFinishedEvent.java |  51 ++-
 .../events/VertexGroupCommitFinishedEvent.java  | 132 +++++++
 .../events/VertexGroupCommitStartedEvent.java   | 132 +++++++
 .../dag/history/recovery/RecoveryService.java   | 181 +++++-----
 tez-dag/src/main/proto/HistoryEvents.proto      |  21 ++
 .../TestHistoryEventsProtoConversion.java       |  75 +++-
 .../org/apache/tez/test/TestDAGRecovery2.java   | 144 ++++++++
 .../java/org/apache/tez/test/TestProcessor.java |   5 +
 .../apache/tez/test/dag/MultiAttemptDAG.java    | 109 ++++++
 21 files changed, 1390 insertions(+), 269 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 9df8752..9a01090 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -19,6 +19,7 @@
 package org.apache.tez.dag.app;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
@@ -99,6 +100,7 @@ import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
 import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourcesProto;
 import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
+import org.apache.tez.dag.app.RecoveryParser.RecoveredDAGData;
 import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.dag.DAGState;
 import org.apache.tez.dag.app.dag.Task;
@@ -108,6 +110,7 @@ import org.apache.tez.dag.app.dag.event.DAGAppMasterEvent;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDAGFinished;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
 import org.apache.tez.dag.app.dag.event.DAGEvent;
+import org.apache.tez.dag.app.dag.event.DAGEventRecoverEvent;
 import org.apache.tez.dag.app.dag.event.DAGEventType;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
@@ -1371,28 +1374,19 @@ public class DAGAppMaster extends AbstractService {
     }
   }
 
-  private DAG recoverDAG() throws IOException {
-    DAG recoveredDAG = null;
+  private RecoveredDAGData recoverDAG() throws IOException {
     if (recoveryEnabled) {
       if (this.appAttemptID.getAttemptId() > 1) {
+        LOG.info("Recovering data from previous attempts"
+            + ", currentAttemptId=" + this.appAttemptID.getAttemptId());
         this.state = DAGAppMasterState.RECOVERING;
         RecoveryParser recoveryParser = new RecoveryParser(
             this, recoveryFS, recoveryDataDir, appAttemptID.getAttemptId());
-        recoveredDAG = recoveryParser.parseRecoveryData();
-        if (recoveredDAG != null) {
-          LOG.info("Found DAG to recover, dagId=" + recoveredDAG.getID());
-          _updateLoggers(recoveredDAG, "");
-          DAGEvent recoverDAGEvent = new DAGEvent(recoveredDAG.getID(),
-              DAGEventType.DAG_RECOVER);
-          dagEventDispatcher.handle(recoverDAGEvent);
-          this.state = DAGAppMasterState.RUNNING;
-        } else {
-          LOG.info("No DAG to recover");
-          this.state = DAGAppMasterState.IDLE;
-        }
+        RecoveredDAGData recoveredDAGData = recoveryParser.parseRecoveryData();
+        return recoveredDAGData;
       }
     }
-    return recoveredDAG;
+    return null;
   }
 
   @SuppressWarnings("unchecked")
@@ -1415,20 +1409,56 @@ public class DAGAppMaster extends AbstractService {
 
     this.lastDAGCompletionTime = clock.getTime();
 
+    RecoveredDAGData recoveredDAGData = recoverDAG();
+
     if (!isSession) {
-      DAG recoveredDAG = null;
-      if (appAttemptID.getAttemptId() != 1) {
-        recoveredDAG = recoverDAG();
+      LOG.info("In Non-Session mode.");
+    } else {
+      LOG.info("In Session mode. Waiting for DAG over RPC");
+      this.state = DAGAppMasterState.IDLE;
+    }
+
+    if (recoveredDAGData != null) {
+      if (recoveredDAGData.isCompleted
+          || recoveredDAGData.nonRecoverable) {
+        LOG.info("Found previous DAG in completed or non-recoverable state"
+            + ", dagId=" + recoveredDAGData.recoveredDagID
+            + ", isCompleted=" + recoveredDAGData.isCompleted
+            + ", isNonRecoverable=" + recoveredDAGData.nonRecoverable
+            + ", state=" + (recoveredDAGData.dagState == null ? "null" :
+                recoveredDAGData.dagState)
+            + ", failureReason=" + recoveredDAGData.reason);
+        _updateLoggers(recoveredDAGData.recoveredDAG, "");
+        if (recoveredDAGData.nonRecoverable) {
+          DAGEventRecoverEvent recoverDAGEvent =
+              new DAGEventRecoverEvent(recoveredDAGData.recoveredDAG.getID(),
+                  DAGState.FAILED);
+          dagEventDispatcher.handle(recoverDAGEvent);
+          this.state = DAGAppMasterState.RUNNING;
+        } else {
+          DAGEventRecoverEvent recoverDAGEvent =
+              new DAGEventRecoverEvent(recoveredDAGData.recoveredDAG.getID(),
+                  recoveredDAGData.dagState);
+          dagEventDispatcher.handle(recoverDAGEvent);
+          this.state = DAGAppMasterState.RUNNING;
+        }
+      } else {
+        LOG.info("Found DAG to recover, dagId=" + recoveredDAGData.recoveredDAG.getID());
+        _updateLoggers(recoveredDAGData.recoveredDAG, "");
+        DAGEvent recoverDAGEvent = new DAGEvent(recoveredDAGData.recoveredDAG.getID(),
+            DAGEventType.DAG_RECOVER);
+        dagEventDispatcher.handle(recoverDAGEvent);
+        this.state = DAGAppMasterState.RUNNING;
       }
-      if (recoveredDAG == null) {
+    } else {
+      if (!isSession) {
+        // No dag recovered - in non-session, just restart the original DAG
         dagCounter.set(0);
         startDAG();
       }
-    } else {
-      LOG.info("In Session mode. Waiting for DAG over RPC");
-      this.state = DAGAppMasterState.IDLE;
-      recoverDAG();
+    }
 
+    if (isSession) {
       this.dagSubmissionTimer = new Timer(true);
       this.dagSubmissionTimer.scheduleAtFixedRate(new TimerTask() {
         @Override
@@ -1436,7 +1466,6 @@ public class DAGAppMaster extends AbstractService {
           checkAndHandleSessionTimeout();
         }
       }, sessionTimeoutInterval, sessionTimeoutInterval / 10);
-
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
index 9e59849..7e1feca 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
@@ -18,6 +18,13 @@
 
 package org.apache.tez.dag.app;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -27,7 +34,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.dag.DAGState;
 import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.impl.DAGImpl;
@@ -37,9 +44,9 @@ import org.apache.tez.dag.history.events.AMLaunchedEvent;
 import org.apache.tez.dag.history.events.AMStartedEvent;
 import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
 import org.apache.tez.dag.history.events.DAGCommitStartedEvent;
-import org.apache.tez.dag.history.events.DAGStartedEvent;
 import org.apache.tez.dag.history.events.DAGFinishedEvent;
 import org.apache.tez.dag.history.events.DAGInitializedEvent;
+import org.apache.tez.dag.history.events.DAGStartedEvent;
 import org.apache.tez.dag.history.events.DAGSubmittedEvent;
 import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
 import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
@@ -48,17 +55,15 @@ import org.apache.tez.dag.history.events.TaskStartedEvent;
 import org.apache.tez.dag.history.events.VertexCommitStartedEvent;
 import org.apache.tez.dag.history.events.VertexDataMovementEventsGeneratedEvent;
 import org.apache.tez.dag.history.events.VertexFinishedEvent;
+import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent;
+import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent;
 import org.apache.tez.dag.history.events.VertexInitializedEvent;
 import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
 import org.apache.tez.dag.history.events.VertexStartedEvent;
 import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.dag.recovery.records.RecoveryProtos;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
 
 public class RecoveryParser {
 
@@ -88,6 +93,15 @@ public class RecoveryParser {
         TezConfiguration.DAG_RECOVERY_FILE_IO_BUFFER_SIZE_DEFAULT);
   }
 
+  public static class RecoveredDAGData {
+    public TezDAGID recoveredDagID = null;
+    public DAGImpl recoveredDAG = null;
+    public DAGState dagState = null;
+    public boolean isCompleted = false;
+    public boolean nonRecoverable = false;
+    public String reason = null;
+  }
+
   private static void parseSummaryFile(FSDataInputStream inputStream)
       throws IOException {
     while (inputStream.available() > 0) {
@@ -149,6 +163,12 @@ public class RecoveryParser {
       case VERTEX_COMMIT_STARTED:
         event = new VertexCommitStartedEvent();
         break;
+      case VERTEX_GROUP_COMMIT_STARTED:
+        event = new VertexGroupCommitStartedEvent();
+        break;
+      case VERTEX_GROUP_COMMIT_FINISHED:
+        event = new VertexGroupCommitFinishedEvent();
+        break;
       case VERTEX_FINISHED:
         event = new VertexFinishedEvent();
         break;
@@ -266,18 +286,25 @@ public class RecoveryParser {
     return recoveryFS.create(dagRecoveryPath, true, recoveryBufferSize);
   }
 
-  private TezDAGID getLastInProgressDAG(Map<TezDAGID, Boolean> seenDAGs) {
-    TezDAGID inProgressDAG = null;
-    for (Map.Entry<TezDAGID, Boolean> entry : seenDAGs.entrySet()) {
-      if (!entry.getValue().booleanValue()) {
+  private DAGSummaryData getLastCompletedOrInProgressDAG(
+      Map<TezDAGID, DAGSummaryData> dagSummaryDataMap) {
+    DAGSummaryData inProgressDAG = null;
+    DAGSummaryData lastCompletedDAG = null;
+    for (Map.Entry<TezDAGID, DAGSummaryData> entry : dagSummaryDataMap.entrySet()) {
+      if (!entry.getValue().completed) {
         if (inProgressDAG != null) {
           throw new RuntimeException("Multiple in progress DAGs seen"
-              + ", dagId=" + inProgressDAG
+              + ", dagId=" + inProgressDAG.dagId
               + ", dagId=" + entry.getKey());
         }
-        inProgressDAG = entry.getKey();
+        inProgressDAG = entry.getValue();
+      } else {
+        lastCompletedDAG = entry.getValue();
       }
     }
+    if (inProgressDAG == null) {
+      return lastCompletedDAG;
+    }
     return inProgressDAG;
   }
 
@@ -305,8 +332,134 @@ public class RecoveryParser {
     return getAttemptRecoveryDataDir(recoveryDataDir, foundPreviousAttempt);
   }
 
+  private static class DAGSummaryData {
+
+    final TezDAGID dagId;
+    boolean completed = false;
+    boolean dagCommitCompleted = true;
+    DAGState dagState;
+    Map<TezVertexID, Boolean> vertexCommitStatus =
+        new HashMap<TezVertexID, Boolean>();
+    Map<String, Boolean> vertexGroupCommitStatus =
+        new HashMap<String, Boolean>();
+    List<HistoryEvent> bufferedSummaryEvents =
+        new ArrayList<HistoryEvent>();
+
+    DAGSummaryData(TezDAGID dagId) {
+      this.dagId = dagId;
+    }
+
+    void handleSummaryEvent(SummaryEventProto proto) throws IOException {
+      HistoryEventType eventType =
+          HistoryEventType.values()[proto.getEventType()];
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("[RECOVERY SUMMARY]"
+            + " dagId=" + proto.getDagId()
+            + ", timestamp=" + proto.getTimestamp()
+            + ", event=" + eventType);
+      }
+      switch (eventType) {
+        case DAG_SUBMITTED:
+          completed = false;
+          break;
+        case DAG_FINISHED:
+          completed = true;
+          dagCommitCompleted = true;
+          DAGFinishedEvent dagFinishedEvent = new DAGFinishedEvent();
+          dagFinishedEvent.fromSummaryProtoStream(proto);
+          dagState = dagFinishedEvent.getState();
+          break;
+        case DAG_COMMIT_STARTED:
+          dagCommitCompleted = false;
+          break;
+        case VERTEX_COMMIT_STARTED:
+          VertexCommitStartedEvent vertexCommitStartedEvent =
+              new VertexCommitStartedEvent();
+          vertexCommitStartedEvent.fromSummaryProtoStream(proto);
+          vertexCommitStatus.put(
+              vertexCommitStartedEvent.getVertexID(), false);
+          break;
+        case VERTEX_FINISHED:
+          VertexFinishedEvent vertexFinishedEvent =
+              new VertexFinishedEvent();
+          vertexFinishedEvent.fromSummaryProtoStream(proto);
+          if (vertexCommitStatus.containsKey(vertexFinishedEvent.getVertexID())) {
+            vertexCommitStatus.put(
+                vertexFinishedEvent.getVertexID(), true);
+            bufferedSummaryEvents.add(vertexFinishedEvent);
+          }
+          break;
+        case VERTEX_GROUP_COMMIT_STARTED:
+          VertexGroupCommitStartedEvent vertexGroupCommitStartedEvent =
+              new VertexGroupCommitStartedEvent();
+          vertexGroupCommitStartedEvent.fromSummaryProtoStream(proto);
+          bufferedSummaryEvents.add(vertexGroupCommitStartedEvent);
+          vertexGroupCommitStatus.put(
+              vertexGroupCommitStartedEvent.getVertexGroupName(), false);
+          break;
+        case VERTEX_GROUP_COMMIT_FINISHED:
+          VertexGroupCommitFinishedEvent vertexGroupCommitFinishedEvent =
+              new VertexGroupCommitFinishedEvent();
+          vertexGroupCommitFinishedEvent.fromSummaryProtoStream(proto);
+          bufferedSummaryEvents.add(vertexGroupCommitFinishedEvent);
+          vertexGroupCommitStatus.put(
+              vertexGroupCommitFinishedEvent.getVertexGroupName(), true);
+          break;
+      }
+    }
 
-  public DAG parseRecoveryData() throws IOException {
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append("dagId=").append(dagId);
+      sb.append(", dagCompleted=").append(completed);
+      if (!vertexCommitStatus.isEmpty()) {
+        sb.append(", vertexCommitStatuses=[");
+        for (Entry<TezVertexID, Boolean> entry : vertexCommitStatus.entrySet()) {
+          sb.append("{ vertexId=").append(entry.getKey())
+              .append(", committed=").append(entry.getValue()).append("}, ");
+        }
+        sb.append("]");
+      }
+      if (!vertexGroupCommitStatus.isEmpty()) {
+        sb.append(", vertexGroupCommitStatuses=[");
+        for (Entry<String, Boolean> entry : vertexGroupCommitStatus.entrySet()) {
+          sb.append("{ vertexGroup=").append(entry.getKey())
+              .append(", committed=").append(entry.getValue()).append("}, ");
+        }
+        sb.append("]");
+      }
+      return sb.toString();
+    }
+  }
+
+  private String isDAGRecoverable(DAGSummaryData data) {
+    if (!data.dagCommitCompleted) {
+      return "DAG Commit was in progress, not recoverable"
+          + ", dagId=" + data.dagId;
+    }
+    if (!data.vertexCommitStatus.isEmpty()) {
+      for (Entry<TezVertexID, Boolean> entry : data.vertexCommitStatus.entrySet()) {
+        if (!(entry.getValue().booleanValue())) {
+          return "Vertex Commit was in progress, not recoverable"
+              + ", dagId=" + data.dagId
+              + ", vertexId=" + entry.getKey();
+        }
+      }
+    }
+    if (!data.vertexGroupCommitStatus.isEmpty()) {
+      for (Entry<String, Boolean> entry : data.vertexGroupCommitStatus.entrySet()) {
+        if (!(entry.getValue().booleanValue())) {
+          return "Vertex Group Commit was in progress, not recoverable"
+              + ", dagId=" + data.dagId
+              + ", vertexGroup=" + entry.getKey();
+        }
+      }
+    }
+    return null;
+  }
+
+  public RecoveredDAGData parseRecoveryData() throws IOException {
     Path previousAttemptRecoveryDataDir = getPreviousAttemptRecoveryDataDir();
     LOG.info("Using " + previousAttemptRecoveryDataDir.toString()
         + " for recovering data from previous attempt");
@@ -330,8 +483,6 @@ public class RecoveryParser {
     FSDataOutputStream newSummaryStream =
         getSummaryOutputStream(newSummaryPath);
 
-    Map<TezDAGID, Boolean> seenDAGs = new TreeMap<TezDAGID, Boolean>();
-
     FileStatus summaryFileStatus = recoveryFS.getFileStatus(summaryPath);
     LOG.info("Parsing summary file"
         + ", path=" + summaryPath.toString()
@@ -339,6 +490,8 @@ public class RecoveryParser {
         + ", lastModTime=" + summaryFileStatus.getModificationTime());
 
     int dagCounter = 0;
+    Map<TezDAGID, DAGSummaryData> dagSummaryDataMap =
+        new HashMap<TezDAGID, DAGSummaryData>();
     while (summaryStream.available() > 0) {
       RecoveryProtos.SummaryEventProto proto =
           RecoveryProtos.SummaryEventProto.parseDelimitedFrom(summaryStream);
@@ -354,11 +507,10 @@ public class RecoveryParser {
       if (dagCounter < dagId.getId()) {
         dagCounter = dagId.getId();
       }
-      if (eventType.equals(HistoryEventType.DAG_SUBMITTED)) {
-        seenDAGs.put(dagId, false);
-      } else if (eventType.equals(HistoryEventType.DAG_FINISHED)) {
-        seenDAGs.put(dagId, true);
+      if (!dagSummaryDataMap.containsKey(dagId)) {
+        dagSummaryDataMap.put(dagId, new DAGSummaryData(dagId));
       }
+      dagSummaryDataMap.get(dagId).handleSummaryEvent(proto);
       proto.writeDelimitedTo(newSummaryStream);
     }
     newSummaryStream.hsync();
@@ -367,12 +519,35 @@ public class RecoveryParser {
     // Set counter for next set of DAGs
     dagAppMaster.setDAGCounter(dagCounter);
 
-    TezDAGID lastInProgressDAG = getLastInProgressDAG(seenDAGs);
+    DAGSummaryData lastInProgressDAGData =
+        getLastCompletedOrInProgressDAG(dagSummaryDataMap);
+    if (lastInProgressDAGData == null) {
+      LOG.info("Nothing to recover as no uncompleted/completed DAGs found");
+      return null;
+    }
+    TezDAGID lastInProgressDAG = lastInProgressDAGData.dagId;
     if (lastInProgressDAG == null) {
-      LOG.info("Nothing to recover as no uncompleted DAGs found");
+      LOG.info("Nothing to recover as no uncompleted/completed DAGs found");
       return null;
     }
 
+    LOG.info("Checking if DAG is in recoverable state"
+        + ", dagId=" + lastInProgressDAGData.dagId);
+
+    final RecoveredDAGData recoveredDAGData = new RecoveredDAGData();
+    if (lastInProgressDAGData.completed) {
+      recoveredDAGData.isCompleted = true;
+      recoveredDAGData.dagState = lastInProgressDAGData.dagState;
+    }
+
+    String nonRecoverableReason = isDAGRecoverable(lastInProgressDAGData);
+    if (nonRecoverableReason != null) {
+      LOG.warn("Found last inProgress DAG but not recoverable: "
+          + lastInProgressDAGData);
+      recoveredDAGData.nonRecoverable = true;
+      recoveredDAGData.reason = nonRecoverableReason;
+    }
+
     LOG.info("Trying to recover dag from recovery file"
         + ", dagId=" + lastInProgressDAG.toString()
         + ", dataDir=" + previousAttemptRecoveryDataDir
@@ -387,14 +562,13 @@ public class RecoveryParser {
           + ", dagId=" + lastInProgressDAG);
     }
 
-    DAGImpl recoveredDAG = null;
-
     LOG.info("Copying DAG data into Current Attempt directory"
         + ", filePath=" + getDAGRecoveryFilePath(currentAttemptRecoveryDataDir,
         lastInProgressDAG));
     FSDataOutputStream newDAGRecoveryStream =
         getDAGRecoveryOutputStream(currentAttemptRecoveryDataDir, lastInProgressDAG);
 
+    boolean skipAllOtherEvents = false;
     while (dagRecoveryStream.available() > 0) {
       HistoryEvent event;
       try {
@@ -403,8 +577,9 @@ public class RecoveryParser {
         LOG.warn("Corrupt data found when trying to read next event", ioe);
         break;
       }
-      if (event == null) {
+      if (event == null || skipAllOtherEvents) {
         // reached end of data
+        event = null;
         break;
       }
       HistoryEventType eventType = event.getEventType();
@@ -414,9 +589,13 @@ public class RecoveryParser {
           LOG.info("Recovering from event"
               + ", eventType=" + eventType
               + ", event=" + event.toString());
-          recoveredDAG = dagAppMaster.createDAG(((DAGSubmittedEvent) event).getDAGPlan(),
+          recoveredDAGData.recoveredDAG = dagAppMaster.createDAG(((DAGSubmittedEvent) event).getDAGPlan(),
               lastInProgressDAG);
-          dagAppMaster.setCurrentDAG(recoveredDAG);
+          recoveredDAGData.recoveredDagID = recoveredDAGData.recoveredDAG.getID();
+          dagAppMaster.setCurrentDAG(recoveredDAGData.recoveredDAG);
+          if (recoveredDAGData.nonRecoverable) {
+            skipAllOtherEvents = true;
+          }
           break;
         }
         case DAG_INITIALIZED:
@@ -424,8 +603,8 @@ public class RecoveryParser {
           LOG.info("Recovering from event"
               + ", eventType=" + eventType
               + ", event=" + event.toString());
-          assert recoveredDAG != null;
-          recoveredDAG.restoreFromEvent(event);
+          assert recoveredDAGData.recoveredDAG != null;
+          recoveredDAGData.recoveredDAG.restoreFromEvent(event);
           break;
         }
         case DAG_STARTED:
@@ -433,8 +612,8 @@ public class RecoveryParser {
           LOG.info("Recovering from event"
               + ", eventType=" + eventType
               + ", event=" + event.toString());
-          assert recoveredDAG != null;
-          recoveredDAG.restoreFromEvent(event);
+          assert recoveredDAGData.recoveredDAG != null;
+          recoveredDAGData.recoveredDAG.restoreFromEvent(event);
           break;
         }
         case DAG_COMMIT_STARTED:
@@ -442,8 +621,26 @@ public class RecoveryParser {
           LOG.info("Recovering from event"
               + ", eventType=" + eventType
               + ", event=" + event.toString());
-          assert recoveredDAG != null;
-          recoveredDAG.restoreFromEvent(event);
+          assert recoveredDAGData.recoveredDAG != null;
+          recoveredDAGData.recoveredDAG.restoreFromEvent(event);
+          break;
+        }
+        case VERTEX_GROUP_COMMIT_STARTED:
+        {
+          LOG.info("Recovering from event"
+              + ", eventType=" + eventType
+              + ", event=" + event.toString());
+          assert recoveredDAGData.recoveredDAG != null;
+          recoveredDAGData.recoveredDAG.restoreFromEvent(event);
+          break;
+        }
+        case VERTEX_GROUP_COMMIT_FINISHED:
+        {
+          LOG.info("Recovering from event"
+              + ", eventType=" + eventType
+              + ", event=" + event.toString());
+          assert recoveredDAGData.recoveredDAG != null;
+          recoveredDAGData.recoveredDAG.restoreFromEvent(event);
           break;
         }
         case DAG_FINISHED:
@@ -452,13 +649,16 @@ public class RecoveryParser {
               + ", eventType=" + eventType
               + ", event=" + event.toString());
           // If this is seen, nothing to recover
-          assert recoveredDAG != null;
-          recoveredDAG.restoreFromEvent(event);
-          return recoveredDAG;
+          assert recoveredDAGData.recoveredDAG != null;
+          recoveredDAGData.recoveredDAG.restoreFromEvent(event);
+          recoveredDAGData.isCompleted = true;
+          recoveredDAGData.dagState =
+              ((DAGFinishedEvent) event).getState();
+          skipAllOtherEvents = true;
         }
         case CONTAINER_LAUNCHED:
         {
-          // Nothing to do?
+          // Nothing to do for now
           break;
         }
         case VERTEX_INITIALIZED:
@@ -466,9 +666,9 @@ public class RecoveryParser {
           LOG.info("Recovering from event"
               + ", eventType=" + eventType
               + ", event=" + event.toString());
-          assert recoveredDAG != null;
+          assert recoveredDAGData.recoveredDAG != null;
           VertexInitializedEvent vEvent = (VertexInitializedEvent) event;
-          Vertex v = recoveredDAG.getVertex(vEvent.getVertexID());
+          Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
           v.restoreFromEvent(vEvent);
           break;
         }
@@ -477,9 +677,9 @@ public class RecoveryParser {
           LOG.info("Recovering from event"
               + ", eventType=" + eventType
               + ", event=" + event.toString());
-          assert recoveredDAG != null;
+          assert recoveredDAGData.recoveredDAG != null;
           VertexStartedEvent vEvent = (VertexStartedEvent) event;
-          Vertex v = recoveredDAG.getVertex(vEvent.getVertexID());
+          Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
           v.restoreFromEvent(vEvent);
           break;
         }
@@ -488,9 +688,9 @@ public class RecoveryParser {
           LOG.info("Recovering from event"
               + ", eventType=" + eventType
               + ", event=" + event.toString());
-          assert recoveredDAG != null;
+          assert recoveredDAGData.recoveredDAG != null;
           VertexParallelismUpdatedEvent vEvent = (VertexParallelismUpdatedEvent) event;
-          Vertex v = recoveredDAG.getVertex(vEvent.getVertexID());
+          Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
           v.restoreFromEvent(vEvent);
           break;
         }
@@ -499,9 +699,9 @@ public class RecoveryParser {
           LOG.info("Recovering from event"
               + ", eventType=" + eventType
               + ", event=" + event.toString());
-          assert recoveredDAG != null;
+          assert recoveredDAGData.recoveredDAG != null;
           VertexCommitStartedEvent vEvent = (VertexCommitStartedEvent) event;
-          Vertex v = recoveredDAG.getVertex(vEvent.getVertexID());
+          Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
           v.restoreFromEvent(vEvent);
           break;
         }
@@ -510,9 +710,9 @@ public class RecoveryParser {
           LOG.info("Recovering from event"
               + ", eventType=" + eventType
               + ", event=" + event.toString());
-          assert recoveredDAG != null;
+          assert recoveredDAGData.recoveredDAG != null;
           VertexFinishedEvent vEvent = (VertexFinishedEvent) event;
-          Vertex v = recoveredDAG.getVertex(vEvent.getVertexID());
+          Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
           v.restoreFromEvent(vEvent);
           break;
         }
@@ -521,9 +721,9 @@ public class RecoveryParser {
           LOG.info("Recovering from event"
               + ", eventType=" + eventType
               + ", event=" + event.toString());
-          assert recoveredDAG != null;
+          assert recoveredDAGData.recoveredDAG != null;
           TaskStartedEvent tEvent = (TaskStartedEvent) event;
-          Task task = recoveredDAG.getVertex(
+          Task task = recoveredDAGData.recoveredDAG.getVertex(
               tEvent.getTaskID().getVertexID()).getTask(tEvent.getTaskID());
           task.restoreFromEvent(tEvent);
           break;
@@ -533,9 +733,9 @@ public class RecoveryParser {
           LOG.info("Recovering from event"
               + ", eventType=" + eventType
               + ", event=" + event.toString());
-          assert recoveredDAG != null;
+          assert recoveredDAGData.recoveredDAG != null;
           TaskFinishedEvent tEvent = (TaskFinishedEvent) event;
-          Task task = recoveredDAG.getVertex(
+          Task task = recoveredDAGData.recoveredDAG.getVertex(
               tEvent.getTaskID().getVertexID()).getTask(tEvent.getTaskID());
           task.restoreFromEvent(tEvent);
           break;
@@ -545,10 +745,10 @@ public class RecoveryParser {
           LOG.info("Recovering from event"
               + ", eventType=" + eventType
               + ", event=" + event.toString());
-          assert recoveredDAG != null;
+          assert recoveredDAGData.recoveredDAG != null;
           TaskAttemptStartedEvent tEvent = (TaskAttemptStartedEvent) event;
           Task task =
-              recoveredDAG.getVertex(
+              recoveredDAGData.recoveredDAG.getVertex(
                   tEvent.getTaskAttemptID().getTaskID().getVertexID())
                       .getTask(tEvent.getTaskAttemptID().getTaskID());
           task.restoreFromEvent(tEvent);
@@ -559,10 +759,10 @@ public class RecoveryParser {
           LOG.info("Recovering from event"
               + ", eventType=" + eventType
               + ", event=" + event.toString());
-          assert recoveredDAG != null;
+          assert recoveredDAGData.recoveredDAG != null;
           TaskAttemptFinishedEvent tEvent = (TaskAttemptFinishedEvent) event;
           Task task =
-              recoveredDAG.getVertex(
+              recoveredDAGData.recoveredDAG.getVertex(
                   tEvent.getTaskAttemptID().getTaskID().getVertexID())
                   .getTask(tEvent.getTaskAttemptID().getTaskID());
           task.restoreFromEvent(tEvent);
@@ -573,10 +773,10 @@ public class RecoveryParser {
           LOG.info("Recovering from event"
               + ", eventType=" + eventType
               + ", event=" + event.toString());
-          assert recoveredDAG != null;
+          assert recoveredDAGData.recoveredDAG != null;
           VertexDataMovementEventsGeneratedEvent vEvent =
               (VertexDataMovementEventsGeneratedEvent) event;
-          Vertex v = recoveredDAG.getVertex(vEvent.getVertexID());
+          Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
           v.restoreFromEvent(vEvent);
           break;
         }
@@ -596,6 +796,42 @@ public class RecoveryParser {
     newDAGRecoveryStream.hsync();
     newDAGRecoveryStream.close();
 
+    if (!recoveredDAGData.isCompleted
+        && !recoveredDAGData.nonRecoverable) {
+      if (lastInProgressDAGData.bufferedSummaryEvents != null
+        && !lastInProgressDAGData.bufferedSummaryEvents.isEmpty()) {
+        for (HistoryEvent bufferedEvent : lastInProgressDAGData.bufferedSummaryEvents) {
+          assert recoveredDAGData.recoveredDAG != null;
+          switch (bufferedEvent.getEventType()) {
+            case VERTEX_GROUP_COMMIT_STARTED:
+              recoveredDAGData.recoveredDAG.restoreFromEvent(bufferedEvent);
+              break;
+            case VERTEX_GROUP_COMMIT_FINISHED:
+              recoveredDAGData.recoveredDAG.restoreFromEvent(bufferedEvent);
+              break;
+            case VERTEX_FINISHED:
+              VertexFinishedEvent vertexFinishedEvent =
+                  (VertexFinishedEvent) bufferedEvent;
+              Vertex vertex = recoveredDAGData.recoveredDAG.getVertex(
+                  vertexFinishedEvent.getVertexID());
+              if (vertex == null) {
+                recoveredDAGData.nonRecoverable = true;
+                recoveredDAGData.reason = "All state could not be recovered"
+                    + ", vertex completed but events not flushed"
+                    + ", vertexId=" + vertexFinishedEvent.getVertexID();
+              } else {
+                vertex.restoreFromEvent(vertexFinishedEvent);
+              }
+              break;
+            default:
+              throw new RuntimeException("Invalid data found in buffered summary events"
+                  + ", unknown event type "
+                  + bufferedEvent.getEventType());
+          }
+        }
+      }
+    }
+
     Path dataCopiedFlagPath = new Path(currentAttemptRecoveryDataDir,
         dataRecoveredFileFlag);
     LOG.info("Finished copying data from previous attempt into current attempt"
@@ -607,7 +843,7 @@ public class RecoveryParser {
     flagFile.hsync();
     flagFile.close();
 
-    return recoveredDAG;
+    return recoveredDAGData;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventRecoverEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventRecoverEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventRecoverEvent.java
new file mode 100644
index 0000000..e64ad13
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventRecoverEvent.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.records.TezDAGID;
+
+public class DAGEventRecoverEvent extends DAGEvent {
+
+  private final DAGState desiredState;
+
+  public DAGEventRecoverEvent(TezDAGID dagId, DAGState desiredState) {
+    super(dagId, DAGEventType.DAG_RECOVER);
+    this.desiredState = desiredState;
+  }
+
+  public DAGState getDesiredState() {
+    return desiredState;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 432c189..8fc278f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -29,6 +29,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -54,11 +55,11 @@ import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.EdgeManagerDescriptor;
 import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.VertexLocationHint;
-import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.DAGStatusBuilder;
 import org.apache.tez.dag.api.client.ProgressBuilder;
@@ -73,32 +74,35 @@ import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
-import org.apache.tez.dag.app.dag.DAGTerminationCause;
 import org.apache.tez.dag.app.dag.DAGReport;
 import org.apache.tez.dag.app.dag.DAGScheduler;
 import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.app.dag.DAGTerminationCause;
 import org.apache.tez.dag.app.dag.Vertex;
-import org.apache.tez.dag.app.dag.VertexTerminationCause;
 import org.apache.tez.dag.app.dag.VertexState;
+import org.apache.tez.dag.app.dag.VertexTerminationCause;
+import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDAGFinished;
 import org.apache.tez.dag.app.dag.event.DAGEvent;
 import org.apache.tez.dag.app.dag.event.DAGEventCounterUpdate;
 import org.apache.tez.dag.app.dag.event.DAGEventDiagnosticsUpdate;
+import org.apache.tez.dag.app.dag.event.DAGEventRecoverEvent;
 import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
 import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdateTAAssigned;
 import org.apache.tez.dag.app.dag.event.DAGEventType;
 import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted;
-import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDAGFinished;
 import org.apache.tez.dag.app.dag.event.DAGEventVertexReRunning;
 import org.apache.tez.dag.app.dag.event.VertexEvent;
 import org.apache.tez.dag.app.dag.event.VertexEventRecoverVertex;
-import org.apache.tez.dag.app.dag.event.VertexEventType;
 import org.apache.tez.dag.app.dag.event.VertexEventTermination;
+import org.apache.tez.dag.app.dag.event.VertexEventType;
 import org.apache.tez.dag.history.DAGHistoryEvent;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.events.DAGCommitStartedEvent;
-import org.apache.tez.dag.history.events.DAGStartedEvent;
 import org.apache.tez.dag.history.events.DAGFinishedEvent;
 import org.apache.tez.dag.history.events.DAGInitializedEvent;
+import org.apache.tez.dag.history.events.DAGStartedEvent;
+import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent;
+import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.dag.utils.TezBuilderUtils;
@@ -357,6 +361,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   Map<String, List<VertexGroupInfo>> vertexGroupInfo = Maps.newHashMap();
   private DAGState recoveredState = DAGState.NEW;
   private boolean recoveryCommitInProgress = false;
+  Map<String, Boolean> recoveredGroupCommits = new HashMap<String, Boolean>();
 
   static class VertexGroupInfo {
     String groupName;
@@ -497,18 +502,21 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
         recoveredState = DAGState.RUNNING;
         return recoveredState;
       case DAG_COMMIT_STARTED:
-        if (recoveredState != DAGState.RUNNING) {
-          throw new RuntimeException("Commit Started Event seen but"
-              + " recovered state is not RUNNING"
-              + ", recoveredState=" + recoveredState);
-        }
         recoveryCommitInProgress = true;
         return recoveredState;
+      case VERTEX_GROUP_COMMIT_STARTED:
+        VertexGroupCommitStartedEvent vertexGroupCommitStartedEvent =
+            (VertexGroupCommitStartedEvent) historyEvent;
+        recoveredGroupCommits.put(
+            vertexGroupCommitStartedEvent.getVertexGroupName(), false);
+        return recoveredState;
+      case VERTEX_GROUP_COMMIT_FINISHED:
+        VertexGroupCommitFinishedEvent vertexGroupCommitFinishedEvent =
+            (VertexGroupCommitFinishedEvent) historyEvent;
+        recoveredGroupCommits.put(
+            vertexGroupCommitFinishedEvent.getVertexGroupName(), true);
+        return recoveredState;
       case DAG_FINISHED:
-        if (!recoveryStartEventSeen) {
-          throw new RuntimeException("Finished Event seen but"
-              + " no Start Event was encountered earlier");
-        }
         recoveryCommitInProgress = false;
         DAGFinishedEvent finishedEvent = (DAGFinishedEvent) historyEvent;
         this.finishTime = finishedEvent.getFinishTime();
@@ -722,7 +730,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     if (dagSucceeded && !successfulOutputsAlreadyCommitted) {
       // commit all shared outputs
       appContext.getHistoryHandler().handle(new DAGHistoryEvent(getID(),
-          new DAGCommitStartedEvent(getID())));
+          new DAGCommitStartedEvent(getID(), clock.getTime())));
       for (VertexGroupInfo groupInfo : vertexGroups.values()) {
         if (failedWhileCommitting) {
           break;
@@ -1266,6 +1274,12 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
 
     @Override
     public DAGState transition(DAGImpl dag, DAGEvent dagEvent) {
+      if (dagEvent instanceof DAGEventRecoverEvent) {
+        // DAG completed or final end state known
+        DAGEventRecoverEvent recoverEvent = (DAGEventRecoverEvent) dagEvent;
+        dag.recoveredState = recoverEvent.getDesiredState();
+      }
+
       switch (dag.recoveredState) {
         case NEW:
           // send DAG an Init and start events
@@ -1292,8 +1306,19 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
         case RUNNING:
           // if commit is in progress, DAG should fail as commits are not
           // recoverable
-          if (dag.recoveryCommitInProgress) {
-            // Fail the DAG as we have not seen a completion
+          boolean groupCommitInProgress = false;
+          if (!dag.recoveredGroupCommits.isEmpty()) {
+            for (Entry<String, Boolean> entry : dag.recoveredGroupCommits.entrySet()) {
+              if (!entry.getValue().booleanValue()) {
+                LOG.info("Found a pending Vertex Group commit"
+                    + ", vertexGroup=" + entry.getKey());
+              }
+              groupCommitInProgress = true;
+            }
+          }
+
+          if (groupCommitInProgress || dag.recoveryCommitInProgress) {
+            // Fail the DAG as we have not seen a commit completion
             dag.trySetTerminationCause(DAGTerminationCause.COMMIT_FAILURE);
             dag.setFinishTime();
             // Recover all other data for all vertices
@@ -1314,6 +1339,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
                 DAGState.FAILED));
             return DAGState.FAILED;
           }
+
           for (Vertex v : dag.vertices.values()) {
             if (v.getInputVerticesCount() == 0) {
               if (LOG.isDebugEnabled()) {
@@ -1596,10 +1622,16 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
           }
         }
         for (VertexGroupInfo groupInfo : commitList) {
+          if (recoveredGroupCommits.containsKey(groupInfo.groupName)) {
+            LOG.info("VertexGroup was already committed as per recovery"
+                + " data, groupName=" + groupInfo.groupName);
+            continue;
+          }
           groupInfo.committed = true;
           Vertex v = getVertex(groupInfo.groupMembers.iterator().next());
           appContext.getHistoryHandler().handle(new DAGHistoryEvent(getID(),
-              new DAGCommitStartedEvent(dagId)));
+              new VertexGroupCommitStartedEvent(dagId, groupInfo.groupName,
+                  clock.getTime())));
           for (String outputName : groupInfo.outputs) {
             OutputCommitter committer = v.getOutputCommitters().get(outputName);
             LOG.info("Committing output: " + outputName);
@@ -1612,6 +1644,9 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
           if (failedCommit) {
             break;
           }
+          appContext.getHistoryHandler().handle(new DAGHistoryEvent(getID(),
+              new VertexGroupCommitFinishedEvent(dagId, groupInfo.groupName,
+                  clock.getTime())));
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 3ca9e11..7b3b6b4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -94,6 +94,7 @@ import org.apache.tez.dag.app.dag.event.TaskEventRecoverTask;
 import org.apache.tez.dag.app.dag.event.TaskEventTermination;
 import org.apache.tez.dag.app.dag.event.TaskEventType;
 import org.apache.tez.dag.app.dag.event.VertexEvent;
+import org.apache.tez.dag.app.dag.event.VertexEventOneToOneSourceSplit;
 import org.apache.tez.dag.app.dag.event.VertexEventRecoverVertex;
 import org.apache.tez.dag.app.dag.event.VertexEventRootInputFailed;
 import org.apache.tez.dag.app.dag.event.VertexEventRootInputInitialized;
@@ -106,7 +107,6 @@ import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted;
 import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule;
 import org.apache.tez.dag.app.dag.event.VertexEventTermination;
 import org.apache.tez.dag.app.dag.event.VertexEventType;
-import org.apache.tez.dag.app.dag.event.VertexEventOneToOneSourceSplit;
 import org.apache.tez.dag.app.dag.impl.DAGImpl.VertexGroupInfo;
 import org.apache.tez.dag.history.DAGHistoryEvent;
 import org.apache.tez.dag.history.HistoryEvent;
@@ -131,9 +131,9 @@ import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
 import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
 import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
-import org.apache.tez.runtime.api.impl.GroupInputSpec;
 import org.apache.tez.runtime.api.impl.EventMetaData;
 import org.apache.tez.runtime.api.impl.EventType;
+import org.apache.tez.runtime.api.impl.GroupInputSpec;
 import org.apache.tez.runtime.api.impl.InputSpec;
 import org.apache.tez.runtime.api.impl.OutputSpec;
 import org.apache.tez.runtime.api.impl.TezEvent;
@@ -146,7 +146,6 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Multiset;
 import com.google.common.collect.Sets;
 
-
 /** Implementation of Vertex interface. Maintains the state machines of Vertex.
  * The read and write calls use ReadWriteLock for concurrency.
  */
@@ -529,6 +528,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   
   private String logIdentifier;
   private boolean recoveryCommitInProgress = false;
+  private boolean summaryCompleteSeen = false;
+  private boolean hasCommitter = false;
+  private boolean vertexCompleteSeen = false;
   private Map<String,EdgeManagerDescriptor> recoveredSourceEdgeManagers = null;
 
   // Recovery related flags
@@ -905,20 +907,17 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         }
         return recoveredState;
       case VERTEX_COMMIT_STARTED:
-        if (recoveredState != VertexState.RUNNING) {
-          throw new RuntimeException("Commit Started Event seen but"
-              + " recovered state is not RUNNING"
-              + ", recoveredState=" + recoveredState);
-        }
         recoveryCommitInProgress = true;
+        hasCommitter = true;
         return recoveredState;
       case VERTEX_FINISHED:
-        if (!recoveryStartEventSeen) {
-          throw new RuntimeException("Finished Event seen but"
-              + " no Started Event was encountered earlier");
+        VertexFinishedEvent finishedEvent = (VertexFinishedEvent) historyEvent;
+        if (finishedEvent.isFromSummary()) {
+          summaryCompleteSeen  = true;
+        } else {
+          vertexCompleteSeen = true;
         }
         recoveryCommitInProgress = false;
-        VertexFinishedEvent finishedEvent = (VertexFinishedEvent) historyEvent;
         recoveredState = finishedEvent.getState();
         diagnostics.add(finishedEvent.getDiagnostics());
         finishTime = finishedEvent.getFinishTime();
@@ -1280,16 +1279,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       if(vertex.succeededTaskCount == vertex.tasks.size() && vertex.terminationCause == null) {
         LOG.info("Vertex succeeded: " + vertex.logIdentifier);
         try {
-          if (vertex.outputCommitters != null) {
-            vertex.appContext.getHistoryHandler().handle(
-                new DAGHistoryEvent(vertex.getDAGId(),
-                    new VertexCommitStartedEvent(vertex.vertexId)));
-          }
           if (vertex.commitVertexOutputs && !vertex.committed.getAndSet(true)) {
             // commit only once. Dont commit shared outputs
             LOG.info("Invoking committer commit for vertex, vertexId="
                 + vertex.logIdentifier);
-            if (vertex.outputCommitters != null) {
+            if (vertex.outputCommitters != null
+                && !vertex.outputCommitters.isEmpty()) {
+              boolean firstCommit = true;
               for (Entry<String, OutputCommitter> entry : vertex.outputCommitters.entrySet()) {
                 final OutputCommitter committer = entry.getValue();
                 final String outputName = entry.getKey();
@@ -1297,6 +1293,15 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                   // dont commit shared committers. Will be committed by the DAG
                   continue;
                 }
+                if (firstCommit) {
+                  // Log commit start event on first actual commit
+                  vertex.appContext.getHistoryHandler().handle(
+                      new DAGHistoryEvent(vertex.getDAGId(),
+                          new VertexCommitStartedEvent(vertex.vertexId,
+                              vertex.clock.getTime())));
+                } else {
+                  firstCommit = false;
+                }
                 vertex.dagUgi.doAs(new PrivilegedExceptionAction<Void>() {
                   @Override
                   public Void run() throws Exception {
@@ -1806,31 +1811,44 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         case SUCCEEDED:
         case FAILED:
         case KILLED:
-          vertex.tasksNotYetScheduled = false;
-          // recover tasks
-          if (vertex.tasks != null) {
-            TaskState taskState = TaskState.KILLED;
-            switch (vertex.recoveredState) {
-              case SUCCEEDED:
-                taskState = TaskState.SUCCEEDED;
-                break;
-              case KILLED:
-                taskState = TaskState.KILLED;
-                break;
-              case FAILED:
-                taskState = TaskState.FAILED;
-                break;
-            }
-            for (Task task : vertex.tasks.values()) {
-              vertex.eventHandler.handle(
-                  new TaskEventRecoverTask(task.getTaskId(),
-                      taskState));
-            }
-            vertex.vertexManager.onVertexStarted(vertex.pendingReportedSrcCompletions);
-            endState = VertexState.RUNNING;
+          if (vertex.recoveredState == VertexState.SUCCEEDED
+              && vertex.hasCommitter
+              && vertex.summaryCompleteSeen && !vertex.vertexCompleteSeen) {
+            LOG.warn("Cannot recover vertex as all recovery events not"
+                + " found, vertex=" + vertex.logIdentifier
+                + ", hasCommitters=" + vertex.hasCommitter
+                + ", summaryCompletionSeen=" + vertex.summaryCompleteSeen
+                + ", finalCompletionSeen=" + vertex.vertexCompleteSeen);
+            vertex.finished(VertexState.FAILED,
+                VertexTerminationCause.COMMIT_FAILURE);
+            endState = VertexState.FAILED;
           } else {
-            endState = vertex.recoveredState;
-            vertex.finished(endState);
+            vertex.tasksNotYetScheduled = false;
+            // recover tasks
+            if (vertex.tasks != null) {
+              TaskState taskState = TaskState.KILLED;
+              switch (vertex.recoveredState) {
+                case SUCCEEDED:
+                  taskState = TaskState.SUCCEEDED;
+                  break;
+                case KILLED:
+                  taskState = TaskState.KILLED;
+                  break;
+                case FAILED:
+                  taskState = TaskState.FAILED;
+                  break;
+              }
+              for (Task task : vertex.tasks.values()) {
+                vertex.eventHandler.handle(
+                    new TaskEventRecoverTask(task.getTaskId(),
+                        taskState));
+              }
+              vertex.vertexManager.onVertexStarted(vertex.pendingReportedSrcCompletions);
+              endState = VertexState.RUNNING;
+            } else {
+              endState = vertex.recoveredState;
+              vertex.finished(endState);
+            }
           }
           break;
         default:
@@ -2318,12 +2336,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
             " asked to split by: " + originalSplitSource + 
             " but was already split by:" + vertex.originalOneToOneSplitSource);
       }
-      Preconditions.checkState(vertex.getState() == VertexState.INITIALIZING, 
-          " Unexpected 1-1 split for vertex " + vertex.getVertexId() + 
-          " in state " + vertex.getState() + 
-          " . Split in vertex " + originalSplitSource + 
-          " sent by vertex " + splitEvent.getSenderVertex() +
-          " numTasks " + splitEvent.getNumTasks());
+      Preconditions.checkState(vertex.getState() == VertexState.INITIALIZING,
+          " Unexpected 1-1 split for vertex " + vertex.getVertexId() +
+              " in state " + vertex.getState() +
+              " . Split in vertex " + originalSplitSource +
+              " sent by vertex " + splitEvent.getSenderVertex() +
+              " numTasks " + splitEvent.getNumTasks());
       LOG.info("Splitting vertex " + vertex.getVertexId() + 
           " because of split in vertex " + originalSplitSource + 
           " sent by vertex " + splitEvent.getSenderVertex() +

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEvent.java
index 78d1208..3f756c0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEvent.java
@@ -18,16 +18,16 @@
 
 package org.apache.tez.dag.history;
 
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
 public interface HistoryEvent {
 
-  HistoryEventType getEventType();
+  public HistoryEventType getEventType();
 
   public JSONObject convertToATSJSON() throws JSONException;
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
index 7b2087a..219bfe3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
@@ -36,5 +36,7 @@ public enum HistoryEventType {
   CONTAINER_LAUNCHED,
   VERTEX_DATA_MOVEMENT_EVENTS_GENERATED,
   DAG_COMMIT_STARTED,
-  VERTEX_COMMIT_STARTED
+  VERTEX_COMMIT_STARTED,
+  VERTEX_GROUP_COMMIT_STARTED,
+  VERTEX_GROUP_COMMIT_FINISHED
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/history/SummaryEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/SummaryEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/SummaryEvent.java
index 587ee3e..eaae813 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/SummaryEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/SummaryEvent.java
@@ -21,8 +21,19 @@ package org.apache.tez.dag.history;
 import java.io.IOException;
 import java.io.OutputStream;
 
+import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
+
 public interface SummaryEvent {
 
   public void toSummaryProtoStream(OutputStream outputStream) throws IOException;
 
+  public void fromSummaryProtoStream(SummaryEventProto proto) throws IOException;
+
+  /**
+   * Whether to write this event immediately to the DAG recovery file
+   * Summary events are always written immediately to summary file.
+   * @return
+   */
+  public boolean writeToRecoveryImmediately();
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java
index 6d5a769..627751a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java
@@ -18,26 +18,31 @@
 
 package org.apache.tez.dag.history.events;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.SummaryEvent;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.DAGCommitStartedProto;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
+import org.apache.tez.dag.utils.ProtoUtils;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-public class DAGCommitStartedEvent implements HistoryEvent {
+public class DAGCommitStartedEvent implements HistoryEvent, SummaryEvent {
 
   private TezDAGID dagID;
+  private long commitStartTime;
 
   public DAGCommitStartedEvent() {
   }
 
-  public DAGCommitStartedEvent(TezDAGID dagID) {
+  public DAGCommitStartedEvent(TezDAGID dagID, long commitStartTime) {
     this.dagID = dagID;
+    this.commitStartTime = commitStartTime;
   }
 
   @Override
@@ -91,4 +96,21 @@ public class DAGCommitStartedEvent implements HistoryEvent {
     return dagID;
   }
 
+  @Override
+  public void toSummaryProtoStream(OutputStream outputStream) throws IOException {
+    ProtoUtils.toSummaryEventProto(dagID, commitStartTime,
+        getEventType()).writeDelimitedTo(outputStream);
+  }
+
+  @Override
+  public void fromSummaryProtoStream(SummaryEventProto proto) throws IOException {
+    this.dagID = TezDAGID.fromString(proto.getDagId());
+    this.commitStartTime = proto.getTimestamp();
+  }
+
+  @Override
+  public boolean writeToRecoveryImmediately() {
+    return false;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
index 38e0702..14381b3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
@@ -18,6 +18,10 @@
 
 package org.apache.tez.dag.history.events;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.app.dag.DAGState;
@@ -28,15 +32,15 @@ import org.apache.tez.dag.history.ats.EntityTypes;
 import org.apache.tez.dag.history.utils.ATSConstants;
 import org.apache.tez.dag.history.utils.DAGUtils;
 import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.recovery.records.RecoveryProtos;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.DAGFinishedProto;
-import org.apache.tez.dag.utils.ProtoUtils;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
 import org.codehaus.jettison.json.JSONArray;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
+import com.google.common.primitives.Ints;
+import com.google.protobuf.ByteString;
 
 public class DAGFinishedEvent implements HistoryEvent, SummaryEvent {
 
@@ -166,8 +170,25 @@ public class DAGFinishedEvent implements HistoryEvent, SummaryEvent {
 
   @Override
   public void toSummaryProtoStream(OutputStream outputStream) throws IOException {
-    ProtoUtils.toSummaryEventProto(dagID, finishTime,
-        HistoryEventType.DAG_FINISHED).writeDelimitedTo(outputStream);
+    SummaryEventProto.Builder builder = RecoveryProtos.SummaryEventProto.newBuilder()
+        .setDagId(dagID.toString())
+        .setTimestamp(finishTime)
+        .setEventType(getEventType().ordinal())
+        .setEventPayload(ByteString.copyFrom(Ints.toByteArray(state.ordinal())));
+    builder.build().writeDelimitedTo(outputStream);
+  }
+
+  @Override
+  public void fromSummaryProtoStream(SummaryEventProto proto) throws IOException {
+    this.dagID = TezDAGID.fromString(proto.getDagId());
+    this.finishTime = proto.getTimestamp();
+    this.state = DAGState.values()[
+        Ints.fromByteArray(proto.getEventPayload().toByteArray())];
+  }
+
+  @Override
+  public boolean writeToRecoveryImmediately() {
+    return true;
   }
 
   public long getFinishTime() {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
index 853bea7..af83dc8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
@@ -18,6 +18,10 @@
 
 package org.apache.tez.dag.history.events;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tez.dag.api.records.DAGProtos;
@@ -29,16 +33,13 @@ import org.apache.tez.dag.history.utils.ATSConstants;
 import org.apache.tez.dag.history.utils.DAGUtils;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.DAGSubmittedProto;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
 import org.apache.tez.dag.utils.ProtoUtils;
 import org.codehaus.jettison.json.JSONArray;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-      public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
+public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
 
   private TezDAGID dagID;
   private long submitTime;
@@ -168,6 +169,17 @@ import java.io.OutputStream;
         HistoryEventType.DAG_SUBMITTED).writeDelimitedTo(outputStream);
   }
 
+  @Override
+  public void fromSummaryProtoStream(SummaryEventProto proto) throws IOException {
+    throw new UnsupportedOperationException("Cannot re-initialize event from"
+        + " summary stream");
+  }
+
+  @Override
+  public boolean writeToRecoveryImmediately() {
+    return true;
+  }
+
   public String getDAGName() {
     if (dagPlan != null && dagPlan.hasName()) {
       return dagPlan.getName();
@@ -190,4 +202,5 @@ import java.io.OutputStream;
   public long getSubmitTime() {
     return submitTime;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
index 564f9ed..387bff1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
@@ -18,26 +18,33 @@
 
 package org.apache.tez.dag.history.events;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.SummaryEvent;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.dag.recovery.records.RecoveryProtos;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexCommitStartedProto;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
+import com.google.protobuf.ByteString;
 
-public class VertexCommitStartedEvent implements HistoryEvent {
+public class VertexCommitStartedEvent implements HistoryEvent, SummaryEvent {
 
   private TezVertexID vertexID;
+  private long commitStartTime;
 
   public VertexCommitStartedEvent() {
   }
 
-  public VertexCommitStartedEvent(TezVertexID vertexId) {
+  public VertexCommitStartedEvent(TezVertexID vertexId, long commitStartTime) {
     this.vertexID = vertexId;
+    this.commitStartTime = commitStartTime;
   }
 
   @Override
@@ -91,4 +98,28 @@ public class VertexCommitStartedEvent implements HistoryEvent {
     return this.vertexID;
   }
 
+  @Override
+  public void toSummaryProtoStream(OutputStream outputStream) throws IOException {
+    SummaryEventProto.Builder builder = RecoveryProtos.SummaryEventProto.newBuilder()
+        .setDagId(vertexID.getDAGId().toString())
+        .setTimestamp(commitStartTime)
+        .setEventType(getEventType().ordinal())
+        .setEventPayload(
+            ByteString.copyFrom(vertexID.toString().getBytes()));
+    builder.build().writeDelimitedTo(outputStream);
+  }
+
+  @Override
+  public void fromSummaryProtoStream(SummaryEventProto proto) throws IOException {
+    this.vertexID = TezVertexID.fromString(
+        new String(proto.getEventPayload().toByteArray()));
+    this.commitStartTime = proto.getTimestamp();
+  }
+
+  @Override
+  public boolean writeToRecoveryImmediately() {
+    return false;
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
index 2366eb1..6f07c91 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
@@ -18,25 +18,29 @@
 
 package org.apache.tez.dag.history.events;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.app.dag.VertexState;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.SummaryEvent;
 import org.apache.tez.dag.history.ats.EntityTypes;
 import org.apache.tez.dag.history.utils.ATSConstants;
 import org.apache.tez.dag.history.utils.DAGUtils;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.dag.recovery.records.RecoveryProtos;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexFinishStateProto;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexFinishedProto;
 import org.codehaus.jettison.json.JSONArray;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-public class VertexFinishedEvent implements HistoryEvent {
+public class VertexFinishedEvent implements HistoryEvent, SummaryEvent {
 
   private TezVertexID vertexID;
   private String vertexName;
@@ -48,6 +52,7 @@ public class VertexFinishedEvent implements HistoryEvent {
   private VertexState state;
   private String diagnostics;
   private TezCounters tezCounters;
+  private boolean fromSummary = false;
 
   public VertexFinishedEvent(TezVertexID vertexId,
       String vertexName, long initRequestedTime, long initedTime, long startRequestedTime, long startedTime, long finishTime,
@@ -186,4 +191,40 @@ public class VertexFinishedEvent implements HistoryEvent {
   public TezCounters getTezCounters() {
     return tezCounters;
   }
+
+  @Override
+  public void toSummaryProtoStream(OutputStream outputStream) throws IOException {
+    VertexFinishStateProto finishStateProto =
+        VertexFinishStateProto.newBuilder()
+            .setState(state.ordinal())
+            .setVertexId(vertexID.toString())
+            .build();
+
+    SummaryEventProto.Builder builder = RecoveryProtos.SummaryEventProto.newBuilder()
+        .setDagId(vertexID.getDAGId().toString())
+        .setTimestamp(finishTime)
+        .setEventType(getEventType().ordinal())
+        .setEventPayload(finishStateProto.toByteString());
+    builder.build().writeDelimitedTo(outputStream);
+
+  }
+
+  @Override
+  public void fromSummaryProtoStream(SummaryEventProto proto) throws IOException {
+    VertexFinishStateProto finishStateProto =
+        VertexFinishStateProto.parseFrom(proto.getEventPayload());
+    this.vertexID = TezVertexID.fromString(finishStateProto.getVertexId());
+    this.state = VertexState.values()[finishStateProto.getState()];
+    this.finishTime = proto.getTimestamp();
+    this.fromSummary = true;
+  }
+
+  @Override
+  public boolean writeToRecoveryImmediately() {
+    return false;
+  }
+
+  public boolean isFromSummary() {
+    return fromSummary;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java
new file mode 100644
index 0000000..99a5288
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.events;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.SummaryEvent;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.recovery.records.RecoveryProtos;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexGroupCommitFinishedProto;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+public class VertexGroupCommitFinishedEvent implements HistoryEvent, SummaryEvent {
+
+  private TezDAGID dagID;
+  private String vertexGroupName;
+  private long commitFinishTime;
+
+  public VertexGroupCommitFinishedEvent() {
+  }
+
+  public VertexGroupCommitFinishedEvent(TezDAGID dagID,
+      String vertexGroupName, long commitFinishTime) {
+    this.dagID = dagID;
+    this.vertexGroupName = vertexGroupName;
+    this.commitFinishTime = commitFinishTime;
+  }
+
+  @Override
+  public HistoryEventType getEventType() {
+    return HistoryEventType.VERTEX_GROUP_COMMIT_FINISHED;
+  }
+
+  @Override
+  public JSONObject convertToATSJSON() throws JSONException {
+    // TODO
+    return null;
+  }
+
+  @Override
+  public boolean isRecoveryEvent() {
+    return true;
+  }
+
+  @Override
+  public boolean isHistoryEvent() {
+    return false;
+  }
+
+  public VertexGroupCommitFinishedProto toProto() {
+    return VertexGroupCommitFinishedProto.newBuilder()
+        .setDagId(dagID.toString())
+        .setVertexGroupName(vertexGroupName)
+        .build();
+  }
+
+  public void fromProto(VertexGroupCommitFinishedProto proto) {
+    this.dagID = TezDAGID.fromString(proto.getDagId());
+    this.vertexGroupName = proto.getVertexGroupName();
+  }
+
+  @Override
+  public void toProtoStream(OutputStream outputStream) throws IOException {
+    toProto().writeDelimitedTo(outputStream);
+  }
+
+  @Override
+  public void fromProtoStream(InputStream inputStream) throws IOException {
+    VertexGroupCommitFinishedProto proto = VertexGroupCommitFinishedProto.parseDelimitedFrom(inputStream);
+    fromProto(proto);
+  }
+
+  @Override
+  public String toString() {
+    return "dagId=" + dagID
+        + ", vertexGroup=" + vertexGroupName;
+  }
+
+  public String getVertexGroupName() {
+    return this.vertexGroupName;
+  }
+
+  @Override
+  public void toSummaryProtoStream(OutputStream outputStream) throws IOException {
+    SummaryEventProto.Builder builder = RecoveryProtos.SummaryEventProto.newBuilder()
+        .setDagId(dagID.toString())
+        .setTimestamp(commitFinishTime)
+        .setEventType(getEventType().ordinal())
+        .setEventPayload(toProto().toByteString());
+    builder.build().writeDelimitedTo(outputStream);
+  }
+
+  @Override
+  public void fromSummaryProtoStream(SummaryEventProto proto) throws IOException {
+    VertexGroupCommitFinishedProto vertexGroupCommitFinishedProto =
+        VertexGroupCommitFinishedProto.parseFrom(proto.getEventPayload());
+    fromProto(vertexGroupCommitFinishedProto);
+    this.commitFinishTime = proto.getTimestamp();
+  }
+
+  @Override
+  public boolean writeToRecoveryImmediately() {
+    return false;
+  }
+
+  public TezDAGID getDagID() {
+    return dagID;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java
new file mode 100644
index 0000000..04d6276
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.events;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.SummaryEvent;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.recovery.records.RecoveryProtos;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexGroupCommitStartedProto;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+public class VertexGroupCommitStartedEvent implements HistoryEvent, SummaryEvent {
+
+  private TezDAGID dagID;
+  private String vertexGroupName;
+  private long commitStartTime;
+
+  public VertexGroupCommitStartedEvent() {
+  }
+
+  public VertexGroupCommitStartedEvent(TezDAGID dagID,
+      String vertexGroupName, long commitStartTime) {
+    this.dagID = dagID;
+    this.vertexGroupName = vertexGroupName;
+    this.commitStartTime = commitStartTime;
+  }
+
+  @Override
+  public HistoryEventType getEventType() {
+    return HistoryEventType.VERTEX_GROUP_COMMIT_STARTED;
+  }
+
+  @Override
+  public JSONObject convertToATSJSON() throws JSONException {
+    // TODO
+    return null;
+  }
+
+  @Override
+  public boolean isRecoveryEvent() {
+    return true;
+  }
+
+  @Override
+  public boolean isHistoryEvent() {
+    return false;
+  }
+
+  public VertexGroupCommitStartedProto toProto() {
+    return VertexGroupCommitStartedProto.newBuilder()
+        .setDagId(dagID.toString())
+        .setVertexGroupName(vertexGroupName)
+        .build();
+  }
+
+  public void fromProto(VertexGroupCommitStartedProto proto) {
+    this.dagID = TezDAGID.fromString(proto.getDagId());
+    this.vertexGroupName = proto.getVertexGroupName();
+  }
+
+  @Override
+  public void toProtoStream(OutputStream outputStream) throws IOException {
+    toProto().writeDelimitedTo(outputStream);
+  }
+
+  @Override
+  public void fromProtoStream(InputStream inputStream) throws IOException {
+    VertexGroupCommitStartedProto proto = VertexGroupCommitStartedProto.parseDelimitedFrom(inputStream);
+    fromProto(proto);
+  }
+
+  @Override
+  public String toString() {
+    return "dagId=" + dagID
+        + ", vertexGroup=" + vertexGroupName;
+  }
+
+  public String getVertexGroupName() {
+    return this.vertexGroupName;
+  }
+
+  @Override
+  public void toSummaryProtoStream(OutputStream outputStream) throws IOException {
+    SummaryEventProto.Builder builder = RecoveryProtos.SummaryEventProto.newBuilder()
+        .setDagId(dagID.toString())
+        .setTimestamp(commitStartTime)
+        .setEventType(getEventType().ordinal())
+        .setEventPayload(toProto().toByteString());
+    builder.build().writeDelimitedTo(outputStream);
+  }
+
+  @Override
+  public void fromSummaryProtoStream(SummaryEventProto proto) throws IOException {
+    VertexGroupCommitStartedProto vertexGroupCommitStartedProto =
+        VertexGroupCommitStartedProto.parseFrom(proto.getEventPayload());
+    fromProto(vertexGroupCommitStartedProto);
+    this.commitStartTime = proto.getTimestamp();
+  }
+
+  @Override
+  public boolean writeToRecoveryImmediately() {
+    return false;
+  }
+
+  public TezDAGID getDagID() {
+    return dagID;
+  }
+
+}