You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/09/29 02:35:14 UTC
[08/50] [abbrv] TEZ-1539. Change InputInitializerEvent semantics to
SEND_ONCE_ON_TASK_SUCCESS. (sseth)
http://git-wip-us.apache.org/repos/asf/tez/blob/b4580a7b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 04e2219..c003e05 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -75,6 +75,7 @@ import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.RootInputLeafOutput;
import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
@@ -116,6 +117,7 @@ import org.apache.tez.dag.app.dag.event.DAGEventType;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.TaskEvent;
+import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
import org.apache.tez.dag.app.dag.event.TaskEventType;
import org.apache.tez.dag.app.dag.event.VertexEvent;
import org.apache.tez.dag.app.dag.event.VertexEventRootInputFailed;
@@ -633,6 +635,208 @@ public class TestVertexImpl {
return dag;
}
+ private DAGPlan createDAGPlanWithRunningInitializer3() {
+ // v2 v1 (send event to v3)
+ // \ /
+ // \ /
+ // v3 -----(In)
+ // (Receive events from v1)
+ LOG.info("Setting up dag plan with running input initializer3");
+ DAGPlan dag = DAGPlan.newBuilder()
+ .setName("DagWithInputInitializer3")
+ .addVertex(
+ VertexPlan.newBuilder()
+ .setName("vertex1")
+ .setType(PlanVertexType.NORMAL)
+ .setTaskConfig(
+ PlanTaskConfiguration.newBuilder()
+ .setNumTasks(1)
+ .setVirtualCores(1)
+ .setMemoryMb(1024)
+ .setJavaOpts("")
+ .setTaskModule("x1.y1")
+ .build()
+ )
+ .addOutEdgeId("e1")
+ .build()
+ )
+ .addVertex(
+ VertexPlan.newBuilder()
+ .setName("vertex2")
+ .setType(PlanVertexType.NORMAL)
+ .setTaskConfig(
+ PlanTaskConfiguration.newBuilder()
+ .setNumTasks(1)
+ .setVirtualCores(1)
+ .setMemoryMb(1024)
+ .setJavaOpts("")
+ .setTaskModule("x1.y1")
+ .build()
+ )
+ .addOutEdgeId("e2")
+ .build()
+ )
+ .addVertex(
+ VertexPlan.newBuilder()
+ .setName("vertex3")
+ .setType(PlanVertexType.NORMAL)
+ .addInputs(
+ RootInputLeafOutputProto.newBuilder()
+ .setControllerDescriptor(
+ TezEntityDescriptorProto.newBuilder().setClassName(
+ "IrrelevantInitializerClassName"))
+ .setName("input1")
+ .setIODescriptor(
+ TezEntityDescriptorProto.newBuilder()
+ .setClassName("InputClazz")
+ .build()
+ )
+ .build()
+ )
+ .setTaskConfig(
+ PlanTaskConfiguration.newBuilder()
+ .setNumTasks(20)
+ .setVirtualCores(4)
+ .setMemoryMb(1024)
+ .setJavaOpts("")
+ .setTaskModule("x2.y2")
+ .build()
+ )
+ .addInEdgeId("e1")
+ .addInEdgeId("e2")
+ .build()
+ )
+ .addEdge(
+ EdgePlan.newBuilder()
+ .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("v1_v3"))
+ .setInputVertexName("vertex1")
+ .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o2"))
+ .setOutputVertexName("vertex3")
+ .setDataMovementType(PlanEdgeDataMovementType.BROADCAST)
+ .setId("e1")
+ .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+ .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+ .build()
+ )
+ .addEdge(
+ EdgePlan.newBuilder()
+ .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("v2_v3"))
+ .setInputVertexName("vertex2")
+ .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o2"))
+ .setOutputVertexName("vertex3")
+ .setDataMovementType(PlanEdgeDataMovementType.BROADCAST)
+ .setId("e2")
+ .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+ .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+ .build()
+ )
+ .build();
+ return dag;
+ }
+
+ private DAGPlan createDAGPlanWithRunningInitializer4() {
+ // v1 (send event to v3)
+ // |
+ // |
+ // v2 (In) (v2 can optioanlly send events to v2. Is setup via the initializer)
+ // | /
+ // | /
+ // v3 (Receive events from v1)
+ // Events are not generated by a directly connected vertex
+ LOG.info("Setting up dag plan with running input initializer4");
+ DAGPlan dag = DAGPlan.newBuilder()
+ .setName("DagWithInputInitializer4")
+ .addVertex(
+ VertexPlan.newBuilder()
+ .setName("vertex1")
+ .setType(PlanVertexType.NORMAL)
+ .setTaskConfig(
+ PlanTaskConfiguration.newBuilder()
+ .setNumTasks(1)
+ .setVirtualCores(1)
+ .setMemoryMb(1024)
+ .setJavaOpts("")
+ .setTaskModule("x1.y1")
+ .build()
+ )
+ .addOutEdgeId("e1")
+ .build()
+ )
+ .addVertex(
+ VertexPlan.newBuilder()
+ .setName("vertex2")
+ .setType(PlanVertexType.NORMAL)
+ .setTaskConfig(
+ PlanTaskConfiguration.newBuilder()
+ .setNumTasks(2)
+ .setVirtualCores(1)
+ .setMemoryMb(1024)
+ .setJavaOpts("")
+ .setTaskModule("x1.y1")
+ .build()
+ )
+ .addInEdgeId("e1")
+ .addOutEdgeId("e2")
+ .build()
+ )
+ .addVertex(
+ VertexPlan.newBuilder()
+ .setName("vertex3")
+ .setType(PlanVertexType.NORMAL)
+ .addInputs(
+ RootInputLeafOutputProto.newBuilder()
+ .setControllerDescriptor(
+ TezEntityDescriptorProto.newBuilder().setClassName(
+ "IrrelevantInitializerClassName"))
+ .setName("input1")
+ .setIODescriptor(
+ TezEntityDescriptorProto.newBuilder()
+ .setClassName("InputClazz")
+ .build()
+ )
+ .build()
+ )
+ .setTaskConfig(
+ PlanTaskConfiguration.newBuilder()
+ .setNumTasks(20)
+ .setVirtualCores(4)
+ .setMemoryMb(1024)
+ .setJavaOpts("")
+ .setTaskModule("x2.y2")
+ .build()
+ )
+ .addInEdgeId("e2")
+ .build()
+ )
+ .addEdge(
+ EdgePlan.newBuilder()
+ .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("v1_v3"))
+ .setInputVertexName("vertex1")
+ .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o2"))
+ .setOutputVertexName("vertex2")
+ .setDataMovementType(PlanEdgeDataMovementType.BROADCAST)
+ .setId("e1")
+ .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+ .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+ .build()
+ )
+ .addEdge(
+ EdgePlan.newBuilder()
+ .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("v2_v3"))
+ .setInputVertexName("vertex2")
+ .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o2"))
+ .setOutputVertexName("vertex3")
+ .setDataMovementType(PlanEdgeDataMovementType.BROADCAST)
+ .setId("e2")
+ .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+ .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+ .build()
+ )
+ .build();
+ return dag;
+ }
+
+
private DAGPlan createDAGPlanWithRunningInitializer() {
LOG.info("Setting up dag plan with running input initializer");
DAGPlan dag = DAGPlan.newBuilder()
@@ -643,7 +847,7 @@ public class TestVertexImpl {
.setType(PlanVertexType.NORMAL)
.setTaskConfig(
PlanTaskConfiguration.newBuilder()
- .setNumTasks(10)
+ .setNumTasks(1)
.setVirtualCores(4)
.setMemoryMb(1024)
.setJavaOpts("")
@@ -1742,7 +1946,7 @@ public class TestVertexImpl {
for (PlanVertexGroupInfo groupInfo : dagPlan.getVertexGroupsList()) {
vertexGroups.put(groupInfo.getGroupName(), new VertexGroupInfo(groupInfo));
}
- updateTracker = new StateChangeNotifier(dag);
+ updateTracker = new StateChangeNotifier(appContext.getCurrentDAG());
setupVertices();
when(dag.getVertex(any(TezVertexID.class))).thenAnswer(new Answer<Vertex>() {
@Override
@@ -2962,16 +3166,390 @@ public class TestVertexImpl {
Assert.assertEquals(VertexState.SUCCEEDED, v1.getState());
// At this point, 2 events should have been received - since the dispatcher is complete.
- Assert.assertEquals(2, initializer.stateUpdateEvents.size());
+ Assert.assertEquals(2, initializer.stateUpdates.size());
+ Assert.assertEquals(org.apache.tez.dag.api.event.VertexState.RUNNING,
+ initializer.stateUpdates.get(0).getVertexState());
+ Assert.assertEquals(org.apache.tez.dag.api.event.VertexState.SUCCEEDED,
+ initializer.stateUpdates.get(1).getVertexState());
+ }
+
+ @Test(timeout = 1000000)
+ public void testInputInitializerEventMultipleAttempts() throws Exception {
+ useCustomInitializer = true;
+ customInitializer = new EventHandlingRootInputInitializer(null);
+ EventHandlingRootInputInitializer initializer =
+ (EventHandlingRootInputInitializer) customInitializer;
+ setupPreDagCreation();
+ dagPlan = createDAGPlanWithRunningInitializer4();
+ setupPostDagCreation();
+
+ VertexImplWithRunningInputInitializer v1 =
+ (VertexImplWithRunningInputInitializer) vertices.get("vertex1");
+ VertexImplWithRunningInputInitializer v2 =
+ (VertexImplWithRunningInputInitializer) vertices.get("vertex2");
+ VertexImplWithRunningInputInitializer v3 =
+ (VertexImplWithRunningInputInitializer) vertices.get("vertex3");
+
+ initVertex(v1);
+ startVertex(v1);
+ dispatcher.await();
+
+ // Vertex1 start should trigger downstream vertices
+ Assert.assertEquals(VertexState.RUNNING, v1.getState());
+ Assert.assertEquals(VertexState.RUNNING, v2.getState());
+ Assert.assertEquals(VertexState.INITIALIZING, v3.getState());
+
+ ByteBuffer expected;
+
+ // Genrate events from v1 to v3's InputInitializer
+ ByteBuffer payload = ByteBuffer.allocate(12).putInt(0, 1).putInt(4, 0).putInt(8, 0);
+ InputInitializerEvent event = InputInitializerEvent.create("vertex3", "input1", payload);
+ // Create taskId and taskAttemptId for the single task that exists in vertex1
+ TezTaskID t0_v1 = TezTaskID.getInstance(v1.getVertexId(), 0);
+ TezTaskAttemptID ta0_t0_v1 = TezTaskAttemptID.getInstance(t0_v1, 0);
+ TezEvent tezEvent = new TezEvent(event,
+ new EventMetaData(EventProducerConsumerType.OUTPUT, "vertex1", "vertex3", ta0_t0_v1));
+
+ dispatcher.getEventHandler()
+ .handle(new VertexEventRouteEvent(v1.getVertexId(), Collections.singletonList(tezEvent)));
+ dispatcher.await();
+
+ payload = ByteBuffer.allocate(12).putInt(0, 1).putInt(4, 0).putInt(8, 1);
+ expected = payload;
+ event = InputInitializerEvent.create("vertex3", "input1", payload);
+ // Create taskId and taskAttemptId for the single task that exists in vertex1
+ TezTaskAttemptID ta1_t0_v1 = TezTaskAttemptID.getInstance(t0_v1, 1);
+ tezEvent = new TezEvent(event,
+ new EventMetaData(EventProducerConsumerType.OUTPUT, "vertex1", "vertex3", ta1_t0_v1));
+ dispatcher.getEventHandler()
+ .handle(new VertexEventRouteEvent(v1.getVertexId(), Collections.singletonList(tezEvent)));
+ dispatcher.await();
+
+ // Events should not be cached in the vertex, since the initializer is running
+ Assert.assertEquals(0, v3.pendingInitializerEvents.size());
+
+ // Events should be cached since the tasks have not succeeded.
+ // Verify that events are cached
+ RootInputInitializerManager.InitializerWrapper initializerWrapper =
+ v3.rootInputInitializerManager.getInitializerWrapper("input1");
+ Assert.assertEquals(1, initializerWrapper.getFirstSuccessfulAttemptMap().size());
+ Assert.assertEquals(2, initializerWrapper.getPendingEvents().get(v1.getName()).size());
+
+
+ // Get all tasks of vertex1 to succeed.
+ for (TezTaskID taskId : v1.getTasks().keySet()) {
+ // Make attempt 1 of every task succeed
+ TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance(taskId, 1);
+ v1.handle( new VertexEventTaskAttemptCompleted(taskAttemptId, TaskAttemptStateInternal.SUCCEEDED));
+ v1.handle(new VertexEventTaskCompleted(taskId, TaskState.SUCCEEDED));
+ dispatcher.await();
+ v1.stateChangeNotifier.taskSucceeded(v1.getName(), taskId, taskAttemptId.getId());
+ }
+ dispatcher.await();
+
+ // v3 would have processed an INIT event and moved into INITIALIZING state.
+ // Since source tasks were complete - the events should have been consumed.
+ // Initializer would have run, and processed events.
+ while (v3.getState() != VertexState.RUNNING) {
+ Thread.sleep(10);
+ }
+ Assert.assertEquals(VertexState.RUNNING, v3.getState());
+
+ Assert.assertEquals(1, initializer.initializerEvents.size());
+ Assert.assertEquals(expected, initializer.initializerEvents.get(0).getUserPayload());
+
+ }
+
+ @Test(timeout = 10000)
+ public void testInputInitializerEventsMultipleSources() throws Exception {
+ useCustomInitializer = true;
+ customInitializer = new EventHandlingRootInputInitializer(null);
+ EventHandlingRootInputInitializer initializer =
+ (EventHandlingRootInputInitializer) customInitializer;
+ initializer.setNumExpectedEvents(4);
+ setupPreDagCreation();
+ dagPlan = createDAGPlanWithRunningInitializer4();
+ setupPostDagCreation();
+
+ VertexImplWithRunningInputInitializer v1 =
+ (VertexImplWithRunningInputInitializer) vertices.get("vertex1");
+ VertexImplWithRunningInputInitializer v2 =
+ (VertexImplWithRunningInputInitializer) vertices.get("vertex2");
+ VertexImplWithRunningInputInitializer v3 =
+ (VertexImplWithRunningInputInitializer) vertices.get("vertex3");
+
+ initVertex(v1);
+ startVertex(v1);
+ dispatcher.await();
+
+ // Vertex1 start should trigger downstream vertices
+ Assert.assertEquals(VertexState.RUNNING, v1.getState());
+ Assert.assertEquals(VertexState.RUNNING, v2.getState());
+ Assert.assertEquals(VertexState.INITIALIZING, v3.getState());
+
+ List<ByteBuffer> expectedPayloads = new LinkedList<ByteBuffer>();
+
+ // Genrate events from v1 to v3's InputInitializer
+ ByteBuffer payload = ByteBuffer.allocate(12).putInt(0, 1).putInt(4, 0).putInt(8, 0);
+ expectedPayloads.add(payload);
+ InputInitializerEvent event = InputInitializerEvent.create("vertex3", "input1", payload);
+ // Create taskId and taskAttemptId for the single task that exists in vertex1
+ TezTaskID t0_v1 = TezTaskID.getInstance(v1.getVertexId(), 0);
+ TezTaskAttemptID ta0_t0_v1 = TezTaskAttemptID.getInstance(t0_v1, 0);
+ TezEvent tezEvent = new TezEvent(event,
+ new EventMetaData(EventProducerConsumerType.OUTPUT, "vertex1", "vertex3", ta0_t0_v1));
+
+ dispatcher.getEventHandler()
+ .handle(new VertexEventRouteEvent(v1.getVertexId(), Collections.singletonList(tezEvent)));
+ dispatcher.await();
+
+ // Events should not be cached in the vertex, since the initializer is running
+ Assert.assertEquals(0, v3.pendingInitializerEvents.size());
+
+ // Events should be cached since the tasks have not succeeded.
+ // Verify that events are cached
+ RootInputInitializerManager.InitializerWrapper initializerWrapper =
+ v3.rootInputInitializerManager.getInitializerWrapper("input1");
+ Assert.assertEquals(1, initializerWrapper.getFirstSuccessfulAttemptMap().size());
+ Assert.assertEquals(1, initializerWrapper.getPendingEvents().get(v1.getName()).size());
+
+
+ // Get all tasks of vertex1 to succeed.
+ for (TezTaskID taskId : v1.getTasks().keySet()) {
+ TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance(taskId, 0);
+ v1.handle( new VertexEventTaskAttemptCompleted(taskAttemptId, TaskAttemptStateInternal.SUCCEEDED));
+ v1.handle(new VertexEventTaskCompleted(taskId, TaskState.SUCCEEDED));
+ dispatcher.await();
+ v1.stateChangeNotifier.taskSucceeded(v1.getName(), taskId, taskAttemptId.getId());
+ }
+ dispatcher.await();
+
+ Assert.assertEquals(1, initializer.initializerEvents.size());
+
+
+ // Test written based on this
+ Assert.assertEquals(2, v2.getTotalTasks());
+ // Generate events from v2 to v3's initializer. 1 from task 0, 2 from task 1
+ for (Task task : v2.getTasks().values()) {
+ TezTaskID taskId = task.getTaskId();
+ TezTaskAttemptID attemptId = TezTaskAttemptID.getInstance(taskId, 0);
+ int numEventsFromTask = taskId.getId() + 1;
+ for (int i = 0; i < numEventsFromTask; i++) {
+ payload = ByteBuffer.allocate(12).putInt(0, 2).putInt(4, taskId.getId()).putInt(8, i);
+ expectedPayloads.add(payload);
+ InputInitializerEvent event2 = InputInitializerEvent.create("vertex3", "input1", payload);
+ TezEvent tezEvent2 = new TezEvent(event2,
+ new EventMetaData(EventProducerConsumerType.OUTPUT, "vertex2", "vertex3", attemptId));
+ dispatcher.getEventHandler()
+ .handle(
+ new VertexEventRouteEvent(v2.getVertexId(), Collections.singletonList(tezEvent2)));
+ dispatcher.await();
+ }
+ }
+
+ // Validate queueing of these events
+ // Only v2 events pending
+ Assert.assertEquals(1, initializerWrapper.getPendingEvents().keySet().size());
+ // 3 events pending
+ Assert.assertEquals(3, initializerWrapper.getPendingEvents().get(v2.getName()).size());
+
+ // Get all tasks of vertex1 to succeed.
+ for (TezTaskID taskId : v2.getTasks().keySet()) {
+ TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance(taskId, 0);
+ v2.handle( new VertexEventTaskAttemptCompleted(taskAttemptId, TaskAttemptStateInternal.SUCCEEDED));
+ v2.handle(new VertexEventTaskCompleted(taskId, TaskState.SUCCEEDED));
+ dispatcher.await();
+ v2.stateChangeNotifier.taskSucceeded(v2.getName(), taskId, taskAttemptId.getId());
+ }
+ dispatcher.await();
+
+ // v3 would have processed an INIT event and moved into INITIALIZING state.
+ // Since source tasks were complete - the events should have been consumed.
+ // Initializer would have run, and processed events.
+ while (v3.getState() != VertexState.RUNNING) {
+ Thread.sleep(10);
+ }
+ Assert.assertEquals(VertexState.RUNNING, v3.getState());
+
+ Assert.assertEquals(4, initializer.initializerEvents.size());
+ Assert.assertTrue(initializer.initComplete.get());
+
+ Assert.assertEquals(2, initializerWrapper.getFirstSuccessfulAttemptMap().size());
+ Assert.assertEquals(0, initializerWrapper.getPendingEvents().get(v1.getName()).size());
+
+ for (InputInitializerEvent initializerEvent : initializer.initializerEvents) {
+ expectedPayloads.remove(initializerEvent.getUserPayload());
+ }
+ Assert.assertEquals(0, expectedPayloads.size());
+
+ }
+
+ @Test(timeout = 10000)
+ public void testInputInitializerEventNoDirectConnection() throws Exception {
+ useCustomInitializer = true;
+ customInitializer = new EventHandlingRootInputInitializer(null);
+ EventHandlingRootInputInitializer initializer =
+ (EventHandlingRootInputInitializer) customInitializer;
+ setupPreDagCreation();
+ dagPlan = createDAGPlanWithRunningInitializer4();
+ setupPostDagCreation();
+
+ VertexImplWithRunningInputInitializer v1 =
+ (VertexImplWithRunningInputInitializer) vertices.get("vertex1");
+ VertexImplWithRunningInputInitializer v2 =
+ (VertexImplWithRunningInputInitializer) vertices.get("vertex2");
+ VertexImplWithRunningInputInitializer v3 =
+ (VertexImplWithRunningInputInitializer) vertices.get("vertex3");
+
+ initVertex(v1);
+ startVertex(v1);
+ dispatcher.await();
+
+ // Vertex1 start should trigger downstream vertices
+ Assert.assertEquals(VertexState.RUNNING, v1.getState());
+ Assert.assertEquals(VertexState.RUNNING, v2.getState());
+ Assert.assertEquals(VertexState.INITIALIZING, v3.getState());
+
+ // Genrate events from v1 to v3's InputInitializer
+ InputInitializerEvent event = InputInitializerEvent.create("vertex3", "input1", null);
+ // Create taskId and taskAttemptId for the single task that exists in vertex1
+ TezTaskID t0_v1 = TezTaskID.getInstance(v1.getVertexId(), 0);
+ TezTaskAttemptID ta0_t0_v1 = TezTaskAttemptID.getInstance(t0_v1, 0);
+ TezEvent tezEvent = new TezEvent(event,
+ new EventMetaData(EventProducerConsumerType.OUTPUT, "vertex1", "vertex3", ta0_t0_v1));
+
+ dispatcher.getEventHandler()
+ .handle(new VertexEventRouteEvent(v1.getVertexId(), Collections.singletonList(tezEvent)));
+ dispatcher.await();
+
+ // Events should not be cached in the vertex, since the initializer is running
+ Assert.assertEquals(0, v3.pendingInitializerEvents.size());
+
+ // Events should be cached since the tasks have not succeeded.
+ // Verify that events are cached
+ RootInputInitializerManager.InitializerWrapper initializerWrapper =
+ v3.rootInputInitializerManager.getInitializerWrapper("input1");
+ Assert.assertEquals(1, initializerWrapper.getFirstSuccessfulAttemptMap().size());
+ Assert.assertEquals(1, initializerWrapper.getPendingEvents().get(v1.getName()).size());
+
+ // Get all tasks of vertex1 to succeed.
+ for (TezTaskID taskId : v1.getTasks().keySet()) {
+ TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance(taskId, 0);
+ v1.handle( new VertexEventTaskAttemptCompleted(taskAttemptId, TaskAttemptStateInternal.SUCCEEDED));
+ v1.handle(new VertexEventTaskCompleted(taskId, TaskState.SUCCEEDED));
+ dispatcher.await();
+ v1.stateChangeNotifier.taskSucceeded(v1.getName(), taskId, taskAttemptId.getId());
+ }
+ dispatcher.await();
+
+ // v3 would have processed an INIT event and moved into INITIALIZING state.
+ // Since source tasks were complete - the events should have been consumed.
+ // Initializer would have run, and processed events.
+ while (v3.getState() != VertexState.RUNNING) {
+ Thread.sleep(10);
+ }
+
+ Assert.assertEquals(VertexState.RUNNING, v3.getState());
+
+ Assert.assertEquals(1, initializerWrapper.getFirstSuccessfulAttemptMap().size());
+ Assert.assertEquals(0, initializerWrapper.getPendingEvents().get(v1.getName()).size());
+
+ Assert.assertTrue(initializer.eventReceived.get());
+ Assert.assertEquals(2, initializer.stateUpdates.size());
Assert.assertEquals(org.apache.tez.dag.api.event.VertexState.RUNNING,
- initializer.stateUpdateEvents.get(0).getVertexState());
+ initializer.stateUpdates.get(0).getVertexState());
Assert.assertEquals(org.apache.tez.dag.api.event.VertexState.SUCCEEDED,
- initializer.stateUpdateEvents.get(1).getVertexState());
+ initializer.stateUpdates.get(1).getVertexState());
+ }
+
+ @Test(timeout = 10000)
+ public void testInputInitializerEventsAtNew() throws Exception {
+ useCustomInitializer = true;
+ customInitializer = new EventHandlingRootInputInitializer(null);
+ EventHandlingRootInputInitializer initializer =
+ (EventHandlingRootInputInitializer) customInitializer;
+ setupPreDagCreation();
+ dagPlan = createDAGPlanWithRunningInitializer3();
+ setupPostDagCreation();
+
+ VertexImplWithRunningInputInitializer v1 =
+ (VertexImplWithRunningInputInitializer) vertices.get("vertex1");
+ VertexImplWithRunningInputInitializer v2 =
+ (VertexImplWithRunningInputInitializer) vertices.get("vertex2");
+ VertexImplWithRunningInputInitializer v3 =
+ (VertexImplWithRunningInputInitializer) vertices.get("vertex3");
+
+ initVertex(v1);
+ startVertex(v1);
+ dispatcher.await();
+
+ // Vertex2 has not been INITED, so the rest of the vertices should be in state NEW.
+ Assert.assertEquals(VertexState.RUNNING, v1.getState());
+ Assert.assertEquals(VertexState.NEW, v2.getState());
+ Assert.assertEquals(VertexState.NEW, v3.getState());
+
+ // Genrate events from v1 to v3's InputInitializer
+ InputInitializerEvent event = InputInitializerEvent.create("vertex3", "input1", null);
+ // Create taskId and taskAttemptId for the single task that exists in vertex1
+ TezTaskID t0_v1 = TezTaskID.getInstance(v1.getVertexId(), 0);
+ TezTaskAttemptID ta0_t0_v1 = TezTaskAttemptID.getInstance(t0_v1, 0);
+ TezEvent tezEvent = new TezEvent(event,
+ new EventMetaData(EventProducerConsumerType.OUTPUT, "vertex1", "vertex3", ta0_t0_v1));
+
+ dispatcher.getEventHandler()
+ .handle(new VertexEventRouteEvent(v1.getVertexId(), Collections.singletonList(tezEvent)));
+ dispatcher.await();
+
+ // Events should be cached in the vertex, since the Initializer has not started
+ Assert.assertEquals(1, v3.pendingInitializerEvents.size());
+
+ // Get Vertex1 to succeed before Vertex2 is INITED. Contrived case ? This is likely a tiny race.
+ for (TezTaskID taskId : v1.getTasks().keySet()) {
+ TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance(taskId, 0);
+ TaskImpl task = (TaskImpl)v1.getTask(taskId);
+ task.handle(new TaskEvent(taskId, TaskEventType.T_ATTEMPT_LAUNCHED));
+ task.handle(new TaskEventTAUpdate(taskAttemptId, TaskEventType.T_ATTEMPT_SUCCEEDED));
+ v1.handle(new VertexEventTaskAttemptCompleted(taskAttemptId, TaskAttemptStateInternal.SUCCEEDED));
+ v1.handle(new VertexEventTaskCompleted(taskId, TaskState.SUCCEEDED));
+ dispatcher.await();
+ v1.stateChangeNotifier.taskSucceeded(v1.getName(), taskId, taskAttemptId.getId());
+ }
+ dispatcher.await();
+
+ // Events should still be cached in the vertex
+ Assert.assertEquals(1, v3.pendingInitializerEvents.size());
+ Assert.assertEquals(VertexState.NEW, v3.getState());
+
+ // Move processing along. INIT the remaining root level vertex.
+ initVertex(v2);
+ startVertex(v2);
+ dispatcher.await();
+
+
+ // v3 would have processed an INIT event and moved into INITIALIZING state.
+ // Since source tasks were complete - the events should have been consumed.
+ // Initializer would have run, and processed events.
+ while (v3.getState() != VertexState.RUNNING) {
+ Thread.sleep(10);
+ }
+
+ Assert.assertEquals(VertexState.RUNNING, v3.getState());
+ // Events should have been cleared from the vertex.
+ Assert.assertEquals(0, v3.pendingInitializerEvents.size());
+
+ // KK Add checks to validate thte RootInputManager doesn't remember the events either
+
+ Assert.assertTrue(initializer.eventReceived.get());
+ Assert.assertEquals(2, initializer.stateUpdates.size());
+ Assert.assertEquals(org.apache.tez.dag.api.event.VertexState.RUNNING,
+ initializer.stateUpdates.get(0).getVertexState());
+ Assert.assertEquals(org.apache.tez.dag.api.event.VertexState.SUCCEEDED,
+ initializer.stateUpdates.get(1).getVertexState());
}
@SuppressWarnings("unchecked")
@Test(timeout = 10000)
- public void testRootInputInitializerEvent() throws Exception {
+ public void testInputInitializerEvents() throws Exception {
useCustomInitializer = true;
customInitializer = new EventHandlingRootInputInitializer(null);
EventHandlingRootInputInitializer initializer =
@@ -2998,15 +3576,36 @@ public class TestVertexImpl {
Assert.assertFalse(initializer.eventReceived.get());
Assert.assertFalse(initializer.initComplete.get());
+
// Signal the initializer by sending an event - via vertex1
InputInitializerEvent event = InputInitializerEvent.create("vertex2", "input1", null);
+ // Create taskId and taskAttemptId for the single task that exists in vertex1
+ TezTaskID t0_v1 = TezTaskID.getInstance(v1.getVertexId(), 0);
+ TezTaskAttemptID ta0_t0_v1 = TezTaskAttemptID.getInstance(t0_v1, 0);
TezEvent tezEvent = new TezEvent(event,
- new EventMetaData(EventProducerConsumerType.OUTPUT, "vertex1", null, null));
+ new EventMetaData(EventProducerConsumerType.OUTPUT, "vertex1", "vertex2", ta0_t0_v1));
dispatcher.getEventHandler()
.handle(new VertexEventRouteEvent(v1.getVertexId(), Collections.singletonList(tezEvent)));
dispatcher.await();
+ // Events should not be cached in the vertex, since the initializer is running
+ Assert.assertEquals(0, v2.pendingInitializerEvents.size());
+
+ // Verify that events are cached
+ RootInputInitializerManager.InitializerWrapper initializerWrapper =
+ v2.rootInputInitializerManager.getInitializerWrapper("input1");
+ Assert.assertEquals(1, initializerWrapper.getFirstSuccessfulAttemptMap().size());
+ Assert.assertEquals(1, initializerWrapper.getPendingEvents().get(v1.getName()).size());
+
+ for (TezTaskID taskId : v1.getTasks().keySet()) {
+ TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance(taskId, 0);
+ v1.handle(new VertexEventTaskAttemptCompleted(taskAttemptId, TaskAttemptStateInternal.SUCCEEDED));
+ v1.handle(new VertexEventTaskCompleted(taskId, TaskState.SUCCEEDED));
+ dispatcher.await();
+ v1.stateChangeNotifier.taskSucceeded(v1.getName(), taskId, taskAttemptId.getId());
+ }
+
// Both happening in separate threads
while (!initializer.eventReceived.get()) {
Thread.sleep(10);
@@ -3019,6 +3618,10 @@ public class TestVertexImpl {
while (v2.getState() != VertexState.RUNNING) {
Thread.sleep(10);
}
+
+ // Verify the events are no longer cached, but attempts are remembered
+ Assert.assertEquals(1, initializerWrapper.getFirstSuccessfulAttemptMap().size());
+ Assert.assertEquals(0, initializerWrapper.getPendingEvents().get(v1.getName()).size());
}
@Test(timeout = 5000)
@@ -3358,7 +3961,7 @@ public class TestVertexImpl {
VertexImplWithControlledInitializerManager v2 = (VertexImplWithControlledInitializerManager) vertices.get("vertex2");
Assert.assertEquals(VertexState.INITIALIZING, v2.getState());
- // non-task events dont get buffered
+ // non-task events don't get buffered
List<TezEvent> events = Lists.newLinkedList();
TezTaskID t0_v1 = TezTaskID.getInstance(v1.getVertexId(), 0);
TezTaskAttemptID ta0_t0_v1 = TezTaskAttemptID.getInstance(t0_v1, 0);
@@ -3954,8 +4557,10 @@ public class TestVertexImpl {
private final ReentrantLock lock = new ReentrantLock();
private final Condition eventCondition = lock.newCondition();
- private final List<VertexStateUpdate> stateUpdateEvents = new LinkedList<VertexStateUpdate>();
+ private final List<VertexStateUpdate> stateUpdates = new LinkedList<VertexStateUpdate>();
+ private final List<InputInitializerEvent> initializerEvents = new LinkedList<InputInitializerEvent>();
private volatile InputInitializerContext context;
+ private volatile int numExpectedEvents = 1;
public EventHandlingRootInputInitializer(
InputInitializerContext initializerContext) {
@@ -3968,7 +4573,9 @@ public class TestVertexImpl {
initStarted.set(true);
lock.lock();
try {
- eventCondition.await();
+ if (!eventReceived.get()) {
+ eventCondition.await();
+ }
} finally {
lock.unlock();
}
@@ -3983,12 +4590,15 @@ public class TestVertexImpl {
@Override
public void handleInputInitializerEvent(List<InputInitializerEvent> events) throws
Exception {
- eventReceived.set(true);
- lock.lock();
- try {
- eventCondition.signal();
- } finally {
- lock.unlock();
+ initializerEvents.addAll(events);
+ if (initializerEvents.size() == numExpectedEvents) {
+ eventReceived.set(true);
+ lock.lock();
+ try {
+ eventCondition.signal();
+ } finally {
+ lock.unlock();
+ }
}
}
@@ -3997,8 +4607,12 @@ public class TestVertexImpl {
this.context = context;
}
+ public void setNumExpectedEvents(int numEvents) {
+ this.numExpectedEvents = numEvents;
+ }
+
public void onVertexStateUpdated(VertexStateUpdate stateUpdate) {
- stateUpdateEvents.add(stateUpdate);
+ stateUpdates.add(stateUpdate);
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/b4580a7b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
index e2f189c..9042a93 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
@@ -62,7 +62,7 @@ import org.apache.tez.dag.app.dag.event.VertexEvent;
import org.apache.tez.dag.app.dag.event.VertexEventRecoverVertex;
import org.apache.tez.dag.app.dag.event.VertexEventType;
import org.apache.tez.dag.app.dag.impl.TestVertexImpl.CountingOutputCommitter;
-import org.apache.tez.dag.history.events.VertexDataMovementEventsGeneratedEvent;
+import org.apache.tez.dag.history.events.VertexRecoverableEventsGeneratedEvent;
import org.apache.tez.dag.history.events.VertexFinishedEvent;
import org.apache.tez.dag.history.events.VertexInitializedEvent;
import org.apache.tez.dag.history.events.VertexStartedEvent;
@@ -403,7 +403,7 @@ public class TestVertexRecovery {
public void testRecovery_New_Desired_RUNNING() {
VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1");
VertexState recoveredState =
- vertex1.restoreFromEvent(new VertexDataMovementEventsGeneratedEvent(
+ vertex1.restoreFromEvent(new VertexRecoverableEventsGeneratedEvent(
vertex1.getVertexId(), Lists.newArrayList(createTezEvent())));
assertEquals(VertexState.NEW, recoveredState);
assertEquals(1, vertex1.recoveredEvents.size());
@@ -478,7 +478,7 @@ public class TestVertexRecovery {
restoreFromInitializedEvent(vertex1);
VertexState recoveredState =
- vertex1.restoreFromEvent(new VertexDataMovementEventsGeneratedEvent(
+ vertex1.restoreFromEvent(new VertexRecoverableEventsGeneratedEvent(
vertex1.getVertexId(), Lists.newArrayList(createTezEvent())));
assertEquals(VertexState.INITED, recoveredState);
@@ -522,7 +522,7 @@ public class TestVertexRecovery {
assertEquals(startedTime, vertex1.startedTime);
recoveredState =
- vertex1.restoreFromEvent(new VertexDataMovementEventsGeneratedEvent(
+ vertex1.restoreFromEvent(new VertexRecoverableEventsGeneratedEvent(
vertex1.getVertexId(), Lists.newArrayList(createTezEvent())));
assertEquals(VertexState.RUNNING, recoveredState);
assertEquals(1, vertex1.recoveredEvents.size());
@@ -616,7 +616,7 @@ public class TestVertexRecovery {
VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3");
VertexState recoveredState =
- vertex3.restoreFromEvent(new VertexDataMovementEventsGeneratedEvent(
+ vertex3.restoreFromEvent(new VertexRecoverableEventsGeneratedEvent(
vertex3.getVertexId(), Lists.newArrayList(createTezEvent())));
assertEquals(VertexState.NEW, recoveredState);
assertEquals(1, vertex3.recoveredEvents.size());
@@ -669,7 +669,7 @@ public class TestVertexRecovery {
VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3");
VertexState recoveredState =
- vertex3.restoreFromEvent(new VertexDataMovementEventsGeneratedEvent(
+ vertex3.restoreFromEvent(new VertexRecoverableEventsGeneratedEvent(
vertex3.getVertexId(), Lists.newArrayList(createTezEvent())));
assertEquals(VertexState.NEW, recoveredState);
assertEquals(1, vertex3.recoveredEvents.size());
@@ -734,7 +734,7 @@ public class TestVertexRecovery {
VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3");
recoveredState =
- vertex3.restoreFromEvent(new VertexDataMovementEventsGeneratedEvent(
+ vertex3.restoreFromEvent(new VertexRecoverableEventsGeneratedEvent(
vertex3.getVertexId(), Lists.newArrayList(createTezEvent())));
assertEquals(VertexState.NEW, recoveredState);
assertEquals(1, vertex3.recoveredEvents.size());
@@ -814,7 +814,7 @@ public class TestVertexRecovery {
VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3");
recoveredState =
- vertex3.restoreFromEvent(new VertexDataMovementEventsGeneratedEvent(
+ vertex3.restoreFromEvent(new VertexRecoverableEventsGeneratedEvent(
vertex3.getVertexId(), Lists.newArrayList(createTezEvent())));
assertEquals(VertexState.NEW, recoveredState);
assertEquals(1, vertex3.recoveredEvents.size());
http://git-wip-us.apache.org/repos/asf/tez/blob/b4580a7b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index 5bb7d35..bcbe6f1 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -527,9 +527,9 @@ public class TestHistoryEventsProtoConversion {
}
private void testVertexDataMovementEventsGeneratedEvent() throws Exception {
- VertexDataMovementEventsGeneratedEvent event;
+ VertexRecoverableEventsGeneratedEvent event;
try {
- event = new VertexDataMovementEventsGeneratedEvent(
+ event = new VertexRecoverableEventsGeneratedEvent(
TezVertexID.getInstance(
TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 1), null);
Assert.fail("Invalid creation should have errored out");
@@ -539,11 +539,11 @@ public class TestHistoryEventsProtoConversion {
List<TezEvent> events =
Arrays.asList(new TezEvent(DataMovementEvent.create(1, null),
new EventMetaData(EventProducerConsumerType.SYSTEM, "foo", "bar", null)));
- event = new VertexDataMovementEventsGeneratedEvent(
+ event = new VertexRecoverableEventsGeneratedEvent(
TezVertexID.getInstance(
TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 1), events);
- VertexDataMovementEventsGeneratedEvent deserializedEvent =
- (VertexDataMovementEventsGeneratedEvent) testProtoConversion(event);
+ VertexRecoverableEventsGeneratedEvent deserializedEvent =
+ (VertexRecoverableEventsGeneratedEvent) testProtoConversion(event);
Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID());
Assert.assertEquals(1,
deserializedEvent.getTezEvents().size());
http://git-wip-us.apache.org/repos/asf/tez/blob/b4580a7b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
index ad39531..f674fc0 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
@@ -45,7 +45,7 @@ import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
import org.apache.tez.dag.history.events.TaskFinishedEvent;
import org.apache.tez.dag.history.events.TaskStartedEvent;
import org.apache.tez.dag.history.events.VertexCommitStartedEvent;
-import org.apache.tez.dag.history.events.VertexDataMovementEventsGeneratedEvent;
+import org.apache.tez.dag.history.events.VertexRecoverableEventsGeneratedEvent;
import org.apache.tez.dag.history.events.VertexFinishedEvent;
import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent;
import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent;
@@ -152,7 +152,7 @@ public class TestHistoryEventJsonConversion {
event = new ContainerStoppedEvent(containerId, random.nextInt(), -1, applicationAttemptId);
break;
case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
- event = new VertexDataMovementEventsGeneratedEvent();
+ event = new VertexRecoverableEventsGeneratedEvent();
break;
case DAG_COMMIT_STARTED:
event = new DAGCommitStartedEvent();
http://git-wip-us.apache.org/repos/asf/tez/blob/b4580a7b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
index 5b19e80..b04b8d4 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
@@ -45,7 +45,7 @@ import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
import org.apache.tez.dag.history.events.TaskFinishedEvent;
import org.apache.tez.dag.history.events.TaskStartedEvent;
import org.apache.tez.dag.history.events.VertexCommitStartedEvent;
-import org.apache.tez.dag.history.events.VertexDataMovementEventsGeneratedEvent;
+import org.apache.tez.dag.history.events.VertexRecoverableEventsGeneratedEvent;
import org.apache.tez.dag.history.events.VertexFinishedEvent;
import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent;
import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent;
@@ -152,7 +152,7 @@ public class TestHistoryEventTimelineConversion {
event = new ContainerStoppedEvent(containerId, random.nextInt(), -1, applicationAttemptId);
break;
case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
- event = new VertexDataMovementEventsGeneratedEvent();
+ event = new VertexRecoverableEventsGeneratedEvent();
break;
case DAG_COMMIT_STARTED:
event = new DAGCommitStartedEvent();
http://git-wip-us.apache.org/repos/asf/tez/blob/b4580a7b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
index 7676313..9595cb9 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
@@ -42,8 +42,8 @@ import org.apache.tez.dag.api.client.DAGStatus.State;
import org.apache.tez.dag.app.RecoveryParser;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.history.events.VertexDataMovementEventsGeneratedEvent;
import org.apache.tez.dag.history.events.VertexInitializedEvent;
+import org.apache.tez.dag.history.events.VertexRecoverableEventsGeneratedEvent;
import org.apache.tez.test.dag.MultiAttemptDAG;
import org.apache.tez.test.dag.MultiAttemptDAG.FailingInputInitializer;
import org.apache.tez.test.dag.MultiAttemptDAG.NoOpInput;
@@ -198,8 +198,8 @@ public class TestDAGRecovery {
+ ", eventType=" + historyEvent.getEventType()
+ ", event=" + historyEvent);
if (historyEvent.getEventType() == HistoryEventType.VERTEX_DATA_MOVEMENT_EVENTS_GENERATED) {
- VertexDataMovementEventsGeneratedEvent dmEvent =
- (VertexDataMovementEventsGeneratedEvent)historyEvent;
+ VertexRecoverableEventsGeneratedEvent dmEvent =
+ (VertexRecoverableEventsGeneratedEvent) historyEvent;
// TODO do not need to check whether it is -1 after Tez-1521 is resolved
if (dmEvent.getVertexID().getId() == 0 && inputInfoEventIndex == -1) {
inputInfoEventIndex = j;