You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/09/12 11:37:28 UTC
[1/2] TEZ-1539. Change InputInitializerEvent semantics to
SEND_ONCE_ON_TASK_SUCCESS. (sseth)
Repository: tez
Updated Branches:
refs/heads/master fb05aace5 -> b4580a7b8
http://git-wip-us.apache.org/repos/asf/tez/blob/b4580a7b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 04e2219..c003e05 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -75,6 +75,7 @@ import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.RootInputLeafOutput;
import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
@@ -116,6 +117,7 @@ import org.apache.tez.dag.app.dag.event.DAGEventType;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.TaskEvent;
+import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
import org.apache.tez.dag.app.dag.event.TaskEventType;
import org.apache.tez.dag.app.dag.event.VertexEvent;
import org.apache.tez.dag.app.dag.event.VertexEventRootInputFailed;
@@ -633,6 +635,208 @@ public class TestVertexImpl {
return dag;
}
+ private DAGPlan createDAGPlanWithRunningInitializer3() {
+ // v2 v1 (send event to v3)
+ // \ /
+ // \ /
+ // v3 -----(In)
+ // (Receive events from v1)
+ LOG.info("Setting up dag plan with running input initializer3");
+ DAGPlan dag = DAGPlan.newBuilder()
+ .setName("DagWithInputInitializer3")
+ .addVertex(
+ VertexPlan.newBuilder()
+ .setName("vertex1")
+ .setType(PlanVertexType.NORMAL)
+ .setTaskConfig(
+ PlanTaskConfiguration.newBuilder()
+ .setNumTasks(1)
+ .setVirtualCores(1)
+ .setMemoryMb(1024)
+ .setJavaOpts("")
+ .setTaskModule("x1.y1")
+ .build()
+ )
+ .addOutEdgeId("e1")
+ .build()
+ )
+ .addVertex(
+ VertexPlan.newBuilder()
+ .setName("vertex2")
+ .setType(PlanVertexType.NORMAL)
+ .setTaskConfig(
+ PlanTaskConfiguration.newBuilder()
+ .setNumTasks(1)
+ .setVirtualCores(1)
+ .setMemoryMb(1024)
+ .setJavaOpts("")
+ .setTaskModule("x1.y1")
+ .build()
+ )
+ .addOutEdgeId("e2")
+ .build()
+ )
+ .addVertex(
+ VertexPlan.newBuilder()
+ .setName("vertex3")
+ .setType(PlanVertexType.NORMAL)
+ .addInputs(
+ RootInputLeafOutputProto.newBuilder()
+ .setControllerDescriptor(
+ TezEntityDescriptorProto.newBuilder().setClassName(
+ "IrrelevantInitializerClassName"))
+ .setName("input1")
+ .setIODescriptor(
+ TezEntityDescriptorProto.newBuilder()
+ .setClassName("InputClazz")
+ .build()
+ )
+ .build()
+ )
+ .setTaskConfig(
+ PlanTaskConfiguration.newBuilder()
+ .setNumTasks(20)
+ .setVirtualCores(4)
+ .setMemoryMb(1024)
+ .setJavaOpts("")
+ .setTaskModule("x2.y2")
+ .build()
+ )
+ .addInEdgeId("e1")
+ .addInEdgeId("e2")
+ .build()
+ )
+ .addEdge(
+ EdgePlan.newBuilder()
+ .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("v1_v3"))
+ .setInputVertexName("vertex1")
+ .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o2"))
+ .setOutputVertexName("vertex3")
+ .setDataMovementType(PlanEdgeDataMovementType.BROADCAST)
+ .setId("e1")
+ .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+ .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+ .build()
+ )
+ .addEdge(
+ EdgePlan.newBuilder()
+ .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("v2_v3"))
+ .setInputVertexName("vertex2")
+ .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o2"))
+ .setOutputVertexName("vertex3")
+ .setDataMovementType(PlanEdgeDataMovementType.BROADCAST)
+ .setId("e2")
+ .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+ .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+ .build()
+ )
+ .build();
+ return dag;
+ }
+
+ private DAGPlan createDAGPlanWithRunningInitializer4() {
+ // v1 (send event to v3)
+ // |
+ // |
+ // v2 (In) (v2 can optioanlly send events to v2. Is setup via the initializer)
+ // | /
+ // | /
+ // v3 (Receive events from v1)
+ // Events are not generated by a directly connected vertex
+ LOG.info("Setting up dag plan with running input initializer4");
+ DAGPlan dag = DAGPlan.newBuilder()
+ .setName("DagWithInputInitializer4")
+ .addVertex(
+ VertexPlan.newBuilder()
+ .setName("vertex1")
+ .setType(PlanVertexType.NORMAL)
+ .setTaskConfig(
+ PlanTaskConfiguration.newBuilder()
+ .setNumTasks(1)
+ .setVirtualCores(1)
+ .setMemoryMb(1024)
+ .setJavaOpts("")
+ .setTaskModule("x1.y1")
+ .build()
+ )
+ .addOutEdgeId("e1")
+ .build()
+ )
+ .addVertex(
+ VertexPlan.newBuilder()
+ .setName("vertex2")
+ .setType(PlanVertexType.NORMAL)
+ .setTaskConfig(
+ PlanTaskConfiguration.newBuilder()
+ .setNumTasks(2)
+ .setVirtualCores(1)
+ .setMemoryMb(1024)
+ .setJavaOpts("")
+ .setTaskModule("x1.y1")
+ .build()
+ )
+ .addInEdgeId("e1")
+ .addOutEdgeId("e2")
+ .build()
+ )
+ .addVertex(
+ VertexPlan.newBuilder()
+ .setName("vertex3")
+ .setType(PlanVertexType.NORMAL)
+ .addInputs(
+ RootInputLeafOutputProto.newBuilder()
+ .setControllerDescriptor(
+ TezEntityDescriptorProto.newBuilder().setClassName(
+ "IrrelevantInitializerClassName"))
+ .setName("input1")
+ .setIODescriptor(
+ TezEntityDescriptorProto.newBuilder()
+ .setClassName("InputClazz")
+ .build()
+ )
+ .build()
+ )
+ .setTaskConfig(
+ PlanTaskConfiguration.newBuilder()
+ .setNumTasks(20)
+ .setVirtualCores(4)
+ .setMemoryMb(1024)
+ .setJavaOpts("")
+ .setTaskModule("x2.y2")
+ .build()
+ )
+ .addInEdgeId("e2")
+ .build()
+ )
+ .addEdge(
+ EdgePlan.newBuilder()
+ .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("v1_v3"))
+ .setInputVertexName("vertex1")
+ .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o2"))
+ .setOutputVertexName("vertex2")
+ .setDataMovementType(PlanEdgeDataMovementType.BROADCAST)
+ .setId("e1")
+ .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+ .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+ .build()
+ )
+ .addEdge(
+ EdgePlan.newBuilder()
+ .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("v2_v3"))
+ .setInputVertexName("vertex2")
+ .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o2"))
+ .setOutputVertexName("vertex3")
+ .setDataMovementType(PlanEdgeDataMovementType.BROADCAST)
+ .setId("e2")
+ .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+ .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+ .build()
+ )
+ .build();
+ return dag;
+ }
+
+
private DAGPlan createDAGPlanWithRunningInitializer() {
LOG.info("Setting up dag plan with running input initializer");
DAGPlan dag = DAGPlan.newBuilder()
@@ -643,7 +847,7 @@ public class TestVertexImpl {
.setType(PlanVertexType.NORMAL)
.setTaskConfig(
PlanTaskConfiguration.newBuilder()
- .setNumTasks(10)
+ .setNumTasks(1)
.setVirtualCores(4)
.setMemoryMb(1024)
.setJavaOpts("")
@@ -1742,7 +1946,7 @@ public class TestVertexImpl {
for (PlanVertexGroupInfo groupInfo : dagPlan.getVertexGroupsList()) {
vertexGroups.put(groupInfo.getGroupName(), new VertexGroupInfo(groupInfo));
}
- updateTracker = new StateChangeNotifier(dag);
+ updateTracker = new StateChangeNotifier(appContext.getCurrentDAG());
setupVertices();
when(dag.getVertex(any(TezVertexID.class))).thenAnswer(new Answer<Vertex>() {
@Override
@@ -2962,16 +3166,390 @@ public class TestVertexImpl {
Assert.assertEquals(VertexState.SUCCEEDED, v1.getState());
// At this point, 2 events should have been received - since the dispatcher is complete.
- Assert.assertEquals(2, initializer.stateUpdateEvents.size());
+ Assert.assertEquals(2, initializer.stateUpdates.size());
+ Assert.assertEquals(org.apache.tez.dag.api.event.VertexState.RUNNING,
+ initializer.stateUpdates.get(0).getVertexState());
+ Assert.assertEquals(org.apache.tez.dag.api.event.VertexState.SUCCEEDED,
+ initializer.stateUpdates.get(1).getVertexState());
+ }
+
+ @Test(timeout = 1000000)
+ public void testInputInitializerEventMultipleAttempts() throws Exception {
+ useCustomInitializer = true;
+ customInitializer = new EventHandlingRootInputInitializer(null);
+ EventHandlingRootInputInitializer initializer =
+ (EventHandlingRootInputInitializer) customInitializer;
+ setupPreDagCreation();
+ dagPlan = createDAGPlanWithRunningInitializer4();
+ setupPostDagCreation();
+
+ VertexImplWithRunningInputInitializer v1 =
+ (VertexImplWithRunningInputInitializer) vertices.get("vertex1");
+ VertexImplWithRunningInputInitializer v2 =
+ (VertexImplWithRunningInputInitializer) vertices.get("vertex2");
+ VertexImplWithRunningInputInitializer v3 =
+ (VertexImplWithRunningInputInitializer) vertices.get("vertex3");
+
+ initVertex(v1);
+ startVertex(v1);
+ dispatcher.await();
+
+ // Vertex1 start should trigger downstream vertices
+ Assert.assertEquals(VertexState.RUNNING, v1.getState());
+ Assert.assertEquals(VertexState.RUNNING, v2.getState());
+ Assert.assertEquals(VertexState.INITIALIZING, v3.getState());
+
+ ByteBuffer expected;
+
+ // Genrate events from v1 to v3's InputInitializer
+ ByteBuffer payload = ByteBuffer.allocate(12).putInt(0, 1).putInt(4, 0).putInt(8, 0);
+ InputInitializerEvent event = InputInitializerEvent.create("vertex3", "input1", payload);
+ // Create taskId and taskAttemptId for the single task that exists in vertex1
+ TezTaskID t0_v1 = TezTaskID.getInstance(v1.getVertexId(), 0);
+ TezTaskAttemptID ta0_t0_v1 = TezTaskAttemptID.getInstance(t0_v1, 0);
+ TezEvent tezEvent = new TezEvent(event,
+ new EventMetaData(EventProducerConsumerType.OUTPUT, "vertex1", "vertex3", ta0_t0_v1));
+
+ dispatcher.getEventHandler()
+ .handle(new VertexEventRouteEvent(v1.getVertexId(), Collections.singletonList(tezEvent)));
+ dispatcher.await();
+
+ payload = ByteBuffer.allocate(12).putInt(0, 1).putInt(4, 0).putInt(8, 1);
+ expected = payload;
+ event = InputInitializerEvent.create("vertex3", "input1", payload);
+ // Create taskId and taskAttemptId for the single task that exists in vertex1
+ TezTaskAttemptID ta1_t0_v1 = TezTaskAttemptID.getInstance(t0_v1, 1);
+ tezEvent = new TezEvent(event,
+ new EventMetaData(EventProducerConsumerType.OUTPUT, "vertex1", "vertex3", ta1_t0_v1));
+ dispatcher.getEventHandler()
+ .handle(new VertexEventRouteEvent(v1.getVertexId(), Collections.singletonList(tezEvent)));
+ dispatcher.await();
+
+ // Events should not be cached in the vertex, since the initializer is running
+ Assert.assertEquals(0, v3.pendingInitializerEvents.size());
+
+ // Events should be cached since the tasks have not succeeded.
+ // Verify that events are cached
+ RootInputInitializerManager.InitializerWrapper initializerWrapper =
+ v3.rootInputInitializerManager.getInitializerWrapper("input1");
+ Assert.assertEquals(1, initializerWrapper.getFirstSuccessfulAttemptMap().size());
+ Assert.assertEquals(2, initializerWrapper.getPendingEvents().get(v1.getName()).size());
+
+
+ // Get all tasks of vertex1 to succeed.
+ for (TezTaskID taskId : v1.getTasks().keySet()) {
+ // Make attempt 1 of every task succeed
+ TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance(taskId, 1);
+ v1.handle( new VertexEventTaskAttemptCompleted(taskAttemptId, TaskAttemptStateInternal.SUCCEEDED));
+ v1.handle(new VertexEventTaskCompleted(taskId, TaskState.SUCCEEDED));
+ dispatcher.await();
+ v1.stateChangeNotifier.taskSucceeded(v1.getName(), taskId, taskAttemptId.getId());
+ }
+ dispatcher.await();
+
+ // v3 would have processed an INIT event and moved into INITIALIZING state.
+ // Since source tasks were complete - the events should have been consumed.
+ // Initializer would have run, and processed events.
+ while (v3.getState() != VertexState.RUNNING) {
+ Thread.sleep(10);
+ }
+ Assert.assertEquals(VertexState.RUNNING, v3.getState());
+
+ Assert.assertEquals(1, initializer.initializerEvents.size());
+ Assert.assertEquals(expected, initializer.initializerEvents.get(0).getUserPayload());
+
+ }
+
+ @Test(timeout = 10000)
+ public void testInputInitializerEventsMultipleSources() throws Exception {
+ useCustomInitializer = true;
+ customInitializer = new EventHandlingRootInputInitializer(null);
+ EventHandlingRootInputInitializer initializer =
+ (EventHandlingRootInputInitializer) customInitializer;
+ initializer.setNumExpectedEvents(4);
+ setupPreDagCreation();
+ dagPlan = createDAGPlanWithRunningInitializer4();
+ setupPostDagCreation();
+
+ VertexImplWithRunningInputInitializer v1 =
+ (VertexImplWithRunningInputInitializer) vertices.get("vertex1");
+ VertexImplWithRunningInputInitializer v2 =
+ (VertexImplWithRunningInputInitializer) vertices.get("vertex2");
+ VertexImplWithRunningInputInitializer v3 =
+ (VertexImplWithRunningInputInitializer) vertices.get("vertex3");
+
+ initVertex(v1);
+ startVertex(v1);
+ dispatcher.await();
+
+ // Vertex1 start should trigger downstream vertices
+ Assert.assertEquals(VertexState.RUNNING, v1.getState());
+ Assert.assertEquals(VertexState.RUNNING, v2.getState());
+ Assert.assertEquals(VertexState.INITIALIZING, v3.getState());
+
+ List<ByteBuffer> expectedPayloads = new LinkedList<ByteBuffer>();
+
+ // Genrate events from v1 to v3's InputInitializer
+ ByteBuffer payload = ByteBuffer.allocate(12).putInt(0, 1).putInt(4, 0).putInt(8, 0);
+ expectedPayloads.add(payload);
+ InputInitializerEvent event = InputInitializerEvent.create("vertex3", "input1", payload);
+ // Create taskId and taskAttemptId for the single task that exists in vertex1
+ TezTaskID t0_v1 = TezTaskID.getInstance(v1.getVertexId(), 0);
+ TezTaskAttemptID ta0_t0_v1 = TezTaskAttemptID.getInstance(t0_v1, 0);
+ TezEvent tezEvent = new TezEvent(event,
+ new EventMetaData(EventProducerConsumerType.OUTPUT, "vertex1", "vertex3", ta0_t0_v1));
+
+ dispatcher.getEventHandler()
+ .handle(new VertexEventRouteEvent(v1.getVertexId(), Collections.singletonList(tezEvent)));
+ dispatcher.await();
+
+ // Events should not be cached in the vertex, since the initializer is running
+ Assert.assertEquals(0, v3.pendingInitializerEvents.size());
+
+ // Events should be cached since the tasks have not succeeded.
+ // Verify that events are cached
+ RootInputInitializerManager.InitializerWrapper initializerWrapper =
+ v3.rootInputInitializerManager.getInitializerWrapper("input1");
+ Assert.assertEquals(1, initializerWrapper.getFirstSuccessfulAttemptMap().size());
+ Assert.assertEquals(1, initializerWrapper.getPendingEvents().get(v1.getName()).size());
+
+
+ // Get all tasks of vertex1 to succeed.
+ for (TezTaskID taskId : v1.getTasks().keySet()) {
+ TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance(taskId, 0);
+ v1.handle( new VertexEventTaskAttemptCompleted(taskAttemptId, TaskAttemptStateInternal.SUCCEEDED));
+ v1.handle(new VertexEventTaskCompleted(taskId, TaskState.SUCCEEDED));
+ dispatcher.await();
+ v1.stateChangeNotifier.taskSucceeded(v1.getName(), taskId, taskAttemptId.getId());
+ }
+ dispatcher.await();
+
+ Assert.assertEquals(1, initializer.initializerEvents.size());
+
+
+ // Test written based on this
+ Assert.assertEquals(2, v2.getTotalTasks());
+ // Generate events from v2 to v3's initializer. 1 from task 0, 2 from task 1
+ for (Task task : v2.getTasks().values()) {
+ TezTaskID taskId = task.getTaskId();
+ TezTaskAttemptID attemptId = TezTaskAttemptID.getInstance(taskId, 0);
+ int numEventsFromTask = taskId.getId() + 1;
+ for (int i = 0; i < numEventsFromTask; i++) {
+ payload = ByteBuffer.allocate(12).putInt(0, 2).putInt(4, taskId.getId()).putInt(8, i);
+ expectedPayloads.add(payload);
+ InputInitializerEvent event2 = InputInitializerEvent.create("vertex3", "input1", payload);
+ TezEvent tezEvent2 = new TezEvent(event2,
+ new EventMetaData(EventProducerConsumerType.OUTPUT, "vertex2", "vertex3", attemptId));
+ dispatcher.getEventHandler()
+ .handle(
+ new VertexEventRouteEvent(v2.getVertexId(), Collections.singletonList(tezEvent2)));
+ dispatcher.await();
+ }
+ }
+
+ // Validate queueing of these events
+ // Only v2 events pending
+ Assert.assertEquals(1, initializerWrapper.getPendingEvents().keySet().size());
+ // 3 events pending
+ Assert.assertEquals(3, initializerWrapper.getPendingEvents().get(v2.getName()).size());
+
+ // Get all tasks of vertex1 to succeed.
+ for (TezTaskID taskId : v2.getTasks().keySet()) {
+ TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance(taskId, 0);
+ v2.handle( new VertexEventTaskAttemptCompleted(taskAttemptId, TaskAttemptStateInternal.SUCCEEDED));
+ v2.handle(new VertexEventTaskCompleted(taskId, TaskState.SUCCEEDED));
+ dispatcher.await();
+ v2.stateChangeNotifier.taskSucceeded(v2.getName(), taskId, taskAttemptId.getId());
+ }
+ dispatcher.await();
+
+ // v3 would have processed an INIT event and moved into INITIALIZING state.
+ // Since source tasks were complete - the events should have been consumed.
+ // Initializer would have run, and processed events.
+ while (v3.getState() != VertexState.RUNNING) {
+ Thread.sleep(10);
+ }
+ Assert.assertEquals(VertexState.RUNNING, v3.getState());
+
+ Assert.assertEquals(4, initializer.initializerEvents.size());
+ Assert.assertTrue(initializer.initComplete.get());
+
+ Assert.assertEquals(2, initializerWrapper.getFirstSuccessfulAttemptMap().size());
+ Assert.assertEquals(0, initializerWrapper.getPendingEvents().get(v1.getName()).size());
+
+ for (InputInitializerEvent initializerEvent : initializer.initializerEvents) {
+ expectedPayloads.remove(initializerEvent.getUserPayload());
+ }
+ Assert.assertEquals(0, expectedPayloads.size());
+
+ }
+
+ @Test(timeout = 10000)
+ public void testInputInitializerEventNoDirectConnection() throws Exception {
+ useCustomInitializer = true;
+ customInitializer = new EventHandlingRootInputInitializer(null);
+ EventHandlingRootInputInitializer initializer =
+ (EventHandlingRootInputInitializer) customInitializer;
+ setupPreDagCreation();
+ dagPlan = createDAGPlanWithRunningInitializer4();
+ setupPostDagCreation();
+
+ VertexImplWithRunningInputInitializer v1 =
+ (VertexImplWithRunningInputInitializer) vertices.get("vertex1");
+ VertexImplWithRunningInputInitializer v2 =
+ (VertexImplWithRunningInputInitializer) vertices.get("vertex2");
+ VertexImplWithRunningInputInitializer v3 =
+ (VertexImplWithRunningInputInitializer) vertices.get("vertex3");
+
+ initVertex(v1);
+ startVertex(v1);
+ dispatcher.await();
+
+ // Vertex1 start should trigger downstream vertices
+ Assert.assertEquals(VertexState.RUNNING, v1.getState());
+ Assert.assertEquals(VertexState.RUNNING, v2.getState());
+ Assert.assertEquals(VertexState.INITIALIZING, v3.getState());
+
+ // Genrate events from v1 to v3's InputInitializer
+ InputInitializerEvent event = InputInitializerEvent.create("vertex3", "input1", null);
+ // Create taskId and taskAttemptId for the single task that exists in vertex1
+ TezTaskID t0_v1 = TezTaskID.getInstance(v1.getVertexId(), 0);
+ TezTaskAttemptID ta0_t0_v1 = TezTaskAttemptID.getInstance(t0_v1, 0);
+ TezEvent tezEvent = new TezEvent(event,
+ new EventMetaData(EventProducerConsumerType.OUTPUT, "vertex1", "vertex3", ta0_t0_v1));
+
+ dispatcher.getEventHandler()
+ .handle(new VertexEventRouteEvent(v1.getVertexId(), Collections.singletonList(tezEvent)));
+ dispatcher.await();
+
+ // Events should not be cached in the vertex, since the initializer is running
+ Assert.assertEquals(0, v3.pendingInitializerEvents.size());
+
+ // Events should be cached since the tasks have not succeeded.
+ // Verify that events are cached
+ RootInputInitializerManager.InitializerWrapper initializerWrapper =
+ v3.rootInputInitializerManager.getInitializerWrapper("input1");
+ Assert.assertEquals(1, initializerWrapper.getFirstSuccessfulAttemptMap().size());
+ Assert.assertEquals(1, initializerWrapper.getPendingEvents().get(v1.getName()).size());
+
+ // Get all tasks of vertex1 to succeed.
+ for (TezTaskID taskId : v1.getTasks().keySet()) {
+ TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance(taskId, 0);
+ v1.handle( new VertexEventTaskAttemptCompleted(taskAttemptId, TaskAttemptStateInternal.SUCCEEDED));
+ v1.handle(new VertexEventTaskCompleted(taskId, TaskState.SUCCEEDED));
+ dispatcher.await();
+ v1.stateChangeNotifier.taskSucceeded(v1.getName(), taskId, taskAttemptId.getId());
+ }
+ dispatcher.await();
+
+ // v3 would have processed an INIT event and moved into INITIALIZING state.
+ // Since source tasks were complete - the events should have been consumed.
+ // Initializer would have run, and processed events.
+ while (v3.getState() != VertexState.RUNNING) {
+ Thread.sleep(10);
+ }
+
+ Assert.assertEquals(VertexState.RUNNING, v3.getState());
+
+ Assert.assertEquals(1, initializerWrapper.getFirstSuccessfulAttemptMap().size());
+ Assert.assertEquals(0, initializerWrapper.getPendingEvents().get(v1.getName()).size());
+
+ Assert.assertTrue(initializer.eventReceived.get());
+ Assert.assertEquals(2, initializer.stateUpdates.size());
Assert.assertEquals(org.apache.tez.dag.api.event.VertexState.RUNNING,
- initializer.stateUpdateEvents.get(0).getVertexState());
+ initializer.stateUpdates.get(0).getVertexState());
Assert.assertEquals(org.apache.tez.dag.api.event.VertexState.SUCCEEDED,
- initializer.stateUpdateEvents.get(1).getVertexState());
+ initializer.stateUpdates.get(1).getVertexState());
+ }
+
+ @Test(timeout = 10000)
+ public void testInputInitializerEventsAtNew() throws Exception {
+ useCustomInitializer = true;
+ customInitializer = new EventHandlingRootInputInitializer(null);
+ EventHandlingRootInputInitializer initializer =
+ (EventHandlingRootInputInitializer) customInitializer;
+ setupPreDagCreation();
+ dagPlan = createDAGPlanWithRunningInitializer3();
+ setupPostDagCreation();
+
+ VertexImplWithRunningInputInitializer v1 =
+ (VertexImplWithRunningInputInitializer) vertices.get("vertex1");
+ VertexImplWithRunningInputInitializer v2 =
+ (VertexImplWithRunningInputInitializer) vertices.get("vertex2");
+ VertexImplWithRunningInputInitializer v3 =
+ (VertexImplWithRunningInputInitializer) vertices.get("vertex3");
+
+ initVertex(v1);
+ startVertex(v1);
+ dispatcher.await();
+
+ // Vertex2 has not been INITED, so the rest of the vertices should be in state NEW.
+ Assert.assertEquals(VertexState.RUNNING, v1.getState());
+ Assert.assertEquals(VertexState.NEW, v2.getState());
+ Assert.assertEquals(VertexState.NEW, v3.getState());
+
+ // Genrate events from v1 to v3's InputInitializer
+ InputInitializerEvent event = InputInitializerEvent.create("vertex3", "input1", null);
+ // Create taskId and taskAttemptId for the single task that exists in vertex1
+ TezTaskID t0_v1 = TezTaskID.getInstance(v1.getVertexId(), 0);
+ TezTaskAttemptID ta0_t0_v1 = TezTaskAttemptID.getInstance(t0_v1, 0);
+ TezEvent tezEvent = new TezEvent(event,
+ new EventMetaData(EventProducerConsumerType.OUTPUT, "vertex1", "vertex3", ta0_t0_v1));
+
+ dispatcher.getEventHandler()
+ .handle(new VertexEventRouteEvent(v1.getVertexId(), Collections.singletonList(tezEvent)));
+ dispatcher.await();
+
+ // Events should be cached in the vertex, since the Initializer has not started
+ Assert.assertEquals(1, v3.pendingInitializerEvents.size());
+
+ // Get Vertex1 to succeed before Vertex2 is INITED. Contrived case ? This is likely a tiny race.
+ for (TezTaskID taskId : v1.getTasks().keySet()) {
+ TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance(taskId, 0);
+ TaskImpl task = (TaskImpl)v1.getTask(taskId);
+ task.handle(new TaskEvent(taskId, TaskEventType.T_ATTEMPT_LAUNCHED));
+ task.handle(new TaskEventTAUpdate(taskAttemptId, TaskEventType.T_ATTEMPT_SUCCEEDED));
+ v1.handle(new VertexEventTaskAttemptCompleted(taskAttemptId, TaskAttemptStateInternal.SUCCEEDED));
+ v1.handle(new VertexEventTaskCompleted(taskId, TaskState.SUCCEEDED));
+ dispatcher.await();
+ v1.stateChangeNotifier.taskSucceeded(v1.getName(), taskId, taskAttemptId.getId());
+ }
+ dispatcher.await();
+
+ // Events should still be cached in the vertex
+ Assert.assertEquals(1, v3.pendingInitializerEvents.size());
+ Assert.assertEquals(VertexState.NEW, v3.getState());
+
+ // Move processing along. INIT the remaining root level vertex.
+ initVertex(v2);
+ startVertex(v2);
+ dispatcher.await();
+
+
+ // v3 would have processed an INIT event and moved into INITIALIZING state.
+ // Since source tasks were complete - the events should have been consumed.
+ // Initializer would have run, and processed events.
+ while (v3.getState() != VertexState.RUNNING) {
+ Thread.sleep(10);
+ }
+
+ Assert.assertEquals(VertexState.RUNNING, v3.getState());
+ // Events should have been cleared from the vertex.
+ Assert.assertEquals(0, v3.pendingInitializerEvents.size());
+
+ // KK Add checks to validate thte RootInputManager doesn't remember the events either
+
+ Assert.assertTrue(initializer.eventReceived.get());
+ Assert.assertEquals(2, initializer.stateUpdates.size());
+ Assert.assertEquals(org.apache.tez.dag.api.event.VertexState.RUNNING,
+ initializer.stateUpdates.get(0).getVertexState());
+ Assert.assertEquals(org.apache.tez.dag.api.event.VertexState.SUCCEEDED,
+ initializer.stateUpdates.get(1).getVertexState());
}
@SuppressWarnings("unchecked")
@Test(timeout = 10000)
- public void testRootInputInitializerEvent() throws Exception {
+ public void testInputInitializerEvents() throws Exception {
useCustomInitializer = true;
customInitializer = new EventHandlingRootInputInitializer(null);
EventHandlingRootInputInitializer initializer =
@@ -2998,15 +3576,36 @@ public class TestVertexImpl {
Assert.assertFalse(initializer.eventReceived.get());
Assert.assertFalse(initializer.initComplete.get());
+
// Signal the initializer by sending an event - via vertex1
InputInitializerEvent event = InputInitializerEvent.create("vertex2", "input1", null);
+ // Create taskId and taskAttemptId for the single task that exists in vertex1
+ TezTaskID t0_v1 = TezTaskID.getInstance(v1.getVertexId(), 0);
+ TezTaskAttemptID ta0_t0_v1 = TezTaskAttemptID.getInstance(t0_v1, 0);
TezEvent tezEvent = new TezEvent(event,
- new EventMetaData(EventProducerConsumerType.OUTPUT, "vertex1", null, null));
+ new EventMetaData(EventProducerConsumerType.OUTPUT, "vertex1", "vertex2", ta0_t0_v1));
dispatcher.getEventHandler()
.handle(new VertexEventRouteEvent(v1.getVertexId(), Collections.singletonList(tezEvent)));
dispatcher.await();
+ // Events should not be cached in the vertex, since the initializer is running
+ Assert.assertEquals(0, v2.pendingInitializerEvents.size());
+
+ // Verify that events are cached
+ RootInputInitializerManager.InitializerWrapper initializerWrapper =
+ v2.rootInputInitializerManager.getInitializerWrapper("input1");
+ Assert.assertEquals(1, initializerWrapper.getFirstSuccessfulAttemptMap().size());
+ Assert.assertEquals(1, initializerWrapper.getPendingEvents().get(v1.getName()).size());
+
+ for (TezTaskID taskId : v1.getTasks().keySet()) {
+ TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance(taskId, 0);
+ v1.handle(new VertexEventTaskAttemptCompleted(taskAttemptId, TaskAttemptStateInternal.SUCCEEDED));
+ v1.handle(new VertexEventTaskCompleted(taskId, TaskState.SUCCEEDED));
+ dispatcher.await();
+ v1.stateChangeNotifier.taskSucceeded(v1.getName(), taskId, taskAttemptId.getId());
+ }
+
// Both happening in separate threads
while (!initializer.eventReceived.get()) {
Thread.sleep(10);
@@ -3019,6 +3618,10 @@ public class TestVertexImpl {
while (v2.getState() != VertexState.RUNNING) {
Thread.sleep(10);
}
+
+ // Verify the events are no longer cached, but attempts are remembered
+ Assert.assertEquals(1, initializerWrapper.getFirstSuccessfulAttemptMap().size());
+ Assert.assertEquals(0, initializerWrapper.getPendingEvents().get(v1.getName()).size());
}
@Test(timeout = 5000)
@@ -3358,7 +3961,7 @@ public class TestVertexImpl {
VertexImplWithControlledInitializerManager v2 = (VertexImplWithControlledInitializerManager) vertices.get("vertex2");
Assert.assertEquals(VertexState.INITIALIZING, v2.getState());
- // non-task events dont get buffered
+ // non-task events don't get buffered
List<TezEvent> events = Lists.newLinkedList();
TezTaskID t0_v1 = TezTaskID.getInstance(v1.getVertexId(), 0);
TezTaskAttemptID ta0_t0_v1 = TezTaskAttemptID.getInstance(t0_v1, 0);
@@ -3954,8 +4557,10 @@ public class TestVertexImpl {
private final ReentrantLock lock = new ReentrantLock();
private final Condition eventCondition = lock.newCondition();
- private final List<VertexStateUpdate> stateUpdateEvents = new LinkedList<VertexStateUpdate>();
+ private final List<VertexStateUpdate> stateUpdates = new LinkedList<VertexStateUpdate>();
+ private final List<InputInitializerEvent> initializerEvents = new LinkedList<InputInitializerEvent>();
private volatile InputInitializerContext context;
+ private volatile int numExpectedEvents = 1;
public EventHandlingRootInputInitializer(
InputInitializerContext initializerContext) {
@@ -3968,7 +4573,9 @@ public class TestVertexImpl {
initStarted.set(true);
lock.lock();
try {
- eventCondition.await();
+ if (!eventReceived.get()) {
+ eventCondition.await();
+ }
} finally {
lock.unlock();
}
@@ -3983,12 +4590,15 @@ public class TestVertexImpl {
@Override
public void handleInputInitializerEvent(List<InputInitializerEvent> events) throws
Exception {
- eventReceived.set(true);
- lock.lock();
- try {
- eventCondition.signal();
- } finally {
- lock.unlock();
+ initializerEvents.addAll(events);
+ if (initializerEvents.size() == numExpectedEvents) {
+ eventReceived.set(true);
+ lock.lock();
+ try {
+ eventCondition.signal();
+ } finally {
+ lock.unlock();
+ }
}
}
@@ -3997,8 +4607,12 @@ public class TestVertexImpl {
this.context = context;
}
+ public void setNumExpectedEvents(int numEvents) {
+ this.numExpectedEvents = numEvents;
+ }
+
public void onVertexStateUpdated(VertexStateUpdate stateUpdate) {
- stateUpdateEvents.add(stateUpdate);
+ stateUpdates.add(stateUpdate);
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/b4580a7b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
index e2f189c..9042a93 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
@@ -62,7 +62,7 @@ import org.apache.tez.dag.app.dag.event.VertexEvent;
import org.apache.tez.dag.app.dag.event.VertexEventRecoverVertex;
import org.apache.tez.dag.app.dag.event.VertexEventType;
import org.apache.tez.dag.app.dag.impl.TestVertexImpl.CountingOutputCommitter;
-import org.apache.tez.dag.history.events.VertexDataMovementEventsGeneratedEvent;
+import org.apache.tez.dag.history.events.VertexRecoverableEventsGeneratedEvent;
import org.apache.tez.dag.history.events.VertexFinishedEvent;
import org.apache.tez.dag.history.events.VertexInitializedEvent;
import org.apache.tez.dag.history.events.VertexStartedEvent;
@@ -403,7 +403,7 @@ public class TestVertexRecovery {
public void testRecovery_New_Desired_RUNNING() {
VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1");
VertexState recoveredState =
- vertex1.restoreFromEvent(new VertexDataMovementEventsGeneratedEvent(
+ vertex1.restoreFromEvent(new VertexRecoverableEventsGeneratedEvent(
vertex1.getVertexId(), Lists.newArrayList(createTezEvent())));
assertEquals(VertexState.NEW, recoveredState);
assertEquals(1, vertex1.recoveredEvents.size());
@@ -478,7 +478,7 @@ public class TestVertexRecovery {
restoreFromInitializedEvent(vertex1);
VertexState recoveredState =
- vertex1.restoreFromEvent(new VertexDataMovementEventsGeneratedEvent(
+ vertex1.restoreFromEvent(new VertexRecoverableEventsGeneratedEvent(
vertex1.getVertexId(), Lists.newArrayList(createTezEvent())));
assertEquals(VertexState.INITED, recoveredState);
@@ -522,7 +522,7 @@ public class TestVertexRecovery {
assertEquals(startedTime, vertex1.startedTime);
recoveredState =
- vertex1.restoreFromEvent(new VertexDataMovementEventsGeneratedEvent(
+ vertex1.restoreFromEvent(new VertexRecoverableEventsGeneratedEvent(
vertex1.getVertexId(), Lists.newArrayList(createTezEvent())));
assertEquals(VertexState.RUNNING, recoveredState);
assertEquals(1, vertex1.recoveredEvents.size());
@@ -616,7 +616,7 @@ public class TestVertexRecovery {
VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3");
VertexState recoveredState =
- vertex3.restoreFromEvent(new VertexDataMovementEventsGeneratedEvent(
+ vertex3.restoreFromEvent(new VertexRecoverableEventsGeneratedEvent(
vertex3.getVertexId(), Lists.newArrayList(createTezEvent())));
assertEquals(VertexState.NEW, recoveredState);
assertEquals(1, vertex3.recoveredEvents.size());
@@ -669,7 +669,7 @@ public class TestVertexRecovery {
VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3");
VertexState recoveredState =
- vertex3.restoreFromEvent(new VertexDataMovementEventsGeneratedEvent(
+ vertex3.restoreFromEvent(new VertexRecoverableEventsGeneratedEvent(
vertex3.getVertexId(), Lists.newArrayList(createTezEvent())));
assertEquals(VertexState.NEW, recoveredState);
assertEquals(1, vertex3.recoveredEvents.size());
@@ -734,7 +734,7 @@ public class TestVertexRecovery {
VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3");
recoveredState =
- vertex3.restoreFromEvent(new VertexDataMovementEventsGeneratedEvent(
+ vertex3.restoreFromEvent(new VertexRecoverableEventsGeneratedEvent(
vertex3.getVertexId(), Lists.newArrayList(createTezEvent())));
assertEquals(VertexState.NEW, recoveredState);
assertEquals(1, vertex3.recoveredEvents.size());
@@ -814,7 +814,7 @@ public class TestVertexRecovery {
VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3");
recoveredState =
- vertex3.restoreFromEvent(new VertexDataMovementEventsGeneratedEvent(
+ vertex3.restoreFromEvent(new VertexRecoverableEventsGeneratedEvent(
vertex3.getVertexId(), Lists.newArrayList(createTezEvent())));
assertEquals(VertexState.NEW, recoveredState);
assertEquals(1, vertex3.recoveredEvents.size());
http://git-wip-us.apache.org/repos/asf/tez/blob/b4580a7b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index 5bb7d35..bcbe6f1 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -527,9 +527,9 @@ public class TestHistoryEventsProtoConversion {
}
private void testVertexDataMovementEventsGeneratedEvent() throws Exception {
- VertexDataMovementEventsGeneratedEvent event;
+ VertexRecoverableEventsGeneratedEvent event;
try {
- event = new VertexDataMovementEventsGeneratedEvent(
+ event = new VertexRecoverableEventsGeneratedEvent(
TezVertexID.getInstance(
TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 1), null);
Assert.fail("Invalid creation should have errored out");
@@ -539,11 +539,11 @@ public class TestHistoryEventsProtoConversion {
List<TezEvent> events =
Arrays.asList(new TezEvent(DataMovementEvent.create(1, null),
new EventMetaData(EventProducerConsumerType.SYSTEM, "foo", "bar", null)));
- event = new VertexDataMovementEventsGeneratedEvent(
+ event = new VertexRecoverableEventsGeneratedEvent(
TezVertexID.getInstance(
TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 1), events);
- VertexDataMovementEventsGeneratedEvent deserializedEvent =
- (VertexDataMovementEventsGeneratedEvent) testProtoConversion(event);
+ VertexRecoverableEventsGeneratedEvent deserializedEvent =
+ (VertexRecoverableEventsGeneratedEvent) testProtoConversion(event);
Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID());
Assert.assertEquals(1,
deserializedEvent.getTezEvents().size());
http://git-wip-us.apache.org/repos/asf/tez/blob/b4580a7b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
index ad39531..f674fc0 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
@@ -45,7 +45,7 @@ import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
import org.apache.tez.dag.history.events.TaskFinishedEvent;
import org.apache.tez.dag.history.events.TaskStartedEvent;
import org.apache.tez.dag.history.events.VertexCommitStartedEvent;
-import org.apache.tez.dag.history.events.VertexDataMovementEventsGeneratedEvent;
+import org.apache.tez.dag.history.events.VertexRecoverableEventsGeneratedEvent;
import org.apache.tez.dag.history.events.VertexFinishedEvent;
import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent;
import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent;
@@ -152,7 +152,7 @@ public class TestHistoryEventJsonConversion {
event = new ContainerStoppedEvent(containerId, random.nextInt(), -1, applicationAttemptId);
break;
case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
- event = new VertexDataMovementEventsGeneratedEvent();
+ event = new VertexRecoverableEventsGeneratedEvent();
break;
case DAG_COMMIT_STARTED:
event = new DAGCommitStartedEvent();
http://git-wip-us.apache.org/repos/asf/tez/blob/b4580a7b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
index 5b19e80..b04b8d4 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
@@ -45,7 +45,7 @@ import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
import org.apache.tez.dag.history.events.TaskFinishedEvent;
import org.apache.tez.dag.history.events.TaskStartedEvent;
import org.apache.tez.dag.history.events.VertexCommitStartedEvent;
-import org.apache.tez.dag.history.events.VertexDataMovementEventsGeneratedEvent;
+import org.apache.tez.dag.history.events.VertexRecoverableEventsGeneratedEvent;
import org.apache.tez.dag.history.events.VertexFinishedEvent;
import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent;
import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent;
@@ -152,7 +152,7 @@ public class TestHistoryEventTimelineConversion {
event = new ContainerStoppedEvent(containerId, random.nextInt(), -1, applicationAttemptId);
break;
case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
- event = new VertexDataMovementEventsGeneratedEvent();
+ event = new VertexRecoverableEventsGeneratedEvent();
break;
case DAG_COMMIT_STARTED:
event = new DAGCommitStartedEvent();
http://git-wip-us.apache.org/repos/asf/tez/blob/b4580a7b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
index 7676313..9595cb9 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
@@ -42,8 +42,8 @@ import org.apache.tez.dag.api.client.DAGStatus.State;
import org.apache.tez.dag.app.RecoveryParser;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.history.events.VertexDataMovementEventsGeneratedEvent;
import org.apache.tez.dag.history.events.VertexInitializedEvent;
+import org.apache.tez.dag.history.events.VertexRecoverableEventsGeneratedEvent;
import org.apache.tez.test.dag.MultiAttemptDAG;
import org.apache.tez.test.dag.MultiAttemptDAG.FailingInputInitializer;
import org.apache.tez.test.dag.MultiAttemptDAG.NoOpInput;
@@ -198,8 +198,8 @@ public class TestDAGRecovery {
+ ", eventType=" + historyEvent.getEventType()
+ ", event=" + historyEvent);
if (historyEvent.getEventType() == HistoryEventType.VERTEX_DATA_MOVEMENT_EVENTS_GENERATED) {
- VertexDataMovementEventsGeneratedEvent dmEvent =
- (VertexDataMovementEventsGeneratedEvent)historyEvent;
+ VertexRecoverableEventsGeneratedEvent dmEvent =
+ (VertexRecoverableEventsGeneratedEvent) historyEvent;
// TODO do not need to check whether it is -1 after Tez-1521 is resolved
if (dmEvent.getVertexID().getId() == 0 && inputInfoEventIndex == -1) {
inputInfoEventIndex = j;
[2/2] git commit: TEZ-1539. Change InputInitializerEvent semantics to
SEND_ONCE_ON_TASK_SUCCESS. (sseth)
Posted by ss...@apache.org.
TEZ-1539. Change InputInitializerEvent semantics to
SEND_ONCE_ON_TASK_SUCCESS. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/b4580a7b
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/b4580a7b
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/b4580a7b
Branch: refs/heads/master
Commit: b4580a7b81ab1a5619987296641ebbf50fda4c55
Parents: fb05aac
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri Sep 12 02:37:11 2014 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Sep 12 02:37:11 2014 -0700
----------------------------------------------------------------------
CHANGES.txt | 4 +
.../tez/runtime/api/InputInitializer.java | 5 +-
.../api/events/InputInitializerEvent.java | 23 +
.../org/apache/tez/dag/app/RecoveryParser.java | 8 +-
.../app/dag/RootInputInitializerManager.java | 194 +++++-
.../tez/dag/app/dag/StateChangeNotifier.java | 76 ++-
.../dag/app/dag/TaskStateUpdateListener.java | 35 +
.../apache/tez/dag/app/dag/impl/TaskImpl.java | 45 +-
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 75 ++-
.../VertexDataMovementEventsGeneratedEvent.java | 214 ------
.../VertexRecoverableEventsGeneratedEvent.java | 223 +++++++
tez-dag/src/main/proto/HistoryEvents.proto | 1 +
.../tez/dag/app/dag/impl/TestTaskImpl.java | 10 +-
.../tez/dag/app/dag/impl/TestTaskRecovery.java | 3 +-
.../tez/dag/app/dag/impl/TestVertexImpl.java | 648 ++++++++++++++++++-
.../dag/app/dag/impl/TestVertexRecovery.java | 16 +-
.../TestHistoryEventsProtoConversion.java | 10 +-
.../impl/TestHistoryEventJsonConversion.java | 4 +-
.../ats/TestHistoryEventTimelineConversion.java | 4 +-
.../org/apache/tez/test/TestDAGRecovery.java | 6 +-
20 files changed, 1290 insertions(+), 314 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/b4580a7b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f29d48d..dd20f43 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -15,6 +15,10 @@ ALL CHANGES:
Release 0.5.1: Unreleased
+INCOMPATIBLE CHANGES
+ TEZ-1539. Change InputInitializerEvent semantics to SEND_ONCE_ON_TASK_SUCCESS
+ TEZ-1488. Rename HashComparator to ProxyComparator and implement in TezBytesComparator
+
ALL CHANGES
TEZ-1494. DAG hangs waiting for ShuffleManager.getNextInput()
TEZ-1515. Remove usage of ResourceBundles in Counters.
http://git-wip-us.apache.org/repos/asf/tez/blob/b4580a7b/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializer.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializer.java b/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializer.java
index 00896de..7b22b62 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializer.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializer.java
@@ -93,7 +93,10 @@ public abstract class InputInitializer {
* State changes will be received based on the registration via {@link
* org.apache.tez.runtime.api.InputInitializerContext#registerForVertexStateUpdates(String,
* java.util.Set)}. Notifications will be received for all registered state changes, and not just
- * for the latest state update. They will be in order in which the state change occurred.
+ * for the latest state update. They will be in order in which the state change occurred. </p>
+ *
+ * Extensive processing should not be performed via this method call. Instead this should just be
+ * used as a notification mechanism to the main initialization, which is via the initialize method.
*
* @param stateUpdate an event indicating the name of the vertex, and it's updated state.
* Additional information may be available for specific events, Look at the
http://git-wip-us.apache.org/repos/asf/tez/blob/b4580a7b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputInitializerEvent.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputInitializerEvent.java b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputInitializerEvent.java
index 3c5e78e..8360447 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputInitializerEvent.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputInitializerEvent.java
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
import com.google.common.base.Preconditions;
+import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.tez.runtime.api.InputInitializer;
@@ -32,6 +33,13 @@ import org.apache.tez.runtime.api.Event;
/**
* An event that is routed to the specified {@link InputInitializer}.
* This can be used to send information/metadata to the {@link InputInitializer}
+ * <p/>
+ *
+ * These events are routed to the InputInitializer, only after the task which generated the event
+ * succeeds. Also, the events will only be sent once per task - irrespective of how many attempts
+ * were run, or succeeded. An example of this is when a task is retried because the node on which it
+ * was running failed. If the Task had succeeded once, the event would already have been sent - and
+ * will not be resent when the task reruns and succeeds. </p>
*/
@Unstable
@Public
@@ -41,6 +49,7 @@ public class InputInitializerEvent extends Event {
private String targetInputName;
private ByteBuffer eventPayload;
+ private String sourceVertexName;
private InputInitializerEvent(String targetVertexName, String targetInputName,
ByteBuffer eventPayload) {
@@ -88,4 +97,18 @@ public class InputInitializerEvent extends Event {
public ByteBuffer getUserPayload() {
return eventPayload == null ? null : eventPayload.asReadOnlyBuffer();
}
+
+ @InterfaceAudience.Private
+ public void setSourceVertexName(String srcVertexName) {
+ this.sourceVertexName = srcVertexName;
+ }
+
+ /**
+ * Returns the name of the vertex which generated the event. This will only be populated after
+ * the event has been routed by the AM.
+ * @return the name of the source vertex
+ */
+ public String getSourceVertexName() {
+ return this.sourceVertexName;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tez/blob/b4580a7b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
index 9ba5847..85851c5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
@@ -58,7 +58,7 @@ import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
import org.apache.tez.dag.history.events.TaskFinishedEvent;
import org.apache.tez.dag.history.events.TaskStartedEvent;
import org.apache.tez.dag.history.events.VertexCommitStartedEvent;
-import org.apache.tez.dag.history.events.VertexDataMovementEventsGeneratedEvent;
+import org.apache.tez.dag.history.events.VertexRecoverableEventsGeneratedEvent;
import org.apache.tez.dag.history.events.VertexFinishedEvent;
import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent;
import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent;
@@ -207,7 +207,7 @@ public class RecoveryParser {
event = new TaskAttemptFinishedEvent();
break;
case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
- event = new VertexDataMovementEventsGeneratedEvent();
+ event = new VertexRecoverableEventsGeneratedEvent();
break;
default:
throw new IOException("Invalid data found, unknown event type "
@@ -865,8 +865,8 @@ public class RecoveryParser {
+ ", eventType=" + eventType
+ ", event=" + event.toString());
assert recoveredDAGData.recoveredDAG != null;
- VertexDataMovementEventsGeneratedEvent vEvent =
- (VertexDataMovementEventsGeneratedEvent) event;
+ VertexRecoverableEventsGeneratedEvent vEvent =
+ (VertexRecoverableEventsGeneratedEvent) event;
Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
v.restoreFromEvent(vEvent);
break;
http://git-wip-us.apache.org/repos/asf/tez/blob/b4580a7b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
index 770761e..87d4eb6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
@@ -20,7 +20,10 @@ package org.apache.tez.dag.app.dag;
import javax.annotation.Nullable;
import java.security.PrivilegedExceptionAction;
+import java.util.Collection;
import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -30,10 +33,13 @@ import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.base.Preconditions;
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.common.ReflectionUtils;
@@ -43,10 +49,12 @@ import org.apache.tez.dag.api.RootInputLeafOutput;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.event.*;
import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.oldrecords.TaskState;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.dag.event.VertexEventRootInputFailed;
import org.apache.tez.dag.app.dag.event.VertexEventRootInputInitialized;
import org.apache.tez.dag.app.dag.impl.TezRootInputInitializerContextImpl;
+import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.InputInitializer;
@@ -61,6 +69,7 @@ import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.tez.runtime.api.events.InputInitializerEvent;
+import org.apache.tez.runtime.api.impl.TezEvent;
public class RootInputInitializerManager {
@@ -77,7 +86,8 @@ public class RootInputInitializerManager {
private final Vertex vertex;
private final AppContext appContext;
- private final Map<String, InitializerWrapper> initializerMap = new HashMap<String, InitializerWrapper>();
+ @VisibleForTesting
+ final Map<String, InitializerWrapper> initializerMap = new HashMap<String, InitializerWrapper>();
public RootInputInitializerManager(Vertex vertex, AppContext appContext,
UserGroupInformation dagUgi, StateChangeNotifier stateTracker) {
@@ -100,7 +110,7 @@ public class RootInputInitializerManager {
InputInitializer initializer = createInitializer(input, context);
InitializerWrapper initializerWrapper =
- new InitializerWrapper(input, initializer, context, vertex, entityStateTracker);
+ new InitializerWrapper(input, initializer, context, vertex, entityStateTracker, appContext);
initializerMap.put(input.getName(), initializerWrapper);
ListenableFuture<List<Event>> future = executor
.submit(new InputInitializerCallable(initializerWrapper, dagUgi));
@@ -117,31 +127,40 @@ public class RootInputInitializerManager {
return initializer;
}
- public void handleInitializerEvent(InputInitializerEvent event) {
- Preconditions.checkState(vertex.getName().equals(event.getTargetVertexName()),
- "Received event for incorrect vertex");
- Preconditions.checkNotNull(event.getTargetInputName(), "target input name must be set");
- InitializerWrapper initializer = initializerMap.get(event.getTargetInputName());
- Preconditions.checkState(initializer != null,
- "Received event for unknown input : " + event.getTargetInputName());
+ public void handleInitializerEvents(List<TezEvent> events) {
+ ListMultimap<InitializerWrapper, TezEvent> eventMap = LinkedListMultimap.create();
+
+ for (TezEvent tezEvent : events) {
+ Preconditions.checkState(tezEvent.getEvent() instanceof InputInitializerEvent);
+ InputInitializerEvent event = (InputInitializerEvent)tezEvent.getEvent();
+ Preconditions.checkState(vertex.getName().equals(event.getTargetVertexName()),
+ "Received event for incorrect vertex");
+ Preconditions.checkNotNull(event.getTargetInputName(), "target input name must be set");
+ InitializerWrapper initializer = initializerMap.get(event.getTargetInputName());
+ Preconditions.checkState(initializer != null,
+ "Received event for unknown input : " + event.getTargetInputName());
+ eventMap.put(initializer, tezEvent);
+ }
+
// This is a restriction based on current flow - i.e. events generated only by initialize().
// TODO Rework the flow as per the first comment on TEZ-1076
if (isStopped) {
LOG.warn("InitializerManager already stopped for " + vertex.getLogIdentifier() +
- " Dropping event. [" + event + "]");
- return;
+ " Dropping " + events.size() + " events");
}
- if (initializer.isComplete()) {
- LOG.warn(
- "Event targeted at vertex " + vertex.getLogIdentifier() + ", initializerWrapper for Input: " +
- initializer.getInput().getName() +
- " will be dropped, since Input has already been initialized. [" + event + "]");
- }
- try {
- initializer.getInitializer().handleInputInitializerEvent(Lists.newArrayList(event));
- } catch (Exception e) {
- throw new TezUncheckedException(
- "Initializer for input: " + event.getTargetInputName() + " failed to process event", e);
+
+ for (Map.Entry<InitializerWrapper, Collection<TezEvent>> entry : eventMap.asMap().entrySet()) {
+ InitializerWrapper initializerWrapper = entry.getKey();
+ if (initializerWrapper.isComplete()) {
+ LOG.warn(entry.getValue().size() +
+ " events targeted at vertex " + vertex.getLogIdentifier() +
+ ", initializerWrapper for Input: " +
+ initializerWrapper.getInput().getName() +
+ " will be dropped, since Input has already been initialized.");
+ } else {
+ initializerWrapper.handleInputInitializerEvents(entry.getValue());
+ }
+
}
}
@@ -157,7 +176,13 @@ public class RootInputInitializerManager {
protected InputInitializerCallback createInputInitializerCallback(InitializerWrapper initializer) {
return new InputInitializerCallback(initializer, eventHandler, vertex.getVertexId());
}
-
+
+ @VisibleForTesting
+ @InterfaceAudience.Private
+ public InitializerWrapper getInitializerWrapper(String inputName) {
+ return initializerMap.get(inputName);
+ }
+
public void shutdown() {
if (executor != null && !isStopped) {
// Don't really care about what is running if an error occurs. If no error
@@ -232,7 +257,9 @@ public class RootInputInitializerManager {
}
}
- private static class InitializerWrapper implements VertexStateUpdateListener {
+ @VisibleForTesting
+ @InterfaceAudience.Private
+ public static class InitializerWrapper implements VertexStateUpdateListener, TaskStateUpdateListener {
private final RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input;
@@ -242,15 +269,18 @@ public class RootInputInitializerManager {
private final String vertexLogIdentifier;
private final StateChangeNotifier stateChangeNotifier;
private final List<String> notificationRegisteredVertices = Lists.newArrayList();
+ private final AppContext appContext;
InitializerWrapper(RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input,
InputInitializer initializer, InputInitializerContext context,
- Vertex vertex, StateChangeNotifier stateChangeNotifier) {
+ Vertex vertex, StateChangeNotifier stateChangeNotifier,
+ AppContext appContext) {
this.input = input;
this.initializer = initializer;
this.context = context;
this.vertexLogIdentifier = vertex.getLogIdentifier();
this.stateChangeNotifier = stateChangeNotifier;
+ this.appContext = appContext;
}
public RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> getInput() {
@@ -272,6 +302,7 @@ public class RootInputInitializerManager {
public void setComplete() {
this.isComplete.set(true);
unregisterForVertexStatusUpdates();
+ unregisterForTaskStatusUpdates();
}
public void registerForVertexStateUpdates(String vertexName, Set<VertexState> stateSet) {
@@ -302,7 +333,118 @@ public class RootInputInitializerManager {
initializer.onVertexStateUpdated(event);
}
}
- }
+ private final Map<String, Map<Integer, Integer>> firstSuccessfulAttemptMap = new HashMap<String, Map<Integer, Integer>>();
+ private final ListMultimap<String, TezEvent> pendingEvents = LinkedListMultimap.create();
+ private final List<String> taskNotificationRegisteredVertices = Lists.newLinkedList();
+
+ @InterfaceAudience.Private
+ @VisibleForTesting
+ public Map<String, Map<Integer, Integer>> getFirstSuccessfulAttemptMap() {
+ return this.firstSuccessfulAttemptMap;
+ }
+
+ @InterfaceAudience.Private
+ @VisibleForTesting
+ public ListMultimap<String, TezEvent> getPendingEvents() {
+ return this.pendingEvents;
+ }
+
+ @Override
+ public void onTaskSucceeded(String vertexName, TezTaskID taskId, int attemptId) {
+ // Notifications will only start coming in after an event is received, which is when we register for notifications.
+ // TODO TEZ-1577. Get rid of this.
+ if (attemptId == -1) {
+ throw new TezUncheckedException(
+ "AttemptId is -1. This is likely caused by TEZ-1577; recovery not supported when InputInitializerEvents are used");
+ }
+ Map<Integer, Integer> vertexSuccessfulAttemptMap = firstSuccessfulAttemptMap.get(vertexName);
+ Integer successfulAttempt = vertexSuccessfulAttemptMap.get(taskId);
+ if (successfulAttempt == null) {
+ successfulAttempt = attemptId;
+ vertexSuccessfulAttemptMap.put(taskId.getId(), successfulAttempt);
+ }
+
+ // Run through all the pending events for this srcVertex to see if any of them need to be dispatched.
+ List<TezEvent> events = pendingEvents.get(vertexName);
+ if (events != null && !events.isEmpty()) {
+ List<InputInitializerEvent> toForwardEvents = new LinkedList<InputInitializerEvent>();
+ Iterator<TezEvent> eventIterator = events.iterator();
+ while (eventIterator.hasNext()) {
+ TezEvent tezEvent = eventIterator.next();
+ int taskIndex = tezEvent.getSourceInfo().getTaskAttemptID().getTaskID().getId();
+ int taskAttemptIndex = tezEvent.getSourceInfo().getTaskAttemptID().getId();
+ if (taskIndex == taskId.getId()) {
+ // Process only if there's a pending event for the specific succeeded task
+ if (taskAttemptIndex == successfulAttempt) {
+ toForwardEvents.add((InputInitializerEvent) tezEvent.getEvent());
+ }
+ eventIterator.remove();
+ }
+ }
+ sendEvents(toForwardEvents);
+ }
+ }
+
+ public void handleInputInitializerEvents(Collection<TezEvent> tezEvents) {
+ List<InputInitializerEvent> toForwardEvents = new LinkedList<InputInitializerEvent>();
+ for (TezEvent tezEvent : tezEvents) {
+ String srcVertexName = tezEvent.getSourceInfo().getTaskVertexName();
+ int taskIndex = tezEvent.getSourceInfo().getTaskAttemptID().getTaskID().getId();
+ int taskAttemptIndex = tezEvent.getSourceInfo().getTaskAttemptID().getId();
+
+ Map<Integer, Integer> vertexSuccessfulAttemptMap =
+ firstSuccessfulAttemptMap.get(srcVertexName);
+ if (vertexSuccessfulAttemptMap == null) {
+ vertexSuccessfulAttemptMap = new HashMap<Integer, Integer>();
+ firstSuccessfulAttemptMap.put(srcVertexName, vertexSuccessfulAttemptMap);
+ // Seen first time. Register for task updates
+ stateChangeNotifier.registerForTaskSuccessUpdates(srcVertexName, this);
+ taskNotificationRegisteredVertices.add(srcVertexName);
+ }
+
+ // Determine the successful attempt for the task
+ Integer successfulAttemptInteger = vertexSuccessfulAttemptMap.get(taskIndex);
+ if (successfulAttemptInteger == null) {
+ // Check immediately if this task has succeeded, in case the notification came in before the event
+ Vertex srcVertex = appContext.getCurrentDAG().getVertex(srcVertexName);
+ Task task = srcVertex.getTask(taskIndex);
+ if (task.getState() == TaskState.SUCCEEDED) {
+ successfulAttemptInteger = task.getSuccessfulAttempt().getID().getId();
+ vertexSuccessfulAttemptMap.put(taskIndex, successfulAttemptInteger);
+ }
+ }
+
+ if (successfulAttemptInteger == null) {
+ // Queue events and await a notification
+ pendingEvents.put(srcVertexName, tezEvent);
+ } else {
+ // Handle the event immediately.
+ if (taskAttemptIndex == successfulAttemptInteger) {
+ toForwardEvents.add((InputInitializerEvent) tezEvent.getEvent());
+ } // Otherwise the event can be dropped
+ }
+ }
+ sendEvents(toForwardEvents);
+ }
+
+ private void sendEvents(List<InputInitializerEvent> events) {
+ if (events != null && !events.isEmpty()) {
+ try {
+ initializer.handleInputInitializerEvent(events);
+ } catch (Exception e) {
+ throw new TezUncheckedException(
+ "Initializer for input: " + getInput().getName() + " on vertex: " + getVertexLogIdentifier() +
+ " failed to process events", e);
+ }
+ }
+ }
+
+ private void unregisterForTaskStatusUpdates() {
+ for (String vertexName : taskNotificationRegisteredVertices) {
+ stateChangeNotifier.unregisterForTaskSuccessUpdates(vertexName, this);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/b4580a7b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java
index 558fc61..dc18e9b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java
@@ -33,6 +33,7 @@ import com.google.common.collect.SetMultimap;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
/**
@@ -55,13 +56,11 @@ public class StateChangeNotifier {
this.lastKnowStatesMap = LinkedListMultimap.create();
}
+ // -------------- VERTEX STATE CHANGE SECTION ---------------
public void registerForVertexUpdates(String vertexName,
Set<org.apache.tez.dag.api.event.VertexState> stateSet,
VertexStateUpdateListener listener) {
- Preconditions.checkNotNull(vertexName, "VertexName cannot be null");
- Vertex vertex = dag.getVertex(vertexName);
- Preconditions.checkNotNull(vertex, "Vertex does not exist: " + vertexName);
- TezVertexID vertexId = vertex.getVertexId();
+ TezVertexID vertexId = validateAndGetVertexId(vertexName);
writeLock.lock();
// Read within the lock, to ensure a consistent view is seen.
List<VertexStateUpdate> previousUpdates = lastKnowStatesMap.get(vertexId);
@@ -88,14 +87,8 @@ public class StateChangeNotifier {
}
}
- // KKK Send out current state.
-
-
public void unregisterForVertexUpdates(String vertexName, VertexStateUpdateListener listener) {
- Preconditions.checkNotNull(vertexName, "VertexName cannot be null");
- Vertex vertex = dag.getVertex(vertexName);
- Preconditions.checkNotNull(vertex, "Vertex does not exist: " + vertexName);
- TezVertexID vertexId = vertex.getVertexId();
+ TezVertexID vertexId = validateAndGetVertexId(vertexName);
writeLock.lock();
try {
ListenerContainer listenerContainer = new ListenerContainer(listener, null);
@@ -125,6 +118,7 @@ public class StateChangeNotifier {
}
+
private static final class ListenerContainer {
final VertexStateUpdateListener listener;
final Set<org.apache.tez.dag.api.event.VertexState> states;
@@ -165,4 +159,64 @@ public class StateChangeNotifier {
return System.identityHashCode(listener);
}
}
+
+ // -------------- END OF VERTEX STATE CHANGE SECTION ---------------
+
+ // -------------- TASK STATE CHANGE SECTION ---------------
+
+ // Task updates are not buffered to avoid storing unnecessary information.
+ // Components (non user facing) which use this will receive notifications after registration.
+ // They will have to query task states, prior to registration.
+ // Currently only handling Task SUCCESS events.
+ private final SetMultimap<TezVertexID, TaskStateUpdateListener> taskListeners =
+ Multimaps.synchronizedSetMultimap(HashMultimap.<TezVertexID, TaskStateUpdateListener>create());
+ private final ReentrantReadWriteLock taskListenerLock = new ReentrantReadWriteLock();
+ private final ReentrantReadWriteLock.ReadLock taskReadLock = taskListenerLock.readLock();
+ private final ReentrantReadWriteLock.WriteLock taskWriteLock = taskListenerLock.writeLock();
+
+
+
+ public void registerForTaskSuccessUpdates(String vertexName, TaskStateUpdateListener listener) {
+ TezVertexID vertexId = validateAndGetVertexId(vertexName);
+ Preconditions.checkNotNull(listener, "listener cannot be null");
+ taskWriteLock.lock();
+ try {
+ taskListeners.put(vertexId, listener);
+ } finally {
+ taskWriteLock.unlock();
+ }
+ }
+
+ public void unregisterForTaskSuccessUpdates(String vertexName, TaskStateUpdateListener listener) {
+ TezVertexID vertexId = validateAndGetVertexId(vertexName);
+ Preconditions.checkNotNull(listener, "listener cannot be null");
+ taskWriteLock.lock();
+ try {
+ taskListeners.remove(vertexId, listener);
+ } finally {
+ taskWriteLock.unlock();
+ }
+ }
+
+ public void taskSucceeded(String vertexName, TezTaskID taskId, int attemptId) {
+ taskReadLock.lock();
+ try {
+ for (TaskStateUpdateListener listener : taskListeners.get(taskId.getVertexID())) {
+ listener.onTaskSucceeded(vertexName, taskId, attemptId);
+ }
+ } finally {
+ taskReadLock.unlock();
+ }
+ }
+
+ // -------------- END OF TASK STATE CHANGE SECTION ---------------
+
+
+ private TezVertexID validateAndGetVertexId(String vertexName) {
+ Preconditions.checkNotNull(vertexName, "VertexName cannot be null");
+ Vertex vertex = dag.getVertex(vertexName);
+ Preconditions.checkNotNull(vertex, "Vertex does not exist: " + vertexName);
+ return vertex.getVertexId();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/b4580a7b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskStateUpdateListener.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskStateUpdateListener.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskStateUpdateListener.java
new file mode 100644
index 0000000..7c86991
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskStateUpdateListener.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.records.TezTaskID;
+
+@InterfaceAudience.Private
+/**
+ * This class should not be implemented by user facing APIs such as InputInitializer
+ */
+public interface TaskStateUpdateListener {
+
+ // Internal usage only. Currently only supporting onSuccess notifications for tasks.
+ // Exposing the taskID is ok, since this isn't public
+ public void onTaskSucceeded(String vertexName, TezTaskID taskId, int attemptId);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tez/blob/b4580a7b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 1dd711b..976f10f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -29,6 +29,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import com.google.common.base.Preconditions;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -39,7 +40,6 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;
-import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.tez.common.counters.TezCounters;
@@ -53,6 +53,7 @@ import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ContainerContext;
import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
+import org.apache.tez.dag.app.dag.StateChangeNotifier;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
@@ -88,6 +89,8 @@ import org.apache.tez.runtime.api.OutputCommitter;
import org.apache.tez.runtime.api.impl.TezEvent;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.tez.state.OnStateChangedCallback;
+import org.apache.tez.state.StateMachineTez;
/**
* Implementation of Task interface.
@@ -118,6 +121,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
private final ContainerContext containerContext;
@VisibleForTesting
long scheduledTime;
+ final StateChangeNotifier stateChangeNotifier;
private final List<TezEvent> tezEventsForTaskAttempts = new ArrayList<TezEvent>();
private static final List<TezEvent> EMPTY_TASK_ATTEMPT_TEZ_EVENTS =
@@ -138,6 +142,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
// Recovery related flags
boolean recoveryStartEventSeen = false;
+ private static final TaskStateChangedCallback STATE_CHANGED_CALLBACK = new TaskStateChangedCallback();
+
private static final StateMachineFactory
<TaskImpl, TaskStateInternal, TaskEventType, TaskEvent>
stateMachineFactory
@@ -261,7 +267,13 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
// create the topology tables
.installTopology();
- private final StateMachine<TaskStateInternal, TaskEventType, TaskEvent>
+ private void augmentStateMachine() {
+ stateMachine
+ .registerStateEnteredCallback(TaskStateInternal.SUCCEEDED,
+ STATE_CHANGED_CALLBACK);
+ }
+
+ private final StateMachineTez<TaskStateInternal, TaskEventType, TaskEvent, TaskImpl>
stateMachine;
// TODO: Recovery
@@ -318,7 +330,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
TaskAttemptListener taskAttemptListener,
Clock clock, TaskHeartbeatHandler thh, AppContext appContext,
boolean leafVertex, Resource resource,
- ContainerContext containerContext) {
+ ContainerContext containerContext,
+ StateChangeNotifier stateChangeNotifier) {
this.conf = conf;
this.clock = clock;
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
@@ -333,11 +346,14 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
this.taskHeartbeatHandler = thh;
this.eventHandler = eventHandler;
this.appContext = appContext;
+ this.stateChangeNotifier = stateChangeNotifier;
this.leafVertex = leafVertex;
this.taskResource = resource;
this.containerContext = containerContext;
- stateMachine = stateMachineFactory.make(this);
+ stateMachine = new StateMachineTez<TaskStateInternal, TaskEventType, TaskEvent, TaskImpl>(
+ stateMachineFactory.make(this), this);
+ augmentStateMachine();
}
@Override
@@ -1423,6 +1439,27 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
}
}
+ private static class TaskStateChangedCallback
+ implements OnStateChangedCallback<TaskStateInternal, TaskImpl> {
+
+ @Override
+ public void onStateChanged(TaskImpl task, TaskStateInternal taskStateInternal) {
+ // Only registered for SUCCEEDED notifications at the moment
+ Preconditions.checkState(taskStateInternal == TaskStateInternal.SUCCEEDED);
+ TaskAttempt successfulAttempt = task.getSuccessfulAttempt();
+ // TODO TEZ-1577.
+ // This is a horrible hack to get around recovery issues. Without this, recovery would fail
+ // for successful vertices.
+ // With this, recovery will end up failing for DAGs making use of InputInitializerEvents
+ int succesfulAttemptInt = -1;
+ if (successfulAttempt != null) {
+ succesfulAttemptInt = successfulAttempt.getID().getId();
+ }
+ task.stateChangeNotifier.taskSucceeded(task.getVertex().getName(), task.getTaskId(),
+ succesfulAttemptInt);
+ }
+ }
+
private void killUnfinishedAttempt(TaskAttempt attempt, String logMsg) {
if (commitAttempt != null && commitAttempt.equals(attempt)) {
LOG.info("Removing commit attempt: " + commitAttempt);
http://git-wip-us.apache.org/repos/asf/tez/blob/b4580a7b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 6437e5b..594c651 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -127,7 +127,7 @@ import org.apache.tez.dag.app.dag.impl.DAGImpl.VertexGroupInfo;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.events.VertexCommitStartedEvent;
-import org.apache.tez.dag.history.events.VertexDataMovementEventsGeneratedEvent;
+import org.apache.tez.dag.history.events.VertexRecoverableEventsGeneratedEvent;
import org.apache.tez.dag.history.events.VertexFinishedEvent;
import org.apache.tez.dag.history.events.VertexInitializedEvent;
import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
@@ -244,6 +244,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
List<TezEvent> recoveredEvents = new ArrayList<TezEvent>();
private boolean vertexAlreadyInitialized = false;
+ @VisibleForTesting
+ final List<TezEvent> pendingInitializerEvents = new LinkedList<TezEvent>();
+
protected static final
StateMachineFactory<VertexImpl, VertexState, VertexEventType, VertexEvent>
stateMachineFactory
@@ -261,6 +264,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
EnumSet.of(VertexState.NEW),
VertexEventType.V_NULL_EDGE_INITIALIZED,
new NullEdgeInitializedTransition())
+ .addTransition(VertexState.NEW, VertexState.NEW,
+ VertexEventType.V_ROUTE_EVENT,
+ ROUTE_EVENT_TRANSITION)
+ .addTransition(VertexState.NEW, VertexState.NEW,
+ VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
+ SOURCE_TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION)
.addTransition
(VertexState.NEW,
EnumSet.of(VertexState.NEW, VertexState.INITED,
@@ -1079,8 +1088,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
return recoveredState;
case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
- VertexDataMovementEventsGeneratedEvent vEvent =
- (VertexDataMovementEventsGeneratedEvent) historyEvent;
+ VertexRecoverableEventsGeneratedEvent vEvent =
+ (VertexRecoverableEventsGeneratedEvent) historyEvent;
this.recoveredEvents.addAll(vEvent.getTezEvents());
return recoveredState;
default:
@@ -1774,7 +1783,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
(this.targetVertices != null ?
this.targetVertices.isEmpty() : true),
this.taskResource,
- conContext);
+ conContext,
+ this.stateChangeNotifier);
this.addTask(task);
if(LOG.isDebugEnabled()) {
LOG.debug("Created task for vertex " + logIdentifier + ": " +
@@ -2218,6 +2228,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
this.getVertexId(), Collections.singletonList(tezEvent), true));
}
continue;
+ } else if (tezEvent.getEventType() == EventType.ROOT_INPUT_INITIALIZER_EVENT) {
+ // The event has the relevant target information
+ InputInitializerEvent iiEvent = (InputInitializerEvent) tezEvent.getEvent();
+ iiEvent.setSourceVertexName(vertexName);
+ eventHandler.handle(new VertexEventRouteEvent(
+ getDAG().getVertex(iiEvent.getTargetVertexName()).getVertexId(),
+ Collections.singletonList(tezEvent), true));
+ continue;
}
Vertex destVertex = getDAG().getVertex(sourceMeta.getEdgeVertexName());
@@ -2661,6 +2679,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
+ vertex.logIdentifier + ". Starting root input initializers: "
+ vertex.inputsWithInitializers.size());
vertex.rootInputInitializerManager.runInputInitializers(inputList);
+ // Send pending rootInputInitializerEvents
+ vertex.rootInputInitializerManager.handleInitializerEvents(vertex.pendingInitializerEvents);
+ vertex.pendingInitializerEvents.clear();
return VertexState.INITIALIZING;
} else {
boolean hasOneToOneUninitedSource = false;
@@ -2706,6 +2727,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
// state. This is handled in RootInputInitializedTransition specially.
vertex.initWaitsForRootInitializers = true;
vertex.rootInputInitializerManager.runInputInitializers(inputList);
+ // Send pending rootInputInitializerEvents
+ vertex.rootInputInitializerManager.handleInitializerEvents(vertex.pendingInitializerEvents);
+ vertex.pendingInitializerEvents.clear();
return VertexState.INITIALIZING;
}
if (!vertex.uninitializedEdges.isEmpty()) {
@@ -2795,6 +2819,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
if (vertex.numInitializedInputs == vertex.inputsWithInitializers.size()) {
// All inputs initialized, shutdown the initializer.
vertex.rootInputInitializerManager.shutdown();
+ vertex.rootInputInitializerManager = null;
}
// done. check if we need to do the initialization
@@ -3064,6 +3089,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
if (vertex.rootInputInitializerManager != null) {
vertex.rootInputInitializerManager.shutdown();
+ vertex.rootInputInitializerManager = null;
}
vertex.finished(VertexState.FAILED,
VertexTerminationCause.ROOT_INPUT_INIT_FAILURE);
@@ -3102,6 +3128,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
super.transition(vertex, event);
if (vertex.rootInputInitializerManager != null) {
vertex.rootInputInitializerManager.shutdown();
+ vertex.rootInputInitializerManager = null;
}
}
}
@@ -3146,17 +3173,19 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
+ " with state: " + completionEvent.getTaskAttemptState()
+ " vertexState: " + vertex.getState());
+
if (TaskAttemptStateInternal.SUCCEEDED.equals(completionEvent
.getTaskAttemptState())) {
vertex.numSuccessSourceAttemptCompletions++;
+
if (vertex.getState() == VertexState.RUNNING) {
+ // Inform the vertex manager about the source task completing.
vertex.vertexManager.onSourceTaskCompleted(completionEvent
.getTaskAttemptId().getTaskID());
} else {
vertex.pendingReportedSrcCompletions.add(completionEvent.getTaskAttemptId());
}
}
-
}
}
@@ -3349,7 +3378,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
if (vertex.getAppContext().isRecoveryEnabled()
&& !recovered
&& !tezEvents.isEmpty()) {
- List<TezEvent> dataMovementEvents =
+ List<TezEvent> recoveryEvents =
Lists.newArrayList();
for (TezEvent tezEvent : tezEvents) {
if (!isEventFromVertex(vertex, tezEvent.getSourceInfo())) {
@@ -3357,14 +3386,15 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
if (tezEvent.getEventType().equals(EventType.COMPOSITE_DATA_MOVEMENT_EVENT)
|| tezEvent.getEventType().equals(EventType.DATA_MOVEMENT_EVENT)
- || tezEvent.getEventType().equals(EventType.ROOT_INPUT_DATA_INFORMATION_EVENT)) {
- dataMovementEvents.add(tezEvent);
+ || tezEvent.getEventType().equals(EventType.ROOT_INPUT_DATA_INFORMATION_EVENT)
+ || tezEvent.getEventType().equals(EventType.ROOT_INPUT_INITIALIZER_EVENT)) {
+ recoveryEvents.add(tezEvent);
}
}
- if (!dataMovementEvents.isEmpty()) {
- VertexDataMovementEventsGeneratedEvent historyEvent =
- new VertexDataMovementEventsGeneratedEvent(vertex.vertexId,
- dataMovementEvents);
+ if (!recoveryEvents.isEmpty()) {
+ VertexRecoverableEventsGeneratedEvent historyEvent =
+ new VertexRecoverableEventsGeneratedEvent(vertex.vertexId,
+ recoveryEvents);
vertex.appContext.getHistoryHandler().handle(
new DAGHistoryEvent(vertex.getDAGId(), historyEvent));
}
@@ -3431,6 +3461,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
break;
case VERTEX_MANAGER_EVENT:
{
+ // VM events on task success only can be changed as part of TEZ-1532
VertexManagerEvent vmEvent = (VertexManagerEvent) tezEvent.getEvent();
Vertex target = vertex.getDAG().getVertex(vmEvent.getTargetVertexName());
Preconditions.checkArgument(target != null,
@@ -3449,9 +3480,25 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
InputInitializerEvent riEvent = (InputInitializerEvent) tezEvent.getEvent();
Vertex target = vertex.getDAG().getVertex(riEvent.getTargetVertexName());
Preconditions.checkArgument(target != null,
- "Event sent to unkown vertex: " + riEvent.getTargetVertexName());
+ "Event sent to unknown vertex: " + riEvent.getTargetVertexName());
+ riEvent.setSourceVertexName(tezEvent.getSourceInfo().getTaskVertexName());
if (target == vertex) {
- vertex.rootInputInitializerManager.handleInitializerEvent(riEvent);
+ if (vertex.rootInputDescriptors == null ||
+ !vertex.rootInputDescriptors.containsKey(riEvent.getTargetInputName())) {
+ throw new TezUncheckedException(
+ "InputInitializerEvent targeted at unknown initializer on vertex " +
+ vertex.logIdentifier + ", Event=" + riEvent);
+ }
+ if (vertex.getState() == VertexState.NEW) {
+ vertex.pendingInitializerEvents.add(tezEvent);
+ } else if (vertex.getState() == VertexState.INITIALIZING) {
+ vertex.rootInputInitializerManager.handleInitializerEvents(Collections.singletonList(tezEvent));
+ } else {
+ // Currently, INITED and subsequent states means Initializer complete / failure
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Dropping event" + tezEvent + " since state is not INITIALIZING in " + vertex.getLogIdentifier() + ", state=" + vertex.getState());
+ }
+ }
} else {
checkEventSourceMetadata(vertex, sourceMeta);
vertex.eventHandler.handle(new VertexEventRouteEvent(target.getVertexId(),
http://git-wip-us.apache.org/repos/asf/tez/blob/b4580a7b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexDataMovementEventsGeneratedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexDataMovementEventsGeneratedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexDataMovementEventsGeneratedEvent.java
deleted file mode 100644
index 7ae73be..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexDataMovementEventsGeneratedEvent.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.history.events;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.EnumSet;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tez.common.ProtoConverters;
-import org.apache.tez.dag.history.HistoryEvent;
-import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.dag.recovery.records.RecoveryProtos;
-import org.apache.tez.dag.recovery.records.RecoveryProtos.TezDataMovementEventProto;
-import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexDataMovementEventsGeneratedProto;
-import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
-import org.apache.tez.runtime.api.events.DataMovementEvent;
-import org.apache.tez.runtime.api.events.InputDataInformationEvent;
-import org.apache.tez.runtime.api.impl.EventMetaData;
-import org.apache.tez.runtime.api.impl.EventType;
-import org.apache.tez.runtime.api.impl.TezEvent;
-
-import com.google.common.collect.Lists;
-
-public class VertexDataMovementEventsGeneratedEvent implements HistoryEvent {
-
- private static final Log LOG = LogFactory.getLog(
- VertexDataMovementEventsGeneratedEvent.class);
- private List<TezEvent> events;
- private TezVertexID vertexID;
-
- public VertexDataMovementEventsGeneratedEvent(TezVertexID vertexID,
- List<TezEvent> events) {
- this.vertexID = vertexID;
- this.events = Lists.newArrayListWithCapacity(events.size());
- for (TezEvent event : events) {
- if (EnumSet.of(EventType.DATA_MOVEMENT_EVENT,
- EventType.COMPOSITE_DATA_MOVEMENT_EVENT,
- EventType.ROOT_INPUT_DATA_INFORMATION_EVENT)
- .contains(event.getEventType())) {
- this.events.add(event);
- }
- }
- if (events.isEmpty()) {
- throw new RuntimeException("Invalid creation of VertexDataMovementEventsGeneratedEvent"
- + ", no data movement/information events provided");
- }
- }
-
- public VertexDataMovementEventsGeneratedEvent() {
- }
-
- @Override
- public HistoryEventType getEventType() {
- return HistoryEventType.VERTEX_DATA_MOVEMENT_EVENTS_GENERATED;
- }
-
- @Override
- public boolean isRecoveryEvent() {
- return true;
- }
-
- @Override
- public boolean isHistoryEvent() {
- return false;
- }
-
- static RecoveryProtos.EventMetaDataProto convertEventMetaDataToProto(
- EventMetaData eventMetaData) {
- RecoveryProtos.EventMetaDataProto.Builder builder =
- RecoveryProtos.EventMetaDataProto.newBuilder()
- .setProducerConsumerType(eventMetaData.getEventGenerator().ordinal())
- .setEdgeVertexName(eventMetaData.getEdgeVertexName())
- .setTaskVertexName(eventMetaData.getTaskVertexName());
- if (eventMetaData.getTaskAttemptID() != null) {
- builder.setTaskAttemptId(eventMetaData.getTaskAttemptID().toString());
- }
- return builder.build();
- }
-
- static EventMetaData convertEventMetaDataFromProto(
- RecoveryProtos.EventMetaDataProto proto) {
- TezTaskAttemptID attemptID = null;
- if (proto.hasTaskAttemptId()) {
- attemptID = TezTaskAttemptID.fromString(proto.getTaskAttemptId());
- }
- return new EventMetaData(
- EventMetaData.EventProducerConsumerType.values()[proto.getProducerConsumerType()],
- proto.getTaskVertexName(),
- proto.getEdgeVertexName(),
- attemptID);
- }
-
- public VertexDataMovementEventsGeneratedProto toProto() {
- List<TezDataMovementEventProto> tezEventProtos = null;
- if (events != null) {
- tezEventProtos = Lists.newArrayListWithCapacity(events.size());
- for (TezEvent event : events) {
- TezDataMovementEventProto.Builder evtBuilder =
- TezDataMovementEventProto.newBuilder();
- if (event.getEventType().equals(EventType.COMPOSITE_DATA_MOVEMENT_EVENT)) {
- evtBuilder.setCompositeDataMovementEvent(
- ProtoConverters.convertCompositeDataMovementEventToProto(
- (CompositeDataMovementEvent) event.getEvent()));
- } else if (event.getEventType().equals(EventType.DATA_MOVEMENT_EVENT)) {
- evtBuilder.setDataMovementEvent(
- ProtoConverters.convertDataMovementEventToProto(
- (DataMovementEvent) event.getEvent()));
- } else if (event.getEventType().equals(EventType.ROOT_INPUT_DATA_INFORMATION_EVENT)) {
- evtBuilder.setRootInputDataInformationEvent(
- ProtoConverters.convertRootInputDataInformationEventToProto(
- (InputDataInformationEvent) event.getEvent()));
- }
- if (event.getSourceInfo() != null) {
- evtBuilder.setSourceInfo(convertEventMetaDataToProto(event.getSourceInfo()));
- }
- if (event.getDestinationInfo() != null) {
- evtBuilder.setDestinationInfo(convertEventMetaDataToProto(event.getDestinationInfo()));
- }
- tezEventProtos.add(evtBuilder.build());
- }
- }
- return VertexDataMovementEventsGeneratedProto.newBuilder()
- .setVertexId(vertexID.toString())
- .addAllTezDataMovementEvent(tezEventProtos)
- .build();
- }
-
- public void fromProto(VertexDataMovementEventsGeneratedProto proto) {
- this.vertexID = TezVertexID.fromString(proto.getVertexId());
- int eventCount = proto.getTezDataMovementEventCount();
- if (eventCount > 0) {
- this.events = Lists.newArrayListWithCapacity(eventCount);
- }
- for (TezDataMovementEventProto eventProto :
- proto.getTezDataMovementEventList()) {
- Event evt = null;
- if (eventProto.hasCompositeDataMovementEvent()) {
- evt = ProtoConverters.convertCompositeDataMovementEventFromProto(
- eventProto.getCompositeDataMovementEvent());
- } else if (eventProto.hasDataMovementEvent()) {
- evt = ProtoConverters.convertDataMovementEventFromProto(
- eventProto.getDataMovementEvent());
- } else if (eventProto.hasRootInputDataInformationEvent()) {
- evt = ProtoConverters.convertRootInputDataInformationEventFromProto(
- eventProto.getRootInputDataInformationEvent());
- }
- EventMetaData sourceInfo = null;
- EventMetaData destinationInfo = null;
- if (eventProto.hasSourceInfo()) {
- sourceInfo = convertEventMetaDataFromProto(eventProto.getSourceInfo());
- }
- if (eventProto.hasDestinationInfo()) {
- destinationInfo = convertEventMetaDataFromProto(eventProto.getDestinationInfo());
- }
- TezEvent tezEvent = new TezEvent(evt, sourceInfo);
- tezEvent.setDestinationInfo(destinationInfo);
- this.events.add(tezEvent);
- }
- }
-
- @Override
- public void toProtoStream(OutputStream outputStream) throws IOException {
- toProto().writeDelimitedTo(outputStream);
- }
-
- @Override
- public void fromProtoStream(InputStream inputStream) throws IOException {
- VertexDataMovementEventsGeneratedProto proto =
- VertexDataMovementEventsGeneratedProto.parseDelimitedFrom(inputStream);
- if (proto == null) {
- throw new IOException("No data found in stream");
- }
- fromProto(proto);
- }
-
- @Override
- public String toString() {
- return "vertexId=" + vertexID.toString()
- + ", eventCount=" + (events != null ? events.size() : "null");
-
- }
-
- public TezVertexID getVertexID() {
- return this.vertexID;
- }
-
- public List<TezEvent> getTezEvents() {
- return this.events;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/b4580a7b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexRecoverableEventsGeneratedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexRecoverableEventsGeneratedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexRecoverableEventsGeneratedEvent.java
new file mode 100644
index 0000000..a9f1fd2
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexRecoverableEventsGeneratedEvent.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.events;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.EnumSet;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tez.common.ProtoConverters;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.dag.recovery.records.RecoveryProtos;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.TezDataMovementEventProto;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexDataMovementEventsGeneratedProto;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.events.InputDataInformationEvent;
+import org.apache.tez.runtime.api.events.InputInitializerEvent;
+import org.apache.tez.runtime.api.impl.EventMetaData;
+import org.apache.tez.runtime.api.impl.EventType;
+import org.apache.tez.runtime.api.impl.TezEvent;
+
+import com.google.common.collect.Lists;
+
+// TODO PreCommit - rename this to VertexRecoverableEventGeneratedEvent
+public class VertexRecoverableEventsGeneratedEvent implements HistoryEvent {
+
+ private static final Log LOG = LogFactory.getLog(
+ VertexRecoverableEventsGeneratedEvent.class);
+ private List<TezEvent> events;
+ private TezVertexID vertexID;
+
+ public VertexRecoverableEventsGeneratedEvent(TezVertexID vertexID,
+ List<TezEvent> events) {
+ this.vertexID = vertexID;
+ this.events = Lists.newArrayListWithCapacity(events.size());
+ for (TezEvent event : events) {
+ if (EnumSet.of(EventType.DATA_MOVEMENT_EVENT,
+ EventType.COMPOSITE_DATA_MOVEMENT_EVENT,
+ EventType.ROOT_INPUT_DATA_INFORMATION_EVENT,
+ EventType.ROOT_INPUT_INITIALIZER_EVENT)
+ .contains(event.getEventType())) {
+ this.events.add(event);
+ }
+ }
+ if (events.isEmpty()) {
+ throw new RuntimeException("Invalid creation of VertexDataMovementEventsGeneratedEvent"
+ + ", no data movement/information events provided");
+ }
+ }
+
+ public VertexRecoverableEventsGeneratedEvent() {
+ }
+
+ @Override
+ public HistoryEventType getEventType() {
+ return HistoryEventType.VERTEX_DATA_MOVEMENT_EVENTS_GENERATED;
+ }
+
+ @Override
+ public boolean isRecoveryEvent() {
+ return true;
+ }
+
+ @Override
+ public boolean isHistoryEvent() {
+ return false;
+ }
+
+ static RecoveryProtos.EventMetaDataProto convertEventMetaDataToProto(
+ EventMetaData eventMetaData) {
+ RecoveryProtos.EventMetaDataProto.Builder builder =
+ RecoveryProtos.EventMetaDataProto.newBuilder()
+ .setProducerConsumerType(eventMetaData.getEventGenerator().ordinal())
+ .setEdgeVertexName(eventMetaData.getEdgeVertexName())
+ .setTaskVertexName(eventMetaData.getTaskVertexName());
+ if (eventMetaData.getTaskAttemptID() != null) {
+ builder.setTaskAttemptId(eventMetaData.getTaskAttemptID().toString());
+ }
+ return builder.build();
+ }
+
+ static EventMetaData convertEventMetaDataFromProto(
+ RecoveryProtos.EventMetaDataProto proto) {
+ TezTaskAttemptID attemptID = null;
+ if (proto.hasTaskAttemptId()) {
+ attemptID = TezTaskAttemptID.fromString(proto.getTaskAttemptId());
+ }
+ return new EventMetaData(
+ EventMetaData.EventProducerConsumerType.values()[proto.getProducerConsumerType()],
+ proto.getTaskVertexName(),
+ proto.getEdgeVertexName(),
+ attemptID);
+ }
+
+ public VertexDataMovementEventsGeneratedProto toProto() {
+ List<TezDataMovementEventProto> tezEventProtos = null;
+ if (events != null) {
+ tezEventProtos = Lists.newArrayListWithCapacity(events.size());
+ for (TezEvent event : events) {
+ TezDataMovementEventProto.Builder evtBuilder =
+ TezDataMovementEventProto.newBuilder();
+ if (event.getEventType().equals(EventType.COMPOSITE_DATA_MOVEMENT_EVENT)) {
+ evtBuilder.setCompositeDataMovementEvent(
+ ProtoConverters.convertCompositeDataMovementEventToProto(
+ (CompositeDataMovementEvent) event.getEvent()));
+ } else if (event.getEventType().equals(EventType.DATA_MOVEMENT_EVENT)) {
+ evtBuilder.setDataMovementEvent(
+ ProtoConverters.convertDataMovementEventToProto(
+ (DataMovementEvent) event.getEvent()));
+ } else if (event.getEventType().equals(EventType.ROOT_INPUT_DATA_INFORMATION_EVENT)) {
+ evtBuilder.setRootInputDataInformationEvent(
+ ProtoConverters.convertRootInputDataInformationEventToProto(
+ (InputDataInformationEvent) event.getEvent()));
+ } else if (event.getEventType().equals(EventType.ROOT_INPUT_INITIALIZER_EVENT)) {
+ evtBuilder.setInputInitializerEvent(ProtoConverters
+ .convertRootInputInitializerEventToProto((InputInitializerEvent) event.getEvent()));
+ }
+ if (event.getSourceInfo() != null) {
+ evtBuilder.setSourceInfo(convertEventMetaDataToProto(event.getSourceInfo()));
+ }
+ if (event.getDestinationInfo() != null) {
+ evtBuilder.setDestinationInfo(convertEventMetaDataToProto(event.getDestinationInfo()));
+ }
+ tezEventProtos.add(evtBuilder.build());
+ }
+ }
+ return VertexDataMovementEventsGeneratedProto.newBuilder()
+ .setVertexId(vertexID.toString())
+ .addAllTezDataMovementEvent(tezEventProtos)
+ .build();
+ }
+
+ public void fromProto(VertexDataMovementEventsGeneratedProto proto) {
+ this.vertexID = TezVertexID.fromString(proto.getVertexId());
+ int eventCount = proto.getTezDataMovementEventCount();
+ if (eventCount > 0) {
+ this.events = Lists.newArrayListWithCapacity(eventCount);
+ }
+ for (TezDataMovementEventProto eventProto :
+ proto.getTezDataMovementEventList()) {
+ Event evt = null;
+ if (eventProto.hasCompositeDataMovementEvent()) {
+ evt = ProtoConverters.convertCompositeDataMovementEventFromProto(
+ eventProto.getCompositeDataMovementEvent());
+ } else if (eventProto.hasDataMovementEvent()) {
+ evt = ProtoConverters.convertDataMovementEventFromProto(
+ eventProto.getDataMovementEvent());
+ } else if (eventProto.hasRootInputDataInformationEvent()) {
+ evt = ProtoConverters.convertRootInputDataInformationEventFromProto(
+ eventProto.getRootInputDataInformationEvent());
+ } else if (eventProto.hasInputInitializerEvent()) {
+ evt = ProtoConverters.convertRootInputInitializerEventFromProto(
+ eventProto.getInputInitializerEvent());
+ }
+ EventMetaData sourceInfo = null;
+ EventMetaData destinationInfo = null;
+ if (eventProto.hasSourceInfo()) {
+ sourceInfo = convertEventMetaDataFromProto(eventProto.getSourceInfo());
+ }
+ if (eventProto.hasDestinationInfo()) {
+ destinationInfo = convertEventMetaDataFromProto(eventProto.getDestinationInfo());
+ }
+ TezEvent tezEvent = new TezEvent(evt, sourceInfo);
+ tezEvent.setDestinationInfo(destinationInfo);
+ this.events.add(tezEvent);
+ }
+ }
+
+ @Override
+ public void toProtoStream(OutputStream outputStream) throws IOException {
+ toProto().writeDelimitedTo(outputStream);
+ }
+
+ @Override
+ public void fromProtoStream(InputStream inputStream) throws IOException {
+ VertexDataMovementEventsGeneratedProto proto =
+ VertexDataMovementEventsGeneratedProto.parseDelimitedFrom(inputStream);
+ if (proto == null) {
+ throw new IOException("No data found in stream");
+ }
+ fromProto(proto);
+ }
+
+ @Override
+ public String toString() {
+ return "vertexId=" + vertexID.toString()
+ + ", eventCount=" + (events != null ? events.size() : "null");
+
+ }
+
+ public TezVertexID getVertexID() {
+ return this.vertexID;
+ }
+
+ public List<TezEvent> getTezEvents() {
+ return this.events;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/b4580a7b/tez-dag/src/main/proto/HistoryEvents.proto
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/proto/HistoryEvents.proto b/tez-dag/src/main/proto/HistoryEvents.proto
index 821612a..93f217f 100644
--- a/tez-dag/src/main/proto/HistoryEvents.proto
+++ b/tez-dag/src/main/proto/HistoryEvents.proto
@@ -180,6 +180,7 @@ message TezDataMovementEventProto {
optional DataMovementEventProto data_movement_event = 3;
optional CompositeEventProto composite_data_movement_event = 4;
optional RootInputDataInformationEventProto root_input_data_information_event = 5;
+ optional RootInputInitializerEventProto input_initializer_event = 6;
}
message VertexDataMovementEventsGeneratedProto {
http://git-wip-us.apache.org/repos/asf/tez/blob/b4580a7b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index f53643f..63cd9c5 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -25,6 +25,9 @@ import static org.junit.Assert.fail;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.eq;
import java.util.ArrayList;
import java.util.HashMap;
@@ -53,6 +56,7 @@ import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ContainerContext;
import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
+import org.apache.tez.dag.app.dag.StateChangeNotifier;
import org.apache.tez.dag.app.dag.TaskStateInternal;
import org.apache.tez.dag.app.dag.TaskTerminationCause;
import org.apache.tez.dag.app.dag.Vertex;
@@ -494,6 +498,8 @@ public class TestTaskImpl {
// The task should now have succeeded
assertTaskSucceededState();
+ verify(mockTask.stateChangeNotifier).taskSucceeded(any(String.class), eq(taskId),
+ eq(mockTask.getLastAttempt().getID().getId()));
eventHandler.events.clear();
// Now fail the attempt after it has succeeded
@@ -548,7 +554,7 @@ public class TestTaskImpl {
private List<MockTaskAttemptImpl> taskAttempts = new LinkedList<MockTaskAttemptImpl>();
private Vertex vertex;
TaskLocationHint locationHint;
-
+
public MockTaskImpl(TezVertexID vertexId, int partition,
EventHandler eventHandler, Configuration conf,
TaskAttemptListener taskAttemptListener, Clock clock,
@@ -557,7 +563,7 @@ public class TestTaskImpl {
ContainerContext containerContext, Vertex vertex) {
super(vertexId, partition, eventHandler, conf, taskAttemptListener,
clock, thh, appContext, leafVertex, resource,
- containerContext);
+ containerContext, mock(StateChangeNotifier.class));
this.vertex = vertex;
this.locationHint = locationHint;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/b4580a7b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
index c5153b6..bd13ffe 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
@@ -47,6 +47,7 @@ import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ContainerContext;
import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
+import org.apache.tez.dag.app.dag.StateChangeNotifier;
import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
import org.apache.tez.dag.app.dag.TaskStateInternal;
import org.apache.tez.dag.app.dag.Vertex;
@@ -186,7 +187,7 @@ public class TestTaskRecovery {
new Configuration(), mock(TaskAttemptListener.class),
new SystemClock(), mock(TaskHeartbeatHandler.class),
mockAppContext, false, Resource.newInstance(1, 1),
- mock(ContainerContext.class));
+ mock(ContainerContext.class), mock(StateChangeNotifier.class));
Map<String, OutputCommitter> committers =
new HashMap<String, OutputCommitter>();