You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2014/09/18 21:49:54 UTC

[09/25] git commit: TEZ-1539. Change InputInitializerEvent semantics to SEND_ONCE_ON_TASK_SUCCESS. (sseth)

TEZ-1539. Change InputInitializerEvent semantics to
SEND_ONCE_ON_TASK_SUCCESS. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/b4580a7b
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/b4580a7b
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/b4580a7b

Branch: refs/heads/TEZ-8
Commit: b4580a7b81ab1a5619987296641ebbf50fda4c55
Parents: fb05aac
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri Sep 12 02:37:11 2014 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Sep 12 02:37:11 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   4 +
 .../tez/runtime/api/InputInitializer.java       |   5 +-
 .../api/events/InputInitializerEvent.java       |  23 +
 .../org/apache/tez/dag/app/RecoveryParser.java  |   8 +-
 .../app/dag/RootInputInitializerManager.java    | 194 +++++-
 .../tez/dag/app/dag/StateChangeNotifier.java    |  76 ++-
 .../dag/app/dag/TaskStateUpdateListener.java    |  35 +
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   |  45 +-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  75 ++-
 .../VertexDataMovementEventsGeneratedEvent.java | 214 ------
 .../VertexRecoverableEventsGeneratedEvent.java  | 223 +++++++
 tez-dag/src/main/proto/HistoryEvents.proto      |   1 +
 .../tez/dag/app/dag/impl/TestTaskImpl.java      |  10 +-
 .../tez/dag/app/dag/impl/TestTaskRecovery.java  |   3 +-
 .../tez/dag/app/dag/impl/TestVertexImpl.java    | 648 ++++++++++++++++++-
 .../dag/app/dag/impl/TestVertexRecovery.java    |  16 +-
 .../TestHistoryEventsProtoConversion.java       |  10 +-
 .../impl/TestHistoryEventJsonConversion.java    |   4 +-
 .../ats/TestHistoryEventTimelineConversion.java |   4 +-
 .../org/apache/tez/test/TestDAGRecovery.java    |   6 +-
 20 files changed, 1290 insertions(+), 314 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/b4580a7b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f29d48d..dd20f43 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -15,6 +15,10 @@ ALL CHANGES:
 
 Release 0.5.1: Unreleased
 
+INCOMPATIBLE CHANGES
+  TEZ-1539. Change InputInitializerEvent semantics to SEND_ONCE_ON_TASK_SUCCESS
+  TEZ-1488. Rename HashComparator to ProxyComparator and implement in TezBytesComparator
+
 ALL CHANGES
   TEZ-1494. DAG hangs waiting for ShuffleManager.getNextInput()
   TEZ-1515. Remove usage of ResourceBundles in Counters.

http://git-wip-us.apache.org/repos/asf/tez/blob/b4580a7b/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializer.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializer.java b/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializer.java
index 00896de..7b22b62 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializer.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializer.java
@@ -93,7 +93,10 @@ public abstract class InputInitializer {
    * State changes will be received based on the registration via {@link
    * org.apache.tez.runtime.api.InputInitializerContext#registerForVertexStateUpdates(String,
    * java.util.Set)}. Notifications will be received for all registered state changes, and not just
-   * for the latest state update. They will be in order in which the state change occurred.
+   * for the latest state update. They will be in order in which the state change occurred. </p>
+   *
+   * Extensive processing should not be performed via this method call. Instead this should just be
+   * used as a notification mechanism to the main initialization, which is via the initialize method.
    *
    * @param stateUpdate an event indicating the name of the vertex, and it's updated state.
    *                    Additional information may be available for specific events, Look at the

http://git-wip-us.apache.org/repos/asf/tez/blob/b4580a7b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputInitializerEvent.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputInitializerEvent.java b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputInitializerEvent.java
index 3c5e78e..8360447 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputInitializerEvent.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputInitializerEvent.java
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
 
 import com.google.common.base.Preconditions;
 
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.tez.runtime.api.InputInitializer;
@@ -32,6 +33,13 @@ import org.apache.tez.runtime.api.Event;
 /**
  * An event that is routed to the specified {@link InputInitializer}.
  * This can be used to send information/metadata to the {@link InputInitializer}
+ * <p/>
+ *
+ * These events are routed to the InputInitializer, only after the task which generated the event
+ * succeeds. Also, the events will only be sent once per task - irrespective of how many attempts
+ * were run, or succeeded. An example of this is when a task is retried because the node on which it
+ * was running failed. If the Task had succeeded once, the event would already have been sent - and
+ * will not be resent when the task reruns and succeeds. </p>
  */
 @Unstable
 @Public
