You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/09/08 05:24:03 UTC

[1/2] git commit: TEZ-1447. Provide a mechanism for InputInitializers to know about Vertex state changes. (sseth) (cherry picked from commit 9085e7bb4d3d9a5c1a91aac48048da8127cee17a)

Repository: tez
Updated Branches:
  refs/heads/branch-0.5 ca86b6e36 -> d9ea43537


TEZ-1447. Provide a mechanism for InputInitializers to know about
Vertex state changes. (sseth)
(cherry picked from commit 9085e7bb4d3d9a5c1a91aac48048da8127cee17a)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/88acd71c
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/88acd71c
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/88acd71c

Branch: refs/heads/branch-0.5
Commit: 88acd71c49d02eabd746ffc3a554e10ccd65fc02
Parents: ca86b6e
Author: Siddharth Seth <ss...@apache.org>
Authored: Sun Sep 7 20:21:45 2014 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Sun Sep 7 20:23:43 2014 -0700

----------------------------------------------------------------------
 .../apache/tez/dag/api/event/VertexState.java   |  51 ++++
 .../tez/dag/api/event/VertexStateUpdate.java    |  60 +++++
 .../VertexStateUpdateParallelismUpdated.java    |  50 ++++
 .../tez/runtime/api/InputInitializer.java       |  17 +-
 .../runtime/api/InputInitializerContext.java    |  19 +-
 .../app/dag/RootInputInitializerManager.java    |  59 ++++-
 .../tez/dag/app/dag/StateChangeNotifier.java    | 168 ++++++++++++
 .../dag/app/dag/VertexStateUpdateListener.java  |  30 +++
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    |   5 +-
 .../TezRootInputInitializerContextImpl.java     |  16 +-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  83 +++++-
 .../tez/state/OnStateChangedCallback.java       |  24 ++
 .../org/apache/tez/state/StateMachineTez.java   |  66 +++++
 .../dag/app/dag/TestStateChangeNotifier.java    | 263 +++++++++++++++++++
 .../tez/dag/app/dag/impl/TestVertexImpl.java    | 102 +++++--
 .../common/TestMRInputSplitDistributor.java     |   9 +
 16 files changed, 991 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/88acd71c/tez-api/src/main/java/org/apache/tez/dag/api/event/VertexState.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/event/VertexState.java b/tez-api/src/main/java/org/apache/tez/dag/api/event/VertexState.java
