You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zj...@apache.org on 2015/11/25 15:02:25 UTC

[5/9] tez git commit: TEZ-2581. Umbrella for Tez Recovery Redesign (zjffdu)

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java
index be67cb2..9a45859 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java
@@ -19,8 +19,11 @@
 package org.apache.tez.dag.app;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Random;
 
@@ -29,11 +32,21 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.api.oldrecords.TaskState;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import org.apache.tez.dag.app.RecoveryParser.DAGRecoveryData;
 import org.apache.tez.dag.app.RecoveryParser.DAGSummaryData;
-import org.apache.tez.dag.app.RecoveryParser.RecoveredDAGData;
+import org.apache.tez.dag.app.RecoveryParser.TaskAttemptRecoveryData;
+import org.apache.tez.dag.app.RecoveryParser.TaskRecoveryData;
+import org.apache.tez.dag.app.RecoveryParser.VertexRecoveryData;
 import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.app.dag.VertexState;
 import org.apache.tez.dag.app.dag.impl.DAGImpl;
 import org.apache.tez.dag.app.dag.impl.TestDAGImpl;
 import org.apache.tez.dag.history.DAGHistoryEvent;
@@ -42,10 +55,27 @@ import org.apache.tez.dag.history.events.DAGFinishedEvent;
 import org.apache.tez.dag.history.events.DAGInitializedEvent;
 import org.apache.tez.dag.history.events.DAGStartedEvent;
 import org.apache.tez.dag.history.events.DAGSubmittedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
+import org.apache.tez.dag.history.events.TaskFinishedEvent;
+import org.apache.tez.dag.history.events.TaskStartedEvent;
+import org.apache.tez.dag.history.events.VertexCommitStartedEvent;
+import org.apache.tez.dag.history.events.VertexFinishedEvent;
+import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent;
+import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent;
+import org.apache.tez.dag.history.events.VertexInitializedEvent;
+import org.apache.tez.dag.history.events.VertexStartedEvent;
 import org.apache.tez.dag.history.recovery.RecoveryService;
 import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.impl.TezEvent;
 import org.junit.*;
 
+import com.google.common.collect.Lists;
+
 import static org.junit.Assert.*;
 import static org.mockito.Mockito.*;
 
@@ -75,6 +105,7 @@ public class TestRecoveryParser {
     mockDAGImpl = mock(DAGImpl.class);
     when(mockAppMaster.createDAG(any(DAGPlan.class), any(TezDAGID.class))).thenReturn(mockDAGImpl);
     parser = new RecoveryParser(mockAppMaster, localFS, recoveryPath, 3);
+    LogManager.getRootLogger().setLevel(Level.DEBUG);
   }
 
   private DAGSummaryData createDAGSummaryData(TezDAGID dagId, boolean completed) {
@@ -155,14 +186,14 @@ public class TestRecoveryParser {
         new DAGStartedEvent(dagID, 1L, "user", "dag1")));
     rService.stop();
 
-    RecoveredDAGData dagData = parser.parseRecoveryData();
+    DAGRecoveryData dagData = parser.parseRecoveryData();
     assertEquals(true, dagData.nonRecoverable);
     assertTrue(dagData.reason.contains("DAG Commit was in progress, not recoverable,"));
     // DAGSubmittedEvent is handled but DAGInitializedEvent and DAGStartedEvent in the next attempt are both skipped
     // due to the dag is not recoerable.
     verify(mockAppMaster).createDAG(any(DAGPlan.class),any(TezDAGID.class));
-    verify(dagData.recoveredDAG, never()).restoreFromEvent(isA(DAGInitializedEvent.class));
-    verify(dagData.recoveredDAG, never()).restoreFromEvent(isA(DAGStartedEvent.class));
+    assertNull(dagData.getDAGInitializedEvent());
+    assertNull(dagData.getDAGStartedEvent());
   }
 
   // skipAllOtherEvents due to dag finished
@@ -202,7 +233,7 @@ public class TestRecoveryParser {
        new DAGStartedEvent(dagID, 1L, "user", "dag1")));
     rService.stop();
 
-    RecoveredDAGData dagData = parser.parseRecoveryData();
+    DAGRecoveryData dagData = parser.parseRecoveryData();
     assertEquals(false, dagData.nonRecoverable);
     assertEquals(DAGState.FAILED, dagData.dagState);
     assertEquals(true, dagData.isCompleted);
