You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2014/09/18 21:49:54 UTC
[09/25] git commit: TEZ-1539. Change InputInitializerEvent semantics
to SEND_ONCE_ON_TASK_SUCCESS. (sseth)
TEZ-1539. Change InputInitializerEvent semantics to
SEND_ONCE_ON_TASK_SUCCESS. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/b4580a7b
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/b4580a7b
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/b4580a7b
Branch: refs/heads/TEZ-8
Commit: b4580a7b81ab1a5619987296641ebbf50fda4c55
Parents: fb05aac
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri Sep 12 02:37:11 2014 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Sep 12 02:37:11 2014 -0700
----------------------------------------------------------------------
CHANGES.txt | 4 +
.../tez/runtime/api/InputInitializer.java | 5 +-
.../api/events/InputInitializerEvent.java | 23 +
.../org/apache/tez/dag/app/RecoveryParser.java | 8 +-
.../app/dag/RootInputInitializerManager.java | 194 +++++-
.../tez/dag/app/dag/StateChangeNotifier.java | 76 ++-
.../dag/app/dag/TaskStateUpdateListener.java | 35 +
.../apache/tez/dag/app/dag/impl/TaskImpl.java | 45 +-
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 75 ++-
.../VertexDataMovementEventsGeneratedEvent.java | 214 ------
.../VertexRecoverableEventsGeneratedEvent.java | 223 +++++++
tez-dag/src/main/proto/HistoryEvents.proto | 1 +
.../tez/dag/app/dag/impl/TestTaskImpl.java | 10 +-
.../tez/dag/app/dag/impl/TestTaskRecovery.java | 3 +-
.../tez/dag/app/dag/impl/TestVertexImpl.java | 648 ++++++++++++++++++-
.../dag/app/dag/impl/TestVertexRecovery.java | 16 +-
.../TestHistoryEventsProtoConversion.java | 10 +-
.../impl/TestHistoryEventJsonConversion.java | 4 +-
.../ats/TestHistoryEventTimelineConversion.java | 4 +-
.../org/apache/tez/test/TestDAGRecovery.java | 6 +-
20 files changed, 1290 insertions(+), 314 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/b4580a7b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f29d48d..dd20f43 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -15,6 +15,10 @@ ALL CHANGES:
Release 0.5.1: Unreleased
+INCOMPATIBLE CHANGES
+ TEZ-1539. Change InputInitializerEvent semantics to SEND_ONCE_ON_TASK_SUCCESS
+ TEZ-1488. Rename HashComparator to ProxyComparator and implement in TezBytesComparator
+
ALL CHANGES
TEZ-1494. DAG hangs waiting for ShuffleManager.getNextInput()
TEZ-1515. Remove usage of ResourceBundles in Counters.
http://git-wip-us.apache.org/repos/asf/tez/blob/b4580a7b/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializer.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializer.java b/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializer.java
index 00896de..7b22b62 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializer.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializer.java
@@ -93,7 +93,10 @@ public abstract class InputInitializer {
* State changes will be received based on the registration via {@link
* org.apache.tez.runtime.api.InputInitializerContext#registerForVertexStateUpdates(String,
* java.util.Set)}. Notifications will be received for all registered state changes, and not just
- * for the latest state update. They will be in order in which the state change occurred.
+ * for the latest state update. They will be in order in which the state change occurred. </p>
+ *
+ * Extensive processing should not be performed via this method call. Instead this should just be
+ * used as a notification mechanism to the main initialization, which is via the initialize method.
*
* @param stateUpdate an event indicating the name of the vertex, and it's updated state.
* Additional information may be available for specific events, Look at the
http://git-wip-us.apache.org/repos/asf/tez/blob/b4580a7b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputInitializerEvent.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputInitializerEvent.java b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputInitializerEvent.java
index 3c5e78e..8360447 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputInitializerEvent.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputInitializerEvent.java
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
import com.google.common.base.Preconditions;
+import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.tez.runtime.api.InputInitializer;
@@ -32,6 +33,13 @@ import org.apache.tez.runtime.api.Event;
/**
* An event that is routed to the specified {@link InputInitializer}.
* This can be used to send information/metadata to the {@link InputInitializer}
+ * <p/>
+ *
+ * These events are routed to the InputInitializer, only after the task which generated the event
+ * succeeds. Also, the events will only be sent once per task - irrespective of how many attempts
+ * were run, or succeeded. An example of this is when a task is retried because the node on which it
+ * was running failed. If the Task had succeeded once, the event would already have been sent - and
+ * will not be resent when the task reruns and succeeds. </p>
*/
@Unstable
@Public
@@ -41,6 +49,7 @@ public class InputInitializerEvent extends Event {
private String targetInputName;
private ByteBuffer eventPayload;
+ private String sourceVertexName;
private InputInitializerEvent(String targetVertexName, String targetInputName,
ByteBuffer eventPayload) {
@@ -88,4 +97,18 @@ public class InputInitializerEvent extends Event {
public ByteBuffer getUserPayload() {
return eventPayload == null ? null : eventPayload.asReadOnlyBuffer();
}
+
+ @InterfaceAudience.Private
+ public void setSourceVertexName(String srcVertexName) {
+ this.sourceVertexName = srcVertexName;
+ }
+
+ /**
+ * Returns the name of the vertex which generated the event. This will only be populated after
+ * the event has been routed by the AM.
+ * @return the name of the source vertex
+ */
+ public String getSourceVertexName() {
+ return this.sourceVertexName;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tez/blob/b4580a7b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
index 9ba5847..85851c5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
@@ -58,7 +58,7 @@ import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
import org.apache.tez.dag.history.events.TaskFinishedEvent;
import org.apache.tez.dag.history.events.TaskStartedEvent;
import org.apache.tez.dag.history.events.VertexCommitStartedEvent;
-import org.apache.tez.dag.history.events.VertexDataMovementEventsGeneratedEvent;
+import org.apache.tez.dag.history.events.VertexRecoverableEventsGeneratedEvent;
import org.apache.tez.dag.history.events.VertexFinishedEvent;
import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent;
import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent;
@@ -207,7 +207,7 @@ public class RecoveryParser {
event = new TaskAttemptFinishedEvent();
break;
case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
- event = new VertexDataMovementEventsGeneratedEvent();
+ event = new VertexRecoverableEventsGeneratedEvent();
break;
default:
throw new IOException("Invalid data found, unknown event type "
@@ -865,8 +865,8 @@ public class RecoveryParser {
+ ", eventType=" + eventType
+ ", event=" + event.toString());
assert recoveredDAGData.recoveredDAG != null;
- VertexDataMovementEventsGeneratedEvent vEvent =
- (VertexDataMovementEventsGeneratedEvent) event;
+ VertexRecoverableEventsGeneratedEvent vEvent =
+ (VertexRecoverableEventsGeneratedEvent) event;
Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
v.restoreFromEvent(vEvent);
break;
http://git-wip-us.apache.org/repos/asf/tez/blob/b4580a7b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
index 770761e..87d4eb6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
@@ -20,7 +20,10 @@ package org.apache.tez.dag.app.dag;
import javax.annotation.Nullable;
import java.security.PrivilegedExceptionAction;
+import java.util.Collection;
import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -30,10 +33,13 @@ import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.base.Preconditions;
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.common.ReflectionUtils;
@@ -43,10 +49,12 @@ import org.apache.tez.dag.api.RootInputLeafOutput;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.event.*;
import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.oldrecords.TaskState;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.dag.event.VertexEventRootInputFailed;
import org.apache.tez.dag.app.dag.event.VertexEventRootInputInitialized;
import org.apache.tez.dag.app.dag.impl.TezRootInputInitializerContextImpl;
+import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.InputInitializer;
@@ -61,6 +69,7 @@ import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.tez.runtime.api.events.InputInitializerEvent;
+import org.apache.tez.runtime.api.impl.TezEvent;
public class RootInputInitializerManager {
@@ -77,7 +86,8 @@ public class RootInputInitializerManager {
private final Vertex vertex;
private final AppContext appContext;
- private final Map<String, InitializerWrapper> initializerMap = new HashMap<String, InitializerWrapper>();
+ @VisibleForTesting
+ final Map<String, InitializerWrapper> initializerMap = new HashMap<String, InitializerWrapper>();
public RootInputInitializerManager(Vertex vertex, AppContext appContext,
UserGroupInformation dagUgi, StateChangeNotifier stateTracker) {
@@ -100,7 +110,7 @@ public class RootInputInitializerManager {
InputInitializer initializer = createInitializer(input, context);
InitializerWrapper initializerWrapper =
- new InitializerWrapper(input, initializer, context, vertex, entityStateTracker);
+ new InitializerWrapper(input, initializer, context, vertex, entityStateTracker, appContext);
initializerMap.put(input.getName(), initializerWrapper);
ListenableFuture<List<Event>> future = executor
.submit(new InputInitializerCallable(initializerWrapper, dagUgi));
@@ -117,31 +127,40 @@ public class RootInputInitializerManager {
return initializer;
}
- public void handleInitializerEvent(InputInitializerEvent event) {
- Preconditions.checkState(vertex.getName().equals(event.getTargetVertexName()),
- "Received event for incorrect vertex");
- Preconditions.checkNotNull(event.getTargetInputName(), "target input name must be set");
- InitializerWrapper initializer = initializerMap.get(event.getTargetInputName());
- Preconditions.checkState(initializer != null,
- "Received event for unknown input : " + event.getTargetInputName());
+ public void handleInitializerEvents(List<TezEvent> events) {
+ ListMultimap<InitializerWrapper, TezEvent> eventMap = LinkedListMultimap.create();
+
+ for (TezEvent tezEvent : events) {
+ Preconditions.checkState(tezEvent.getEvent() instanceof InputInitializerEvent);
+ InputInitializerEvent event = (InputInitializerEvent)tezEvent.getEvent();
+ Preconditions.checkState(vertex.getName().equals(event.getTargetVertexName()),
+ "Received event for incorrect vertex");
+ Preconditions.checkNotNull(event.getTargetInputName(), "target input name must be set");
+ InitializerWrapper initializer = initializerMap.get(event.getTargetInputName());
+ Preconditions.checkState(initializer != null,
+ "Received event for unknown input : " + event.getTargetInputName());
+ eventMap.put(initializer, tezEvent);
+ }
+
// This is a restriction based on current flow - i.e. events generated only by initialize().
// TODO Rework the flow as per the first comment on TEZ-1076
if (isStopped) {
LOG.warn("InitializerManager already stopped for " + vertex.getLogIdentifier() +
- " Dropping event. [" + event + "]");
- return;
+ " Dropping " + events.size() + " events");
}
- if (initializer.isComplete()) {
- LOG.warn(
- "Event targeted at vertex " + vertex.getLogIdentifier() + ", initializerWrapper for Input: " +
- initializer.getInput().getName() +
- " will be dropped, since Input has already been initialized. [" + event + "]");
- }
- try {
- initializer.getInitializer().handleInputInitializerEvent(Lists.newArrayList(event));
- } catch (Exception e) {
- throw new TezUncheckedException(
- "Initializer for input: " + event.getTargetInputName() + " failed to process event", e);
+
+ for (Map.Entry<InitializerWrapper, Collection<TezEvent>> entry : eventMap.asMap().entrySet()) {
+ InitializerWrapper initializerWrapper = entry.getKey();
+ if (initializerWrapper.isComplete()) {
+ LOG.warn(entry.getValue().size() +
+ " events targeted at vertex " + vertex.getLogIdentifier() +
+ ", initializerWrapper for Input: " +
+ initializerWrapper.getInput().getName() +
+ " will be dropped, since Input has already been initialized.");
+ } else {
+ initializerWrapper.handleInputInitializerEvents(entry.getValue());
+ }
+
}
}
@@ -157,7 +176,13 @@ public class RootInputInitializerManager {
protected InputInitializerCallback createInputInitializerCallback(InitializerWrapper initializer) {
return new InputInitializerCallback(initializer, eventHandler, vertex.getVertexId());
}
-
+
+ @VisibleForTesting
+ @InterfaceAudience.Private
+ public InitializerWrapper getInitializerWrapper(String inputName) {
+ return initializerMap.get(inputName);
+ }
+
public void shutdown() {
if (executor != null && !isStopped) {
// Don't really care about what is running if an error occurs. If no error
@@ -232,7 +257,9 @@ public class RootInputInitializerManager {
}
}
- private static class InitializerWrapper implements VertexStateUpdateListener {
+ @VisibleForTesting
+ @InterfaceAudience.Private
+ public static class InitializerWrapper implements VertexStateUpdateListener, TaskStateUpdateListener {
private final RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input;
@@ -242,15 +269,18 @@ public class RootInputInitializerManager {
private final String vertexLogIdentifier;
private final StateChangeNotifier stateChangeNotifier;
private final List<String> notificationRegisteredVertices = Lists.newArrayList();
+ private final AppContext appContext;
InitializerWrapper(RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input,
InputInitializer initializer, InputInitializerContext context,
- Vertex vertex, StateChangeNotifier stateChangeNotifier) {
+ Vertex vertex, StateChangeNotifier stateChangeNotifier,
+ AppContext appContext) {
this.input = input;
this.initializer = initializer;
this.context = context;
this.vertexLogIdentifier = vertex.getLogIdentifier();
this.stateChangeNotifier = stateChangeNotifier;
+ this.appContext = appContext;
}
public RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> getInput() {
@@ -272,6 +302,7 @@ public class RootInputInitializerManager {
public void setComplete() {
this.isComplete.set(true);
unregisterForVertexStatusUpdates();
+ unregisterForTaskStatusUpdates();
}
public void registerForVertexStateUpdates(String vertexName, Set<VertexState> stateSet) {
@@ -302,7 +333,118 @@ public class RootInputInitializerManager {
initializer.onVertexStateUpdated(event);
}
}
- }
+ private final Map<String, Map<Integer, Integer>> firstSuccessfulAttemptMap = new HashMap<String, Map<Integer, Integer>>();
+ private final ListMultimap<String, TezEvent> pendingEvents = LinkedListMultimap.create();
+ private final List<String> taskNotificationRegisteredVertices = Lists.newLinkedList();
+
+ @InterfaceAudience.Private
+ @VisibleForTesting
+ public Map<String, Map<Integer, Integer>> getFirstSuccessfulAttemptMap() {
+ return this.firstSuccessfulAttemptMap;
+ }
+
+ @InterfaceAudience.Private
+ @VisibleForTesting
+ public ListMultimap<String, TezEvent> getPendingEvents() {
+ return this.pendingEvents;
+ }
+
+ @Override
+ public void onTaskSucceeded(String vertexName, TezTaskID taskId, int attemptId) {
+ // Notifications will only start coming in after an event is received, which is when we register for notifications.
+ // TODO TEZ-1577. Get rid of this.
+ if (attemptId == -1) {
+ throw new TezUncheckedException(
+ "AttemptId is -1. This is likely caused by TEZ-1577; recovery not supported when InputInitializerEvents are used");
+ }
+ Map<Integer, Integer> vertexSuccessfulAttemptMap = firstSuccessfulAttemptMap.get(vertexName);
+ Integer successfulAttempt = vertexSuccessfulAttemptMap.get(taskId);
+ if (successfulAttempt == null) {
+ successfulAttempt = attemptId;
+ vertexSuccessfulAttemptMap.put(taskId.getId(), successfulAttempt);
+ }
+
+ // Run through all the pending events for this srcVertex to see if any of them need to be dispatched.
+ List<TezEvent> events = pendingEvents.get(vertexName);
+ if (events != null && !events.isEmpty()) {
+ List<InputInitializerEvent> toForwardEvents = new LinkedList<InputInitializerEvent>();
+ Iterator<TezEvent> eventIterator = events.iterator();
+ while (eventIterator.hasNext()) {
+ TezEvent tezEvent = eventIterator.next();
+ int taskIndex = tezEvent.getSourceInfo().getTaskAttemptID().getTaskID().getId();
+ int taskAttemptIndex = tezEvent.getSourceInfo().getTaskAttemptID().getId();
+ if (taskIndex == taskId.getId()) {
+ // Process only if there's a pending event for the specific succeeded task
+ if (taskAttemptIndex == successfulAttempt) {
+ toForwardEvents.add((InputInitializerEvent) tezEvent.getEvent());
+ }
+ eventIterator.remove();
+ }
+ }
+ sendEvents(toForwardEvents);
+ }
+ }
+
+ public void handleInputInitializerEvents(Collection<TezEvent> tezEvents) {
+ List<InputInitializerEvent> toForwardEvents = new LinkedList<InputInitializerEvent>();
+ for (TezEvent tezEvent : tezEvents) {
+ String srcVertexName = tezEvent.getSourceInfo().getTaskVertexName();
+ int taskIndex = tezEvent.getSourceInfo().getTaskAttemptID().getTaskID().getId();
+ int taskAttemptIndex = tezEvent.getSourceInfo().getTaskAttemptID().getId();
+
+ Map<Integer, Integer> vertexSuccessfulAttemptMap =
+ firstSuccessfulAttemptMap.get(srcVertexName);
+ if (vertexSuccessfulAttemptMap == null) {
+ vertexSuccessfulAttemptMap = new HashMap<Integer, Integer>();
+ firstSuccessfulAttemptMap.put(srcVertexName, vertexSuccessfulAttemptMap);
+ // Seen first time. Register for task updates
+ stateChangeNotifier.registerForTaskSuccessUpdates(srcVertexName, this);
+ taskNotificationRegisteredVertices.add(srcVertexName);
+ }
+
+ // Determine the successful attempt for the task
+ Integer successfulAttemptInteger = vertexSuccessfulAttemptMap.get(taskIndex);
+ if (successfulAttemptInteger == null) {
+ // Check immediately if this task has succeeded, in case the notification came in before the event
+ Vertex srcVertex = appContext.getCurrentDAG().getVertex(srcVertexName);
+ Task task = srcVertex.getTask(taskIndex);
+ if (task.getState() == TaskState.SUCCEEDED) {
+ successfulAttemptInteger = task.getSuccessfulAttempt().getID().getId();
+ vertexSuccessfulAttemptMap.put(taskIndex, successfulAttemptInteger);
+ }
+ }
+
+ if (successfulAttemptInteger == null) {
+ // Queue events and await a notification
+ pendingEvents.put(srcVertexName, tezEvent);
+ } else {
+ // Handle the event immediately.
+ if (taskAttemptIndex == successfulAttemptInteger) {
+ toForwardEvents.add((InputInitializerEvent) tezEvent.getEvent());
+ } // Otherwise the event can be dropped
+ }
+ }
+ sendEvents(toForwardEvents);
+ }
+
+ private void sendEvents(List<InputInitializerEvent> events) {
+ if (events != null && !events.isEmpty()) {
+ try {
+ initializer.handleInputInitializerEvent(events);
+ } catch (Exception e) {
+ throw new TezUncheckedException(
+ "Initializer for input: " + getInput().getName() + " on vertex: " + getVertexLogIdentifier() +
+ " failed to process events", e);
+ }
+ }
+ }
+
+ private void unregisterForTaskStatusUpdates() {
+ for (String vertexName : taskNotificationRegisteredVertices) {
+ stateChangeNotifier.unregisterForTaskSuccessUpdates(vertexName, this);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/b4580a7b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java
index 558fc61..dc18e9b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java
@@ -33,6 +33,7 @@ import com.google.common.collect.SetMultimap;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
/**
@@ -55,13 +56,11 @@ public class StateChangeNotifier {
this.lastKnowStatesMap = LinkedListMultimap.create();
}
+ // -------------- VERTEX STATE CHANGE SECTION ---------------
public void registerForVertexUpdates(String vertexName,
Set<org.apache.tez.dag.api.event.VertexState> stateSet,
VertexStateUpdateListener listener) {
- Preconditions.checkNotNull(vertexName, "VertexName cannot be null");
- Vertex vertex = dag.getVertex(vertexName);
- Preconditions.checkNotNull(vertex, "Vertex does not exist: " + vertexName);
- TezVertexID vertexId = vertex.getVertexId();
+ TezVertexID vertexId = validateAndGetVertexId(vertexName);
writeLock.lock();
// Read within the lock, to ensure a consistent view is seen.
List<VertexStateUpdate> previousUpdates = lastKnowStatesMap.get(vertexId);
@@ -88,14 +87,8 @@ public class StateChangeNotifier {
}
}
- // KKK Send out current state.
-
-
public void unregisterForVertexUpdates(String vertexName, VertexStateUpdateListener listener) {
- Preconditions.checkNotNull(vertexName, "VertexName cannot be null");
- Vertex vertex = dag.getVertex(vertexName);
- Preconditions.checkNotNull(vertex, "Vertex does not exist: " + vertexName);
- TezVertexID vertexId = vertex.getVertexId();
+ TezVertexID vertexId = validateAndGetVertexId(vertexName);
writeLock.lock();
try {
ListenerContainer listenerContainer = new ListenerContainer(listener, null);
@@ -125,6 +118,7 @@ public class StateChangeNotifier {
}
+
private static final class ListenerContainer {
final VertexStateUpdateListener listener;
final Set<org.apache.tez.dag.api.event.VertexState> states;
@@ -165,4 +159,64 @@ public class StateChangeNotifier {
return System.identityHashCode(listener);
}
}
+
+ // -------------- END OF VERTEX STATE CHANGE SECTION ---------------
+
+ // -------------- TASK STATE CHANGE SECTION ---------------
+
+ // Task updates are not buffered to avoid storing unnecessary information.
+ // Components (non user facing) which use this will receive notifications after registration.
+ // They will have to query task states, prior to registration.
+ // Currently only handling Task SUCCESS events.
+ private final SetMultimap<TezVertexID, TaskStateUpdateListener> taskListeners =
+ Multimaps.synchronizedSetMultimap(HashMultimap.<TezVertexID, TaskStateUpdateListener>create());
+ private final ReentrantReadWriteLock taskListenerLock = new ReentrantReadWriteLock();
+ private final ReentrantReadWriteLock.ReadLock taskReadLock = taskListenerLock.readLock();
+ private final ReentrantReadWriteLock.WriteLock taskWriteLock = taskListenerLock.writeLock();
+
+
+
+ public void registerForTaskSuccessUpdates(String vertexName, TaskStateUpdateListener listener) {
+ TezVertexID vertexId = validateAndGetVertexId(vertexName);
+ Preconditions.checkNotNull(listener, "listener cannot be null");
+ taskWriteLock.lock();
+ try {
+ taskListeners.put(vertexId, listener);
+ } finally {
+ taskWriteLock.unlock();
+ }
+ }
+
+ public void unregisterForTaskSuccessUpdates(String vertexName, TaskStateUpdateListener listener) {
+ TezVertexID vertexId = validateAndGetVertexId(vertexName);
+ Preconditions.checkNotNull(listener, "listener cannot be null");
+ taskWriteLock.lock();
+ try {
+ taskListeners.remove(vertexId, listener);
+ } finally {
+ taskWriteLock.unlock();
+ }
+ }
+
+ public void taskSucceeded(String vertexName, TezTaskID taskId, int attemptId) {
+ taskReadLock.lock();
+ try {
+ for (TaskStateUpdateListener listener : taskListeners.get(taskId.getVertexID())) {
+ listener.onTaskSucceeded(vertexName, taskId, attemptId);
+ }
+ } finally {
+ taskReadLock.unlock();
+ }
+ }
+
+ // -------------- END OF TASK STATE CHANGE SECTION ---------------
+
+
+ private TezVertexID validateAndGetVertexId(String vertexName) {
+ Preconditions.checkNotNull(vertexName, "VertexName cannot be null");
+ Vertex vertex = dag.getVertex(vertexName);
+ Preconditions.checkNotNull(vertex, "Vertex does not exist: " + vertexName);
+ return vertex.getVertexId();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/b4580a7b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskStateUpdateListener.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskStateUpdateListener.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskStateUpdateListener.java
new file mode 100644
index 0000000..7c86991
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskStateUpdateListener.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.records.TezTaskID;
+
+@InterfaceAudience.Private
+/**
+ * This class should not be implemented by user facing APIs such as InputInitializer
+ */
+public interface TaskStateUpdateListener {
+
+ // Internal usage only. Currently only supporting onSuccess notifications for tasks.
+ // Exposing the taskID is ok, since this isn't public
+ public void onTaskSucceeded(String vertexName, TezTaskID taskId, int attemptId);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tez/blob/b4580a7b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 1dd711b..976f10f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -29,6 +29,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import com.google.common.base.Preconditions;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -39,7 +40,6 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;
-import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.tez.common.counters.TezCounters;
@@ -53,6 +53,7 @@ import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ContainerContext;
import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
+import org.apache.tez.dag.app.dag.StateChangeNotifier;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
@@ -88,6 +89,8 @@ import org.apache.tez.runtime.api.OutputCommitter;
import org.apache.tez.runtime.api.impl.TezEvent;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.tez.state.OnStateChangedCallback;
+import org.apache.tez.state.StateMachineTez;
/**
* Implementation of Task interface.
@@ -118,6 +121,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
private final ContainerContext containerContext;
@VisibleForTesting
long scheduledTime;
+ final StateChangeNotifier stateChangeNotifier;
private final List<TezEvent> tezEventsForTaskAttempts = new ArrayList<TezEvent>();
private static final List<TezEvent> EMPTY_TASK_ATTEMPT_TEZ_EVENTS =
@@ -138,6 +142,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
// Recovery related flags
boolean recoveryStartEventSeen = false;
+ private static final TaskStateChangedCallback STATE_CHANGED_CALLBACK = new TaskStateChangedCallback();
+
private static final StateMachineFactory
<TaskImpl, TaskStateInternal, TaskEventType, TaskEvent>
stateMachineFactory
@@ -261,7 +267,13 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
// create the topology tables
.installTopology();
- private final StateMachine<TaskStateInternal, TaskEventType, TaskEvent>
+ private void augmentStateMachine() {
+ stateMachine
+ .registerStateEnteredCallback(TaskStateInternal.SUCCEEDED,
+ STATE_CHANGED_CALLBACK);
+ }
+
+ private final StateMachineTez<TaskStateInternal, TaskEventType, TaskEvent, TaskImpl>
stateMachine;
// TODO: Recovery
@@ -318,7 +330,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
TaskAttemptListener taskAttemptListener,
Clock clock, TaskHeartbeatHandler thh, AppContext appContext,
boolean leafVertex, Resource resource,
- ContainerContext containerContext) {
+ ContainerContext containerContext,
+ StateChangeNotifier stateChangeNotifier) {
this.conf = conf;
this.clock = clock;
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
@@ -333,11 +346,14 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
this.taskHeartbeatHandler = thh;
this.eventHandler = eventHandler;
this.appContext = appContext;
+ this.stateChangeNotifier = stateChangeNotifier;
this.leafVertex = leafVertex;
this.taskResource = resource;
this.containerContext = containerContext;
- stateMachine = stateMachineFactory.make(this);
+ stateMachine = new StateMachineTez<TaskStateInternal, TaskEventType, TaskEvent, TaskImpl>(
+ stateMachineFactory.make(this), this);
+ augmentStateMachine();
}
@Override
@@ -1423,6 +1439,27 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
}
}
+ private static class TaskStateChangedCallback
+ implements OnStateChangedCallback<TaskStateInternal, TaskImpl> {
+
+ @Override
+ public void onStateChanged(TaskImpl task, TaskStateInternal taskStateInternal) {
+ // Only registered for SUCCEEDED notifications at the moment
+ Preconditions.checkState(taskStateInternal == TaskStateInternal.SUCCEEDED);
+ TaskAttempt successfulAttempt = task.getSuccessfulAttempt();
+ // TODO TEZ-1577.
+ // This is a horrible hack to get around recovery issues. Without this, recovery would fail
+ // for successful vertices.
+ // With this, recovery will end up failing for DAGs making use of InputInitializerEvents
+ int succesfulAttemptInt = -1;
+ if (successfulAttempt != null) {
+ succesfulAttemptInt = successfulAttempt.getID().getId();
+ }
+ task.stateChangeNotifier.taskSucceeded(task.getVertex().getName(), task.getTaskId(),
+ succesfulAttemptInt);
+ }
+ }
+
private void killUnfinishedAttempt(TaskAttempt attempt, String logMsg) {
if (commitAttempt != null && commitAttempt.equals(attempt)) {
LOG.info("Removing commit attempt: " + commitAttempt);
http://git-wip-us.apache.org/repos/asf/tez/blob/b4580a7b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 6437e5b..594c651 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -127,7 +127,7 @@ import org.apache.tez.dag.app.dag.impl.DAGImpl.VertexGroupInfo;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.events.VertexCommitStartedEvent;
-import org.apache.tez.dag.history.events.VertexDataMovementEventsGeneratedEvent;
+import org.apache.tez.dag.history.events.VertexRecoverableEventsGeneratedEvent;
import org.apache.tez.dag.history.events.VertexFinishedEvent;
import org.apache.tez.dag.history.events.VertexInitializedEvent;
import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
@@ -244,6 +244,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
List<TezEvent> recoveredEvents = new ArrayList<TezEvent>();
private boolean vertexAlreadyInitialized = false;
+ @VisibleForTesting
+ final List<TezEvent> pendingInitializerEvents = new LinkedList<TezEvent>();
+
protected static final
StateMachineFactory<VertexImpl, VertexState, VertexEventType, VertexEvent>
stateMachineFactory
@@ -261,6 +264,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
EnumSet.of(VertexState.NEW),
VertexEventType.V_NULL_EDGE_INITIALIZED,
new NullEdgeInitializedTransition())
+ .addTransition(VertexState.NEW, VertexState.NEW,
+ VertexEventType.V_ROUTE_EVENT,
+ ROUTE_EVENT_TRANSITION)
+ .addTransition(VertexState.NEW, VertexState.NEW,
+ VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
+ SOURCE_TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION)
.addTransition
(VertexState.NEW,
EnumSet.of(VertexState.NEW, VertexState.INITED,
@@ -1079,8 +1088,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
return recoveredState;
case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
- VertexDataMovementEventsGeneratedEvent vEvent =
- (VertexDataMovementEventsGeneratedEvent) historyEvent;
+ VertexRecoverableEventsGeneratedEvent vEvent =
+ (VertexRecoverableEventsGeneratedEvent) historyEvent;
this.recoveredEvents.addAll(vEvent.getTezEvents());
return recoveredState;
default:
@@ -1774,7 +1783,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
(this.targetVertices != null ?
this.targetVertices.isEmpty() : true),
this.taskResource,
- conContext);
+ conContext,
+ this.stateChangeNotifier);
this.addTask(task);
if(LOG.isDebugEnabled()) {
LOG.debug("Created task for vertex " + logIdentifier + ": " +
@@ -2218,6 +2228,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
this.getVertexId(), Collections.singletonList(tezEvent), true));
}
continue;
+ } else if (tezEvent.getEventType() == EventType.ROOT_INPUT_INITIALIZER_EVENT) {
+ // The event has the relevant target information
+ InputInitializerEvent iiEvent = (InputInitializerEvent) tezEvent.getEvent();
+ iiEvent.setSourceVertexName(vertexName);
+ eventHandler.handle(new VertexEventRouteEvent(
+ getDAG().getVertex(iiEvent.getTargetVertexName()).getVertexId(),
+ Collections.singletonList(tezEvent), true));
+ continue;
}
Vertex destVertex = getDAG().getVertex(sourceMeta.getEdgeVertexName());
@@ -2661,6 +2679,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
+ vertex.logIdentifier + ". Starting root input initializers: "
+ vertex.inputsWithInitializers.size());
vertex.rootInputInitializerManager.runInputInitializers(inputList);
+ // Send pending rootInputInitializerEvents
+ vertex.rootInputInitializerManager.handleInitializerEvents(vertex.pendingInitializerEvents);
+ vertex.pendingInitializerEvents.clear();
return VertexState.INITIALIZING;
} else {
boolean hasOneToOneUninitedSource = false;
@@ -2706,6 +2727,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
// state. This is handled in RootInputInitializedTransition specially.
vertex.initWaitsForRootInitializers = true;
vertex.rootInputInitializerManager.runInputInitializers(inputList);
+ // Send pending rootInputInitializerEvents
+ vertex.rootInputInitializerManager.handleInitializerEvents(vertex.pendingInitializerEvents);
+ vertex.pendingInitializerEvents.clear();
return VertexState.INITIALIZING;
}
if (!vertex.uninitializedEdges.isEmpty()) {
@@ -2795,6 +2819,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
if (vertex.numInitializedInputs == vertex.inputsWithInitializers.size()) {
// All inputs initialized, shutdown the initializer.
vertex.rootInputInitializerManager.shutdown();
+ vertex.rootInputInitializerManager = null;
}
// done. check if we need to do the initialization
@@ -3064,6 +3089,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
if (vertex.rootInputInitializerManager != null) {
vertex.rootInputInitializerManager.shutdown();
+ vertex.rootInputInitializerManager = null;
}
vertex.finished(VertexState.FAILED,
VertexTerminationCause.ROOT_INPUT_INIT_FAILURE);
@@ -3102,6 +3128,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
super.transition(vertex, event);
if (vertex.rootInputInitializerManager != null) {
vertex.rootInputInitializerManager.shutdown();
+ vertex.rootInputInitializerManager = null;
}
}
}
@@ -3146,17 +3173,19 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
+ " with state: " + completionEvent.getTaskAttemptState()
+ " vertexState: " + vertex.getState());
+
if (TaskAttemptStateInternal.SUCCEEDED.equals(completionEvent
.getTaskAttemptState())) {
vertex.numSuccessSourceAttemptCompletions++;
+
if (vertex.getState() == VertexState.RUNNING) {
+ // Inform the vertex manager about the source task completing.
vertex.vertexManager.onSourceTaskCompleted(completionEvent
.getTaskAttemptId().getTaskID());
} else {
vertex.pendingReportedSrcCompletions.add(completionEvent.getTaskAttemptId());
}
}
-
}
}
@@ -3349,7 +3378,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
if (vertex.getAppContext().isRecoveryEnabled()
&& !recovered
&& !tezEvents.isEmpty()) {
- List<TezEvent> dataMovementEvents =
+ List<TezEvent> recoveryEvents =
Lists.newArrayList();
for (TezEvent tezEvent : tezEvents) {
if (!isEventFromVertex(vertex, tezEvent.getSourceInfo())) {
@@ -3357,14 +3386,15 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
if (tezEvent.getEventType().equals(EventType.COMPOSITE_DATA_MOVEMENT_EVENT)
|| tezEvent.getEventType().equals(EventType.DATA_MOVEMENT_EVENT)
- || tezEvent.getEventType().equals(EventType.ROOT_INPUT_DATA_INFORMATION_EVENT)) {
- dataMovementEvents.add(tezEvent);
+ || tezEvent.getEventType().equals(EventType.ROOT_INPUT_DATA_INFORMATION_EVENT)
+ || tezEvent.getEventType().equals(EventType.ROOT_INPUT_INITIALIZER_EVENT)) {
+ recoveryEvents.add(tezEvent);
}
}
- if (!dataMovementEvents.isEmpty()) {
- VertexDataMovementEventsGeneratedEvent historyEvent =
- new VertexDataMovementEventsGeneratedEvent(vertex.vertexId,
- dataMovementEvents);
+ if (!recoveryEvents.isEmpty()) {
+ VertexRecoverableEventsGeneratedEvent historyEvent =
+ new VertexRecoverableEventsGeneratedEvent(vertex.vertexId,
+ recoveryEvents);
vertex.appContext.getHistoryHandler().handle(
new DAGHistoryEvent(vertex.getDAGId(), historyEvent));
}
@@ -3431,6 +3461,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
break;
case VERTEX_MANAGER_EVENT:
{
+ // VM events on task success only can be changed as part of TEZ-1532
VertexManagerEvent vmEvent = (VertexManagerEvent) tezEvent.getEvent();
Vertex target = vertex.getDAG().getVertex(vmEvent.getTargetVertexName());
Preconditions.checkArgument(target != null,
@@ -3449,9 +3480,25 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
InputInitializerEvent riEvent = (InputInitializerEvent) tezEvent.getEvent();
Vertex target = vertex.getDAG().getVertex(riEvent.getTargetVertexName());
Preconditions.checkArgument(target != null,
- "Event sent to unkown vertex: " + riEvent.getTargetVertexName());
+ "Event sent to unknown vertex: " + riEvent.getTargetVertexName());
+ riEvent.setSourceVertexName(tezEvent.getSourceInfo().getTaskVertexName());
if (target == vertex) {
- vertex.rootInputInitializerManager.handleInitializerEvent(riEvent);
+ if (vertex.rootInputDescriptors == null ||
+ !vertex.rootInputDescriptors.containsKey(riEvent.getTargetInputName())) {
+ throw new TezUncheckedException(
+ "InputInitializerEvent targeted at unknown initializer on vertex " +
+ vertex.logIdentifier + ", Event=" + riEvent);
+ }
+ if (vertex.getState() == VertexState.NEW) {
+ vertex.pendingInitializerEvents.add(tezEvent);
+ } else if (vertex.getState() == VertexState.INITIALIZING) {
+ vertex.rootInputInitializerManager.handleInitializerEvents(Collections.singletonList(tezEvent));
+ } else {
+ // Currently, INITED and subsequent states means Initializer complete / failure
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Dropping event" + tezEvent + " since state is not INITIALIZING in " + vertex.getLogIdentifier() + ", state=" + vertex.getState());
+ }
+ }
} else {
checkEventSourceMetadata(vertex, sourceMeta);
vertex.eventHandler.handle(new VertexEventRouteEvent(target.getVertexId(),
http://git-wip-us.apache.org/repos/asf/tez/blob/b4580a7b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexDataMovementEventsGeneratedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexDataMovementEventsGeneratedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexDataMovementEventsGeneratedEvent.java
deleted file mode 100644
index 7ae73be..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexDataMovementEventsGeneratedEvent.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.history.events;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.EnumSet;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tez.common.ProtoConverters;
-import org.apache.tez.dag.history.HistoryEvent;
-import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.dag.recovery.records.RecoveryProtos;
-import org.apache.tez.dag.recovery.records.RecoveryProtos.TezDataMovementEventProto;
-import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexDataMovementEventsGeneratedProto;
-import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
-import org.apache.tez.runtime.api.events.DataMovementEvent;
-import org.apache.tez.runtime.api.events.InputDataInformationEvent;
-import org.apache.tez.runtime.api.impl.EventMetaData;
-import org.apache.tez.runtime.api.impl.EventType;
-import org.apache.tez.runtime.api.impl.TezEvent;
-
-import com.google.common.collect.Lists;
-
-public class VertexDataMovementEventsGeneratedEvent implements HistoryEvent {
-
- private static final Log LOG = LogFactory.getLog(
- VertexDataMovementEventsGeneratedEvent.class);
- private List<TezEvent> events;
- private TezVertexID vertexID;
-
- public VertexDataMovementEventsGeneratedEvent(TezVertexID vertexID,
- List<TezEvent> events) {
- this.vertexID = vertexID;
- this.events = Lists.newArrayListWithCapacity(events.size());
- for (TezEvent event : events) {
- if (EnumSet.of(EventType.DATA_MOVEMENT_EVENT,
- EventType.COMPOSITE_DATA_MOVEMENT_EVENT,
- EventType.ROOT_INPUT_DATA_INFORMATION_EVENT)
- .contains(event.getEventType())) {
- this.events.add(event);
- }
- }
- if (events.isEmpty()) {
- throw new RuntimeException("Invalid creation of VertexDataMovementEventsGeneratedEvent"
- + ", no data movement/information events provided");
- }
- }
-
- public VertexDataMovementEventsGeneratedEvent() {
- }
-
- @Override
- public HistoryEventType getEventType() {
- return HistoryEventType.VERTEX_DATA_MOVEMENT_EVENTS_GENERATED;
- }
-
- @Override
- public boolean isRecoveryEvent() {
- return true;
- }
-
- @Override
- public boolean isHistoryEvent() {
- return false;
- }
-
- static RecoveryProtos.EventMetaDataProto convertEventMetaDataToProto(
- EventMetaData eventMetaData) {
- RecoveryProtos.EventMetaDataProto.Builder builder =
- RecoveryProtos.EventMetaDataProto.newBuilder()
- .setProducerConsumerType(eventMetaData.getEventGenerator().ordinal())
- .setEdgeVertexName(eventMetaData.getEdgeVertexName())
- .setTaskVertexName(eventMetaData.getTaskVertexName());
- if (eventMetaData.getTaskAttemptID() != null) {
- builder.setTaskAttemptId(eventMetaData.getTaskAttemptID().toString());
- }
- return builder.build();
- }
-
- static EventMetaData convertEventMetaDataFromProto(
- RecoveryProtos.EventMetaDataProto proto) {
- TezTaskAttemptID attemptID = null;
- if (proto.hasTaskAttemptId()) {
- attemptID = TezTaskAttemptID.fromString(proto.getTaskAttemptId());
- }
- return new EventMetaData(
- EventMetaData.EventProducerConsumerType.values()[proto.getProducerConsumerType()],
- proto.getTaskVertexName(),
- proto.getEdgeVertexName(),
- attemptID);
- }
-
- public VertexDataMovementEventsGeneratedProto toProto() {
- List<TezDataMovementEventProto> tezEventProtos = null;
- if (events != null) {
- tezEventProtos = Lists.newArrayListWithCapacity(events.size());
- for (TezEvent event : events) {
- TezDataMovementEventProto.Builder evtBuilder =
- TezDataMovementEventProto.newBuilder();
- if (event.getEventType().equals(EventType.COMPOSITE_DATA_MOVEMENT_EVENT)) {
- evtBuilder.setCompositeDataMovementEvent(
- ProtoConverters.convertCompositeDataMovementEventToProto(
- (CompositeDataMovementEvent) event.getEvent()));
- } else if (event.getEventType().equals(EventType.DATA_MOVEMENT_EVENT)) {
- evtBuilder.setDataMovementEvent(
- ProtoConverters.convertDataMovementEventToProto(
- (DataMovementEvent) event.getEvent()));
- } else if (event.getEventType().equals(EventType.ROOT_INPUT_DATA_INFORMATION_EVENT)) {
- evtBuilder.setRootInputDataInformationEvent(
- ProtoConverters.convertRootInputDataInformationEventToProto(
- (InputDataInformationEvent) event.getEvent()));
- }
- if (event.getSourceInfo() != null) {
- evtBuilder.setSourceInfo(convertEventMetaDataToProto(event.getSourceInfo()));
- }
- if (event.getDestinationInfo() != null) {
- evtBuilder.setDestinationInfo(convertEventMetaDataToProto(event.getDestinationInfo()));
- }
- tezEventProtos.add(evtBuilder.build());
- }
- }
- return VertexDataMovementEventsGeneratedProto.newBuilder()
- .setVertexId(vertexID.toString())
- .addAllTezDataMovementEvent(tezEventProtos)
- .build();
- }
-
- public void fromProto(VertexDataMovementEventsGeneratedProto proto) {
- this.vertexID = TezVertexID.fromString(proto.getVertexId());
- int eventCount = proto.getTezDataMovementEventCount();
- if (eventCount > 0) {
- this.events = Lists.newArrayListWithCapacity(eventCount);
- }
- for (TezDataMovementEventProto eventProto :
- proto.getTezDataMovementEventList()) {
- Event evt = null;
- if (eventProto.hasCompositeDataMovementEvent()) {
- evt = ProtoConverters.convertCompositeDataMovementEventFromProto(
- eventProto.getCompositeDataMovementEvent());
- } else if (eventProto.hasDataMovementEvent()) {
- evt = ProtoConverters.convertDataMovementEventFromProto(
- eventProto.getDataMovementEvent());
- } else if (eventProto.hasRootInputDataInformationEvent()) {
- evt = ProtoConverters.convertRootInputDataInformationEventFromProto(
- eventProto.getRootInputDataInformationEvent());
- }
- EventMetaData sourceInfo = null;
- EventMetaData destinationInfo = null;
- if (eventProto.hasSourceInfo()) {
- sourceInfo = convertEventMetaDataFromProto(eventProto.getSourceInfo());
- }
- if (eventProto.hasDestinationInfo()) {
- destinationInfo = convertEventMetaDataFromProto(eventProto.getDestinationInfo());
- }
- TezEvent tezEvent = new TezEvent(evt, sourceInfo);
- tezEvent.setDestinationInfo(destinationInfo);
- this.events.add(tezEvent);
- }
- }
-
- @Override
- public void toProtoStream(OutputStream outputStream) throws IOException {
- toProto().writeDelimitedTo(outputStream);
- }
-
- @Override
- public void fromProtoStream(InputStream inputStream) throws IOException {
- VertexDataMovementEventsGeneratedProto proto =
- VertexDataMovementEventsGeneratedProto.parseDelimitedFrom(inputStream);
- if (proto == null) {
- throw new IOException("No data found in stream");
- }
- fromProto(proto);
- }
-
- @Override
- public String toString() {
- return "vertexId=" + vertexID.toString()
- + ", eventCount=" + (events != null ? events.size() : "null");
-
- }
-
- public TezVertexID getVertexID() {
- return this.vertexID;
- }
-
- public List<TezEvent> getTezEvents() {
- return this.events;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/b4580a7b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexRecoverableEventsGeneratedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexRecoverableEventsGeneratedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexRecoverableEventsGeneratedEvent.java
new file mode 100644
index 0000000..a9f1fd2
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexRecoverableEventsGeneratedEvent.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.events;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.EnumSet;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tez.common.ProtoConverters;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.dag.recovery.records.RecoveryProtos;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.TezDataMovementEventProto;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexDataMovementEventsGeneratedProto;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.events.InputDataInformationEvent;
+import org.apache.tez.runtime.api.events.InputInitializerEvent;
+import org.apache.tez.runtime.api.impl.EventMetaData;
+import org.apache.tez.runtime.api.impl.EventType;
+import org.apache.tez.runtime.api.impl.TezEvent;
+
+import com.google.common.collect.Lists;
+
+// TODO PreCommit - rename this to VertexRecoverableEventGeneratedEvent
+public class VertexRecoverableEventsGeneratedEvent implements HistoryEvent {
+
+ private static final Log LOG = LogFactory.getLog(
+ VertexRecoverableEventsGeneratedEvent.class);
+ private List<TezEvent> events;
+ private TezVertexID vertexID;
+
+ public VertexRecoverableEventsGeneratedEvent(TezVertexID vertexID,
+ List<TezEvent> events) {
+ this.vertexID = vertexID;
+ this.events = Lists.newArrayListWithCapacity(events.size());
+ for (TezEvent event : events) {
+ if (EnumSet.of(EventType.DATA_MOVEMENT_EVENT,
+ EventType.COMPOSITE_DATA_MOVEMENT_EVENT,
+ EventType.ROOT_INPUT_DATA_INFORMATION_EVENT,
+ EventType.ROOT_INPUT_INITIALIZER_EVENT)
+ .contains(event.getEventType())) {
+ this.events.add(event);
+ }
+ }
+ if (events.isEmpty()) {
+ throw new RuntimeException("Invalid creation of VertexDataMovementEventsGeneratedEvent"
+ + ", no data movement/information events provided");
+ }
+ }
+
+ public VertexRecoverableEventsGeneratedEvent() {
+ }
+
+ @Override
+ public HistoryEventType getEventType() {
+ return HistoryEventType.VERTEX_DATA_MOVEMENT_EVENTS_GENERATED;
+ }
+
+ @Override
+ public boolean isRecoveryEvent() {
+ return true;
+ }
+
+ @Override
+ public boolean isHistoryEvent() {
+ return false;
+ }
+
+ static RecoveryProtos.EventMetaDataProto convertEventMetaDataToProto(
+ EventMetaData eventMetaData) {
+ RecoveryProtos.EventMetaDataProto.Builder builder =
+ RecoveryProtos.EventMetaDataProto.newBuilder()
+ .setProducerConsumerType(eventMetaData.getEventGenerator().ordinal())
+ .setEdgeVertexName(eventMetaData.getEdgeVertexName())
+ .setTaskVertexName(eventMetaData.getTaskVertexName());
+ if (eventMetaData.getTaskAttemptID() != null) {
+ builder.setTaskAttemptId(eventMetaData.getTaskAttemptID().toString());
+ }
+ return builder.build();
+ }
+
+ static EventMetaData convertEventMetaDataFromProto(
+ RecoveryProtos.EventMetaDataProto proto) {
+ TezTaskAttemptID attemptID = null;
+ if (proto.hasTaskAttemptId()) {
+ attemptID = TezTaskAttemptID.fromString(proto.getTaskAttemptId());
+ }
+ return new EventMetaData(
+ EventMetaData.EventProducerConsumerType.values()[proto.getProducerConsumerType()],
+ proto.getTaskVertexName(),
+ proto.getEdgeVertexName(),
+ attemptID);
+ }
+
+ public VertexDataMovementEventsGeneratedProto toProto() {
+ List<TezDataMovementEventProto> tezEventProtos = null;
+ if (events != null) {
+ tezEventProtos = Lists.newArrayListWithCapacity(events.size());
+ for (TezEvent event : events) {
+ TezDataMovementEventProto.Builder evtBuilder =
+ TezDataMovementEventProto.newBuilder();
+ if (event.getEventType().equals(EventType.COMPOSITE_DATA_MOVEMENT_EVENT)) {
+ evtBuilder.setCompositeDataMovementEvent(
+ ProtoConverters.convertCompositeDataMovementEventToProto(
+ (CompositeDataMovementEvent) event.getEvent()));
+ } else if (event.getEventType().equals(EventType.DATA_MOVEMENT_EVENT)) {
+ evtBuilder.setDataMovementEvent(
+ ProtoConverters.convertDataMovementEventToProto(
+ (DataMovementEvent) event.getEvent()));
+ } else if (event.getEventType().equals(EventType.ROOT_INPUT_DATA_INFORMATION_EVENT)) {
+ evtBuilder.setRootInputDataInformationEvent(
+ ProtoConverters.convertRootInputDataInformationEventToProto(
+ (InputDataInformationEvent) event.getEvent()));
+ } else if (event.getEventType().equals(EventType.ROOT_INPUT_INITIALIZER_EVENT)) {
+ evtBuilder.setInputInitializerEvent(ProtoConverters
+ .convertRootInputInitializerEventToProto((InputInitializerEvent) event.getEvent()));
+ }
+ if (event.getSourceInfo() != null) {
+ evtBuilder.setSourceInfo(convertEventMetaDataToProto(event.getSourceInfo()));
+ }
+ if (event.getDestinationInfo() != null) {
+ evtBuilder.setDestinationInfo(convertEventMetaDataToProto(event.getDestinationInfo()));
+ }
+ tezEventProtos.add(evtBuilder.build());
+ }
+ }
+ return VertexDataMovementEventsGeneratedProto.newBuilder()
+ .setVertexId(vertexID.toString())
+ .addAllTezDataMovementEvent(tezEventProtos)
+ .build();
+ }
+
+ public void fromProto(VertexDataMovementEventsGeneratedProto proto) {
+ this.vertexID = TezVertexID.fromString(proto.getVertexId());
+ int eventCount = proto.getTezDataMovementEventCount();
+ if (eventCount > 0) {
+ this.events = Lists.newArrayListWithCapacity(eventCount);
+ }
+ for (TezDataMovementEventProto eventProto :
+ proto.getTezDataMovementEventList()) {
+ Event evt = null;
+ if (eventProto.hasCompositeDataMovementEvent()) {
+ evt = ProtoConverters.convertCompositeDataMovementEventFromProto(
+ eventProto.getCompositeDataMovementEvent());
+ } else if (eventProto.hasDataMovementEvent()) {
+ evt = ProtoConverters.convertDataMovementEventFromProto(
+ eventProto.getDataMovementEvent());
+ } else if (eventProto.hasRootInputDataInformationEvent()) {
+ evt = ProtoConverters.convertRootInputDataInformationEventFromProto(
+ eventProto.getRootInputDataInformationEvent());
+ } else if (eventProto.hasInputInitializerEvent()) {
+ evt = ProtoConverters.convertRootInputInitializerEventFromProto(
+ eventProto.getInputInitializerEvent());
+ }
+ EventMetaData sourceInfo = null;
+ EventMetaData destinationInfo = null;
+ if (eventProto.hasSourceInfo()) {
+ sourceInfo = convertEventMetaDataFromProto(eventProto.getSourceInfo());
+ }
+ if (eventProto.hasDestinationInfo()) {
+ destinationInfo = convertEventMetaDataFromProto(eventProto.getDestinationInfo());
+ }
+ TezEvent tezEvent = new TezEvent(evt, sourceInfo);
+ tezEvent.setDestinationInfo(destinationInfo);
+ this.events.add(tezEvent);
+ }
+ }
+
+ @Override
+ public void toProtoStream(OutputStream outputStream) throws IOException {
+ toProto().writeDelimitedTo(outputStream);
+ }
+
+ @Override
+ public void fromProtoStream(InputStream inputStream) throws IOException {
+ VertexDataMovementEventsGeneratedProto proto =
+ VertexDataMovementEventsGeneratedProto.parseDelimitedFrom(inputStream);
+ if (proto == null) {
+ throw new IOException("No data found in stream");
+ }
+ fromProto(proto);
+ }
+
+ @Override
+ public String toString() {
+ return "vertexId=" + vertexID.toString()
+ + ", eventCount=" + (events != null ? events.size() : "null");
+
+ }
+
+ public TezVertexID getVertexID() {
+ return this.vertexID;
+ }
+
+ public List<TezEvent> getTezEvents() {
+ return this.events;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/b4580a7b/tez-dag/src/main/proto/HistoryEvents.proto
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/proto/HistoryEvents.proto b/tez-dag/src/main/proto/HistoryEvents.proto
index 821612a..93f217f 100644
--- a/tez-dag/src/main/proto/HistoryEvents.proto
+++ b/tez-dag/src/main/proto/HistoryEvents.proto
@@ -180,6 +180,7 @@ message TezDataMovementEventProto {
optional DataMovementEventProto data_movement_event = 3;
optional CompositeEventProto composite_data_movement_event = 4;
optional RootInputDataInformationEventProto root_input_data_information_event = 5;
+ optional RootInputInitializerEventProto input_initializer_event = 6;
}
message VertexDataMovementEventsGeneratedProto {
http://git-wip-us.apache.org/repos/asf/tez/blob/b4580a7b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index f53643f..63cd9c5 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -25,6 +25,9 @@ import static org.junit.Assert.fail;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.eq;
import java.util.ArrayList;
import java.util.HashMap;
@@ -53,6 +56,7 @@ import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ContainerContext;
import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
+import org.apache.tez.dag.app.dag.StateChangeNotifier;
import org.apache.tez.dag.app.dag.TaskStateInternal;
import org.apache.tez.dag.app.dag.TaskTerminationCause;
import org.apache.tez.dag.app.dag.Vertex;
@@ -494,6 +498,8 @@ public class TestTaskImpl {
// The task should now have succeeded
assertTaskSucceededState();
+ verify(mockTask.stateChangeNotifier).taskSucceeded(any(String.class), eq(taskId),
+ eq(mockTask.getLastAttempt().getID().getId()));
eventHandler.events.clear();
// Now fail the attempt after it has succeeded
@@ -548,7 +554,7 @@ public class TestTaskImpl {
private List<MockTaskAttemptImpl> taskAttempts = new LinkedList<MockTaskAttemptImpl>();
private Vertex vertex;
TaskLocationHint locationHint;
-
+
public MockTaskImpl(TezVertexID vertexId, int partition,
EventHandler eventHandler, Configuration conf,
TaskAttemptListener taskAttemptListener, Clock clock,
@@ -557,7 +563,7 @@ public class TestTaskImpl {
ContainerContext containerContext, Vertex vertex) {
super(vertexId, partition, eventHandler, conf, taskAttemptListener,
clock, thh, appContext, leafVertex, resource,
- containerContext);
+ containerContext, mock(StateChangeNotifier.class));
this.vertex = vertex;
this.locationHint = locationHint;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/b4580a7b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
index c5153b6..bd13ffe 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
@@ -47,6 +47,7 @@ import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ContainerContext;
import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
+import org.apache.tez.dag.app.dag.StateChangeNotifier;
import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
import org.apache.tez.dag.app.dag.TaskStateInternal;
import org.apache.tez.dag.app.dag.Vertex;
@@ -186,7 +187,7 @@ public class TestTaskRecovery {
new Configuration(), mock(TaskAttemptListener.class),
new SystemClock(), mock(TaskHeartbeatHandler.class),
mockAppContext, false, Resource.newInstance(1, 1),
- mock(ContainerContext.class));
+ mock(ContainerContext.class), mock(StateChangeNotifier.class));
Map<String, OutputCommitter> committers =
new HashMap<String, OutputCommitter>();