@@ -41,6 +49,7 @@ public class InputInitializerEvent extends Event {
   private String targetInputName;
 
   private ByteBuffer eventPayload;
+  private String sourceVertexName;
 
   private InputInitializerEvent(String targetVertexName, String targetInputName,
                                 ByteBuffer eventPayload) {
@@ -88,4 +97,18 @@ public class InputInitializerEvent extends Event {
   public ByteBuffer getUserPayload() {
     return eventPayload == null ? null : eventPayload.asReadOnlyBuffer();
   }
+
+  @InterfaceAudience.Private
+  public void setSourceVertexName(String srcVertexName) {
+    this.sourceVertexName = srcVertexName;
+  }
+
+  /**
+   * Returns the name of the vertex which generated the event. This will only be populated after
+   * the event has been routed by the AM.
+   * @return the name of the source vertex
+   */
+  public String getSourceVertexName() {
+    return this.sourceVertexName;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/b4580a7b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
index 9ba5847..85851c5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
@@ -58,7 +58,7 @@ import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
 import org.apache.tez.dag.history.events.TaskFinishedEvent;
 import org.apache.tez.dag.history.events.TaskStartedEvent;
 import org.apache.tez.dag.history.events.VertexCommitStartedEvent;
-import org.apache.tez.dag.history.events.VertexDataMovementEventsGeneratedEvent;
+import org.apache.tez.dag.history.events.VertexRecoverableEventsGeneratedEvent;
 import org.apache.tez.dag.history.events.VertexFinishedEvent;
 import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent;
 import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent;
@@ -207,7 +207,7 @@ public class RecoveryParser {
         event = new TaskAttemptFinishedEvent();
         break;
       case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
-        event = new VertexDataMovementEventsGeneratedEvent();
+        event = new VertexRecoverableEventsGeneratedEvent();
         break;
       default:
         throw new IOException("Invalid data found, unknown event type "
@@ -865,8 +865,8 @@ public class RecoveryParser {
               + ", eventType=" + eventType
               + ", event=" + event.toString());
           assert recoveredDAGData.recoveredDAG != null;
-          VertexDataMovementEventsGeneratedEvent vEvent =
-              (VertexDataMovementEventsGeneratedEvent) event;
+          VertexRecoverableEventsGeneratedEvent vEvent =
+              (VertexRecoverableEventsGeneratedEvent) event;
           Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
           v.restoreFromEvent(vEvent);
           break;

http://git-wip-us.apache.org/repos/asf/tez/blob/b4580a7b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
index 770761e..87d4eb6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
@@ -20,7 +20,10 @@ package org.apache.tez.dag.app.dag;
 
 import javax.annotation.Nullable;
 import java.security.PrivilegedExceptionAction;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -30,10 +33,13 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.common.ReflectionUtils;
@@ -43,10 +49,12 @@ import org.apache.tez.dag.api.RootInputLeafOutput;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.event.*;
 import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.oldrecords.TaskState;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.dag.event.VertexEventRootInputFailed;
 import org.apache.tez.dag.app.dag.event.VertexEventRootInputInitialized;
 import org.apache.tez.dag.app.dag.impl.TezRootInputInitializerContextImpl;
+import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.InputInitializer;
@@ -61,6 +69,7 @@ import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 import org.apache.tez.runtime.api.events.InputInitializerEvent;
+import org.apache.tez.runtime.api.impl.TezEvent;
 
 public class RootInputInitializerManager {
 
@@ -77,7 +86,8 @@ public class RootInputInitializerManager {
   private final Vertex vertex;
   private final AppContext appContext;
 
-  private final Map<String, InitializerWrapper> initializerMap = new HashMap<String, InitializerWrapper>();
+  @VisibleForTesting
+  final Map<String, InitializerWrapper> initializerMap = new HashMap<String, InitializerWrapper>();
 
   public RootInputInitializerManager(Vertex vertex, AppContext appContext,
                                      UserGroupInformation dagUgi, StateChangeNotifier stateTracker) {
@@ -100,7 +110,7 @@ public class RootInputInitializerManager {
       InputInitializer initializer = createInitializer(input, context);
 
       InitializerWrapper initializerWrapper =
-          new InitializerWrapper(input, initializer, context, vertex, entityStateTracker);
+          new InitializerWrapper(input, initializer, context, vertex, entityStateTracker, appContext);
       initializerMap.put(input.getName(), initializerWrapper);
       ListenableFuture<List<Event>> future = executor
           .submit(new InputInitializerCallable(initializerWrapper, dagUgi));
@@ -117,31 +127,40 @@ public class RootInputInitializerManager {
     return initializer;
   }
 
-  public void handleInitializerEvent(InputInitializerEvent event) {
-    Preconditions.checkState(vertex.getName().equals(event.getTargetVertexName()),
-        "Received event for incorrect vertex");
-    Preconditions.checkNotNull(event.getTargetInputName(), "target input name must be set");
-    InitializerWrapper initializer = initializerMap.get(event.getTargetInputName());
-    Preconditions.checkState(initializer != null,
-        "Received event for unknown input : " + event.getTargetInputName());
+  public void handleInitializerEvents(List<TezEvent> events) {
+    ListMultimap<InitializerWrapper, TezEvent> eventMap = LinkedListMultimap.create();
+
+    for (TezEvent tezEvent : events) {
+      Preconditions.checkState(tezEvent.getEvent() instanceof InputInitializerEvent);
+      InputInitializerEvent event = (InputInitializerEvent)tezEvent.getEvent();
+      Preconditions.checkState(vertex.getName().equals(event.getTargetVertexName()),
+          "Received event for incorrect vertex");
+      Preconditions.checkNotNull(event.getTargetInputName(), "target input name must be set");
+      InitializerWrapper initializer = initializerMap.get(event.getTargetInputName());
+      Preconditions.checkState(initializer != null,
+          "Received event for unknown input : " + event.getTargetInputName());
+      eventMap.put(initializer, tezEvent);
+    }
+
     // This is a restriction based on current flow - i.e. events generated only by initialize().
     // TODO Rework the flow as per the first comment on TEZ-1076
     if (isStopped) {
       LOG.warn("InitializerManager already stopped for " + vertex.getLogIdentifier() +
-          " Dropping event. [" + event + "]");
-      return;
+          " Dropping " + events.size() + " events");
     }
-    if (initializer.isComplete()) {
-      LOG.warn(
-          "Event targeted at vertex " + vertex.getLogIdentifier() + ", initializerWrapper for Input: " +
-              initializer.getInput().getName() +
-              " will be dropped, since Input has already been initialized. [" + event + "]");
-    }
-    try {
-      initializer.getInitializer().handleInputInitializerEvent(Lists.newArrayList(event));
-    } catch (Exception e) {
-      throw new TezUncheckedException(
-          "Initializer for input: " + event.getTargetInputName() + " failed to process event", e);
+
+    for (Map.Entry<InitializerWrapper, Collection<TezEvent>> entry : eventMap.asMap().entrySet()) {
+      InitializerWrapper initializerWrapper = entry.getKey();
+      if (initializerWrapper.isComplete()) {
+        LOG.warn(entry.getValue().size() +
+            " events targeted at vertex " + vertex.getLogIdentifier() +
+            ", initializerWrapper for Input: " +
+            initializerWrapper.getInput().getName() +
+            " will be dropped, since Input has already been initialized.");
+      } else {
+        initializerWrapper.handleInputInitializerEvents(entry.getValue());
+      }
+
     }
   }
 
@@ -157,7 +176,13 @@ public class RootInputInitializerManager {
   protected InputInitializerCallback createInputInitializerCallback(InitializerWrapper initializer) {
     return new InputInitializerCallback(initializer, eventHandler, vertex.getVertexId());
   }
-  
+
+  @VisibleForTesting
+  @InterfaceAudience.Private
+  public InitializerWrapper getInitializerWrapper(String inputName) {
+    return initializerMap.get(inputName);
+  }
+
   public void shutdown() {
     if (executor != null && !isStopped) {
       // Don't really care about what is running if an error occurs. If no error
@@ -232,7 +257,9 @@ public class RootInputInitializerManager {
     }
   }
 
-  private static class InitializerWrapper implements VertexStateUpdateListener {
+  @VisibleForTesting
+  @InterfaceAudience.Private
+  public static class InitializerWrapper implements VertexStateUpdateListener, TaskStateUpdateListener {
 
 
     private final RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input;
@@ -242,15 +269,18 @@ public class RootInputInitializerManager {
     private final String vertexLogIdentifier;
     private final StateChangeNotifier stateChangeNotifier;
     private final List<String> notificationRegisteredVertices = Lists.newArrayList();
+    private final AppContext appContext;
 
     InitializerWrapper(RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input,
                        InputInitializer initializer, InputInitializerContext context,
-                       Vertex vertex, StateChangeNotifier stateChangeNotifier) {
+                       Vertex vertex, StateChangeNotifier stateChangeNotifier,
+                       AppContext appContext) {
       this.input = input;
       this.initializer = initializer;
       this.context = context;
       this.vertexLogIdentifier = vertex.getLogIdentifier();
       this.stateChangeNotifier = stateChangeNotifier;
+      this.appContext = appContext;
     }
 
     public RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> getInput() {
@@ -272,6 +302,7 @@ public class RootInputInitializerManager {
     public void setComplete() {
       this.isComplete.set(true);
       unregisterForVertexStatusUpdates();
+      unregisterForTaskStatusUpdates();
     }
 
     public void registerForVertexStateUpdates(String vertexName, Set<VertexState> stateSet) {
@@ -302,7 +333,118 @@ public class RootInputInitializerManager {
         initializer.onVertexStateUpdated(event);
       }
     }
-  }
 
+    private final Map<String, Map<Integer, Integer>> firstSuccessfulAttemptMap = new HashMap<String, Map<Integer, Integer>>();
+    private final ListMultimap<String, TezEvent> pendingEvents = LinkedListMultimap.create();
+    private final List<String> taskNotificationRegisteredVertices = Lists.newLinkedList();
+
+    @InterfaceAudience.Private
+    @VisibleForTesting
+    public Map<String, Map<Integer, Integer>> getFirstSuccessfulAttemptMap() {
+      return this.firstSuccessfulAttemptMap;
+    }
+
+    @InterfaceAudience.Private
+    @VisibleForTesting
+    public ListMultimap<String, TezEvent> getPendingEvents() {
+      return this.pendingEvents;
+    }
+
+    @Override
+    public void onTaskSucceeded(String vertexName, TezTaskID taskId, int attemptId) {
+      // Notifications will only start coming in after an event is received, which is when we register for notifications.
+      // TODO TEZ-1577. Get rid of this.
+      if (attemptId == -1) {
+        throw new TezUncheckedException(
+            "AttemptId is -1. This is likely caused by TEZ-1577; recovery not supported when InputInitializerEvents are used");
+      }
+      Map<Integer, Integer> vertexSuccessfulAttemptMap = firstSuccessfulAttemptMap.get(vertexName);
+      Integer successfulAttempt = vertexSuccessfulAttemptMap.get(taskId);
+      if (successfulAttempt == null) {
+        successfulAttempt = attemptId;
+        vertexSuccessfulAttemptMap.put(taskId.getId(), successfulAttempt);
+      }
+
+      // Run through all the pending events for this srcVertex to see if any of them need to be dispatched.
+      List<TezEvent> events = pendingEvents.get(vertexName);
+      if (events != null && !events.isEmpty()) {
+        List<InputInitializerEvent> toForwardEvents = new LinkedList<InputInitializerEvent>();
+        Iterator<TezEvent> eventIterator = events.iterator();
+        while (eventIterator.hasNext()) {
+          TezEvent tezEvent = eventIterator.next();
+          int taskIndex = tezEvent.getSourceInfo().getTaskAttemptID().getTaskID().getId();
+          int taskAttemptIndex = tezEvent.getSourceInfo().getTaskAttemptID().getId();
+          if (taskIndex == taskId.getId()) {
+            // Process only if there's a pending event for the specific succeeded task
+            if (taskAttemptIndex == successfulAttempt) {
+              toForwardEvents.add((InputInitializerEvent) tezEvent.getEvent());
+            }
+            eventIterator.remove();
+          }
+        }
+        sendEvents(toForwardEvents);
+      }
+    }
+
+    public void handleInputInitializerEvents(Collection<TezEvent> tezEvents) {
+      List<InputInitializerEvent> toForwardEvents = new LinkedList<InputInitializerEvent>();
+      for (TezEvent tezEvent : tezEvents) {
+        String srcVertexName = tezEvent.getSourceInfo().getTaskVertexName();
+        int taskIndex = tezEvent.getSourceInfo().getTaskAttemptID().getTaskID().getId();
+        int taskAttemptIndex = tezEvent.getSourceInfo().getTaskAttemptID().getId();
+
+        Map<Integer, Integer> vertexSuccessfulAttemptMap =
+            firstSuccessfulAttemptMap.get(srcVertexName);
+        if (vertexSuccessfulAttemptMap == null) {
+          vertexSuccessfulAttemptMap = new HashMap<Integer, Integer>();
+          firstSuccessfulAttemptMap.put(srcVertexName, vertexSuccessfulAttemptMap);
+          // Seen first time. Register for task updates
+          stateChangeNotifier.registerForTaskSuccessUpdates(srcVertexName, this);
+          taskNotificationRegisteredVertices.add(srcVertexName);
+        }
+
+        // Determine the successful attempt for the task
+        Integer successfulAttemptInteger = vertexSuccessfulAttemptMap.get(taskIndex);
+        if (successfulAttemptInteger == null) {
+          // Check immediately if this task has succeeded, in case the notification came in before the event
+          Vertex srcVertex = appContext.getCurrentDAG().getVertex(srcVertexName);
+          Task task = srcVertex.getTask(taskIndex);
+          if (task.getState() == TaskState.SUCCEEDED) {
+            successfulAttemptInteger = task.getSuccessfulAttempt().getID().getId();
+            vertexSuccessfulAttemptMap.put(taskIndex, successfulAttemptInteger);
+          }
+        }
+
+        if (successfulAttemptInteger == null) {
+          // Queue events and await a notification
+          pendingEvents.put(srcVertexName, tezEvent);
+        } else {
+          // Handle the event immediately.
+          if (taskAttemptIndex == successfulAttemptInteger) {
+            toForwardEvents.add((InputInitializerEvent) tezEvent.getEvent());
+          } // Otherwise the event can be dropped
+        }
+      }
+      sendEvents(toForwardEvents);
+    }
+
+    private void sendEvents(List<InputInitializerEvent> events) {
+      if (events != null && !events.isEmpty()) {
+        try {
+          initializer.handleInputInitializerEvent(events);
+        } catch (Exception e) {
+          throw new TezUncheckedException(
+              "Initializer for input: " + getInput().getName() + " on vertex: " + getVertexLogIdentifier() +
+                  " failed to process events", e);
+        }
+      }
+    }
+
+    private void unregisterForTaskStatusUpdates() {
+      for (String vertexName : taskNotificationRegisteredVertices) {
+        stateChangeNotifier.unregisterForTaskSuccessUpdates(vertexName, this);
+      }
+    }
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/b4580a7b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java
index 558fc61..dc18e9b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java
@@ -33,6 +33,7 @@ import com.google.common.collect.SetMultimap;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 
 /**
@@ -55,13 +56,11 @@ public class StateChangeNotifier {
     this.lastKnowStatesMap = LinkedListMultimap.create();
   }
 
+  // -------------- VERTEX STATE CHANGE SECTION ---------------
   public void registerForVertexUpdates(String vertexName,
                                        Set<org.apache.tez.dag.api.event.VertexState> stateSet,
                                        VertexStateUpdateListener listener) {
-    Preconditions.checkNotNull(vertexName, "VertexName cannot be null");
-    Vertex vertex = dag.getVertex(vertexName);
-    Preconditions.checkNotNull(vertex, "Vertex does not exist: " + vertexName);
-    TezVertexID vertexId = vertex.getVertexId();
+    TezVertexID vertexId = validateAndGetVertexId(vertexName);
     writeLock.lock();
     // Read within the lock, to ensure a consistent view is seen.
     List<VertexStateUpdate> previousUpdates = lastKnowStatesMap.get(vertexId);
@@ -88,14 +87,8 @@ public class StateChangeNotifier {
     }
   }
 
-  // KKK Send out current state.
-
-
   public void unregisterForVertexUpdates(String vertexName, VertexStateUpdateListener listener) {
-    Preconditions.checkNotNull(vertexName, "VertexName cannot be null");
-    Vertex vertex = dag.getVertex(vertexName);
-    Preconditions.checkNotNull(vertex, "Vertex does not exist: " + vertexName);
-    TezVertexID vertexId = vertex.getVertexId();
+    TezVertexID vertexId = validateAndGetVertexId(vertexName);
     writeLock.lock();
     try {
       ListenerContainer listenerContainer = new ListenerContainer(listener, null);
@@ -125,6 +118,7 @@ public class StateChangeNotifier {
 
   }
 
+
   private static final class ListenerContainer {
     final VertexStateUpdateListener listener;
     final Set<org.apache.tez.dag.api.event.VertexState> states;
@@ -165,4 +159,64 @@ public class StateChangeNotifier {
       return System.identityHashCode(listener);
     }
   }
+
+  // -------------- END OF VERTEX STATE CHANGE SECTION ---------------
+
+  // -------------- TASK STATE CHANGE SECTION ---------------
+
+  // Task updates are not buffered to avoid storing unnecessary information.
+  // Components (non user facing) which use this will receive notifications after registration.
+  // They will have to query task states, prior to registration.
+  // Currently only handling Task SUCCESS events.
+  private final SetMultimap<TezVertexID, TaskStateUpdateListener> taskListeners =
+      Multimaps.synchronizedSetMultimap(HashMultimap.<TezVertexID, TaskStateUpdateListener>create());
+  private final ReentrantReadWriteLock taskListenerLock = new ReentrantReadWriteLock();
+  private final ReentrantReadWriteLock.ReadLock taskReadLock = taskListenerLock.readLock();
+  private final ReentrantReadWriteLock.WriteLock taskWriteLock = taskListenerLock.writeLock();
+
+
+
+  public void registerForTaskSuccessUpdates(String vertexName, TaskStateUpdateListener listener) {
+    TezVertexID vertexId = validateAndGetVertexId(vertexName);
+    Preconditions.checkNotNull(listener, "listener cannot be null");
+    taskWriteLock.lock();
+    try {
+      taskListeners.put(vertexId, listener);
+    } finally {
+      taskWriteLock.unlock();
+    }
+  }
+
+  public void unregisterForTaskSuccessUpdates(String vertexName, TaskStateUpdateListener listener) {
+    TezVertexID vertexId = validateAndGetVertexId(vertexName);
+    Preconditions.checkNotNull(listener, "listener cannot be null");
+    taskWriteLock.lock();
+    try {
+      taskListeners.remove(vertexId, listener);
+    } finally {
+      taskWriteLock.unlock();
+    }
+  }
+
+  public void taskSucceeded(String vertexName, TezTaskID taskId, int attemptId) {
+    taskReadLock.lock();
+    try {
+      for (TaskStateUpdateListener listener : taskListeners.get(taskId.getVertexID())) {
+        listener.onTaskSucceeded(vertexName, taskId, attemptId);
+      }
+    } finally {
+      taskReadLock.unlock();
+    }
+  }
+
+  // -------------- END OF TASK STATE CHANGE SECTION ---------------
+
+
+  private TezVertexID validateAndGetVertexId(String vertexName) {
+    Preconditions.checkNotNull(vertexName, "VertexName cannot be null");
+    Vertex vertex = dag.getVertex(vertexName);
+    Preconditions.checkNotNull(vertex, "Vertex does not exist: " + vertexName);
+    return vertex.getVertexId();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/b4580a7b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskStateUpdateListener.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskStateUpdateListener.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskStateUpdateListener.java
new file mode 100644
index 0000000..7c86991
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskStateUpdateListener.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.records.TezTaskID;
+
+@InterfaceAudience.Private
+/**
+ * This class should not be implemented by user facing APIs such as InputInitializer
+ */
+public interface TaskStateUpdateListener {
+
+  // Internal usage only. Currently only supporting onSuccess notifications for tasks.
+  // Exposing the taskID is ok, since this isn't public
+  public void onTaskSucceeded(String vertexName, TezTaskID taskId, int attemptId);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/b4580a7b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 1dd711b..976f10f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -29,6 +29,7 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import com.google.common.base.Preconditions;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -39,7 +40,6 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
 import org.apache.hadoop.yarn.state.MultipleArcTransition;
 import org.apache.hadoop.yarn.state.SingleArcTransition;
-import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.tez.common.counters.TezCounters;
@@ -53,6 +53,7 @@ import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ContainerContext;
 import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
+import org.apache.tez.dag.app.dag.StateChangeNotifier;
 import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.TaskAttempt;
 import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
@@ -88,6 +89,8 @@ import org.apache.tez.runtime.api.OutputCommitter;
 import org.apache.tez.runtime.api.impl.TezEvent;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.tez.state.OnStateChangedCallback;
+import org.apache.tez.state.StateMachineTez;
 
 /**
  * Implementation of Task interface.
@@ -118,6 +121,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
   private final ContainerContext containerContext;
   @VisibleForTesting
   long scheduledTime;
+  final StateChangeNotifier stateChangeNotifier;
 
   private final List<TezEvent> tezEventsForTaskAttempts = new ArrayList<TezEvent>();
   private static final List<TezEvent> EMPTY_TASK_ATTEMPT_TEZ_EVENTS =
@@ -138,6 +142,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
   // Recovery related flags
   boolean recoveryStartEventSeen = false;
 
+  private static final TaskStateChangedCallback STATE_CHANGED_CALLBACK = new TaskStateChangedCallback();
+
   private static final StateMachineFactory
                <TaskImpl, TaskStateInternal, TaskEventType, TaskEvent>
             stateMachineFactory
@@ -261,7 +267,13 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     // create the topology tables
     .installTopology();
 
-  private final StateMachine<TaskStateInternal, TaskEventType, TaskEvent>
+  private void augmentStateMachine() {
+    stateMachine
+        .registerStateEnteredCallback(TaskStateInternal.SUCCEEDED,
+            STATE_CHANGED_CALLBACK);
+  }
+
+  private final StateMachineTez<TaskStateInternal, TaskEventType, TaskEvent, TaskImpl>
     stateMachine;
 
   // TODO: Recovery
@@ -318,7 +330,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
       TaskAttemptListener taskAttemptListener,
       Clock clock, TaskHeartbeatHandler thh, AppContext appContext,
       boolean leafVertex, Resource resource,
-      ContainerContext containerContext) {
+      ContainerContext containerContext,
+      StateChangeNotifier stateChangeNotifier) {
     this.conf = conf;
     this.clock = clock;
     ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
@@ -333,11 +346,14 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     this.taskHeartbeatHandler = thh;
     this.eventHandler = eventHandler;
     this.appContext = appContext;
+    this.stateChangeNotifier = stateChangeNotifier;
 
     this.leafVertex = leafVertex;
     this.taskResource = resource;
     this.containerContext = containerContext;
-    stateMachine = stateMachineFactory.make(this);
+    stateMachine = new StateMachineTez<TaskStateInternal, TaskEventType, TaskEvent, TaskImpl>(
+        stateMachineFactory.make(this), this);
+    augmentStateMachine();
   }
 
   @Override
@@ -1423,6 +1439,27 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     }
   }
 
+  private static class TaskStateChangedCallback
+      implements OnStateChangedCallback<TaskStateInternal, TaskImpl> {
+
+    @Override
+    public void onStateChanged(TaskImpl task, TaskStateInternal taskStateInternal) {
+      // Only registered for SUCCEEDED notifications at the moment
+      Preconditions.checkState(taskStateInternal == TaskStateInternal.SUCCEEDED);
+      TaskAttempt successfulAttempt = task.getSuccessfulAttempt();
+      // TODO TEZ-1577.
+      // This is a horrible hack to get around recovery issues. Without this, recovery would fail
+      // for successful vertices.
+      // With this, recovery will end up failing for DAGs making use of InputInitializerEvents
+      int succesfulAttemptInt = -1;
+      if (successfulAttempt != null) {
+        succesfulAttemptInt = successfulAttempt.getID().getId();
+      }
+      task.stateChangeNotifier.taskSucceeded(task.getVertex().getName(), task.getTaskId(),
+          succesfulAttemptInt);
+    }
+  }
+
   private void killUnfinishedAttempt(TaskAttempt attempt, String logMsg) {
     if (commitAttempt != null && commitAttempt.equals(attempt)) {
       LOG.info("Removing commit attempt: " + commitAttempt);

http://git-wip-us.apache.org/repos/asf/tez/blob/b4580a7b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 6437e5b..594c651 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -127,7 +127,7 @@ import org.apache.tez.dag.app.dag.impl.DAGImpl.VertexGroupInfo;
 import org.apache.tez.dag.history.DAGHistoryEvent;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.events.VertexCommitStartedEvent;
-import org.apache.tez.dag.history.events.VertexDataMovementEventsGeneratedEvent;
+import org.apache.tez.dag.history.events.VertexRecoverableEventsGeneratedEvent;
 import org.apache.tez.dag.history.events.VertexFinishedEvent;
 import org.apache.tez.dag.history.events.VertexInitializedEvent;
 import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
@@ -244,6 +244,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   List<TezEvent> recoveredEvents = new ArrayList<TezEvent>();
   private boolean vertexAlreadyInitialized = false;
 
+  @VisibleForTesting
+  final List<TezEvent> pendingInitializerEvents = new LinkedList<TezEvent>();
+
   protected static final
     StateMachineFactory<VertexImpl, VertexState, VertexEventType, VertexEvent>
        stateMachineFactory
@@ -261,6 +264,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                 EnumSet.of(VertexState.NEW),
                   VertexEventType.V_NULL_EDGE_INITIALIZED,
                   new NullEdgeInitializedTransition())
+          .addTransition(VertexState.NEW, VertexState.NEW,
+                VertexEventType.V_ROUTE_EVENT,
+                ROUTE_EVENT_TRANSITION)
+          .addTransition(VertexState.NEW,  VertexState.NEW,
+                VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
+                SOURCE_TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION)
           .addTransition
               (VertexState.NEW,
                   EnumSet.of(VertexState.NEW, VertexState.INITED,
@@ -1079,8 +1088,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         }
         return recoveredState;
       case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
-        VertexDataMovementEventsGeneratedEvent vEvent =
-            (VertexDataMovementEventsGeneratedEvent) historyEvent;
+        VertexRecoverableEventsGeneratedEvent vEvent =
+            (VertexRecoverableEventsGeneratedEvent) historyEvent;
         this.recoveredEvents.addAll(vEvent.getTezEvents());
         return recoveredState;
       default:
@@ -1774,7 +1783,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               (this.targetVertices != null ?
                 this.targetVertices.isEmpty() : true),
               this.taskResource,
-              conContext);
+              conContext,
+              this.stateChangeNotifier);
       this.addTask(task);
       if(LOG.isDebugEnabled()) {
         LOG.debug("Created task for vertex " + logIdentifier + ": " +
@@ -2218,6 +2228,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               this.getVertexId(), Collections.singletonList(tezEvent), true));
         }
         continue;
+      } else if (tezEvent.getEventType() == EventType.ROOT_INPUT_INITIALIZER_EVENT) {
+        // The event has the relevant target information
+        InputInitializerEvent iiEvent = (InputInitializerEvent) tezEvent.getEvent();
+        iiEvent.setSourceVertexName(vertexName);
+        eventHandler.handle(new VertexEventRouteEvent(
+            getDAG().getVertex(iiEvent.getTargetVertexName()).getVertexId(),
+            Collections.singletonList(tezEvent), true));
+        continue;
       }
 
       Vertex destVertex = getDAG().getVertex(sourceMeta.getEdgeVertexName());
@@ -2661,6 +2679,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               + vertex.logIdentifier + ". Starting root input initializers: "
               + vertex.inputsWithInitializers.size());
           vertex.rootInputInitializerManager.runInputInitializers(inputList);
+          // Send pending rootInputInitializerEvents
+          vertex.rootInputInitializerManager.handleInitializerEvents(vertex.pendingInitializerEvents);
+          vertex.pendingInitializerEvents.clear();
           return VertexState.INITIALIZING;
         } else {
           boolean hasOneToOneUninitedSource = false;
@@ -2706,6 +2727,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           // state. This is handled in RootInputInitializedTransition specially.
           vertex.initWaitsForRootInitializers = true;
           vertex.rootInputInitializerManager.runInputInitializers(inputList);
+          // Send pending rootInputInitializerEvents
+          vertex.rootInputInitializerManager.handleInitializerEvents(vertex.pendingInitializerEvents);
+          vertex.pendingInitializerEvents.clear();
           return VertexState.INITIALIZING;
         }
         if (!vertex.uninitializedEdges.isEmpty()) {
@@ -2795,6 +2819,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       if (vertex.numInitializedInputs == vertex.inputsWithInitializers.size()) {
         // All inputs initialized, shutdown the initializer.
         vertex.rootInputInitializerManager.shutdown();
+        vertex.rootInputInitializerManager = null;
       }
 
       // done. check if we need to do the initialization
@@ -3064,6 +3089,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       }
       if (vertex.rootInputInitializerManager != null) {
         vertex.rootInputInitializerManager.shutdown();
+        vertex.rootInputInitializerManager = null;
       }
       vertex.finished(VertexState.FAILED,
           VertexTerminationCause.ROOT_INPUT_INIT_FAILURE);
@@ -3102,6 +3128,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       super.transition(vertex, event);
       if (vertex.rootInputInitializerManager != null) {
         vertex.rootInputInitializerManager.shutdown();
+        vertex.rootInputInitializerManager = null;
       }
     }
   }
@@ -3146,17 +3173,19 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
             + " with state: " + completionEvent.getTaskAttemptState()
             + " vertexState: " + vertex.getState());
 
+
       if (TaskAttemptStateInternal.SUCCEEDED.equals(completionEvent
           .getTaskAttemptState())) {
         vertex.numSuccessSourceAttemptCompletions++;
+
         if (vertex.getState() == VertexState.RUNNING) {
+          // Inform the vertex manager about the source task completing.
           vertex.vertexManager.onSourceTaskCompleted(completionEvent
               .getTaskAttemptId().getTaskID());
         } else {
           vertex.pendingReportedSrcCompletions.add(completionEvent.getTaskAttemptId());
         }
       }
-
     }
   }
 
@@ -3349,7 +3378,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       if (vertex.getAppContext().isRecoveryEnabled()
           && !recovered
           && !tezEvents.isEmpty()) {
-        List<TezEvent> dataMovementEvents =
+        List<TezEvent> recoveryEvents =
             Lists.newArrayList();
         for (TezEvent tezEvent : tezEvents) {
           if (!isEventFromVertex(vertex, tezEvent.getSourceInfo())) {
@@ -3357,14 +3386,15 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           }
           if  (tezEvent.getEventType().equals(EventType.COMPOSITE_DATA_MOVEMENT_EVENT)
             || tezEvent.getEventType().equals(EventType.DATA_MOVEMENT_EVENT)
-            || tezEvent.getEventType().equals(EventType.ROOT_INPUT_DATA_INFORMATION_EVENT)) {
-            dataMovementEvents.add(tezEvent);
+            || tezEvent.getEventType().equals(EventType.ROOT_INPUT_DATA_INFORMATION_EVENT)
+            || tezEvent.getEventType().equals(EventType.ROOT_INPUT_INITIALIZER_EVENT)) {
+            recoveryEvents.add(tezEvent);
           }
         }
-        if (!dataMovementEvents.isEmpty()) {
-          VertexDataMovementEventsGeneratedEvent historyEvent =
-              new VertexDataMovementEventsGeneratedEvent(vertex.vertexId,
-                  dataMovementEvents);
+        if (!recoveryEvents.isEmpty()) {
+          VertexRecoverableEventsGeneratedEvent historyEvent =
+              new VertexRecoverableEventsGeneratedEvent(vertex.vertexId,
+                  recoveryEvents);
           vertex.appContext.getHistoryHandler().handle(
               new DAGHistoryEvent(vertex.getDAGId(), historyEvent));
         }
@@ -3431,6 +3461,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           break;
         case VERTEX_MANAGER_EVENT:
         {
+          // VM events on task success only can be changed as part of TEZ-1532
           VertexManagerEvent vmEvent = (VertexManagerEvent) tezEvent.getEvent();
           Vertex target = vertex.getDAG().getVertex(vmEvent.getTargetVertexName());
           Preconditions.checkArgument(target != null,
@@ -3449,9 +3480,25 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           InputInitializerEvent riEvent = (InputInitializerEvent) tezEvent.getEvent();
           Vertex target = vertex.getDAG().getVertex(riEvent.getTargetVertexName());
           Preconditions.checkArgument(target != null,
-              "Event sent to unkown vertex: " + riEvent.getTargetVertexName());
+              "Event sent to unknown vertex: " + riEvent.getTargetVertexName());
+          riEvent.setSourceVertexName(tezEvent.getSourceInfo().getTaskVertexName());
           if (target == vertex) {
-            vertex.rootInputInitializerManager.handleInitializerEvent(riEvent);
+            if (vertex.rootInputDescriptors == null ||
+                !vertex.rootInputDescriptors.containsKey(riEvent.getTargetInputName())) {
+              throw new TezUncheckedException(
+                  "InputInitializerEvent targeted at unknown initializer on vertex " +
+                      vertex.logIdentifier + ", Event=" + riEvent);
+            }
+            if (vertex.getState() == VertexState.NEW) {
+              vertex.pendingInitializerEvents.add(tezEvent);
+            } else  if (vertex.getState() == VertexState.INITIALIZING) {
+              vertex.rootInputInitializerManager.handleInitializerEvents(Collections.singletonList(tezEvent));
+            } else {
+              // Currently, INITED and subsequent states means Initializer complete / failure
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Dropping event" + tezEvent + " since state is not INITIALIZING in " + vertex.getLogIdentifier() + ", state=" + vertex.getState());
+              }
+            }
           } else {
             checkEventSourceMetadata(vertex, sourceMeta);
             vertex.eventHandler.handle(new VertexEventRouteEvent(target.getVertexId(),

http://git-wip-us.apache.org/repos/asf/tez/blob/b4580a7b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexDataMovementEventsGeneratedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexDataMovementEventsGeneratedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexDataMovementEventsGeneratedEvent.java
deleted file mode 100644
index 7ae73be..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexDataMovementEventsGeneratedEvent.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.history.events;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.EnumSet;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tez.common.ProtoConverters;
-import org.apache.tez.dag.history.HistoryEvent;
-import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.dag.recovery.records.RecoveryProtos;
-import org.apache.tez.dag.recovery.records.RecoveryProtos.TezDataMovementEventProto;
-import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexDataMovementEventsGeneratedProto;
-import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
-import org.apache.tez.runtime.api.events.DataMovementEvent;
-import org.apache.tez.runtime.api.events.InputDataInformationEvent;
-import org.apache.tez.runtime.api.impl.EventMetaData;
-import org.apache.tez.runtime.api.impl.EventType;
-import org.apache.tez.runtime.api.impl.TezEvent;
-
-import com.google.common.collect.Lists;
-
-public class VertexDataMovementEventsGeneratedEvent implements HistoryEvent {
-
-  private static final Log LOG = LogFactory.getLog(
-      VertexDataMovementEventsGeneratedEvent.class);
-  private List<TezEvent> events;
-  private TezVertexID vertexID;
-
-  public VertexDataMovementEventsGeneratedEvent(TezVertexID vertexID,
-      List<TezEvent> events) {
-    this.vertexID = vertexID;
-    this.events = Lists.newArrayListWithCapacity(events.size());
-    for (TezEvent event : events) {
-      if (EnumSet.of(EventType.DATA_MOVEMENT_EVENT,
-          EventType.COMPOSITE_DATA_MOVEMENT_EVENT,
-          EventType.ROOT_INPUT_DATA_INFORMATION_EVENT)
-              .contains(event.getEventType())) {
-        this.events.add(event);
-      }
-    }
-    if (events.isEmpty()) {
-      throw new RuntimeException("Invalid creation of VertexDataMovementEventsGeneratedEvent"
-        + ", no data movement/information events provided");
-    }
-  }
-
-  public VertexDataMovementEventsGeneratedEvent() {
-  }
-
-  @Override
-  public HistoryEventType getEventType() {
-    return HistoryEventType.VERTEX_DATA_MOVEMENT_EVENTS_GENERATED;
-  }
-
-  @Override
-  public boolean isRecoveryEvent() {
-    return true;
-  }
-
-  @Override
-  public boolean isHistoryEvent() {
-    return false;
-  }
-
-  static RecoveryProtos.EventMetaDataProto convertEventMetaDataToProto(
-      EventMetaData eventMetaData) {
-    RecoveryProtos.EventMetaDataProto.Builder builder =
-        RecoveryProtos.EventMetaDataProto.newBuilder()
-        .setProducerConsumerType(eventMetaData.getEventGenerator().ordinal())
-        .setEdgeVertexName(eventMetaData.getEdgeVertexName())
-        .setTaskVertexName(eventMetaData.getTaskVertexName());
-    if (eventMetaData.getTaskAttemptID() != null) {
-        builder.setTaskAttemptId(eventMetaData.getTaskAttemptID().toString());
-    }
-    return builder.build();
-  }
-
-  static EventMetaData convertEventMetaDataFromProto(
-      RecoveryProtos.EventMetaDataProto proto) {
-    TezTaskAttemptID attemptID = null;
-    if (proto.hasTaskAttemptId()) {
-      attemptID = TezTaskAttemptID.fromString(proto.getTaskAttemptId());
-    }
-    return new EventMetaData(
-        EventMetaData.EventProducerConsumerType.values()[proto.getProducerConsumerType()],
-        proto.getTaskVertexName(),
-        proto.getEdgeVertexName(),
-        attemptID);
-  }
-
-  public VertexDataMovementEventsGeneratedProto toProto() {
-    List<TezDataMovementEventProto> tezEventProtos = null;
-    if (events != null) {
-      tezEventProtos = Lists.newArrayListWithCapacity(events.size());
-      for (TezEvent event : events) {
-        TezDataMovementEventProto.Builder evtBuilder =
-            TezDataMovementEventProto.newBuilder();
-        if (event.getEventType().equals(EventType.COMPOSITE_DATA_MOVEMENT_EVENT)) {
-          evtBuilder.setCompositeDataMovementEvent(
-              ProtoConverters.convertCompositeDataMovementEventToProto(
-                  (CompositeDataMovementEvent) event.getEvent()));
-        } else if (event.getEventType().equals(EventType.DATA_MOVEMENT_EVENT)) {
-          evtBuilder.setDataMovementEvent(
-              ProtoConverters.convertDataMovementEventToProto(
-                  (DataMovementEvent) event.getEvent()));
-        } else if (event.getEventType().equals(EventType.ROOT_INPUT_DATA_INFORMATION_EVENT)) {
-          evtBuilder.setRootInputDataInformationEvent(
-              ProtoConverters.convertRootInputDataInformationEventToProto(
-                  (InputDataInformationEvent) event.getEvent()));
-        }
-        if (event.getSourceInfo() != null) {
-          evtBuilder.setSourceInfo(convertEventMetaDataToProto(event.getSourceInfo()));
-        }
-        if (event.getDestinationInfo() != null) {
-          evtBuilder.setDestinationInfo(convertEventMetaDataToProto(event.getDestinationInfo()));
-        }
-        tezEventProtos.add(evtBuilder.build());
-      }
-    }
-    return VertexDataMovementEventsGeneratedProto.newBuilder()
-        .setVertexId(vertexID.toString())
-        .addAllTezDataMovementEvent(tezEventProtos)
-        .build();
-  }
-
-  public void fromProto(VertexDataMovementEventsGeneratedProto proto) {
-    this.vertexID = TezVertexID.fromString(proto.getVertexId());
-    int eventCount = proto.getTezDataMovementEventCount();
-    if (eventCount > 0) {
-      this.events = Lists.newArrayListWithCapacity(eventCount);
-    }
-    for (TezDataMovementEventProto eventProto :
-        proto.getTezDataMovementEventList()) {
-      Event evt = null;
-      if (eventProto.hasCompositeDataMovementEvent()) {
-        evt = ProtoConverters.convertCompositeDataMovementEventFromProto(
-            eventProto.getCompositeDataMovementEvent());
-      } else if (eventProto.hasDataMovementEvent()) {
-        evt = ProtoConverters.convertDataMovementEventFromProto(
-            eventProto.getDataMovementEvent());
-      } else if (eventProto.hasRootInputDataInformationEvent()) {
-        evt = ProtoConverters.convertRootInputDataInformationEventFromProto(
-            eventProto.getRootInputDataInformationEvent());
-      }
-      EventMetaData sourceInfo = null;
-      EventMetaData destinationInfo = null;
-      if (eventProto.hasSourceInfo()) {
-        sourceInfo = convertEventMetaDataFromProto(eventProto.getSourceInfo());
-      }
-      if (eventProto.hasDestinationInfo()) {
-        destinationInfo = convertEventMetaDataFromProto(eventProto.getDestinationInfo());
-      }
-      TezEvent tezEvent = new TezEvent(evt, sourceInfo);
-      tezEvent.setDestinationInfo(destinationInfo);
-      this.events.add(tezEvent);
-    }
-  }
-
-  @Override
-  public void toProtoStream(OutputStream outputStream) throws IOException {
-    toProto().writeDelimitedTo(outputStream);
-  }
-
-  @Override
-  public void fromProtoStream(InputStream inputStream) throws IOException {
-    VertexDataMovementEventsGeneratedProto proto =
-        VertexDataMovementEventsGeneratedProto.parseDelimitedFrom(inputStream);
-    if (proto == null) {
-      throw new IOException("No data found in stream");
-    }
-    fromProto(proto);
-  }
-
-  @Override
-  public String toString() {
-    return "vertexId=" + vertexID.toString()
-        + ", eventCount=" + (events != null ? events.size() : "null");
-
-  }
-
-  public TezVertexID getVertexID() {
-    return this.vertexID;
-  }
-
-  public List<TezEvent> getTezEvents() {
-    return this.events;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/b4580a7b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexRecoverableEventsGeneratedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexRecoverableEventsGeneratedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexRecoverableEventsGeneratedEvent.java
new file mode 100644
index 0000000..a9f1fd2
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexRecoverableEventsGeneratedEvent.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.events;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.EnumSet;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tez.common.ProtoConverters;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.dag.recovery.records.RecoveryProtos;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.TezDataMovementEventProto;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexDataMovementEventsGeneratedProto;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.events.InputDataInformationEvent;
+import org.apache.tez.runtime.api.events.InputInitializerEvent;
+import org.apache.tez.runtime.api.impl.EventMetaData;
+import org.apache.tez.runtime.api.impl.EventType;
+import org.apache.tez.runtime.api.impl.TezEvent;
+
+import com.google.common.collect.Lists;
+
+// TODO PreCommit - rename this to VertexRecoverableEventGeneratedEvent
+public class VertexRecoverableEventsGeneratedEvent implements HistoryEvent {
+
+  private static final Log LOG = LogFactory.getLog(
+      VertexRecoverableEventsGeneratedEvent.class);
+  private List<TezEvent> events;
+  private TezVertexID vertexID;
+
+  public VertexRecoverableEventsGeneratedEvent(TezVertexID vertexID,
+                                               List<TezEvent> events) {
+    this.vertexID = vertexID;
+    this.events = Lists.newArrayListWithCapacity(events.size());
+    for (TezEvent event : events) {
+      if (EnumSet.of(EventType.DATA_MOVEMENT_EVENT,
+          EventType.COMPOSITE_DATA_MOVEMENT_EVENT,
+          EventType.ROOT_INPUT_DATA_INFORMATION_EVENT,
+          EventType.ROOT_INPUT_INITIALIZER_EVENT)
+              .contains(event.getEventType())) {
+        this.events.add(event);
+      }
+    }
+    if (events.isEmpty()) {
+      throw new RuntimeException("Invalid creation of VertexDataMovementEventsGeneratedEvent"
+        + ", no data movement/information events provided");
+    }
+  }
+
+  public VertexRecoverableEventsGeneratedEvent() {
+  }
+
+  @Override
+  public HistoryEventType getEventType() {
+    return HistoryEventType.VERTEX_DATA_MOVEMENT_EVENTS_GENERATED;
+  }
+
+  @Override
+  public boolean isRecoveryEvent() {
+    return true;
+  }
+
+  @Override
+  public boolean isHistoryEvent() {
+    return false;
+  }
+
+  static RecoveryProtos.EventMetaDataProto convertEventMetaDataToProto(
+      EventMetaData eventMetaData) {
+    RecoveryProtos.EventMetaDataProto.Builder builder =
+        RecoveryProtos.EventMetaDataProto.newBuilder()
+        .setProducerConsumerType(eventMetaData.getEventGenerator().ordinal())
+        .setEdgeVertexName(eventMetaData.getEdgeVertexName())
+        .setTaskVertexName(eventMetaData.getTaskVertexName());
+    if (eventMetaData.getTaskAttemptID() != null) {
+        builder.setTaskAttemptId(eventMetaData.getTaskAttemptID().toString());
+    }
+    return builder.build();
+  }
+
+  static EventMetaData convertEventMetaDataFromProto(
+      RecoveryProtos.EventMetaDataProto proto) {
+    TezTaskAttemptID attemptID = null;
+    if (proto.hasTaskAttemptId()) {
+      attemptID = TezTaskAttemptID.fromString(proto.getTaskAttemptId());
+    }
+    return new EventMetaData(
+        EventMetaData.EventProducerConsumerType.values()[proto.getProducerConsumerType()],
+        proto.getTaskVertexName(),
+        proto.getEdgeVertexName(),
+        attemptID);
+  }
+
+  public VertexDataMovementEventsGeneratedProto toProto() {
+    List<TezDataMovementEventProto> tezEventProtos = null;
+    if (events != null) {
+      tezEventProtos = Lists.newArrayListWithCapacity(events.size());
+      for (TezEvent event : events) {
+        TezDataMovementEventProto.Builder evtBuilder =
+            TezDataMovementEventProto.newBuilder();
+        if (event.getEventType().equals(EventType.COMPOSITE_DATA_MOVEMENT_EVENT)) {
+          evtBuilder.setCompositeDataMovementEvent(
+              ProtoConverters.convertCompositeDataMovementEventToProto(
+                  (CompositeDataMovementEvent) event.getEvent()));
+        } else if (event.getEventType().equals(EventType.DATA_MOVEMENT_EVENT)) {
+          evtBuilder.setDataMovementEvent(
+              ProtoConverters.convertDataMovementEventToProto(
+                  (DataMovementEvent) event.getEvent()));
+        } else if (event.getEventType().equals(EventType.ROOT_INPUT_DATA_INFORMATION_EVENT)) {
+          evtBuilder.setRootInputDataInformationEvent(
+              ProtoConverters.convertRootInputDataInformationEventToProto(
+                  (InputDataInformationEvent) event.getEvent()));
+        } else if (event.getEventType().equals(EventType.ROOT_INPUT_INITIALIZER_EVENT)) {
+          evtBuilder.setInputInitializerEvent(ProtoConverters
+              .convertRootInputInitializerEventToProto((InputInitializerEvent) event.getEvent()));
+        }
+        if (event.getSourceInfo() != null) {
+          evtBuilder.setSourceInfo(convertEventMetaDataToProto(event.getSourceInfo()));
+        }
+        if (event.getDestinationInfo() != null) {
+          evtBuilder.setDestinationInfo(convertEventMetaDataToProto(event.getDestinationInfo()));
+        }
+        tezEventProtos.add(evtBuilder.build());
+      }
+    }
+    return VertexDataMovementEventsGeneratedProto.newBuilder()
+        .setVertexId(vertexID.toString())
+        .addAllTezDataMovementEvent(tezEventProtos)
+        .build();
+  }
+
+  public void fromProto(VertexDataMovementEventsGeneratedProto proto) {
+    this.vertexID = TezVertexID.fromString(proto.getVertexId());
+    int eventCount = proto.getTezDataMovementEventCount();
+    if (eventCount > 0) {
+      this.events = Lists.newArrayListWithCapacity(eventCount);
+    }
+    for (TezDataMovementEventProto eventProto :
+        proto.getTezDataMovementEventList()) {
+      Event evt = null;
+      if (eventProto.hasCompositeDataMovementEvent()) {
+        evt = ProtoConverters.convertCompositeDataMovementEventFromProto(
+            eventProto.getCompositeDataMovementEvent());
+      } else if (eventProto.hasDataMovementEvent()) {
+        evt = ProtoConverters.convertDataMovementEventFromProto(
+            eventProto.getDataMovementEvent());
+      } else if (eventProto.hasRootInputDataInformationEvent()) {
+        evt = ProtoConverters.convertRootInputDataInformationEventFromProto(
+            eventProto.getRootInputDataInformationEvent());
+      } else if (eventProto.hasInputInitializerEvent()) {
+        evt = ProtoConverters.convertRootInputInitializerEventFromProto(
+            eventProto.getInputInitializerEvent());
+      }
+      EventMetaData sourceInfo = null;
+      EventMetaData destinationInfo = null;
+      if (eventProto.hasSourceInfo()) {
+        sourceInfo = convertEventMetaDataFromProto(eventProto.getSourceInfo());
+      }
+      if (eventProto.hasDestinationInfo()) {
+        destinationInfo = convertEventMetaDataFromProto(eventProto.getDestinationInfo());
+      }
+      TezEvent tezEvent = new TezEvent(evt, sourceInfo);
+      tezEvent.setDestinationInfo(destinationInfo);
+      this.events.add(tezEvent);
+    }
+  }
+
+  @Override
+  public void toProtoStream(OutputStream outputStream) throws IOException {
+    toProto().writeDelimitedTo(outputStream);
+  }
+
+  @Override
+  public void fromProtoStream(InputStream inputStream) throws IOException {
+    VertexDataMovementEventsGeneratedProto proto =
+        VertexDataMovementEventsGeneratedProto.parseDelimitedFrom(inputStream);
+    if (proto == null) {
+      throw new IOException("No data found in stream");
+    }
+    fromProto(proto);
+  }
+
+  @Override
+  public String toString() {
+    return "vertexId=" + vertexID.toString()
+        + ", eventCount=" + (events != null ? events.size() : "null");
+
+  }
+
+  public TezVertexID getVertexID() {
+    return this.vertexID;
+  }
+
+  public List<TezEvent> getTezEvents() {
+    return this.events;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/b4580a7b/tez-dag/src/main/proto/HistoryEvents.proto
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/proto/HistoryEvents.proto b/tez-dag/src/main/proto/HistoryEvents.proto
index 821612a..93f217f 100644
--- a/tez-dag/src/main/proto/HistoryEvents.proto
+++ b/tez-dag/src/main/proto/HistoryEvents.proto
@@ -180,6 +180,7 @@ message TezDataMovementEventProto {
   optional DataMovementEventProto data_movement_event = 3;
   optional CompositeEventProto composite_data_movement_event = 4;
   optional RootInputDataInformationEventProto root_input_data_information_event = 5;
+  optional RootInputInitializerEventProto input_initializer_event = 6;
 }
 
 message VertexDataMovementEventsGeneratedProto {

http://git-wip-us.apache.org/repos/asf/tez/blob/b4580a7b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index f53643f..63cd9c5 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -25,6 +25,9 @@ import static org.junit.Assert.fail;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.eq;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -53,6 +56,7 @@ import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ContainerContext;
 import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
+import org.apache.tez.dag.app.dag.StateChangeNotifier;
 import org.apache.tez.dag.app.dag.TaskStateInternal;
 import org.apache.tez.dag.app.dag.TaskTerminationCause;
 import org.apache.tez.dag.app.dag.Vertex;
@@ -494,6 +498,8 @@ public class TestTaskImpl {
 
     // The task should now have succeeded
     assertTaskSucceededState();
+    verify(mockTask.stateChangeNotifier).taskSucceeded(any(String.class), eq(taskId),
+        eq(mockTask.getLastAttempt().getID().getId()));
 
     eventHandler.events.clear();
     // Now fail the attempt after it has succeeded
@@ -548,7 +554,7 @@ public class TestTaskImpl {
     private List<MockTaskAttemptImpl> taskAttempts = new LinkedList<MockTaskAttemptImpl>();
     private Vertex vertex;
     TaskLocationHint locationHint;
-    
+
     public MockTaskImpl(TezVertexID vertexId, int partition,
         EventHandler eventHandler, Configuration conf,
         TaskAttemptListener taskAttemptListener, Clock clock,
@@ -557,7 +563,7 @@ public class TestTaskImpl {
         ContainerContext containerContext, Vertex vertex) {
       super(vertexId, partition, eventHandler, conf, taskAttemptListener,
           clock, thh, appContext, leafVertex, resource,
-          containerContext);
+          containerContext, mock(StateChangeNotifier.class));
       this.vertex = vertex;
       this.locationHint = locationHint;
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/b4580a7b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
index c5153b6..bd13ffe 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
@@ -47,6 +47,7 @@ import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ContainerContext;
 import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
+import org.apache.tez.dag.app.dag.StateChangeNotifier;
 import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
 import org.apache.tez.dag.app.dag.TaskStateInternal;
 import org.apache.tez.dag.app.dag.Vertex;
@@ -186,7 +187,7 @@ public class TestTaskRecovery {
             new Configuration(), mock(TaskAttemptListener.class),
             new SystemClock(), mock(TaskHeartbeatHandler.class),
             mockAppContext, false, Resource.newInstance(1, 1),
-            mock(ContainerContext.class));
+            mock(ContainerContext.class), mock(StateChangeNotifier.class));
 
     Map<String, OutputCommitter> committers =
         new HashMap<String, OutputCommitter>();