@@ -210,9 +241,8 @@ public class TestRecoveryParser {
     verify(mockAppMaster).createDAG(any(DAGPlan.class),any(TezDAGID.class));
     // DAGInitializedEvent may not been handled before DAGFinishedEvent,
     // because DAGFinishedEvent's writeToRecoveryImmediately is true
-    verify(dagData.recoveredDAG).restoreFromEvent(isA(DAGFinishedEvent.class));
-    // DAGStartedEvent is skipped due to it is after DAGFinishedEvent
-    verify(dagData.recoveredDAG, never()).restoreFromEvent(isA(DAGStartedEvent.class));
+    assertNotNull(dagData.getDAGFinishedEvent());
+    assertNull(dagData.getDAGStartedEvent());
   }
 
   @Test(timeout = 5000)
@@ -250,13 +280,13 @@ public class TestRecoveryParser {
     rService.stop();
 
     // corrupted last records will be skipped but the whole recovery logs will be read
-    RecoveredDAGData dagData = parser.parseRecoveryData();
+    DAGRecoveryData dagData = parser.parseRecoveryData();
     assertEquals(false, dagData.isCompleted);
     assertEquals(null, dagData.reason);
     assertEquals(false, dagData.nonRecoverable);
     // verify DAGSubmitedEvent & DAGInititlizedEvent is handled.
     verify(mockAppMaster).createDAG(any(DAGPlan.class),any(TezDAGID.class));
-    verify(dagData.recoveredDAG).restoreFromEvent(isA(DAGInitializedEvent.class));
+    assertNotNull(dagData.getDAGInitializedEvent());
   }
 
   @Test(timeout = 5000)
@@ -293,4 +323,434 @@ public class TestRecoveryParser {
     }
   }
 
