You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/09/29 02:35:14 UTC

[08/50] [abbrv] TEZ-1539. Change InputInitializerEvent semantics to SEND_ONCE_ON_TASK_SUCCESS. (sseth)

http://git-wip-us.apache.org/repos/asf/tez/blob/b4580a7b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 04e2219..c003e05 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -75,6 +75,7 @@ import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.RootInputLeafOutput;
 import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
@@ -116,6 +117,7 @@ import org.apache.tez.dag.app.dag.event.DAGEventType;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.dag.event.TaskEvent;
+import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
 import org.apache.tez.dag.app.dag.event.TaskEventType;
 import org.apache.tez.dag.app.dag.event.VertexEvent;
 import org.apache.tez.dag.app.dag.event.VertexEventRootInputFailed;
@@ -633,6 +635,208 @@ public class TestVertexImpl {
     return dag;
   }
 
+  private DAGPlan createDAGPlanWithRunningInitializer3() {
+    // v2    v1 (send event to v3)
+    //  \    /
+    //   \  /
+    //   v3 -----(In)
+    //  (Receive events from v1)
+    LOG.info("Setting up dag plan with running input initializer3");
+    DAGPlan dag = DAGPlan.newBuilder()
+        .setName("DagWithInputInitializer3")
+        .addVertex(
+            VertexPlan.newBuilder()
+                .setName("vertex1")
+                .setType(PlanVertexType.NORMAL)
+                .setTaskConfig(
+                    PlanTaskConfiguration.newBuilder()
+                        .setNumTasks(1)
+                        .setVirtualCores(1)
+                        .setMemoryMb(1024)
+                        .setJavaOpts("")
+                        .setTaskModule("x1.y1")
+                        .build()
+                )
+                .addOutEdgeId("e1")
+                .build()
+        )
+        .addVertex(
+            VertexPlan.newBuilder()
+                .setName("vertex2")
+                .setType(PlanVertexType.NORMAL)
+                .setTaskConfig(
+                    PlanTaskConfiguration.newBuilder()
+                        .setNumTasks(1)
+                        .setVirtualCores(1)
+                        .setMemoryMb(1024)
+                        .setJavaOpts("")
+                        .setTaskModule("x1.y1")
+                        .build()
+                )
+                .addOutEdgeId("e2")
+                .build()
+        )
+        .addVertex(
+            VertexPlan.newBuilder()
+                .setName("vertex3")
+                .setType(PlanVertexType.NORMAL)
+                .addInputs(
+                    RootInputLeafOutputProto.newBuilder()
+                        .setControllerDescriptor(
+                            TezEntityDescriptorProto.newBuilder().setClassName(
+                                "IrrelevantInitializerClassName"))
+                        .setName("input1")
+                        .setIODescriptor(
+                            TezEntityDescriptorProto.newBuilder()
+                                .setClassName("InputClazz")
+                                .build()
+                        )
+                        .build()
+                )
+                .setTaskConfig(
+                    PlanTaskConfiguration.newBuilder()
+                        .setNumTasks(20)
+                        .setVirtualCores(4)
+                        .setMemoryMb(1024)
+                        .setJavaOpts("")
+                        .setTaskModule("x2.y2")
+                        .build()
+                )
+                .addInEdgeId("e1")
+                .addInEdgeId("e2")
+                .build()
+        )
+        .addEdge(
+            EdgePlan.newBuilder()
+                .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("v1_v3"))
+                .setInputVertexName("vertex1")
+                .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o2"))
+                .setOutputVertexName("vertex3")
+                .setDataMovementType(PlanEdgeDataMovementType.BROADCAST)
+                .setId("e1")
+                .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+                .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+                .build()
+        )
+        .addEdge(
+            EdgePlan.newBuilder()
+                .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("v2_v3"))
+                .setInputVertexName("vertex2")
+                .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o2"))
+                .setOutputVertexName("vertex3")
+                .setDataMovementType(PlanEdgeDataMovementType.BROADCAST)
+                .setId("e2")
+                .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+                .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+                .build()
+        )
+        .build();
+    return dag;
+  }
+
+  private DAGPlan createDAGPlanWithRunningInitializer4() {
+    //   v1 (send event to v3)
+    //    |
+    //    |
+    //   v2   (In)    (v2 can optioanlly send events to v2. Is setup via the initializer)
+    //    |   /
+    //    |  /
+    //    v3 (Receive events from v1)
+    // Events are not generated by a directly connected vertex
+    LOG.info("Setting up dag plan with running input initializer4");
+    DAGPlan dag = DAGPlan.newBuilder()
+        .setName("DagWithInputInitializer4")
+        .addVertex(
+            VertexPlan.newBuilder()
+                .setName("vertex1")
+                .setType(PlanVertexType.NORMAL)
+                .setTaskConfig(
+                    PlanTaskConfiguration.newBuilder()
+                        .setNumTasks(1)
+                        .setVirtualCores(1)
+                        .setMemoryMb(1024)
+                        .setJavaOpts("")
+                        .setTaskModule("x1.y1")
+                        .build()
+                )
+                .addOutEdgeId("e1")
+                .build()
+        )
+        .addVertex(
+            VertexPlan.newBuilder()
+                .setName("vertex2")
+                .setType(PlanVertexType.NORMAL)
+                .setTaskConfig(
+                    PlanTaskConfiguration.newBuilder()
+                        .setNumTasks(2)
+                        .setVirtualCores(1)
+                        .setMemoryMb(1024)
+                        .setJavaOpts("")
+                        .setTaskModule("x1.y1")
+                        .build()
+                )
+                .addInEdgeId("e1")
+                .addOutEdgeId("e2")
+                .build()
+        )
+        .addVertex(
+            VertexPlan.newBuilder()
+                .setName("vertex3")
+                .setType(PlanVertexType.NORMAL)
+                .addInputs(
+                    RootInputLeafOutputProto.newBuilder()
+                        .setControllerDescriptor(
+                            TezEntityDescriptorProto.newBuilder().setClassName(
+                                "IrrelevantInitializerClassName"))
+                        .setName("input1")
+                        .setIODescriptor(
+                            TezEntityDescriptorProto.newBuilder()
+                                .setClassName("InputClazz")
+                                .build()
+                        )
+                        .build()
+                )
+                .setTaskConfig(
+                    PlanTaskConfiguration.newBuilder()
+                        .setNumTasks(20)
+                        .setVirtualCores(4)
+                        .setMemoryMb(1024)
+                        .setJavaOpts("")
+                        .setTaskModule("x2.y2")
+                        .build()
+                )
+                .addInEdgeId("e2")
+                .build()
+        )
+        .addEdge(
+            EdgePlan.newBuilder()
+                .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("v1_v3"))
+                .setInputVertexName("vertex1")
+                .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o2"))
+                .setOutputVertexName("vertex2")
+                .setDataMovementType(PlanEdgeDataMovementType.BROADCAST)
+                .setId("e1")
+                .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+                .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+                .build()
+        )
+        .addEdge(
+            EdgePlan.newBuilder()
+                .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("v2_v3"))
+                .setInputVertexName("vertex2")
+                .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o2"))
+                .setOutputVertexName("vertex3")
+                .setDataMovementType(PlanEdgeDataMovementType.BROADCAST)
+                .setId("e2")
+                .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+                .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+                .build()
+        )
+        .build();
+    return dag;
+  }
+
+
   private DAGPlan createDAGPlanWithRunningInitializer() {
     LOG.info("Setting up dag plan with running input initializer");
     DAGPlan dag = DAGPlan.newBuilder()
@@ -643,7 +847,7 @@ public class TestVertexImpl {
                 .setType(PlanVertexType.NORMAL)
                 .setTaskConfig(
                     PlanTaskConfiguration.newBuilder()
-                        .setNumTasks(10)
+                        .setNumTasks(1)
                         .setVirtualCores(4)
                         .setMemoryMb(1024)
                         .setJavaOpts("")
@@ -1742,7 +1946,7 @@ public class TestVertexImpl {
     for (PlanVertexGroupInfo groupInfo : dagPlan.getVertexGroupsList()) {
       vertexGroups.put(groupInfo.getGroupName(), new VertexGroupInfo(groupInfo));
     }
-    updateTracker = new StateChangeNotifier(dag);
+    updateTracker = new StateChangeNotifier(appContext.getCurrentDAG());
     setupVertices();
     when(dag.getVertex(any(TezVertexID.class))).thenAnswer(new Answer<Vertex>() {
       @Override
@@ -2962,16 +3166,390 @@ public class TestVertexImpl {
     Assert.assertEquals(VertexState.SUCCEEDED, v1.getState());
 
     // At this point, 2 events should have been received - since the dispatcher is complete.
-    Assert.assertEquals(2, initializer.stateUpdateEvents.size());
+    Assert.assertEquals(2, initializer.stateUpdates.size());
+    Assert.assertEquals(org.apache.tez.dag.api.event.VertexState.RUNNING,
+        initializer.stateUpdates.get(0).getVertexState());
+    Assert.assertEquals(org.apache.tez.dag.api.event.VertexState.SUCCEEDED,
+        initializer.stateUpdates.get(1).getVertexState());
+  }
+
+  @Test(timeout = 1000000)
+  public void testInputInitializerEventMultipleAttempts() throws Exception {
+    useCustomInitializer = true;
+    customInitializer = new EventHandlingRootInputInitializer(null);
+    EventHandlingRootInputInitializer initializer =
+        (EventHandlingRootInputInitializer) customInitializer;
+    setupPreDagCreation();
+    dagPlan = createDAGPlanWithRunningInitializer4();
+    setupPostDagCreation();
+
+    VertexImplWithRunningInputInitializer v1 =
+        (VertexImplWithRunningInputInitializer) vertices.get("vertex1");
+    VertexImplWithRunningInputInitializer v2 =
+        (VertexImplWithRunningInputInitializer) vertices.get("vertex2");
+    VertexImplWithRunningInputInitializer v3 =
+        (VertexImplWithRunningInputInitializer) vertices.get("vertex3");
+
+    initVertex(v1);
+    startVertex(v1);
+    dispatcher.await();
+
+    // Vertex1 start should trigger downstream vertices
+    Assert.assertEquals(VertexState.RUNNING, v1.getState());
+    Assert.assertEquals(VertexState.RUNNING, v2.getState());
+    Assert.assertEquals(VertexState.INITIALIZING, v3.getState());
+
+    ByteBuffer expected;
+
+    // Genrate events from v1 to v3's InputInitializer
+    ByteBuffer payload = ByteBuffer.allocate(12).putInt(0, 1).putInt(4, 0).putInt(8, 0);
+    InputInitializerEvent event = InputInitializerEvent.create("vertex3", "input1", payload);
+    // Create taskId and taskAttemptId for the single task that exists in vertex1
+    TezTaskID t0_v1 = TezTaskID.getInstance(v1.getVertexId(), 0);
+    TezTaskAttemptID ta0_t0_v1 = TezTaskAttemptID.getInstance(t0_v1, 0);
+    TezEvent tezEvent = new TezEvent(event,
+        new EventMetaData(EventProducerConsumerType.OUTPUT, "vertex1", "vertex3", ta0_t0_v1));
+
+    dispatcher.getEventHandler()
+        .handle(new VertexEventRouteEvent(v1.getVertexId(), Collections.singletonList(tezEvent)));
+    dispatcher.await();
+
+    payload = ByteBuffer.allocate(12).putInt(0, 1).putInt(4, 0).putInt(8, 1);
+    expected = payload;
+    event = InputInitializerEvent.create("vertex3", "input1", payload);
+    // Create taskId and taskAttemptId for the single task that exists in vertex1
+    TezTaskAttemptID ta1_t0_v1 = TezTaskAttemptID.getInstance(t0_v1, 1);
+    tezEvent = new TezEvent(event,
+        new EventMetaData(EventProducerConsumerType.OUTPUT, "vertex1", "vertex3", ta1_t0_v1));
+    dispatcher.getEventHandler()
+        .handle(new VertexEventRouteEvent(v1.getVertexId(), Collections.singletonList(tezEvent)));
+    dispatcher.await();
+
+    // Events should not be cached in the vertex, since the initializer is running
+    Assert.assertEquals(0, v3.pendingInitializerEvents.size());
+
+    // Events should be cached since the tasks have not succeeded.
+    // Verify that events are cached
+    RootInputInitializerManager.InitializerWrapper initializerWrapper =
+        v3.rootInputInitializerManager.getInitializerWrapper("input1");
+    Assert.assertEquals(1, initializerWrapper.getFirstSuccessfulAttemptMap().size());
+    Assert.assertEquals(2, initializerWrapper.getPendingEvents().get(v1.getName()).size());
+
+
+    // Get all tasks of vertex1 to succeed.
+    for (TezTaskID taskId : v1.getTasks().keySet()) {
+      // Make attempt 1 of every task succeed
+      TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance(taskId, 1);
+      v1.handle( new VertexEventTaskAttemptCompleted(taskAttemptId, TaskAttemptStateInternal.SUCCEEDED));
+      v1.handle(new VertexEventTaskCompleted(taskId, TaskState.SUCCEEDED));
+      dispatcher.await();
+      v1.stateChangeNotifier.taskSucceeded(v1.getName(), taskId, taskAttemptId.getId());
+    }
+    dispatcher.await();
+
+    // v3 would have processed an INIT event and moved into INITIALIZING state.
+    // Since source tasks were complete - the events should have been consumed.
+    // Initializer would have run, and processed events.
+    while (v3.getState()  != VertexState.RUNNING) {
+      Thread.sleep(10);
+    }
+    Assert.assertEquals(VertexState.RUNNING, v3.getState());
+
+    Assert.assertEquals(1, initializer.initializerEvents.size());
+    Assert.assertEquals(expected, initializer.initializerEvents.get(0).getUserPayload());
+
+  }
+
+  @Test(timeout = 10000)
+  public void testInputInitializerEventsMultipleSources() throws Exception {
+    useCustomInitializer = true;
+    customInitializer = new EventHandlingRootInputInitializer(null);
+    EventHandlingRootInputInitializer initializer =
+        (EventHandlingRootInputInitializer) customInitializer;
+    initializer.setNumExpectedEvents(4);
+    setupPreDagCreation();
+    dagPlan = createDAGPlanWithRunningInitializer4();
+    setupPostDagCreation();
+
+    VertexImplWithRunningInputInitializer v1 =
+        (VertexImplWithRunningInputInitializer) vertices.get("vertex1");
+    VertexImplWithRunningInputInitializer v2 =
+        (VertexImplWithRunningInputInitializer) vertices.get("vertex2");
+    VertexImplWithRunningInputInitializer v3 =
+        (VertexImplWithRunningInputInitializer) vertices.get("vertex3");
+
+    initVertex(v1);
+    startVertex(v1);
+    dispatcher.await();
+
+    // Vertex1 start should trigger downstream vertices
+    Assert.assertEquals(VertexState.RUNNING, v1.getState());
+    Assert.assertEquals(VertexState.RUNNING, v2.getState());
+    Assert.assertEquals(VertexState.INITIALIZING, v3.getState());
+
+    List<ByteBuffer> expectedPayloads = new LinkedList<ByteBuffer>();
+
+    // Genrate events from v1 to v3's InputInitializer
+    ByteBuffer payload = ByteBuffer.allocate(12).putInt(0, 1).putInt(4, 0).putInt(8, 0);
+    expectedPayloads.add(payload);
+    InputInitializerEvent event = InputInitializerEvent.create("vertex3", "input1", payload);
+    // Create taskId and taskAttemptId for the single task that exists in vertex1
+    TezTaskID t0_v1 = TezTaskID.getInstance(v1.getVertexId(), 0);
+    TezTaskAttemptID ta0_t0_v1 = TezTaskAttemptID.getInstance(t0_v1, 0);
+    TezEvent tezEvent = new TezEvent(event,
+        new EventMetaData(EventProducerConsumerType.OUTPUT, "vertex1", "vertex3", ta0_t0_v1));
+
+    dispatcher.getEventHandler()
+        .handle(new VertexEventRouteEvent(v1.getVertexId(), Collections.singletonList(tezEvent)));
+    dispatcher.await();
+
+    // Events should not be cached in the vertex, since the initializer is running
+    Assert.assertEquals(0, v3.pendingInitializerEvents.size());
+
+    // Events should be cached since the tasks have not succeeded.
+    // Verify that events are cached
+    RootInputInitializerManager.InitializerWrapper initializerWrapper =
+        v3.rootInputInitializerManager.getInitializerWrapper("input1");
+    Assert.assertEquals(1, initializerWrapper.getFirstSuccessfulAttemptMap().size());
+    Assert.assertEquals(1, initializerWrapper.getPendingEvents().get(v1.getName()).size());
+
+
+    // Get all tasks of vertex1 to succeed.
+    for (TezTaskID taskId : v1.getTasks().keySet()) {
+      TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance(taskId, 0);
+      v1.handle( new VertexEventTaskAttemptCompleted(taskAttemptId, TaskAttemptStateInternal.SUCCEEDED));
+      v1.handle(new VertexEventTaskCompleted(taskId, TaskState.SUCCEEDED));
+      dispatcher.await();
+      v1.stateChangeNotifier.taskSucceeded(v1.getName(), taskId, taskAttemptId.getId());
+    }
+    dispatcher.await();
+
+    Assert.assertEquals(1, initializer.initializerEvents.size());
+
+
+    // Test written based on this
+    Assert.assertEquals(2, v2.getTotalTasks());
+    // Generate events from v2 to v3's initializer. 1 from task 0, 2 from task 1
+    for (Task task : v2.getTasks().values()) {
+      TezTaskID taskId = task.getTaskId();
+      TezTaskAttemptID attemptId = TezTaskAttemptID.getInstance(taskId, 0);
+      int numEventsFromTask = taskId.getId() + 1;
+      for (int i = 0; i < numEventsFromTask; i++) {
+        payload = ByteBuffer.allocate(12).putInt(0, 2).putInt(4, taskId.getId()).putInt(8, i);
+        expectedPayloads.add(payload);
+        InputInitializerEvent event2 = InputInitializerEvent.create("vertex3", "input1", payload);
+        TezEvent tezEvent2 = new TezEvent(event2,
+            new EventMetaData(EventProducerConsumerType.OUTPUT, "vertex2", "vertex3", attemptId));
+        dispatcher.getEventHandler()
+            .handle(
+                new VertexEventRouteEvent(v2.getVertexId(), Collections.singletonList(tezEvent2)));
+        dispatcher.await();
+      }
+    }
+
+    // Validate queueing of these events
+    // Only v2 events pending
+    Assert.assertEquals(1, initializerWrapper.getPendingEvents().keySet().size());
+    // 3 events pending
+    Assert.assertEquals(3, initializerWrapper.getPendingEvents().get(v2.getName()).size());
+
+    // Get all tasks of vertex1 to succeed.
+    for (TezTaskID taskId : v2.getTasks().keySet()) {
+      TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance(taskId, 0);
+      v2.handle( new VertexEventTaskAttemptCompleted(taskAttemptId, TaskAttemptStateInternal.SUCCEEDED));
+      v2.handle(new VertexEventTaskCompleted(taskId, TaskState.SUCCEEDED));
+      dispatcher.await();
+      v2.stateChangeNotifier.taskSucceeded(v2.getName(), taskId, taskAttemptId.getId());
+    }
+    dispatcher.await();
+
+    // v3 would have processed an INIT event and moved into INITIALIZING state.
+    // Since source tasks were complete - the events should have been consumed.
+    // Initializer would have run, and processed events.
+    while (v3.getState()  != VertexState.RUNNING) {
+      Thread.sleep(10);
+    }
+    Assert.assertEquals(VertexState.RUNNING, v3.getState());
+
+    Assert.assertEquals(4, initializer.initializerEvents.size());
+    Assert.assertTrue(initializer.initComplete.get());
+
+    Assert.assertEquals(2, initializerWrapper.getFirstSuccessfulAttemptMap().size());
+    Assert.assertEquals(0, initializerWrapper.getPendingEvents().get(v1.getName()).size());
+
+    for (InputInitializerEvent initializerEvent : initializer.initializerEvents) {
+      expectedPayloads.remove(initializerEvent.getUserPayload());
+    }
+    Assert.assertEquals(0, expectedPayloads.size());
+
+  }
+
+  @Test(timeout = 10000)
+  public void testInputInitializerEventNoDirectConnection() throws Exception {
+    useCustomInitializer = true;
+    customInitializer = new EventHandlingRootInputInitializer(null);
+    EventHandlingRootInputInitializer initializer =
+        (EventHandlingRootInputInitializer) customInitializer;
+    setupPreDagCreation();
+    dagPlan = createDAGPlanWithRunningInitializer4();
+    setupPostDagCreation();
+
+    VertexImplWithRunningInputInitializer v1 =
+        (VertexImplWithRunningInputInitializer) vertices.get("vertex1");
+    VertexImplWithRunningInputInitializer v2 =
+        (VertexImplWithRunningInputInitializer) vertices.get("vertex2");
+    VertexImplWithRunningInputInitializer v3 =
+        (VertexImplWithRunningInputInitializer) vertices.get("vertex3");
+
+    initVertex(v1);
+    startVertex(v1);
+    dispatcher.await();
+
+    // Vertex1 start should trigger downstream vertices
+    Assert.assertEquals(VertexState.RUNNING, v1.getState());
+    Assert.assertEquals(VertexState.RUNNING, v2.getState());
+    Assert.assertEquals(VertexState.INITIALIZING, v3.getState());
+
+    // Genrate events from v1 to v3's InputInitializer
+    InputInitializerEvent event = InputInitializerEvent.create("vertex3", "input1", null);
+    // Create taskId and taskAttemptId for the single task that exists in vertex1
+    TezTaskID t0_v1 = TezTaskID.getInstance(v1.getVertexId(), 0);
+    TezTaskAttemptID ta0_t0_v1 = TezTaskAttemptID.getInstance(t0_v1, 0);
+    TezEvent tezEvent = new TezEvent(event,
+        new EventMetaData(EventProducerConsumerType.OUTPUT, "vertex1", "vertex3", ta0_t0_v1));
+
+    dispatcher.getEventHandler()
+        .handle(new VertexEventRouteEvent(v1.getVertexId(), Collections.singletonList(tezEvent)));
+    dispatcher.await();
+
+    // Events should not be cached in the vertex, since the initializer is running
+    Assert.assertEquals(0, v3.pendingInitializerEvents.size());
+
+    // Events should be cached since the tasks have not succeeded.
+    // Verify that events are cached
+    RootInputInitializerManager.InitializerWrapper initializerWrapper =
+        v3.rootInputInitializerManager.getInitializerWrapper("input1");
+    Assert.assertEquals(1, initializerWrapper.getFirstSuccessfulAttemptMap().size());
+    Assert.assertEquals(1, initializerWrapper.getPendingEvents().get(v1.getName()).size());
+
+    // Get all tasks of vertex1 to succeed.
+    for (TezTaskID taskId : v1.getTasks().keySet()) {
+      TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance(taskId, 0);
+      v1.handle( new VertexEventTaskAttemptCompleted(taskAttemptId, TaskAttemptStateInternal.SUCCEEDED));
+      v1.handle(new VertexEventTaskCompleted(taskId, TaskState.SUCCEEDED));
+      dispatcher.await();
+      v1.stateChangeNotifier.taskSucceeded(v1.getName(), taskId, taskAttemptId.getId());
+    }
+    dispatcher.await();
+
+    // v3 would have processed an INIT event and moved into INITIALIZING state.
+    // Since source tasks were complete - the events should have been consumed.
+    // Initializer would have run, and processed events.
+    while (v3.getState()  != VertexState.RUNNING) {
+      Thread.sleep(10);
+    }
+
+    Assert.assertEquals(VertexState.RUNNING, v3.getState());
+
+    Assert.assertEquals(1, initializerWrapper.getFirstSuccessfulAttemptMap().size());
+    Assert.assertEquals(0, initializerWrapper.getPendingEvents().get(v1.getName()).size());
+
+    Assert.assertTrue(initializer.eventReceived.get());
+    Assert.assertEquals(2, initializer.stateUpdates.size());
     Assert.assertEquals(org.apache.tez.dag.api.event.VertexState.RUNNING,
-        initializer.stateUpdateEvents.get(0).getVertexState());
+        initializer.stateUpdates.get(0).getVertexState());
     Assert.assertEquals(org.apache.tez.dag.api.event.VertexState.SUCCEEDED,
-        initializer.stateUpdateEvents.get(1).getVertexState());
+        initializer.stateUpdates.get(1).getVertexState());
+  }
+
+  @Test(timeout = 10000)
+  public void testInputInitializerEventsAtNew() throws Exception {
+    useCustomInitializer = true;
+    customInitializer = new EventHandlingRootInputInitializer(null);
+    EventHandlingRootInputInitializer initializer =
+        (EventHandlingRootInputInitializer) customInitializer;
+    setupPreDagCreation();
+    dagPlan = createDAGPlanWithRunningInitializer3();
+    setupPostDagCreation();
+
+    VertexImplWithRunningInputInitializer v1 =
+        (VertexImplWithRunningInputInitializer) vertices.get("vertex1");
+    VertexImplWithRunningInputInitializer v2 =
+        (VertexImplWithRunningInputInitializer) vertices.get("vertex2");
+    VertexImplWithRunningInputInitializer v3 =
+        (VertexImplWithRunningInputInitializer) vertices.get("vertex3");
+
+    initVertex(v1);
+    startVertex(v1);
+    dispatcher.await();
+
+    // Vertex2 has not been INITED, so the rest of the vertices should be in state NEW.
+    Assert.assertEquals(VertexState.RUNNING, v1.getState());
+    Assert.assertEquals(VertexState.NEW, v2.getState());
+    Assert.assertEquals(VertexState.NEW, v3.getState());
+
+    // Genrate events from v1 to v3's InputInitializer
+    InputInitializerEvent event = InputInitializerEvent.create("vertex3", "input1", null);
+    // Create taskId and taskAttemptId for the single task that exists in vertex1
+    TezTaskID t0_v1 = TezTaskID.getInstance(v1.getVertexId(), 0);
+    TezTaskAttemptID ta0_t0_v1 = TezTaskAttemptID.getInstance(t0_v1, 0);
+    TezEvent tezEvent = new TezEvent(event,
+        new EventMetaData(EventProducerConsumerType.OUTPUT, "vertex1", "vertex3", ta0_t0_v1));
+
+    dispatcher.getEventHandler()
+        .handle(new VertexEventRouteEvent(v1.getVertexId(), Collections.singletonList(tezEvent)));
+    dispatcher.await();
+
+    // Events should be cached in the vertex, since the Initializer has not started
+    Assert.assertEquals(1, v3.pendingInitializerEvents.size());
+
+    // Get Vertex1 to succeed before Vertex2 is INITED. Contrived case ? This is likely a tiny race.
+    for (TezTaskID taskId : v1.getTasks().keySet()) {
+      TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance(taskId, 0);
+      TaskImpl task = (TaskImpl)v1.getTask(taskId);
+      task.handle(new TaskEvent(taskId, TaskEventType.T_ATTEMPT_LAUNCHED));
+      task.handle(new TaskEventTAUpdate(taskAttemptId, TaskEventType.T_ATTEMPT_SUCCEEDED));
+      v1.handle(new VertexEventTaskAttemptCompleted(taskAttemptId, TaskAttemptStateInternal.SUCCEEDED));
+      v1.handle(new VertexEventTaskCompleted(taskId, TaskState.SUCCEEDED));
+      dispatcher.await();
+      v1.stateChangeNotifier.taskSucceeded(v1.getName(), taskId, taskAttemptId.getId());
+    }
+    dispatcher.await();
+
+    // Events should still be cached in the vertex
+    Assert.assertEquals(1, v3.pendingInitializerEvents.size());
+    Assert.assertEquals(VertexState.NEW, v3.getState());
+
+    // Move processing along. INIT the remaining root level vertex.
+    initVertex(v2);
+    startVertex(v2);
+    dispatcher.await();
+
+
+    // v3 would have processed an INIT event and moved into INITIALIZING state.
+    // Since source tasks were complete - the events should have been consumed.
+    // Initializer would have run, and processed events.
+    while (v3.getState()  != VertexState.RUNNING) {
+      Thread.sleep(10);
+    }
+
+    Assert.assertEquals(VertexState.RUNNING, v3.getState());
+    // Events should have been cleared from the vertex.
+    Assert.assertEquals(0, v3.pendingInitializerEvents.size());
+
+    // KK Add checks to validate thte RootInputManager doesn't remember the events either
+
+    Assert.assertTrue(initializer.eventReceived.get());
+    Assert.assertEquals(2, initializer.stateUpdates.size());
+    Assert.assertEquals(org.apache.tez.dag.api.event.VertexState.RUNNING,
+        initializer.stateUpdates.get(0).getVertexState());
+    Assert.assertEquals(org.apache.tez.dag.api.event.VertexState.SUCCEEDED,
+        initializer.stateUpdates.get(1).getVertexState());
   }
 
   @SuppressWarnings("unchecked")
   @Test(timeout = 10000)
-  public void testRootInputInitializerEvent() throws Exception {
+  public void testInputInitializerEvents() throws Exception {
     useCustomInitializer = true;
     customInitializer = new EventHandlingRootInputInitializer(null);
     EventHandlingRootInputInitializer initializer =
@@ -2998,15 +3576,36 @@ public class TestVertexImpl {
     Assert.assertFalse(initializer.eventReceived.get());
     Assert.assertFalse(initializer.initComplete.get());
 
+
     // Signal the initializer by sending an event - via vertex1
     InputInitializerEvent event = InputInitializerEvent.create("vertex2", "input1", null);
+    // Create taskId and taskAttemptId for the single task that exists in vertex1
+    TezTaskID t0_v1 = TezTaskID.getInstance(v1.getVertexId(), 0);
+    TezTaskAttemptID ta0_t0_v1 = TezTaskAttemptID.getInstance(t0_v1, 0);
     TezEvent tezEvent = new TezEvent(event,
-        new EventMetaData(EventProducerConsumerType.OUTPUT, "vertex1", null, null));
+        new EventMetaData(EventProducerConsumerType.OUTPUT, "vertex1", "vertex2", ta0_t0_v1));
 
     dispatcher.getEventHandler()
         .handle(new VertexEventRouteEvent(v1.getVertexId(), Collections.singletonList(tezEvent)));
     dispatcher.await();
 
+    // Events should not be cached in the vertex, since the initializer is running
+    Assert.assertEquals(0, v2.pendingInitializerEvents.size());
+
+    // Verify that events are cached
+    RootInputInitializerManager.InitializerWrapper initializerWrapper =
+        v2.rootInputInitializerManager.getInitializerWrapper("input1");
+    Assert.assertEquals(1, initializerWrapper.getFirstSuccessfulAttemptMap().size());
+    Assert.assertEquals(1, initializerWrapper.getPendingEvents().get(v1.getName()).size());
+
+    for (TezTaskID taskId : v1.getTasks().keySet()) {
+      TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance(taskId, 0);
+      v1.handle(new VertexEventTaskAttemptCompleted(taskAttemptId, TaskAttemptStateInternal.SUCCEEDED));
+      v1.handle(new VertexEventTaskCompleted(taskId, TaskState.SUCCEEDED));
+      dispatcher.await();
+      v1.stateChangeNotifier.taskSucceeded(v1.getName(), taskId, taskAttemptId.getId());
+    }
+
     // Both happening in separate threads
     while (!initializer.eventReceived.get()) {
       Thread.sleep(10);
@@ -3019,6 +3618,10 @@ public class TestVertexImpl {
     while (v2.getState()  != VertexState.RUNNING) {
       Thread.sleep(10);
     }
+
+    // Verify the events are no longer cached, but attempts are remembered
+    Assert.assertEquals(1, initializerWrapper.getFirstSuccessfulAttemptMap().size());
+    Assert.assertEquals(0, initializerWrapper.getPendingEvents().get(v1.getName()).size());
   }
 
   @Test(timeout = 5000)
@@ -3358,7 +3961,7 @@ public class TestVertexImpl {
     VertexImplWithControlledInitializerManager v2 = (VertexImplWithControlledInitializerManager) vertices.get("vertex2");
     Assert.assertEquals(VertexState.INITIALIZING, v2.getState());
     
-    // non-task events dont get buffered
+    // non-task events don't get buffered
     List<TezEvent> events = Lists.newLinkedList();
     TezTaskID t0_v1 = TezTaskID.getInstance(v1.getVertexId(), 0);
     TezTaskAttemptID ta0_t0_v1 = TezTaskAttemptID.getInstance(t0_v1, 0);
@@ -3954,8 +4557,10 @@ public class TestVertexImpl {
     private final ReentrantLock lock = new ReentrantLock();
     private final Condition eventCondition = lock.newCondition();
 
-    private final List<VertexStateUpdate> stateUpdateEvents = new LinkedList<VertexStateUpdate>();
+    private final List<VertexStateUpdate> stateUpdates = new LinkedList<VertexStateUpdate>();
+    private final List<InputInitializerEvent> initializerEvents = new LinkedList<InputInitializerEvent>();
     private volatile InputInitializerContext context;
+    private volatile int numExpectedEvents = 1;
 
     public EventHandlingRootInputInitializer(
         InputInitializerContext initializerContext) {
@@ -3968,7 +4573,9 @@ public class TestVertexImpl {
       initStarted.set(true);
       lock.lock();
       try {
-        eventCondition.await();
+        if (!eventReceived.get()) {
+          eventCondition.await();
+        }
       } finally {
         lock.unlock();
       }
@@ -3983,12 +4590,15 @@ public class TestVertexImpl {
     @Override
     public void handleInputInitializerEvent(List<InputInitializerEvent> events) throws
         Exception {
-      eventReceived.set(true);
-      lock.lock();
-      try {
-        eventCondition.signal();
-      } finally {
-        lock.unlock();
+      initializerEvents.addAll(events);
+      if (initializerEvents.size() == numExpectedEvents) {
+        eventReceived.set(true);
+        lock.lock();
+        try {
+          eventCondition.signal();
+        } finally {
+          lock.unlock();
+        }
       }
     }
 
@@ -3997,8 +4607,12 @@ public class TestVertexImpl {
       this.context = context;
     }
 
+    public void setNumExpectedEvents(int numEvents) {
+      this.numExpectedEvents = numEvents;
+    }
+
     public void onVertexStateUpdated(VertexStateUpdate stateUpdate) {
-      stateUpdateEvents.add(stateUpdate);
+      stateUpdates.add(stateUpdate);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/b4580a7b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
index e2f189c..9042a93 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
@@ -62,7 +62,7 @@ import org.apache.tez.dag.app.dag.event.VertexEvent;
 import org.apache.tez.dag.app.dag.event.VertexEventRecoverVertex;
 import org.apache.tez.dag.app.dag.event.VertexEventType;
 import org.apache.tez.dag.app.dag.impl.TestVertexImpl.CountingOutputCommitter;
-import org.apache.tez.dag.history.events.VertexDataMovementEventsGeneratedEvent;
+import org.apache.tez.dag.history.events.VertexRecoverableEventsGeneratedEvent;
 import org.apache.tez.dag.history.events.VertexFinishedEvent;
 import org.apache.tez.dag.history.events.VertexInitializedEvent;
 import org.apache.tez.dag.history.events.VertexStartedEvent;
@@ -403,7 +403,7 @@ public class TestVertexRecovery {
   public void testRecovery_New_Desired_RUNNING() {
     VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1");
     VertexState recoveredState =
-        vertex1.restoreFromEvent(new VertexDataMovementEventsGeneratedEvent(
+        vertex1.restoreFromEvent(new VertexRecoverableEventsGeneratedEvent(
             vertex1.getVertexId(), Lists.newArrayList(createTezEvent())));
     assertEquals(VertexState.NEW, recoveredState);
     assertEquals(1, vertex1.recoveredEvents.size());
@@ -478,7 +478,7 @@ public class TestVertexRecovery {
     restoreFromInitializedEvent(vertex1);
 
     VertexState recoveredState =
-        vertex1.restoreFromEvent(new VertexDataMovementEventsGeneratedEvent(
+        vertex1.restoreFromEvent(new VertexRecoverableEventsGeneratedEvent(
             vertex1.getVertexId(), Lists.newArrayList(createTezEvent())));
     assertEquals(VertexState.INITED, recoveredState);
 
@@ -522,7 +522,7 @@ public class TestVertexRecovery {
     assertEquals(startedTime, vertex1.startedTime);
 
     recoveredState =
-        vertex1.restoreFromEvent(new VertexDataMovementEventsGeneratedEvent(
+        vertex1.restoreFromEvent(new VertexRecoverableEventsGeneratedEvent(
             vertex1.getVertexId(), Lists.newArrayList(createTezEvent())));
     assertEquals(VertexState.RUNNING, recoveredState);
     assertEquals(1, vertex1.recoveredEvents.size());
@@ -616,7 +616,7 @@ public class TestVertexRecovery {
 
     VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3");
     VertexState recoveredState =
-        vertex3.restoreFromEvent(new VertexDataMovementEventsGeneratedEvent(
+        vertex3.restoreFromEvent(new VertexRecoverableEventsGeneratedEvent(
             vertex3.getVertexId(), Lists.newArrayList(createTezEvent())));
     assertEquals(VertexState.NEW, recoveredState);
     assertEquals(1, vertex3.recoveredEvents.size());
@@ -669,7 +669,7 @@ public class TestVertexRecovery {
 
     VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3");
     VertexState recoveredState =
-        vertex3.restoreFromEvent(new VertexDataMovementEventsGeneratedEvent(
+        vertex3.restoreFromEvent(new VertexRecoverableEventsGeneratedEvent(
             vertex3.getVertexId(), Lists.newArrayList(createTezEvent())));
     assertEquals(VertexState.NEW, recoveredState);
     assertEquals(1, vertex3.recoveredEvents.size());
@@ -734,7 +734,7 @@ public class TestVertexRecovery {
 
     VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3");
     recoveredState =
-        vertex3.restoreFromEvent(new VertexDataMovementEventsGeneratedEvent(
+        vertex3.restoreFromEvent(new VertexRecoverableEventsGeneratedEvent(
             vertex3.getVertexId(), Lists.newArrayList(createTezEvent())));
     assertEquals(VertexState.NEW, recoveredState);
     assertEquals(1, vertex3.recoveredEvents.size());
@@ -814,7 +814,7 @@ public class TestVertexRecovery {
 
     VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3");
     recoveredState =
-        vertex3.restoreFromEvent(new VertexDataMovementEventsGeneratedEvent(
+        vertex3.restoreFromEvent(new VertexRecoverableEventsGeneratedEvent(
             vertex3.getVertexId(), Lists.newArrayList(createTezEvent())));
     assertEquals(VertexState.NEW, recoveredState);
     assertEquals(1, vertex3.recoveredEvents.size());

http://git-wip-us.apache.org/repos/asf/tez/blob/b4580a7b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index 5bb7d35..bcbe6f1 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -527,9 +527,9 @@ public class TestHistoryEventsProtoConversion {
   }
 
   private void testVertexDataMovementEventsGeneratedEvent() throws Exception {
-    VertexDataMovementEventsGeneratedEvent event;
+    VertexRecoverableEventsGeneratedEvent event;
     try {
-      event = new VertexDataMovementEventsGeneratedEvent(
+      event = new VertexRecoverableEventsGeneratedEvent(
           TezVertexID.getInstance(
               TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 1), null);
       Assert.fail("Invalid creation should have errored out");
@@ -539,11 +539,11 @@ public class TestHistoryEventsProtoConversion {
     List<TezEvent> events =
         Arrays.asList(new TezEvent(DataMovementEvent.create(1, null),
             new EventMetaData(EventProducerConsumerType.SYSTEM, "foo", "bar", null)));
-    event = new VertexDataMovementEventsGeneratedEvent(
+    event = new VertexRecoverableEventsGeneratedEvent(
             TezVertexID.getInstance(
                 TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 1), events);
-    VertexDataMovementEventsGeneratedEvent deserializedEvent =
-        (VertexDataMovementEventsGeneratedEvent) testProtoConversion(event);
+    VertexRecoverableEventsGeneratedEvent deserializedEvent =
+        (VertexRecoverableEventsGeneratedEvent) testProtoConversion(event);
     Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID());
     Assert.assertEquals(1,
         deserializedEvent.getTezEvents().size());

http://git-wip-us.apache.org/repos/asf/tez/blob/b4580a7b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
index ad39531..f674fc0 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
@@ -45,7 +45,7 @@ import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
 import org.apache.tez.dag.history.events.TaskFinishedEvent;
 import org.apache.tez.dag.history.events.TaskStartedEvent;
 import org.apache.tez.dag.history.events.VertexCommitStartedEvent;
-import org.apache.tez.dag.history.events.VertexDataMovementEventsGeneratedEvent;
+import org.apache.tez.dag.history.events.VertexRecoverableEventsGeneratedEvent;
 import org.apache.tez.dag.history.events.VertexFinishedEvent;
 import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent;
 import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent;
@@ -152,7 +152,7 @@ public class TestHistoryEventJsonConversion {
           event = new ContainerStoppedEvent(containerId, random.nextInt(), -1, applicationAttemptId);
           break;
         case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
-          event = new VertexDataMovementEventsGeneratedEvent();
+          event = new VertexRecoverableEventsGeneratedEvent();
           break;
         case DAG_COMMIT_STARTED:
           event = new DAGCommitStartedEvent();

http://git-wip-us.apache.org/repos/asf/tez/blob/b4580a7b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
index 5b19e80..b04b8d4 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
@@ -45,7 +45,7 @@ import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
 import org.apache.tez.dag.history.events.TaskFinishedEvent;
 import org.apache.tez.dag.history.events.TaskStartedEvent;
 import org.apache.tez.dag.history.events.VertexCommitStartedEvent;
-import org.apache.tez.dag.history.events.VertexDataMovementEventsGeneratedEvent;
+import org.apache.tez.dag.history.events.VertexRecoverableEventsGeneratedEvent;
 import org.apache.tez.dag.history.events.VertexFinishedEvent;
 import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent;
 import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent;
@@ -152,7 +152,7 @@ public class TestHistoryEventTimelineConversion {
           event = new ContainerStoppedEvent(containerId, random.nextInt(), -1, applicationAttemptId);
           break;
         case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
-          event = new VertexDataMovementEventsGeneratedEvent();
+          event = new VertexRecoverableEventsGeneratedEvent();
           break;
         case DAG_COMMIT_STARTED:
           event = new DAGCommitStartedEvent();

http://git-wip-us.apache.org/repos/asf/tez/blob/b4580a7b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
index 7676313..9595cb9 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
@@ -42,8 +42,8 @@ import org.apache.tez.dag.api.client.DAGStatus.State;
 import org.apache.tez.dag.app.RecoveryParser;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.history.events.VertexDataMovementEventsGeneratedEvent;
 import org.apache.tez.dag.history.events.VertexInitializedEvent;
+import org.apache.tez.dag.history.events.VertexRecoverableEventsGeneratedEvent;
 import org.apache.tez.test.dag.MultiAttemptDAG;
 import org.apache.tez.test.dag.MultiAttemptDAG.FailingInputInitializer;
 import org.apache.tez.test.dag.MultiAttemptDAG.NoOpInput;
@@ -198,8 +198,8 @@ public class TestDAGRecovery {
             + ", eventType=" + historyEvent.getEventType()
             + ", event=" + historyEvent);
         if (historyEvent.getEventType() ==  HistoryEventType.VERTEX_DATA_MOVEMENT_EVENTS_GENERATED) {
-          VertexDataMovementEventsGeneratedEvent dmEvent =
-              (VertexDataMovementEventsGeneratedEvent)historyEvent;
+          VertexRecoverableEventsGeneratedEvent dmEvent =
+              (VertexRecoverableEventsGeneratedEvent) historyEvent;
           // TODO do not need to check whether it is -1 after Tez-1521 is resolved
           if (dmEvent.getVertexID().getId() == 0 && inputInfoEventIndex == -1) {
             inputInfoEventIndex = j;