new file mode 100644
index 0000000..ab296a5
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/event/VertexState.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api.event;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Vertex state information.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public enum VertexState {
+  /**
+   * Indicates that the Vertex had entered the SUCCEEDED state. A vertex could go back into RUNNING state after SUCCEEDING
+   */
+  SUCCEEDED,
+  /**
+   * Indicates that the Vertex had entered the RUNNING state. This state can be reached after SUCCEEDED, if some
+   * tasks belonging to the vertex are restarted due to errors
+   */
+  RUNNING,
+  /**
+   * Indicates that the Vertex has FAILED
+   */
+  FAILED,
+  /**
+   * Indicates that the Vertex has been KILLED
+   */
+  KILLED,
+  /**
+   * Indicates that the parallelism for the vertex had changed.
+   */
+  PARALLELISM_UPDATED
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/88acd71c/tez-api/src/main/java/org/apache/tez/dag/api/event/VertexStateUpdate.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/event/VertexStateUpdate.java b/tez-api/src/main/java/org/apache/tez/dag/api/event/VertexStateUpdate.java
new file mode 100644
index 0000000..5b7ca40
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/event/VertexStateUpdate.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api.event;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Updates that are sent to user code running within the AM, on Vertex state changes.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class VertexStateUpdate {
+
+  private final String vertexName;
+  private final VertexState vertexState;
+
+
+  public VertexStateUpdate(String vertexName, VertexState vertexState) {
+    this.vertexName = vertexName;
+    this.vertexState = vertexState;
+  }
+
+  /**
+   * Get the name of the vertex for which the state has changed
+   * @return the name of the vertex
+   */
+  public String getVertexName() {
+    return vertexName;
+  }
+
+  /**
+   * Get the updated state
+   * @return the updated state
+   */
+  public VertexState getVertexState() {
+    return vertexState;
+  }
+
+  @Override
+  public String toString() {
+    return "VertexStateUpdate: vertexName=" + vertexName + ", State=" + vertexState;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/88acd71c/tez-api/src/main/java/org/apache/tez/dag/api/event/VertexStateUpdateParallelismUpdated.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/event/VertexStateUpdateParallelismUpdated.java b/tez-api/src/main/java/org/apache/tez/dag/api/event/VertexStateUpdateParallelismUpdated.java
new file mode 100644
index 0000000..54fa87c
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/event/VertexStateUpdateParallelismUpdated.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api.event;
+
+/**
+ * An event that is sent out when the parallelism of a vertex changes.
+ */
+public class VertexStateUpdateParallelismUpdated extends VertexStateUpdate {
+
+  private final int parallelism;
+  private final int previousParallelism;
+  public VertexStateUpdateParallelismUpdated(String vertexName,
+                                             int updatedParallelism, int previousParallelism) {
+    super(vertexName, VertexState.PARALLELISM_UPDATED);
+    this.parallelism = updatedParallelism;
+    this.previousParallelism = previousParallelism;
+  }
+
+  /**
+   * Returns the new parallelism for the vertex
+   * @return the new parallelism
+   */
+  public int getParallelism() {
+    return parallelism;
+  }
+
+  /**
+   * Returns the previous value of the parallelism
+   * @return the previous parallelism
+   */
+  public int getPreviousParallelism() {
+    return previousParallelism;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/88acd71c/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializer.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializer.java b/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializer.java
index 09268d9..3ab5cdb 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializer.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializer.java
@@ -22,6 +22,7 @@ import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
 import org.apache.tez.runtime.api.events.InputInitializerEvent;
 
 /**
@@ -85,5 +86,19 @@ public abstract class InputInitializer {
   public final InputInitializerContext getContext() {
     return this.initializerContext;
   }
-  
+
+  /**
+   * Receive notifications on vertex state changes.
+   * <p/>
+   * State changes will be received based on the registration via {@link
+   * org.apache.tez.runtime.api.InputInitializerContext#registerForVertexStatusUpdates(String,
+   * java.util.Set)}. Notifications will be received for all registered state changes, and not just
+   * for the latest state update. They will be in order in which the state change occurred.
+   *
+   * @param stateUpdate an event indicating the name of the vertex, and it's updated state.
+   *                    Additional information may be available for specific events, Look at the
+   *                    type hierarchy for {@link org.apache.tez.dag.api.event.VertexStateUpdate}
+   */
+  public void onVertexStateUpdated(VertexStateUpdate stateUpdate) {
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/88acd71c/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializerContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializerContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializerContext.java
index e4e15ef..fe82b54 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializerContext.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializerContext.java
@@ -18,11 +18,17 @@
 
 package org.apache.tez.runtime.api;
 
+import javax.annotation.Nullable;
+import java.util.EnumSet;
+import java.util.Set;
+
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
 
 /**
  * A context that provides information to the {@link InputInitializer}
@@ -98,6 +104,17 @@ public interface InputInitializerContext {
    * @param vertexName
    * @return Total number of tasks in this vertex
    */
-  public int getVertexNumTasks(String vertexName);
+  int getVertexNumTasks(String vertexName);
+
+  /**
+   * Register to get notifications on updates to the specified vertex. Notifications will be sent
+   * via {@link org.apache.tez.runtime.api.InputInitializer#onVertexStateUpdated(org.apache.tez.dag.api.event.VertexStateUpdate)} </p>
+   *
+   * This method can only be invoked once. Duplicate invocations will result in an error.
+   *
+   * @param vertexName the vertex name for which notifications are required.
+   * @param stateSet   the set of states for which notifications are required. null implies all
+   */
+  void registerForVertexStatusUpdates(String vertexName, @Nullable Set<VertexState> stateSet);
 
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/88acd71c/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
index f8e68bd..770761e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
@@ -18,10 +18,12 @@
 
 package org.apache.tez.dag.app.dag;
 
+import javax.annotation.Nullable;
 import java.security.PrivilegedExceptionAction;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -39,6 +41,8 @@ import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.InputInitializerDescriptor;
 import org.apache.tez.dag.api.RootInputLeafOutput;
 import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.event.*;
+import org.apache.tez.dag.api.event.VertexState;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.dag.event.VertexEventRootInputFailed;
 import org.apache.tez.dag.app.dag.event.VertexEventRootInputInitialized;
@@ -68,6 +72,7 @@ public class RootInputInitializerManager {
   private final EventHandler eventHandler;
   private volatile boolean isStopped = false;
   private final UserGroupInformation dagUgi;
+  private final StateChangeNotifier entityStateTracker;
 
   private final Vertex vertex;
   private final AppContext appContext;
@@ -75,7 +80,7 @@ public class RootInputInitializerManager {
   private final Map<String, InitializerWrapper> initializerMap = new HashMap<String, InitializerWrapper>();
 
   public RootInputInitializerManager(Vertex vertex, AppContext appContext,
-                                     UserGroupInformation dagUgi) {
+                                     UserGroupInformation dagUgi, StateChangeNotifier stateTracker) {
     this.appContext = appContext;
     this.vertex = vertex;
     this.eventHandler = appContext.getEventHandler();
@@ -83,6 +88,7 @@ public class RootInputInitializerManager {
         .setDaemon(true).setNameFormat("InputInitializer [" + this.vertex.getName() + "] #%d").build());
     this.executor = MoreExecutors.listeningDecorator(rawExecutor);
     this.dagUgi = dagUgi;
+    this.entityStateTracker = stateTracker;
   }
   
   public void runInputInitializers(List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> 
@@ -90,10 +96,11 @@ public class RootInputInitializerManager {
     for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input : inputs) {
 
       InputInitializerContext context =
-          new TezRootInputInitializerContextImpl(input, vertex, appContext);
+          new TezRootInputInitializerContextImpl(input, vertex, appContext, this);
       InputInitializer initializer = createInitializer(input, context);
 
-      InitializerWrapper initializerWrapper = new InitializerWrapper(input, initializer, context, vertex);
+      InitializerWrapper initializerWrapper =
+          new InitializerWrapper(input, initializer, context, vertex, entityStateTracker);
       initializerMap.put(input.getName(), initializerWrapper);
       ListenableFuture<List<Event>> future = executor
           .submit(new InputInitializerCallable(initializerWrapper, dagUgi));
@@ -101,7 +108,6 @@ public class RootInputInitializerManager {
     }
   }
 
-
   @VisibleForTesting
   protected InputInitializer createInitializer(RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>
       input, InputInitializerContext context) {
@@ -139,6 +145,14 @@ public class RootInputInitializerManager {
     }
   }
 
+  public void registerForVertexUpdates(String vertexName, String inputName,
+                                       @Nullable Set<org.apache.tez.dag.api.event.VertexState> stateSet) {
+    Preconditions.checkNotNull(vertexName, "VertexName cannot be null: " + vertexName);
+    Preconditions.checkNotNull(inputName, "InputName cannot be null");
+    InitializerWrapper initializer = initializerMap.get(inputName);
+    initializer.registerForVertexStateUpdates(vertexName, stateSet);
+  }
+
   @VisibleForTesting
   protected InputInitializerCallback createInputInitializerCallback(InitializerWrapper initializer) {
     return new InputInitializerCallback(initializer, eventHandler, vertex.getVertexId());
@@ -218,7 +232,7 @@ public class RootInputInitializerManager {
     }
   }
 
-  private static class InitializerWrapper {
+  private static class InitializerWrapper implements VertexStateUpdateListener {
 
 
     private final RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input;
@@ -226,14 +240,17 @@ public class RootInputInitializerManager {
     private final InputInitializerContext context;
     private final AtomicBoolean isComplete = new AtomicBoolean(false);
     private final String vertexLogIdentifier;
+    private final StateChangeNotifier stateChangeNotifier;
+    private final List<String> notificationRegisteredVertices = Lists.newArrayList();
 
     InitializerWrapper(RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input,
                        InputInitializer initializer, InputInitializerContext context,
-                       Vertex vertex) {
+                       Vertex vertex, StateChangeNotifier stateChangeNotifier) {
       this.input = input;
       this.initializer = initializer;
       this.context = context;
       this.vertexLogIdentifier = vertex.getLogIdentifier();
+      this.stateChangeNotifier = stateChangeNotifier;
     }
 
     public RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> getInput() {
@@ -254,6 +271,36 @@ public class RootInputInitializerManager {
 
     public void setComplete() {
       this.isComplete.set(true);
+      unregisterForVertexStatusUpdates();
+    }
+
+    public void registerForVertexStateUpdates(String vertexName, Set<VertexState> stateSet) {
+      synchronized(notificationRegisteredVertices) {
+        notificationRegisteredVertices.add(vertexName);
+      }
+      stateChangeNotifier.registerForVertexUpdates(vertexName, stateSet, this);
+    }
+
+    private void unregisterForVertexStatusUpdates() {
+      synchronized (notificationRegisteredVertices) {
+        for (String vertexName : notificationRegisteredVertices) {
+          stateChangeNotifier.unregisterForVertexUpdates(vertexName, this);
+        }
+
+      }
+    }
+
+    @Override
+    public void onStateUpdated(VertexStateUpdate event) {
+      if (isComplete()) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Dropping state update for vertex=" + event.getVertexName() + ", state=" +
+              event.getVertexState() +
+              " since initializer " + input.getName() + " is already complete.");
+        }
+      } else {
+        initializer.onVertexStateUpdated(event);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/88acd71c/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java
new file mode 100644
index 0000000..558fc61
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag;
+
+
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Multimaps;
+import com.google.common.collect.SetMultimap;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.dag.records.TezVertexID;
+
+/**
+ * Tracks status updates from various components, and informs registered components about updates.
+ */
+@InterfaceAudience.Private
+public class StateChangeNotifier {
+
+  private final DAG dag;
+  private final SetMultimap<TezVertexID, ListenerContainer> vertexListeners;
+  private final ListMultimap<TezVertexID, VertexStateUpdate> lastKnowStatesMap;
+  private final ReentrantReadWriteLock listenersLock = new ReentrantReadWriteLock();
+  private final ReentrantReadWriteLock.ReadLock readLock = listenersLock.readLock();
+  private final ReentrantReadWriteLock.WriteLock writeLock = listenersLock.writeLock();
+
+  public StateChangeNotifier(DAG dag) {
+    this.dag = dag;
+    this.vertexListeners = Multimaps.synchronizedSetMultimap(
+        HashMultimap.<TezVertexID, ListenerContainer>create());
+    this.lastKnowStatesMap = LinkedListMultimap.create();
+  }
+
+  public void registerForVertexUpdates(String vertexName,
+                                       Set<org.apache.tez.dag.api.event.VertexState> stateSet,
+                                       VertexStateUpdateListener listener) {
+    Preconditions.checkNotNull(vertexName, "VertexName cannot be null");
+    Vertex vertex = dag.getVertex(vertexName);
+    Preconditions.checkNotNull(vertex, "Vertex does not exist: " + vertexName);
+    TezVertexID vertexId = vertex.getVertexId();
+    writeLock.lock();
+    // Read within the lock, to ensure a consistent view is seen.
+    List<VertexStateUpdate> previousUpdates = lastKnowStatesMap.get(vertexId);
+    try {
+      ListenerContainer listenerContainer = new ListenerContainer(listener, stateSet);
+      Set<ListenerContainer> listenerContainers = vertexListeners.get(vertexId);
+      if (listenerContainers == null || !listenerContainers.contains(listenerContainer)) {
+        vertexListeners.put(vertexId, listenerContainer);
+        // Send the last known state immediately, if it isn't null.
+        // Sent from within the lock to avoid duplicate events, and out of order events.
+        if (previousUpdates != null && !previousUpdates.isEmpty()) {
+          for (VertexStateUpdate update : previousUpdates) {
+            listenerContainer.sendStateUpdate(update);
+          }
+        }
+      } else {
+        // Disallow multiple register calls.
+        throw new TezUncheckedException(
+            "Only allowed to register once for a listener. CurrentContext: vertexName=" +
+                vertexName + ", Listener: " + listener);
+      }
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  // KKK Send out current state.
+
+
+  public void unregisterForVertexUpdates(String vertexName, VertexStateUpdateListener listener) {
+    Preconditions.checkNotNull(vertexName, "VertexName cannot be null");
+    Vertex vertex = dag.getVertex(vertexName);
+    Preconditions.checkNotNull(vertex, "Vertex does not exist: " + vertexName);
+    TezVertexID vertexId = vertex.getVertexId();
+    writeLock.lock();
+    try {
+      ListenerContainer listenerContainer = new ListenerContainer(listener, null);
+      vertexListeners.remove(vertexId, listenerContainer);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  public void stateChanged(TezVertexID vertexId, VertexStateUpdate vertexStateUpdate) {
+    readLock.lock();
+    try {
+      lastKnowStatesMap.put(vertexId, vertexStateUpdate);
+      if (vertexListeners.containsKey(vertexId)) {
+        sendStateUpdate(vertexId, vertexStateUpdate);
+      }
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  private void sendStateUpdate(TezVertexID vertexId,
+                               VertexStateUpdate event) {
+    for (ListenerContainer listenerContainer : vertexListeners.get(vertexId)) {
+      listenerContainer.sendStateUpdate(event);
+    }
+
+  }
+
+  private static final class ListenerContainer {
+    final VertexStateUpdateListener listener;
+    final Set<org.apache.tez.dag.api.event.VertexState> states;
+
+    private ListenerContainer(VertexStateUpdateListener listener,
+                              Set<org.apache.tez.dag.api.event.VertexState> states) {
+      this.listener = listener;
+      if (states == null) {
+        this.states = EnumSet.allOf(org.apache.tez.dag.api.event.VertexState.class);
+      } else {
+        this.states = states;
+      }
+    }
+
+    private void sendStateUpdate(VertexStateUpdate stateUpdate) {
+      if (states.contains(stateUpdate.getVertexState())) {
+        listener.onStateUpdated(stateUpdate);
+      }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      ListenerContainer that = (ListenerContainer) o;
+
+      // Explicit reference comparison
+      return listener == that.listener;
+    }
+
+    @Override
+    public int hashCode() {
+      return System.identityHashCode(listener);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/88acd71c/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexStateUpdateListener.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexStateUpdateListener.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexStateUpdateListener.java
new file mode 100644
index 0000000..f636c5d
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexStateUpdateListener.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+
+@InterfaceAudience.Private
+/**
+ * This class should not be implemented by user facing APIs such as InputInitializer
+ */
+public interface VertexStateUpdateListener {
+  public void onStateUpdated(VertexStateUpdate event);
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/88acd71c/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 24b41a5..680e31a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -76,6 +76,7 @@ import org.apache.tez.dag.app.dag.DAGReport;
 import org.apache.tez.dag.app.dag.DAGScheduler;
 import org.apache.tez.dag.app.dag.DAGState;
 import org.apache.tez.dag.app.dag.DAGTerminationCause;
+import org.apache.tez.dag.app.dag.StateChangeNotifier;
 import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.VertexState;
 import org.apache.tez.dag.app.dag.VertexTerminationCause;
@@ -153,6 +154,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   private final AppContext appContext;
   private final UserGroupInformation dagUGI;
   private final ACLManager aclManager;
+  private final StateChangeNotifier entityUpdateTracker;
 
   volatile Map<TezVertexID, Vertex> vertices = new HashMap<TezVertexID, Vertex>();
   private Map<String, Edge> edges = new HashMap<String, Edge>();
@@ -434,6 +436,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     // This "this leak" is okay because the retained pointer is in an
     //  instance variable.
     stateMachine = stateMachineFactory.make(this);
+    this.entityUpdateTracker = new StateChangeNotifier(this);
   }
 
   protected StateMachine<DAGState, DAGEventType, DAGEvent> getStateMachine() {
@@ -1243,7 +1246,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
         dag.eventHandler, dag.taskAttemptListener,
         dag.clock, dag.taskHeartbeatHandler,
         !dag.commitAllOutputsOnSuccess, dag.appContext, vertexLocationHint,
-        dag.vertexGroups, dag.taskSpecificLaunchCmdOption);
+        dag.vertexGroups, dag.taskSpecificLaunchCmdOption, dag.entityUpdateTracker);
     return v;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/88acd71c/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java
index a9b9bca..846d208 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java
@@ -20,13 +20,17 @@ package org.apache.tez.dag.app.dag.impl;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import java.util.Set;
+
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.InputInitializerDescriptor;
 import org.apache.tez.dag.api.RootInputLeafOutput;
 import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.event.VertexState;
 import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.dag.RootInputInitializerManager;
 import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.runtime.api.InputInitializerContext;
 
@@ -36,18 +40,23 @@ public class TezRootInputInitializerContextImpl implements
   private RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input;
   private final Vertex vertex;
   private final AppContext appContext;
+  private final RootInputInitializerManager manager;
+
 
   // TODO Add support for counters - merged with the Vertex counters.
 
   public TezRootInputInitializerContextImpl(
       RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input,
-      Vertex vertex, AppContext appContext) {
+      Vertex vertex, AppContext appContext,
+      RootInputInitializerManager manager) {
     checkNotNull(input, "input is null");
     checkNotNull(vertex, "vertex is null");
     checkNotNull(appContext, "appContext is null");
+    checkNotNull(manager, "initializerManager is null");
     this.input = input;
     this.vertex = vertex;
     this.appContext = appContext;
+    this.manager = manager;
   }
 
   @Override
@@ -105,4 +114,9 @@ public class TezRootInputInitializerContextImpl implements
     return appContext.getCurrentDAG().getVertex(vertexName).getTotalTasks();
   }
 
+  @Override
+  public void registerForVertexStatusUpdates(String vertexName, Set<VertexState> stateSet) {
+    manager.registerForVertexUpdates(vertexName, input.getName(), stateSet);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/88acd71c/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 5787a11..ff556ba 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -77,6 +77,8 @@ import org.apache.tez.dag.api.client.StatusGetOpts;
 import org.apache.tez.dag.api.client.VertexStatus;
 import org.apache.tez.dag.api.client.VertexStatus.State;
 import org.apache.tez.dag.api.client.VertexStatusBuilder;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.dag.api.event.VertexStateUpdateParallelismUpdated;
 import org.apache.tez.dag.api.oldrecords.TaskState;
 import org.apache.tez.dag.api.records.DAGProtos.RootInputLeafOutputProto;
 import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
@@ -86,6 +88,7 @@ import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.dag.RootInputInitializerManager;
+import org.apache.tez.dag.app.dag.StateChangeNotifier;
 import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
 import org.apache.tez.dag.app.dag.TaskTerminationCause;
@@ -161,6 +164,8 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multiset;
 import com.google.common.collect.Sets;
+import org.apache.tez.state.OnStateChangedCallback;
+import org.apache.tez.state.StateMachineTez;
 
 /** Implementation of Vertex interface. Maintains the state machines of Vertex.
  * The read and write calls use ReadWriteLock for concurrency.
@@ -207,6 +212,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   private int distanceFromRoot = 0;
 
   private final List<String> diagnostics = new ArrayList<String>();
+
+  protected final StateChangeNotifier stateChangeNotifier;
   
   //task/attempt related datastructures
   @VisibleForTesting
@@ -227,6 +234,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   private static final SourceTaskAttemptCompletedEventTransition
       SOURCE_TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION =
           new SourceTaskAttemptCompletedEventTransition();
+  private static final VertexStateChangedCallback STATE_CHANGED_CALLBACK =
+      new VertexStateChangedCallback();
 
   private VertexState recoveredState = VertexState.NEW;
   private List<TezEvent> recoveredEvents = new ArrayList<TezEvent>();
@@ -527,8 +536,19 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           // create the topology tables
           .installTopology();
 
-  private final StateMachine<VertexState, VertexEventType, VertexEvent>
-      stateMachine;
+  private void augmentStateMachine() {
+    stateMachine
+        .registerStateEnteredCallback(VertexState.SUCCEEDED,
+            STATE_CHANGED_CALLBACK)
+        .registerStateEnteredCallback(VertexState.FAILED,
+            STATE_CHANGED_CALLBACK)
+        .registerStateEnteredCallback(VertexState.KILLED,
+            STATE_CHANGED_CALLBACK)
+        .registerStateEnteredCallback(VertexState.RUNNING,
+            STATE_CHANGED_CALLBACK);
+  }
+
+  private final StateMachineTez<VertexState, VertexEventType, VertexEvent, VertexImpl> stateMachine;
 
   //changing fields while the vertex is running
   private int numTasks;
@@ -617,7 +637,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       TaskAttemptListener taskAttemptListener, Clock clock,
       TaskHeartbeatHandler thh, boolean commitVertexOutputs,
       AppContext appContext, VertexLocationHint vertexLocationHint,
-      Map<String, VertexGroupInfo> dagVertexGroups, TaskSpecificLaunchCmdOption taskSpecificLaunchCmdOption) {
+      Map<String, VertexGroupInfo> dagVertexGroups, TaskSpecificLaunchCmdOption taskSpecificLaunchCmdOption,
+      StateChangeNotifier entityStatusTracker) {
     this.vertexId = vertexId;
     this.vertexPlan = vertexPlan;
     this.vertexName = StringInterner.weakIntern(vertexName);
@@ -663,17 +684,22 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     if (vertexPlan.getOutputsCount() > 0) {
       setAdditionalOutputs(vertexPlan.getOutputsList());
     }
+    this.stateChangeNotifier = entityStatusTracker;
 
     // Setup the initial parallelism early. This may be changed after
     // initialization or on a setParallelism call.
     this.numTasks = vertexPlan.getTaskConfig().getNumTasks();
+    // Not sending the notifier a parallelism update since this is the initial parallelism
 
     this.dagVertexGroups = dagVertexGroups;
 
     logIdentifier =  this.getVertexId() + " [" + this.getName() + "]";
     // This "this leak" is okay because the retained pointer is in an
     //  instance variable.
-    stateMachine = stateMachineFactory.make(this);
+
+    stateMachine = new StateMachineTez<VertexState, VertexEventType, VertexEvent, VertexImpl>(
+        stateMachineFactory.make(this), this);
+    augmentStateMachine();
   }
 
   protected StateMachine<VertexState, VertexEventType, VertexEvent> getStateMachine() {
@@ -1003,7 +1029,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         if (updatedEvent.getVertexLocationHint() != null) {
           setTaskLocationHints(updatedEvent.getVertexLocationHint());
         }
+        int oldNumTasks = numTasks;
         numTasks = updatedEvent.getNumTasks();
+        stateChangeNotifier.stateChanged(vertexId,
+            new VertexStateUpdateParallelismUpdated(vertexName, numTasks, oldNumTasks));
         handleParallelismUpdate(numTasks, updatedEvent.getSourceEdgeManagers(),
           updatedEvent.getRootInputSpecUpdates());
         if (LOG.isDebugEnabled()) {
@@ -1189,7 +1218,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           }
           this.rootInputSpecs.putAll(rootInputSpecUpdates);
         }
+        int oldNumTasks = numTasks;
         this.numTasks = parallelism;
+        stateChangeNotifier.stateChanged(vertexId,
+            new VertexStateUpdateParallelismUpdated(vertexName, numTasks, oldNumTasks));
         this.createTasks();
         LOG.info("Vertex " + getVertexId() + 
             " parallelism set to " + parallelism);
@@ -1251,7 +1283,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         }
         LOG.info("Vertex " + logIdentifier + 
             " parallelism set to " + parallelism + " from " + numTasks);
+        int oldNumTasks = numTasks;
         this.numTasks = parallelism;
+        stateChangeNotifier.stateChanged(vertexId,
+            new VertexStateUpdateParallelismUpdated(vertexName, numTasks, oldNumTasks));
         assert tasks.size() == numTasks;
   
         // set new edge managers
@@ -1816,9 +1851,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     // For VertexManagers setting parallelism, the setParallelism call needs
     // to be inline.
     if (event != null) {
+      int oldNumTasks = numTasks;
       numTasks = event.getNumTasks();
+      stateChangeNotifier.stateChanged(vertexId,
+          new VertexStateUpdateParallelismUpdated(vertexName, numTasks, oldNumTasks));
     } else {
       numTasks = getVertexPlan().getTaskConfig().getNumTasks();
+      // Not sending a parallelism update notification since this is from the original plan
     }
 
     if (!(numTasks == -1 || numTasks >= 0)) {
@@ -2670,7 +2709,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       String dagName, String vertexName, TezVertexID vertexID,
       EventHandler eventHandler, int numTasks, int numNodes,
       Resource vertexTaskResource, Resource totalResource) {
-    return new RootInputInitializerManager(this, appContext, this.dagUgi);
+    return new RootInputInitializerManager(this, appContext, this.dagUgi, this.stateChangeNotifier);
   }
   
   private boolean initializeVertexInInitializingState() {
@@ -3456,6 +3495,40 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     }
   }
 
+  private static class VertexStateChangedCallback
+      implements OnStateChangedCallback<VertexState, VertexImpl> {
+
+    @Override
+    public void onStateChanged(VertexImpl vertex, VertexState vertexState) {
+      vertex.stateChangeNotifier.stateChanged(vertex.getVertexId(),
+          new VertexStateUpdate(vertex.getName(), convertInternalState(
+              vertexState, vertex.getVertexId())));
+    }
+
+    private org.apache.tez.dag.api.event.VertexState convertInternalState(VertexState vertexState,
+                                                                          TezVertexID vertexId) {
+      switch (vertexState) {
+        case RUNNING:
+          return org.apache.tez.dag.api.event.VertexState.RUNNING;
+        case SUCCEEDED:
+          return org.apache.tez.dag.api.event.VertexState.SUCCEEDED;
+        case FAILED:
+          return org.apache.tez.dag.api.event.VertexState.FAILED;
+        case KILLED:
+          return org.apache.tez.dag.api.event.VertexState.KILLED;
+        case NEW:
+        case INITIALIZING:
+        case INITED:
+        case ERROR:
+        case TERMINATING:
+        case RECOVERING:
+        default:
+          throw new TezUncheckedException(
+              "Not expecting state updates for state: " + vertexState + ", VertexID: " + vertexId);
+      }
+    }
+  }
+
   @Override
   public void setInputVertices(Map<Vertex, Edge> inVertices) {
     this.sourceVertices = inVertices;

http://git-wip-us.apache.org/repos/asf/tez/blob/88acd71c/tez-dag/src/main/java/org/apache/tez/state/OnStateChangedCallback.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/state/OnStateChangedCallback.java b/tez-dag/src/main/java/org/apache/tez/state/OnStateChangedCallback.java
new file mode 100644
index 0000000..53767fc
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/state/OnStateChangedCallback.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.state;
+
+
+public interface OnStateChangedCallback<STATE extends Enum<STATE>, OPERAND> {
+  public void onStateChanged(OPERAND operand, STATE state);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/88acd71c/tez-dag/src/main/java/org/apache/tez/state/StateMachineTez.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/state/StateMachineTez.java b/tez-dag/src/main/java/org/apache/tez/state/StateMachineTez.java
new file mode 100644
index 0000000..0fd3c0e
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/state/StateMachineTez.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.state;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
+import org.apache.hadoop.yarn.state.StateMachine;
+import org.apache.tez.dag.records.TezID;
+
+public class StateMachineTez<STATE extends Enum<STATE>, EVENTTYPE extends Enum<EVENTTYPE>, EVENT, OPERAND>
+    implements StateMachine<STATE, EVENTTYPE, EVENT> {
+
+  private final Map<STATE, OnStateChangedCallback> callbackMap =
+      new HashMap<STATE, OnStateChangedCallback>();
+  private final OPERAND operand;
+
+  private final StateMachine<STATE, EVENTTYPE, EVENT> realStatemachine;
+
+  public StateMachineTez(StateMachine sm, OPERAND operand) {
+    this.realStatemachine = sm;
+    this.operand = operand;
+  }
+
+  public StateMachineTez registerStateEnteredCallback(STATE state,
+                                                      OnStateChangedCallback callback) {
+    callbackMap.put(state, callback);
+    return this;
+  }
+
+  @Override
+  public STATE getCurrentState() {
+    return realStatemachine.getCurrentState();
+  }
+
+  @Override
+  public STATE doTransition(EVENTTYPE eventType, EVENT event) throws
+      InvalidStateTransitonException {
+    STATE oldState = realStatemachine.getCurrentState();
+    STATE newState = realStatemachine.doTransition(eventType, event);
+    if (newState != oldState) {
+      OnStateChangedCallback callback = callbackMap.get(newState);
+      if (callback != null) {
+        callback.onStateChanged(operand, newState);
+      }
+    }
+    return newState;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/88acd71c/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestStateChangeNotifier.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestStateChangeNotifier.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestStateChangeNotifier.java
new file mode 100644
index 0000000..6a505ef
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestStateChangeNotifier.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.EnumSet;
+import java.util.Iterator;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.dag.api.event.VertexStateUpdateParallelismUpdated;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+public class TestStateChangeNotifier {
+
+  @Test(timeout = 5000)
+  public void testEventsOnRegistration() {
+    TezDAGID dagId = TezDAGID.getInstance("1", 1, 1);
+    Vertex v1 = createMockVertex(dagId, 1);
+    Vertex v2 = createMockVertex(dagId, 2);
+    Vertex v3 = createMockVertex(dagId, 3);
+    DAG dag = createMockDag(dagId, v1, v2, v3);
+
+    StateChangeNotifier tracker = new StateChangeNotifier(dag);
+
+    // Vertex has sent one event
+    notifyTracker(tracker, v1, VertexState.RUNNING);
+    VertexStateUpdateListener mockListener11 = mock(VertexStateUpdateListener.class);
+    VertexStateUpdateListener mockListener12 = mock(VertexStateUpdateListener.class);
+    VertexStateUpdateListener mockListener13 = mock(VertexStateUpdateListener.class);
+    VertexStateUpdateListener mockListener14 = mock(VertexStateUpdateListener.class);
+      // Register for all states
+    tracker.registerForVertexUpdates(v1.getName(), null, mockListener11);
+      // Register for all states
+    tracker.registerForVertexUpdates(v1.getName(), EnumSet.allOf(
+        VertexState.class), mockListener12);
+      // Register for specific state, event generated
+    tracker.registerForVertexUpdates(v1.getName(), EnumSet.of(
+        VertexState.RUNNING), mockListener13);
+      // Register for specific state, event not generated
+    tracker.registerForVertexUpdates(v1.getName(), EnumSet.of(
+        VertexState.SUCCEEDED), mockListener14);
+    ArgumentCaptor<VertexStateUpdate> argumentCaptor =
+        ArgumentCaptor.forClass(VertexStateUpdate.class);
+
+    verify(mockListener11, times(1)).onStateUpdated(argumentCaptor.capture());
+    assertEquals(VertexState.RUNNING,
+        argumentCaptor.getValue().getVertexState());
+    verify(mockListener12, times(1)).onStateUpdated(argumentCaptor.capture());
+    assertEquals(VertexState.RUNNING,
+        argumentCaptor.getValue().getVertexState());
+    verify(mockListener13, times(1)).onStateUpdated(argumentCaptor.capture());
+    assertEquals(VertexState.RUNNING,
+        argumentCaptor.getValue().getVertexState());
+    verify(mockListener14, never()).onStateUpdated(any(VertexStateUpdate.class));
+
+    // Vertex has not notified of state
+    VertexStateUpdateListener mockListener2 = mock(VertexStateUpdateListener.class);
+    tracker.registerForVertexUpdates(v2.getName(), null, mockListener2);
+    verify(mockListener2, never()).onStateUpdated(any(VertexStateUpdate.class));
+
+    // Vertex has notified about parallelism update only
+    tracker.stateChanged(v3.getVertexId(), new VertexStateUpdateParallelismUpdated(v3.getName(), 23, -1));
+    VertexStateUpdateListener mockListener3 = mock(VertexStateUpdateListener.class);
+    tracker.registerForVertexUpdates(v3.getName(), null, mockListener3);
+    verify(mockListener3, times(1)).onStateUpdated(argumentCaptor.capture());
+    assertEquals(VertexState.PARALLELISM_UPDATED,
+        argumentCaptor.getValue().getVertexState());
+  }
+
+  @Test(timeout = 5000)
+  public void testSimpleStateUpdates() {
+    TezDAGID dagId = TezDAGID.getInstance("1", 1, 1);
+    Vertex v1 = createMockVertex(dagId, 1);
+    DAG dag = createMockDag(dagId, v1);
+
+    StateChangeNotifier tracker = new StateChangeNotifier(dag);
+
+    VertexStateUpdateListener mockListener = mock(VertexStateUpdateListener.class);
+    tracker.registerForVertexUpdates(v1.getName(), null, mockListener);
+
+    List<VertexState> expectedStates = Lists.newArrayList(
+        VertexState.RUNNING,
+        VertexState.SUCCEEDED,
+        VertexState.FAILED,
+        VertexState.KILLED,
+        VertexState.RUNNING,
+        VertexState.SUCCEEDED);
+
+    for (VertexState state : expectedStates) {
+      notifyTracker(tracker, v1, state);
+    }
+
+    ArgumentCaptor<VertexStateUpdate> argumentCaptor =
+        ArgumentCaptor.forClass(VertexStateUpdate.class);
+    verify(mockListener, times(expectedStates.size())).onStateUpdated(argumentCaptor.capture());
+    List<VertexStateUpdate> stateUpdatesSent = argumentCaptor.getAllValues();
+
+    Iterator<VertexState> expectedStateIter =
+        expectedStates.iterator();
+    for (int i = 0; i < expectedStates.size(); i++) {
+      assertEquals(expectedStateIter.next(), stateUpdatesSent.get(i).getVertexState());
+    }
+  }
+
+  @Test(timeout = 5000)
+  public void testDuplicateRegistration() {
+    TezDAGID dagId = TezDAGID.getInstance("1", 1, 1);
+    Vertex v1 = createMockVertex(dagId, 1);
+    DAG dag = createMockDag(dagId, v1);
+
+    StateChangeNotifier tracker = new StateChangeNotifier(dag);
+    VertexStateUpdateListener mockListener = mock(VertexStateUpdateListener.class);
+
+    tracker.registerForVertexUpdates(v1.getName(), null, mockListener);
+    try {
+      tracker.registerForVertexUpdates(v1.getName(), null, mockListener);
+      fail("Expecting an error from duplicate registrations of the same listener");
+    } catch (TezUncheckedException e) {
+      // Expected, ignore
+    }
+  }
+
+  @Test(timeout = 5000)
+  public void testSpecificStateUpdates() {
+    TezDAGID dagId = TezDAGID.getInstance("1", 1, 1);
+    Vertex v1 = createMockVertex(dagId, 1);
+    DAG dag = createMockDag(dagId, v1);
+
+    StateChangeNotifier tracker = new StateChangeNotifier(dag);
+
+    VertexStateUpdateListener mockListener = mock(VertexStateUpdateListener.class);
+    tracker.registerForVertexUpdates(v1.getName(), EnumSet.of(
+        VertexState.RUNNING,
+        VertexState.SUCCEEDED), mockListener);
+
+    List<VertexState> states = Lists.newArrayList(
+        VertexState.RUNNING,
+        VertexState.SUCCEEDED,
+        VertexState.FAILED,
+        VertexState.KILLED,
+        VertexState.RUNNING,
+        VertexState.SUCCEEDED);
+    List<VertexState> expectedStates = Lists.newArrayList(
+        VertexState.RUNNING,
+        VertexState.SUCCEEDED,
+        VertexState.RUNNING,
+        VertexState.SUCCEEDED);
+
+    for (VertexState state : states) {
+      notifyTracker(tracker, v1, state);
+    }
+
+    ArgumentCaptor<VertexStateUpdate> argumentCaptor =
+        ArgumentCaptor.forClass(VertexStateUpdate.class);
+    verify(mockListener, times(expectedStates.size())).onStateUpdated(argumentCaptor.capture());
+    List<VertexStateUpdate> stateUpdatesSent = argumentCaptor.getAllValues();
+
+    Iterator<VertexState> expectedStateIter =
+        expectedStates.iterator();
+    for (int i = 0; i < expectedStates.size(); i++) {
+      assertEquals(expectedStateIter.next(), stateUpdatesSent.get(i).getVertexState());
+    }
+  }
+
+  @Test(timeout = 5000)
+  public void testUnregister() {
+    TezDAGID dagId = TezDAGID.getInstance("1", 1, 1);
+    Vertex v1 = createMockVertex(dagId, 1);
+    DAG dag = createMockDag(dagId, v1);
+
+    StateChangeNotifier tracker = new StateChangeNotifier(dag);
+
+    VertexStateUpdateListener mockListener = mock(VertexStateUpdateListener.class);
+    tracker.registerForVertexUpdates(v1.getName(), null, mockListener);
+
+    List<VertexState> expectedStates = Lists.newArrayList(
+        VertexState.RUNNING,
+        VertexState.SUCCEEDED,
+        VertexState.FAILED,
+        VertexState.KILLED,
+        VertexState.RUNNING,
+        VertexState.SUCCEEDED);
+
+    int count = 0;
+    int numExpectedEvents = 3;
+    for (VertexState state : expectedStates) {
+      if (count == numExpectedEvents) {
+        tracker.unregisterForVertexUpdates(v1.getName(), mockListener);
+      }
+      notifyTracker(tracker, v1, state);
+      count++;
+    }
+
+    ArgumentCaptor<VertexStateUpdate> argumentCaptor =
+        ArgumentCaptor.forClass(VertexStateUpdate.class);
+    verify(mockListener, times(numExpectedEvents)).onStateUpdated(argumentCaptor.capture());
+    List<VertexStateUpdate> stateUpdatesSent = argumentCaptor.getAllValues();
+
+    Iterator<VertexState> expectedStateIter =
+        expectedStates.iterator();
+    for (int i = 0; i < numExpectedEvents; i++) {
+      assertEquals(expectedStateIter.next(), stateUpdatesSent.get(i).getVertexState());
+    }
+  }
+
+  private DAG createMockDag(TezDAGID dagId, Vertex... vertices) {
+    DAG dag = mock(DAG.class);
+    doReturn(dagId).when(dag).getID();
+    for (Vertex v : vertices) {
+      String vertexName = v.getName();
+      TezVertexID vertexId = v.getVertexId();
+
+      doReturn(v).when(dag).getVertex(vertexName);
+      doReturn(v).when(dag).getVertex(vertexId);
+    }
+    return dag;
+  }
+
+  private Vertex createMockVertex(TezDAGID dagId, int id) {
+    TezVertexID vertexId = TezVertexID.getInstance(dagId, id);
+    String vertexName = "vertex" + id;
+    Vertex v = mock(Vertex.class);
+    doReturn(vertexId).when(v).getVertexId();
+    doReturn(vertexName).when(v).getName();
+    return v;
+  }
+
+  private void notifyTracker(StateChangeNotifier notifier, Vertex v,
+                             VertexState state) {
+    notifier.stateChanged(v.getVertexId(), new VertexStateUpdate(v.getName(), state));
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/88acd71c/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 6e2bd9d..31aaf6f 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -83,6 +83,7 @@ import org.apache.tez.dag.api.VertexManagerPluginContext;
 import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
 import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
 import org.apache.tez.dag.api.client.VertexStatus;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
 import org.apache.tez.dag.api.oldrecords.TaskState;
 import org.apache.tez.dag.api.records.DAGProtos;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
@@ -103,6 +104,7 @@ import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.dag.RootInputInitializerManager;
+import org.apache.tez.dag.app.dag.StateChangeNotifier;
 import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
 import org.apache.tez.dag.app.dag.Vertex;
@@ -195,6 +197,7 @@ public class TestVertexImpl {
   private VertexEventDispatcher vertexEventDispatcher;
   private DagEventDispatcher dagEventDispatcher;
   private HistoryEventHandler historyEventHandler;
+  private StateChangeNotifier updateTracker;
   private static TaskSpecificLaunchCmdOption taskSpecificLaunchCmdOption;
 
   public static class CountingOutputCommitter extends OutputCommitter {
@@ -1633,16 +1636,17 @@ public class TestVertexImpl {
         if (customInitializer == null) {
           v = new VertexImplWithControlledInitializerManager(vertexId, vPlan, vPlan.getName(), conf,
               dispatcher.getEventHandler(), taskAttemptListener,
-              clock, thh, appContext, locationHint, dispatcher);
+              clock, thh, appContext, locationHint, dispatcher, updateTracker);
         } else {
           v = new VertexImplWithRunningInputInitializer(vertexId, vPlan, vPlan.getName(), conf,
               dispatcher.getEventHandler(), taskAttemptListener,
-              clock, thh, appContext, locationHint, dispatcher, customInitializer);
+              clock, thh, appContext, locationHint, dispatcher, customInitializer, updateTracker);
         }
       } else {
         v = new VertexImpl(vertexId, vPlan, vPlan.getName(), conf,
             dispatcher.getEventHandler(), taskAttemptListener,
-            clock, thh, true, appContext, locationHint, vertexGroups, taskSpecificLaunchCmdOption);
+            clock, thh, true, appContext, locationHint, vertexGroups, taskSpecificLaunchCmdOption,
+            updateTracker);
       }
       vertices.put(vName, v);
       vertexIdMap.put(vertexId, v);
@@ -1693,6 +1697,7 @@ public class TestVertexImpl {
   }
 
   public void setupPreDagCreation() {
+    LOG.info("____________ RESETTING CURRENT DAG ____________");
     conf = new Configuration();
     conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, false);
     appAttemptId = ApplicationAttemptId.newInstance(
@@ -1736,6 +1741,7 @@ public class TestVertexImpl {
     for (PlanVertexGroupInfo groupInfo : dagPlan.getVertexGroupsList()) {
       vertexGroups.put(groupInfo.getGroupName(), new VertexGroupInfo(groupInfo));
     }
+    updateTracker = new StateChangeNotifier(dag);
     setupVertices();
     when(dag.getVertex(any(TezVertexID.class))).thenAnswer(new Answer<Vertex>() {
       @Override
@@ -1773,7 +1779,7 @@ public class TestVertexImpl {
     for (Edge edge : edges.values()) {
       edge.initialize();
     }
-    
+
     taskAttemptEventDispatcher = new TaskAttemptEventDispatcher();
     dispatcher.register(TaskAttemptEventType.class, taskAttemptEventDispatcher);
     taskEventDispatcher = new TaskEventDispatcher();
@@ -2926,6 +2932,42 @@ public class TestVertexImpl {
     Assert.assertEquals(VertexState.SUCCEEDED, v1.getState());
   }
 
+  @Test(timeout = 10000)
+  public void testInputInitializerVertexStateUpdates() throws Exception {
+    // v2 running an Input initializer, which is subscribed to events on v1.
+    useCustomInitializer = true;
+    customInitializer = new EventHandlingRootInputInitializer(null);
+    // Using the EventHandlingRootInputInitializer since it keeps the initializer alive till signalled,
+    // which is required to track events that it receives.
+    EventHandlingRootInputInitializer initializer =
+        (EventHandlingRootInputInitializer) customInitializer;
+    setupPreDagCreation();
+    dagPlan = createDAGPlanWithRunningInitializer();
+    setupPostDagCreation();
+
+    VertexImplWithRunningInputInitializer v1 =
+        (VertexImplWithRunningInputInitializer) vertices.get("vertex1");
+
+    initVertex(v1);
+    startVertex(v1);
+    Assert.assertEquals(VertexState.RUNNING, v1.getState());
+
+    // Make v1 succeed
+    for (TezTaskID taskId : v1.getTasks().keySet()) {
+      v1.handle(new VertexEventTaskCompleted(taskId, TaskState.SUCCEEDED));
+    }
+    dispatcher.await();
+
+    Assert.assertEquals(VertexState.SUCCEEDED, v1.getState());
+
+    // At this point, 2 events should have been received - since the dispatcher is complete.
+    Assert.assertEquals(2, initializer.stateUpdateEvents.size());
+    Assert.assertEquals(org.apache.tez.dag.api.event.VertexState.RUNNING,
+        initializer.stateUpdateEvents.get(0).getVertexState());
+    Assert.assertEquals(org.apache.tez.dag.api.event.VertexState.SUCCEEDED,
+        initializer.stateUpdateEvents.get(1).getVertexState());
+  }
+
   @SuppressWarnings("unchecked")
   @Test(timeout = 10000)
   public void testRootInputInitializerEvent() throws Exception {
@@ -3171,7 +3213,8 @@ public class TestVertexImpl {
       VertexPlan vPlan = invalidDagPlan.getVertex(0);
       VertexImpl v = new VertexImpl(vId, vPlan, vPlan.getName(), conf,
           dispatcher.getEventHandler(), taskAttemptListener,
-          clock, thh, true, appContext, vertexLocationHint, null, taskSpecificLaunchCmdOption);
+          clock, thh, true, appContext, vertexLocationHint, null, taskSpecificLaunchCmdOption,
+          updateTracker);
       vertexIdMap.put(vId, v);
       vertices.put(v.getName(), v);
       v.handle(new VertexEvent(vId, VertexEventType.V_INIT));
@@ -3202,10 +3245,12 @@ public class TestVertexImpl {
                                                  AppContext appContext,
                                                  VertexLocationHint vertexLocationHint,
                                                  DrainDispatcher dispatcher,
-                                                 InputInitializer presetInitializer) {
+                                                 InputInitializer presetInitializer,
+                                                 StateChangeNotifier updateTracker) {
       super(vertexId, vertexPlan, vertexName, conf, eventHandler,
           taskAttemptListener, clock, thh, true,
-          appContext, vertexLocationHint, null, taskSpecificLaunchCmdOption);
+          appContext, vertexLocationHint, null, taskSpecificLaunchCmdOption,
+          updateTracker);
       this.presetInitializer = presetInitializer;
     }
 
@@ -3217,7 +3262,7 @@ public class TestVertexImpl {
       try {
         rootInputInitializerManager =
             new RootInputInitializerManagerWithRunningInitializer(this, this.getAppContext(),
-                presetInitializer);
+                presetInitializer, stateChangeNotifier);
       } catch (IOException e) {
         throw new RuntimeException(e);
       }
@@ -3239,10 +3284,12 @@ public class TestVertexImpl {
                                                       Clock clock, TaskHeartbeatHandler thh,
                                                       AppContext appContext,
                                                       VertexLocationHint vertexLocationHint,
-                                                      DrainDispatcher dispatcher) {
+                                                      DrainDispatcher dispatcher,
+                                                      StateChangeNotifier updateTracker) {
       super(vertexId, vertexPlan, vertexName, conf, eventHandler,
           taskAttemptListener, clock, thh, true,
-          appContext, vertexLocationHint, null, taskSpecificLaunchCmdOption);
+          appContext, vertexLocationHint, null, taskSpecificLaunchCmdOption,
+          updateTracker);
       this.dispatcher = dispatcher;
     }
 
@@ -3254,7 +3301,7 @@ public class TestVertexImpl {
       try {
         rootInputInitializerManager =
             new RootInputInitializerManagerControlled(this, this.getAppContext(), eventHandler,
-                dispatcher);
+                dispatcher, stateChangeNotifier);
       } catch (IOException e) {
         throw new RuntimeException(e);
       }
@@ -3273,9 +3320,10 @@ public class TestVertexImpl {
     private final InputInitializer presetInitializer;
 
     public RootInputInitializerManagerWithRunningInitializer(Vertex vertex, AppContext appContext,
-                                                             InputInitializer presetInitializer) throws
+                                                             InputInitializer presetInitializer,
+                                                             StateChangeNotifier tracker) throws
         IOException {
-      super(vertex, appContext, UserGroupInformation.getCurrentUser());
+      super(vertex, appContext, UserGroupInformation.getCurrentUser(), tracker);
       this.presetInitializer = presetInitializer;
     }
 
@@ -3284,6 +3332,9 @@ public class TestVertexImpl {
     protected InputInitializer createInitializer(
         RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input,
         InputInitializerContext context) {
+      if (presetInitializer instanceof ContextSettableInputInitialzier) {
+        ((ContextSettableInputInitialzier)presetInitializer).setContext(context);
+      }
       return presetInitializer;
     }
   }
@@ -3300,9 +3351,10 @@ public class TestVertexImpl {
 
     public RootInputInitializerManagerControlled(Vertex vertex, AppContext appContext,
                                                  EventHandler eventHandler,
-                                                 DrainDispatcher dispatcher
+                                                 DrainDispatcher dispatcher,
+                                                 StateChangeNotifier tracker
     ) throws IOException {
-      super(vertex, appContext, UserGroupInformation.getCurrentUser());
+      super(vertex, appContext, UserGroupInformation.getCurrentUser(), tracker);
       this.eventHandler = eventHandler;
       this.dispatcher = dispatcher;
       this.vertexID = vertex.getVertexId();
@@ -3590,7 +3642,8 @@ public class TestVertexImpl {
   }
 
   @InterfaceAudience.Private
-  public static class EventHandlingRootInputInitializer extends InputInitializer {
+  public static class EventHandlingRootInputInitializer extends InputInitializer
+      implements ContextSettableInputInitialzier {
 
     final AtomicBoolean initStarted = new AtomicBoolean(false);
     final AtomicBoolean eventReceived = new AtomicBoolean(false);
@@ -3599,6 +3652,9 @@ public class TestVertexImpl {
     private final ReentrantLock lock = new ReentrantLock();
     private final Condition eventCondition = lock.newCondition();
 
+    private final List<VertexStateUpdate> stateUpdateEvents = new LinkedList<VertexStateUpdate>();
+    private volatile InputInitializerContext context;
+
     public EventHandlingRootInputInitializer(
         InputInitializerContext initializerContext) {
       super(initializerContext);
@@ -3606,6 +3662,7 @@ public class TestVertexImpl {
 
     @Override
     public List<Event> initialize() throws Exception {
+      context.registerForVertexStatusUpdates("vertex1", null);
       initStarted.set(true);
       lock.lock();
       try {
@@ -3632,5 +3689,18 @@ public class TestVertexImpl {
         lock.unlock();
       }
     }
+
+    @Override
+    public void setContext(InputInitializerContext context) {
+      this.context = context;
+    }
+
+    public void onVertexStateUpdated(VertexStateUpdate stateUpdate) {
+      stateUpdateEvents.add(stateUpdate);
+    }
+  }
+
+  private interface ContextSettableInputInitialzier {
+    void setContext(InputInitializerContext context);
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/88acd71c/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java
index b1a0880..55f9b11 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java
@@ -29,13 +29,17 @@ import static org.junit.Assert.assertTrue;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.EnumSet;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
 import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.lib.MRInputUtils;
@@ -208,6 +212,11 @@ public class TestMRInputSplitDistributor {
     }
 
     @Override
+    public void registerForVertexStatusUpdates(String vertexName, Set<VertexState> stateSet) {
+      throw new UnsupportedOperationException("getVertexNumTasks not implemented in this mock");
+    }
+
+    @Override
     public UserPayload getUserPayload() {
       throw new UnsupportedOperationException("getUserPayload not implemented in this mock");
     }


[2/2] git commit: Update CHANGES.txt for TEZ-1447 (cherry picked from commit 44e6a77cc5e6e09e97c5b1525428bf67967c1d25)

Posted by ss...@apache.org.
Update CHANGES.txt for TEZ-1447
(cherry picked from commit 44e6a77cc5e6e09e97c5b1525428bf67967c1d25)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/d9ea4353
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/d9ea4353
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/d9ea4353

Branch: refs/heads/branch-0.5
Commit: d9ea43537bb9333de5213e56d4367bf0c36cb27a
Parents: 88acd71
Author: Siddharth Seth <ss...@apache.org>
Authored: Sun Sep 7 20:23:05 2014 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Sun Sep 7 20:23:51 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/d9ea4353/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c2c075c..0bdb9d9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,7 @@ ALL CHANGES
   TEZ-1527. Fix indentation of Vertex status in DAGClient output.
   TEZ-1536. Fix spelling typo "configurartion" in TezClientUtils.
   TEZ-1310. Update website documentation framework
+  TEZ-1447. Provide a mechanism for InputInitializers to know about Vertex state changes.
 
 Release 0.5.0: Unreleased