+  @Test(timeout=5000)
+  public void testRecoverableSummary_DAGInCommitting() throws IOException {
+    ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+    AppContext appContext = mock(AppContext.class);
+    when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(recoveryPath+"/1"));
+    when(appContext.getClock()).thenReturn(new SystemClock());
+    when(mockDAGImpl.getID()).thenReturn(dagID);
+
+    RecoveryService rService = new RecoveryService(appContext);
+    Configuration conf = new Configuration();
+    conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true);
+    rService.init(conf);
+    rService.start();
+
+    DAGPlan dagPlan = TestDAGImpl.createTestDAGPlan();
+    // write a DAGSubmittedEvent first to initialize summaryStream
+    rService.handle(new DAGHistoryEvent(dagID,
+        new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
+            null, "user", new Configuration(), null)));
+    // It should be fine to skip other events, just for testing.
+    rService.handle(new DAGHistoryEvent(dagID,
+        new DAGCommitStartedEvent(dagID, 0L)));
+    rService.stop();
+
+    DAGRecoveryData dagData = parser.parseRecoveryData();
+    assertEquals(dagID, dagData.recoveredDagID);
+    assertTrue(dagData.nonRecoverable);
+    assertTrue(dagData.reason.contains("DAG Commit was in progress"));
+  }
+  
+  @Test(timeout=5000)
+  public void testRecoverableSummary_DAGFinishCommitting() throws IOException {
+    ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
+    TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+    AppContext appContext = mock(AppContext.class);
+    when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(recoveryPath+"/1"));
+    when(appContext.getClock()).thenReturn(new SystemClock());
+    when(mockDAGImpl.getID()).thenReturn(dagID);
+
+    RecoveryService rService = new RecoveryService(appContext);
+    Configuration conf = new Configuration();
+    conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true);
+    rService.init(conf);
+    rService.start();
+
+    DAGPlan dagPlan = TestDAGImpl.createTestDAGPlan();
+    // write a DAGSubmittedEvent first to initialize summaryStream
+    rService.handle(new DAGHistoryEvent(dagID,
+        new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
+            null, "user", new Configuration(), null)));
+    // It should be fine to skip other events, just for testing.
+    rService.handle(new DAGHistoryEvent(dagID,
+        new DAGCommitStartedEvent(dagID, 0L)));
+    rService.handle(new DAGHistoryEvent(dagID,
+        new DAGFinishedEvent(dagID, 1L, 2L, DAGState.FAILED, "diag", null, "user", "dag1", null,
+            appAttemptId)));
+    rService.stop();
+
+    DAGRecoveryData dagData = parser.parseRecoveryData();
+    assertEquals(dagID, dagData.recoveredDagID);
+    assertEquals(DAGState.FAILED, dagData.dagState);
+    assertFalse(dagData.nonRecoverable);
+    assertNull(dagData.reason);
+    assertTrue(dagData.isCompleted);
+  }
+
+  @Test(timeout=5000)
+  public void testRecoverableSummary_VertexInCommitting() throws IOException {
+    ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+    AppContext appContext = mock(AppContext.class);
+    when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(recoveryPath+"/1"));
+    when(appContext.getClock()).thenReturn(new SystemClock());
+    when(mockDAGImpl.getID()).thenReturn(dagID);
+
+    RecoveryService rService = new RecoveryService(appContext);
+    Configuration conf = new Configuration();
+    conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true);
+    rService.init(conf);
+    rService.start();
+
+    DAGPlan dagPlan = TestDAGImpl.createTestDAGPlan();
+    // write a DAGSubmittedEvent first to initialize summaryStream
+    rService.handle(new DAGHistoryEvent(dagID,
+        new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
+            null, "user", new Configuration(), null)));
+    // It should be fine to skip other events, just for testing.
+    rService.handle(new DAGHistoryEvent(dagID,
+        new VertexCommitStartedEvent(TezVertexID.getInstance(dagID, 0), 0L)));
+    rService.stop();
+
+    DAGRecoveryData dagData = parser.parseRecoveryData();
+    assertEquals(dagID, dagData.recoveredDagID);
+    assertTrue(dagData.nonRecoverable);
+    assertTrue(dagData.reason.contains("Vertex Commit was in progress"));
+  }
+
+  @Test(timeout=5000)
+  public void testRecoverableSummary_VertexFinishCommitting() throws IOException {
+    ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+    AppContext appContext = mock(AppContext.class);
+    when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(recoveryPath+"/1"));
+    when(appContext.getClock()).thenReturn(new SystemClock());
+    when(mockDAGImpl.getID()).thenReturn(dagID);
+
+    RecoveryService rService = new RecoveryService(appContext);
+    Configuration conf = new Configuration();
+    conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true);
+    rService.init(conf);
+    rService.start();
+
+    DAGPlan dagPlan = TestDAGImpl.createTestDAGPlan();
+    // write a DAGSubmittedEvent first to initialize summaryStream
+    rService.handle(new DAGHistoryEvent(dagID,
+        new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
+            null, "user", new Configuration(), null)));
+    // It should be fine to skip other events, just for testing.
+    TezVertexID vertexId = TezVertexID.getInstance(dagID, 0);
+    rService.handle(new DAGHistoryEvent(dagID,
+        new VertexCommitStartedEvent(vertexId, 0L)));
+    rService.handle(new DAGHistoryEvent(dagID,
+        new VertexFinishedEvent(vertexId, "v1", 10, 0L, 0L, 
+            0L, 0L, 0L, VertexState.SUCCEEDED, 
+            "", null, null, null)));
+    rService.stop();
+
+    DAGRecoveryData dagData = parser.parseRecoveryData();
+    assertEquals(dagID, dagData.recoveredDagID);
+    assertFalse(dagData.nonRecoverable);
+  }
+
+  @Test(timeout=5000)
+  public void testRecoverableSummary_VertexGroupInCommitting() throws IOException {
+    ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+    AppContext appContext = mock(AppContext.class);
+    when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(recoveryPath+"/1"));
+    when(appContext.getClock()).thenReturn(new SystemClock());
+    when(mockDAGImpl.getID()).thenReturn(dagID);
+
+    RecoveryService rService = new RecoveryService(appContext);
+    Configuration conf = new Configuration();
+    conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true);
+    rService.init(conf);
+    rService.start();
+
+    DAGPlan dagPlan = TestDAGImpl.createTestDAGPlan();
+    // write a DAGSubmittedEvent first to initialize summaryStream
+    rService.handle(new DAGHistoryEvent(dagID,
+        new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
+            null, "user", new Configuration(), null)));
+    // It should be fine to skip other events, just for testing.
+    rService.handle(new DAGHistoryEvent(dagID,
+        new VertexGroupCommitStartedEvent(dagID, "group_1", 
+            Lists.newArrayList(TezVertexID.getInstance(dagID, 0), TezVertexID.getInstance(dagID, 1)), 0L)));
+    rService.stop();
+
+    DAGRecoveryData dagData = parser.parseRecoveryData();
+    assertEquals(dagID, dagData.recoveredDagID);
+    assertTrue(dagData.nonRecoverable);
+    assertTrue(dagData.reason.contains("Vertex Group Commit was in progress"));
+  }
+  
+  @Test(timeout=5000)
+  public void testRecoverableSummary_VertexGroupFinishCommitting() throws IOException {
+    ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+    AppContext appContext = mock(AppContext.class);
+    when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(recoveryPath+"/1"));
+    when(appContext.getClock()).thenReturn(new SystemClock());
+    when(mockDAGImpl.getID()).thenReturn(dagID);
+
+    RecoveryService rService = new RecoveryService(appContext);
+    Configuration conf = new Configuration();
+    conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true);
+    rService.init(conf);
+    rService.start();
+
+    DAGPlan dagPlan = TestDAGImpl.createTestDAGPlan();
+    // write a DAGSubmittedEvent first to initialize summaryStream
+    rService.handle(new DAGHistoryEvent(dagID,
+        new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
+            null, "user", new Configuration(), null)));
+    // It should be fine to skip other events, just for testing.
+    TezVertexID v0 = TezVertexID.getInstance(dagID, 0);
+    TezVertexID v1 = TezVertexID.getInstance(dagID, 1);
+    rService.handle(new DAGHistoryEvent(dagID,
+        new VertexGroupCommitStartedEvent(dagID, "group_1", 
+            Lists.newArrayList(v0, v1), 0L)));
+    rService.handle(new DAGHistoryEvent(dagID,
+        new VertexGroupCommitFinishedEvent(dagID, "group_1", 
+            Lists.newArrayList(v0, v1), 0L)));
+    // also write VertexFinishedEvent, otherwise it is still non-recoverable
+    // when checking with non-summary event
+    rService.handle(new DAGHistoryEvent(dagID,
+        new VertexFinishedEvent(v0, "v1", 10, 0L, 0L, 
+            0L, 0L, 0L, VertexState.SUCCEEDED, 
+            "", null, null, null)));
+    rService.handle(new DAGHistoryEvent(dagID,
+        new VertexFinishedEvent(v1, "v1", 10, 0L, 0L, 
+            0L, 0L, 0L, VertexState.SUCCEEDED, 
+            "", null, null, null)));
+    rService.stop();
+    
+    DAGRecoveryData dagData = parser.parseRecoveryData();
+    assertEquals(dagID, dagData.recoveredDagID);
+    assertFalse(dagData.nonRecoverable);
+  }
+  
+  @Test(timeout=5000)
+  public void testRecoverableNonSummary1() throws IOException {
+    ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+    AppContext appContext = mock(AppContext.class);
+    when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(recoveryPath+"/1"));
+    when(appContext.getClock()).thenReturn(new SystemClock());
+    when(mockDAGImpl.getID()).thenReturn(dagID);
+
+    // MockRecoveryService will skip the non-summary event
+    MockRecoveryService rService = new MockRecoveryService(appContext);
+    Configuration conf = new Configuration();
+    conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true);
+    rService.init(conf);
+    rService.start();
+
+    DAGPlan dagPlan = TestDAGImpl.createTestDAGPlan();
+    // write a DAGSubmittedEvent first to initialize summaryStream
+    rService.handle(new DAGHistoryEvent(dagID,
+        new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
+            null, "user", new Configuration(), null)));
+    // It should be fine to skip other events, just for testing.
+    TezVertexID vertexId = TezVertexID.getInstance(dagID, 0);
+    rService.handle(new DAGHistoryEvent(dagID,
+        new VertexCommitStartedEvent(vertexId, 0L)));
+    rService.handle(new DAGHistoryEvent(dagID,
+        new VertexFinishedEvent(vertexId, "v1", 10, 0L, 0L, 
+            0L, 0L, 0L, VertexState.SUCCEEDED, 
+            "", null, null, null)));
+    rService.stop();
+
+    DAGRecoveryData dagData = parser.parseRecoveryData();
+    assertTrue(dagData.nonRecoverable);
+    assertTrue(dagData.reason.contains("Vertex has been committed, but its full recovery events are not seen"));
+  }
+  
+  @Test(timeout=5000)
+  public void testRecoverableNonSummary2() throws IOException {
+    ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+    AppContext appContext = mock(AppContext.class);
+    when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(recoveryPath+"/1"));
+    when(appContext.getClock()).thenReturn(new SystemClock());
+    when(mockDAGImpl.getID()).thenReturn(dagID);
+
+    // MockRecoveryService will skip the non-summary event
+    MockRecoveryService rService = new MockRecoveryService(appContext);
+    Configuration conf = new Configuration();
+    conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true);
+    rService.init(conf);
+    rService.start();
+
+    DAGPlan dagPlan = TestDAGImpl.createTestDAGPlan();
+    // write a DAGSubmittedEvent first to initialize summaryStream
+    rService.handle(new DAGHistoryEvent(dagID,
+        new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
+            null, "user", new Configuration(), null)));
+    // It should be fine to skip other events, just for testing.
+    TezVertexID vertexId = TezVertexID.getInstance(dagID, 0);
+    rService.handle(new DAGHistoryEvent(dagID,
+        new VertexGroupCommitStartedEvent(dagID, "group_1", 
+            Lists.newArrayList(TezVertexID.getInstance(dagID, 0), TezVertexID.getInstance(dagID, 1)), 0L)));
+    rService.handle(new DAGHistoryEvent(dagID,
+        new VertexGroupCommitFinishedEvent(dagID, "group_1", 
+            Lists.newArrayList(TezVertexID.getInstance(dagID, 0), TezVertexID.getInstance(dagID, 1)), 0L)));
+    rService.stop();
+
+    DAGRecoveryData dagData = parser.parseRecoveryData();
+    assertTrue(dagData.nonRecoverable);
+    assertTrue(dagData.reason.contains("Vertex has been committed as member of vertex group"
+              + ", but its full recovery events are not seen"));
+  }
+
+  @Test(timeout=5000)
+  public void testRecoveryData() throws IOException {
+    ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
+    AppContext appContext = mock(AppContext.class);
+    when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(recoveryPath+"/1"));
+    when(appContext.getClock()).thenReturn(new SystemClock());
+    when(mockDAGImpl.getID()).thenReturn(dagID);
+
+    RecoveryService rService = new RecoveryService(appContext);
+    Configuration conf = new Configuration();
+    conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true);
+    rService.init(conf);
+    rService.start();
+
+    DAGPlan dagPlan = TestDAGImpl.createTestDAGPlan();
+    // DAG  DAGSubmittedEvent -> DAGInitializedEvent -> DAGStartedEvent
+    rService.handle(new DAGHistoryEvent(dagID,
+        new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
+            null, "user", new Configuration(), null)));
+    DAGInitializedEvent dagInitedEvent = new DAGInitializedEvent(dagID, 100L, 
+        "user", "dagName", null);
+    DAGStartedEvent dagStartedEvent = new DAGStartedEvent(dagID, 0L, "user", "dagName");
+    rService.handle(new DAGHistoryEvent(dagID, dagInitedEvent));
+    rService.handle(new DAGHistoryEvent(dagID, dagStartedEvent));
+    
+    // 3 vertices of this dag: v0, v1, v2
+    TezVertexID v0Id = TezVertexID.getInstance(dagID, 0);
+    TezVertexID v1Id = TezVertexID.getInstance(dagID, 1);
+    TezVertexID v2Id = TezVertexID.getInstance(dagID, 2);
+    // v0 VertexInitializedEvent
+    VertexInitializedEvent v0InitedEvent =  new VertexInitializedEvent(v0Id, "v0", 200L, 400L, 2, null, null, null);
+    rService.handle(new DAGHistoryEvent(dagID, v0InitedEvent));
+    // v1 VertexFinishedEvent(KILLED)
+    VertexFinishedEvent v1FinishedEvent = new VertexFinishedEvent(v1Id, "v1", 2, 300L, 400L, 
+        500L, 600L, 700L, VertexState.KILLED, 
+        "", null, null, null);
+    rService.handle(new DAGHistoryEvent(dagID, v1FinishedEvent));
+    // v2 VertexInitializedEvent -> VertexStartedEvent
+    List<TezEvent> initGeneratedEvents = Lists.newArrayList(
+        new TezEvent(DataMovementEvent.create(ByteBuffer.wrap(new byte[0])), null));
+    VertexInitializedEvent v2InitedEvent = new VertexInitializedEvent(v2Id, "v2", 200L, 300L,
+        2, null, null, initGeneratedEvents);
+    VertexStartedEvent v2StartedEvent = new VertexStartedEvent(v2Id, 0L, 0L);
+    rService.handle(new DAGHistoryEvent(dagID, v2InitedEvent));
+    rService.handle(new DAGHistoryEvent(dagID, v2StartedEvent));
+
+    // 3 tasks of v2
+    TezTaskID t0v2Id = TezTaskID.getInstance(v2Id, 0);
+    TezTaskID t1v2Id = TezTaskID.getInstance(v2Id, 1);
+    TezTaskID t2v2Id = TezTaskID.getInstance(v2Id, 2);
+    // t0v2 TaskStartedEvent
+    TaskStartedEvent t0v2StartedEvent = new TaskStartedEvent(t0v2Id, "v2", 400L, 5000L);
+    rService.handle(new DAGHistoryEvent(dagID, t0v2StartedEvent));
+    // t1v2 TaskFinishedEvent
+    TaskFinishedEvent t1v2FinishedEvent = new TaskFinishedEvent(t1v2Id, "v1",
+        0L, 0L, null, TaskState.KILLED, "", null, 4);
+    rService.handle(new DAGHistoryEvent(dagID, t1v2FinishedEvent));
+    // t2v2 TaskStartedEvent -> TaskFinishedEvent
+    TaskStartedEvent t2v2StartedEvent = new TaskStartedEvent(t2v2Id, "v2", 400L, 500L);
+    rService.handle(new DAGHistoryEvent(dagID, t2v2StartedEvent));
+    TaskFinishedEvent t2v2FinishedEvent = new TaskFinishedEvent(t2v2Id, "v1",
+        0L, 0L, null, TaskState.SUCCEEDED, "", null, 4);
+    rService.handle(new DAGHistoryEvent(dagID, t2v2FinishedEvent));
+
+    // attempts under t0v2
+    ContainerId containerId = ContainerId.newContainerId(appAttemptId, 1);
+    NodeId nodeId = NodeId.newInstance("localhost", 9999);
+    TezTaskAttemptID ta0t0v2Id = TezTaskAttemptID.getInstance(t0v2Id, 0);
+    TaskAttemptStartedEvent ta0t0v2StartedEvent = new TaskAttemptStartedEvent(
+        ta0t0v2Id, "v1", 0L, containerId, 
+        nodeId, "", "", "");
+    rService.handle(new DAGHistoryEvent(dagID, ta0t0v2StartedEvent));
+    // attempts under t2v2
+    TezTaskAttemptID ta0t2v2Id = TezTaskAttemptID.getInstance(t2v2Id, 0);
+    TaskAttemptStartedEvent ta0t2v2StartedEvent = new TaskAttemptStartedEvent(
+        ta0t2v2Id, "v1", 500L, containerId, 
+        nodeId, "", "", "");
+    rService.handle(new DAGHistoryEvent(dagID, ta0t2v2StartedEvent));
+    TaskAttemptFinishedEvent ta0t2v2FinishedEvent = new TaskAttemptFinishedEvent(
+        ta0t2v2Id, "v1", 500L, 600L, 
+        TaskAttemptState.SUCCEEDED, null, "", null, 
+        null, null, 0L, null, 0L);
+    rService.handle(new DAGHistoryEvent(dagID, ta0t2v2FinishedEvent));
+
+    rService.stop();
+
+    DAGRecoveryData dagData = parser.parseRecoveryData();
+    assertFalse(dagData.nonRecoverable);
+    // There's no equals method for the history event, so here only verify the init/start/finish time of each event for simplicity
+    assertEquals(dagInitedEvent.getInitTime(), dagData.getDAGInitializedEvent().getInitTime());
+    assertEquals(dagStartedEvent.getStartTime(), dagData.getDAGStartedEvent().getStartTime());
+    assertNull(dagData.getDAGFinishedEvent());
+
+    VertexRecoveryData v0Data = dagData.getVertexRecoveryData(v0Id);
+    VertexRecoveryData v1Data = dagData.getVertexRecoveryData(v1Id);
+    VertexRecoveryData v2Data = dagData.getVertexRecoveryData(v2Id);
+    assertNotNull(v0Data);
+    assertNotNull(v1Data);
+    assertNotNull(v2Data);
+    assertEquals(v0InitedEvent.getInitedTime(), v0Data.getVertexInitedEvent().getInitedTime());
+    assertNull(v0Data.getVertexStartedEvent());
+    assertNull(v1Data.getVertexInitedEvent());
+    assertEquals(v1FinishedEvent.getFinishTime(), v1Data.getVertexFinishedEvent().getFinishTime());
+    assertEquals(v2InitedEvent.getInitedTime(), v2Data.getVertexInitedEvent().getInitedTime());
+    assertEquals(v2StartedEvent.getStartTime(), v2Data.getVertexStartedEvent().getStartTime());
+
+    TaskRecoveryData t0v2Data = dagData.getTaskRecoveryData(t0v2Id);
+    TaskRecoveryData t1v2Data = dagData.getTaskRecoveryData(t1v2Id);
+    TaskRecoveryData t2v2Data = dagData.getTaskRecoveryData(t2v2Id);
+    assertNotNull(t0v2Data);
+    assertNotNull(t1v2Data);
+    assertNotNull(t2v2Data);
+    assertEquals(t0v2StartedEvent.getStartTime(), t0v2Data.getTaskStartedEvent().getStartTime());
+    assertNull(t0v2Data.getTaskFinishedEvent());
+    assertEquals(t1v2FinishedEvent.getFinishTime(), t1v2Data.getTaskFinishedEvent().getFinishTime());
+    assertNull(t1v2Data.getTaskStartedEvent());
+    assertEquals(t2v2StartedEvent.getStartTime(), t2v2Data.getTaskStartedEvent().getStartTime());
+    assertEquals(t2v2FinishedEvent.getFinishTime(), t2v2Data.getTaskFinishedEvent().getFinishTime());
+
+    TaskAttemptRecoveryData ta0t0v2Data = dagData.getTaskAttemptRecoveryData(ta0t0v2Id);
+    TaskAttemptRecoveryData ta0t2v2Data = dagData.getTaskAttemptRecoveryData(ta0t2v2Id);
+    assertNotNull(ta0t0v2Data);
+    assertNotNull(ta0t2v2Data);
+    assertEquals(ta0t0v2StartedEvent.getStartTime(), ta0t0v2Data.getTaskAttemptStartedEvent().getStartTime());
+    assertNull(ta0t0v2Data.getTaskAttemptFinishedEvent());
+    assertEquals(ta0t2v2StartedEvent.getStartTime(), ta0t2v2Data.getTaskAttemptStartedEvent().getStartTime());
+    assertEquals(ta0t2v2FinishedEvent.getFinishTime(), ta0t2v2Data.getTaskAttemptFinishedEvent().getFinishTime());
+  }
+
+  // Simulate the behavior that summary event is written 
+  // but non-summary is not written to hdfs
+  public static class MockRecoveryService extends RecoveryService{
+
+    public MockRecoveryService(AppContext appContext) {
+      super(appContext);
+    }
+
+    @Override
+    protected void handleRecoveryEvent(DAGHistoryEvent event)
+        throws IOException {
+      // skip the non-summary events
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java
index 17fa4d9..3f80928 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java
@@ -35,6 +35,7 @@ import java.util.Map;
 import java.util.Random;
 
 import com.google.common.collect.Lists;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
@@ -68,7 +69,9 @@ import org.apache.tez.common.TezTaskUmbilicalProtocol;
 import org.apache.tez.dag.api.TaskCommunicatorContext;
 import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventTezEventUpdate;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
 import org.apache.tez.dag.app.dag.event.VertexEventType;
@@ -83,6 +86,8 @@ import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
 import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
+import org.apache.tez.runtime.api.impl.EventMetaData;
+import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
 import org.apache.tez.runtime.api.impl.EventType;
 import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.apache.tez.runtime.api.impl.TezEvent;
@@ -235,15 +240,18 @@ public class TestTaskCommunicatorManager1 {
   @Test (timeout = 5000)
   public void testTaskEventRouting() throws Exception {
     List<TezEvent> events =  Arrays.asList(
-      new TezEvent(new TaskStatusUpdateEvent(null, 0.0f, null, false), null),
-      new TezEvent(DataMovementEvent.create(0, ByteBuffer.wrap(new byte[0])), null),
-      new TezEvent(new TaskAttemptCompletedEvent(), null)
+      new TezEvent(new TaskStatusUpdateEvent(null, 0.0f, null, false), new EventMetaData(EventProducerConsumerType.PROCESSOR,
+          "v1", "v2", taskAttemptID)),
+      new TezEvent(DataMovementEvent.create(0, ByteBuffer.wrap(new byte[0])), new EventMetaData(EventProducerConsumerType.OUTPUT,
+          "v1", "v2", taskAttemptID)),
+      new TezEvent(new TaskAttemptCompletedEvent(), new EventMetaData(EventProducerConsumerType.SYSTEM,
+          "v1", "v2", taskAttemptID))
     );
 
     generateHeartbeat(events, 0, 1, 0, new ArrayList<TezEvent>());
 
     ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
-    verify(eventHandler, times(2)).handle(arg.capture());
+    verify(eventHandler, times(4)).handle(arg.capture());
     final List<Event> argAllValues = arg.getAllValues();
 
     final Event statusUpdateEvent = argAllValues.get(0);
@@ -251,28 +259,33 @@ public class TestTaskCommunicatorManager1 {
         statusUpdateEvent.getType());
     assertEquals(false, ((TaskAttemptEventStatusUpdate)statusUpdateEvent).getReadErrorReported());
 
-    final Event vertexEvent = argAllValues.get(1);
-    final VertexEventRouteEvent vertexRouteEvent = (VertexEventRouteEvent)vertexEvent;
-    assertEquals("First event should be routed to vertex", VertexEventType.V_ROUTE_EVENT,
-        vertexEvent.getType());
+    final TaskAttemptEventTezEventUpdate taEvent = (TaskAttemptEventTezEventUpdate)argAllValues.get(1);
+    assertEquals(1, taEvent.getTezEvents().size());
+    assertEquals(EventType.DATA_MOVEMENT_EVENT,
+        taEvent.getTezEvents().get(0).getEventType());
+    
+    final TaskAttemptEvent taCompleteEvent = (TaskAttemptEvent)argAllValues.get(2);
+    assertEquals(TaskAttemptEventType.TA_DONE, taCompleteEvent.getType());
+    final VertexEventRouteEvent vertexRouteEvent = (VertexEventRouteEvent)argAllValues.get(3);
+    assertEquals(1, vertexRouteEvent.getEvents().size());
     assertEquals(EventType.DATA_MOVEMENT_EVENT,
         vertexRouteEvent.getEvents().get(0).getEventType());
-    assertEquals(EventType.TASK_ATTEMPT_COMPLETED_EVENT,
-        vertexRouteEvent.getEvents().get(1).getEventType());
   }
   
   @Test (timeout = 5000)
   public void testTaskEventRoutingWithReadError() throws Exception {
     List<TezEvent> events =  Arrays.asList(
       new TezEvent(new TaskStatusUpdateEvent(null, 0.0f, null, false), null),
-      new TezEvent(InputReadErrorEvent.create("", 0, 0), null),
-      new TezEvent(new TaskAttemptCompletedEvent(), null)
+      new TezEvent(InputReadErrorEvent.create("", 0, 0), new EventMetaData(EventProducerConsumerType.INPUT,
+          "v2", "v1", taskAttemptID)),
+      new TezEvent(new TaskAttemptCompletedEvent(), new EventMetaData(EventProducerConsumerType.SYSTEM,
+          "v1", "v2", taskAttemptID))
     );
 
     generateHeartbeat(events, 0, 1, 0, new ArrayList<TezEvent>());
 
     ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
-    verify(eventHandler, times(2)).handle(arg.capture());
+    verify(eventHandler, times(3)).handle(arg.capture());
     final List<Event> argAllValues = arg.getAllValues();
 
     final Event statusUpdateEvent = argAllValues.get(0);
@@ -280,22 +293,24 @@ public class TestTaskCommunicatorManager1 {
         statusUpdateEvent.getType());
     assertEquals(true, ((TaskAttemptEventStatusUpdate)statusUpdateEvent).getReadErrorReported());
 
-    final Event vertexEvent = argAllValues.get(1);
+    final Event taFinishedEvent = argAllValues.get(1);
+    assertEquals("Second event should be TA_DONE", TaskAttemptEventType.TA_DONE,
+        taFinishedEvent.getType());
+
+    final Event vertexEvent = argAllValues.get(2);
     final VertexEventRouteEvent vertexRouteEvent = (VertexEventRouteEvent)vertexEvent;
-    assertEquals("First event should be routed to vertex", VertexEventType.V_ROUTE_EVENT,
+    assertEquals("Third event should be routed to vertex", VertexEventType.V_ROUTE_EVENT,
         vertexEvent.getType());
     assertEquals(EventType.INPUT_READ_ERROR_EVENT,
         vertexRouteEvent.getEvents().get(0).getEventType());
-    assertEquals(EventType.TASK_ATTEMPT_COMPLETED_EVENT,
-        vertexRouteEvent.getEvents().get(1).getEventType());
-
   }
 
 
   @Test (timeout = 5000)
   public void testTaskEventRoutingTaskAttemptOnly() throws Exception {
     List<TezEvent> events = Arrays.asList(
-      new TezEvent(new TaskAttemptCompletedEvent(), null)
+      new TezEvent(new TaskAttemptCompletedEvent(), new EventMetaData(EventProducerConsumerType.SYSTEM,
+          "v1", "v2", taskAttemptID))
     );
     generateHeartbeat(events, 0, 1, 0, new ArrayList<TezEvent>());
 
@@ -304,7 +319,8 @@ public class TestTaskCommunicatorManager1 {
     final List<Event> argAllValues = arg.getAllValues();
 
     final Event event = argAllValues.get(0);
-    assertEquals("only event should be route event", VertexEventType.V_ROUTE_EVENT,
+    // Route to TaskAttempt directly rather than through Vertex
+    assertEquals("only event should be route event", TaskAttemptEventType.TA_DONE,
         event.getType());
   }