You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/11/04 22:09:14 UTC

[2/2] git commit: TEZ-1547. Make use of state change notifier in VertexManagerPlugins and fix TEZ-1494 without latency penalty (bikas) (cherry picked from commit cd0ed751ae502492733738f2d7d7d0ae8e4224e9)

TEZ-1547. Make use of state change notifier in VertexManagerPlugins and fix TEZ-1494 without latency penalty (bikas)
(cherry picked from commit cd0ed751ae502492733738f2d7d7d0ae8e4224e9)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/bf8d7a5e
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/bf8d7a5e
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/bf8d7a5e

Branch: refs/heads/branch-0.5
Commit: bf8d7a5e25b506985f7e0b358b63d4fb725b2972
Parents: 6e6b55d
Author: Bikas Saha <bi...@apache.org>
Authored: Tue Nov 4 13:07:06 2014 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Tue Nov 4 13:08:20 2014 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../apache/tez/dag/api/VertexManagerPlugin.java |  21 ++
 .../tez/dag/api/VertexManagerPluginContext.java |  50 ++++
 .../apache/tez/dag/api/event/VertexState.java   |  17 +-
 .../tez/runtime/api/InputInitializer.java       |   4 +-
 .../java/org/apache/tez/dag/app/dag/Vertex.java |   5 +-
 .../event/VertexEventManagerUserCodeError.java  |  36 +++
 .../tez/dag/app/dag/event/VertexEventType.java  |   1 +
 .../dag/impl/ImmediateStartVertexManager.java   |  93 ++++---
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 180 ++++++++++--
 .../tez/dag/app/dag/impl/VertexManager.java     | 150 ++++++++--
 .../tez/dag/app/dag/impl/TestDAGImpl.java       |   7 -
 .../impl/TestImmediateStartVertexManager.java   | 131 +++++++++
 .../tez/dag/app/dag/impl/TestVertexImpl.java    | 279 ++++++++++++++-----
 .../tez/dag/app/dag/impl/TestVertexManager.java |   5 +-
 .../dag/app/dag/impl/TestVertexRecovery.java    |  48 ++++
 .../tez/examples/SortMergeJoinExample.java      |  16 +-
 .../vertexmanager/ShuffleVertexManager.java     |  60 +++-
 .../vertexmanager/TestShuffleVertexManager.java | 122 +++++---
 .../org/apache/tez/test/TestAMRecovery.java     |   5 +-
 20 files changed, 1005 insertions(+), 227 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/bf8d7a5e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 882aa07..ee3f358 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -94,6 +94,8 @@ ALL CHANGES:
   TEZ-1731. OnDiskMerger can end up clobbering files across tasks with LocalDiskFetch enabled.
   TEZ-1735. Allow setting basic info per DAG for Tez UI.
   TEZ-1728. Remove local host name from Fetcher thread name.
+  TEZ-1547. Make use of state change notifier in VertexManagerPlugins and fix
+  TEZ-1494 without latency penalty
 
 Release 0.5.1: 2014-10-02
 

http://git-wip-us.apache.org/repos/asf/tez/blob/bf8d7a5e/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPlugin.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPlugin.java b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPlugin.java
index b494b12..6aa18d6 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPlugin.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPlugin.java
@@ -23,6 +23,7 @@ import java.util.Map;
 
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
 
@@ -99,4 +100,24 @@ public abstract class VertexManagerPlugin {
   public final VertexManagerPluginContext getContext() {
     return this.context;
   }
+  
+  /**
+   * Receive notifications on vertex state changes.
+   * <p/>
+   * State changes will be received based on the registration via
+   * {@link VertexManagerPluginContext#registerForVertexStateUpdates(String, java.util.Set)}
+   * . Notifications will be received for all registered state changes, and not
+   * just for the latest state update. They will be in order in which the state
+   * change occurred.
+   * </p><br>This method may be invoked concurrently with {@link #onVertexStarted(Map)} etc. and 
+   * multi-threading/concurrency implications must be considered.
+   *  
+   * @param stateUpdate
+   *          an event indicating the name of the vertex, and it's updated
+   *          state. Additional information may be available for specific
+   *          events, Look at the type hierarchy for
+   *          {@link org.apache.tez.dag.api.event.VertexStateUpdate}
+   */
+  public void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws Exception {
+  }  
  }

http://git-wip-us.apache.org/repos/asf/tez/blob/bf8d7a5e/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
index c1f4bcd..dfa9287 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.tez.dag.api.TaskLocationHint;
+import org.apache.tez.dag.api.event.VertexState;
 import org.apache.tez.runtime.api.InputSpecUpdate;
 import org.apache.tez.runtime.api.events.InputDataInformationEvent;
 
@@ -169,4 +170,53 @@ public interface VertexManagerPluginContext {
    * @return DAG Attempt number
    */
   public int getDAGAttemptNumber();
+  
+  /**
+   * Register to get notifications on updates to the specified vertex. Notifications will be sent
+   * via {@link VertexManagerPlugin#onVertexStateUpdated(org.apache.tez.dag.api.event.VertexStateUpdate)}
+   *
+   * This method can only be invoked once. Duplicate invocations will result in an error.
+   *
+   * @param vertexName the vertex name for which notifications are required.
+   * @param stateSet   the set of states for which notifications are required. null implies all
+   */
+  void registerForVertexStateUpdates(String vertexName, @Nullable Set<VertexState> stateSet);
+  
+  /**
+   * Optional API. No need to call this when the vertex is not fully defined to
+   * start with. E.g. vertex parallelism is not defined, or edges are not
+   * configured. In that case, Tez will assume that the vertex needs
+   * reconfiguration. If the vertex is already fully defined, but the
+   * {@link VertexManagerPlugin} wants to reconfigure the vertex, then it must
+   * use this API to inform Tez about its intention. Without invoking this
+   * method, it is invalid to re-configure the vertex, e.g. via the
+   * {@link #setVertexParallelism(int, VertexLocationHint, Map, Map)} method if
+   * the vertex is already fully defined. This can be invoked at any time until
+   * {@link VertexManagerPlugin#initialize()} has completed. Its invalid to
+   * invoke this method after {@link VertexManagerPlugin#initialize()} has
+   * completed<br>
+   * If this API is invoked, then {@link #doneReconfiguringVertex()} must be
+   * invoked after the {@link VertexManagerPlugin} is done reconfiguring the
+   * vertex, . Actions like scheduling tasks or sending events do not count as
+   * reconfiguration.
+   */
+  public void vertexReconfigurationPlanned();
+  
+  /**
+   * Optional API. This needs to be called only if {@link #vertexReconfigurationPlanned()} has been 
+   * invoked. This must be called after {@link #vertexReconfigurationPlanned()} is called.
+   */
+  public void doneReconfiguringVertex();
+  
+  /**
+   * Optional API. This API can be invoked to declare that the
+   * {@link VertexManagerPlugin} is done with its work. After this the system
+   * will not invoke the plugin methods any more. Its invalid for the plugin to
+   * make further invocations of the context APIs after this. This can be used
+   * to stop receiving further {@link VertexState} notifications after the
+   * plugin has made all changes.
+   */
+  // TODO must be done later after TEZ-1714
+  //public void vertexManagerDone();
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/bf8d7a5e/tez-api/src/main/java/org/apache/tez/dag/api/event/VertexState.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/event/VertexState.java b/tez-api/src/main/java/org/apache/tez/dag/api/event/VertexState.java
index ab296a5..c9c2d58 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/event/VertexState.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/event/VertexState.java
@@ -28,12 +28,14 @@ import org.apache.hadoop.classification.InterfaceStability;
 @InterfaceStability.Unstable
 public enum VertexState {
   /**
-   * Indicates that the Vertex had entered the SUCCEEDED state. A vertex could go back into RUNNING state after SUCCEEDING
+   * Indicates that the Vertex had entered the SUCCEEDED state. A vertex could
+   * go back into RUNNING state after SUCCEEDING
    */
   SUCCEEDED,
   /**
-   * Indicates that the Vertex had entered the RUNNING state. This state can be reached after SUCCEEDED, if some
-   * tasks belonging to the vertex are restarted due to errors
+   * Indicates that the Vertex had entered the RUNNING state. This state can be
+   * reached after SUCCEEDED, if some tasks belonging to the vertex are
+   * restarted due to errors
    */
   RUNNING,
   /**
@@ -47,5 +49,12 @@ public enum VertexState {
   /**
    * Indicates that the parallelism for the vertex had changed.
    */
-  PARALLELISM_UPDATED
+  PARALLELISM_UPDATED,
+  /**
+   * Indicates that the vertex has been completely configured. Parallelism, edges, edge
+   * properties, inputs/outputs have been set and will not be changed any
+   * further. Listeners can depend on the vertex's configured state after
+   * receiving this notification.
+   */
+  CONFIGURED
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/bf8d7a5e/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializer.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializer.java b/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializer.java
index d9d6517..cc33205 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializer.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializer.java
@@ -19,6 +19,7 @@
 package org.apache.tez.runtime.api;
 
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
@@ -97,7 +98,8 @@ public abstract class InputInitializer {
    *
    * Extensive processing should not be performed via this method call. Instead this should just be
    * used as a notification mechanism to the main initialization, which is via the initialize method.
-   *
+   * <br>This method may be invoked concurrently with {@link #initialize()} etc. and 
+   * multi-threading/concurrency implications must be considered.
    * @param stateUpdate an event indicating the name of the vertex, and it's updated state.
    *                    Additional information may be available for specific events, Look at the
    *                    type hierarchy for {@link org.apache.tez.dag.api.event.VertexStateUpdate}

http://git-wip-us.apache.org/repos/asf/tez/blob/bf8d7a5e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index fa1f2c4..cfedc41 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -89,8 +89,11 @@ public interface Vertex extends Comparable<Vertex> {
 
   void setParallelism(int parallelism, VertexLocationHint vertexLocationHint,
       Map<String, EdgeManagerPluginDescriptor> sourceEdgeManagers,
-      Map<String, InputSpecUpdate> rootInputSpecUpdate) throws AMUserCodeException;
+      Map<String, InputSpecUpdate> rootInputSpecUpdate, boolean fromVertexManager)
+      throws AMUserCodeException;
   void setVertexLocationHint(VertexLocationHint vertexLocationHint);
+  void vertexReconfigurationPlanned();
+  void doneReconfiguringVertex();
 
   // CHANGE THESE TO LISTS AND MAINTAIN ORDER?
   void setInputVertices(Map<Vertex, Edge> inVertices);

http://git-wip-us.apache.org/repos/asf/tez/blob/bf8d7a5e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventManagerUserCodeError.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventManagerUserCodeError.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventManagerUserCodeError.java
new file mode 100644
index 0000000..022620a
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventManagerUserCodeError.java
@@ -0,0 +1,36 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.dag.app.dag.impl.AMUserCodeException;
+import org.apache.tez.dag.records.TezVertexID;
+
+public class VertexEventManagerUserCodeError extends VertexEvent {
+  final AMUserCodeException e;
+  
+  public VertexEventManagerUserCodeError(TezVertexID vertexId, AMUserCodeException e) {
+    super(vertexId, VertexEventType.V_MANAGER_USER_CODE_ERROR);
+    this.e = e;
+  }
+  
+  public AMUserCodeException getError() {
+    return e;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/bf8d7a5e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
index e649095..b4f7e29 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
@@ -42,6 +42,7 @@ public enum VertexEventType {
   
   //Producer:Any component
   V_INTERNAL_ERROR,
+  V_MANAGER_USER_CODE_ERROR,
   
   V_ROUTE_EVENT,
   V_ONE_TO_ONE_SOURCE_SPLIT,

http://git-wip-us.apache.org/repos/asf/tez/blob/bf8d7a5e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java
index 773426b..00b5306 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java
@@ -18,8 +18,10 @@
 
 package org.apache.tez.dag.app.dag.impl;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tez.dag.api.EdgeProperty;
@@ -27,11 +29,15 @@ import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.VertexManagerPlugin;
 import org.apache.tez.dag.api.VertexManagerPluginContext;
 import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
 
+import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Starts all tasks immediately on vertex start
@@ -40,18 +46,10 @@ public class ImmediateStartVertexManager extends VertexManagerPlugin {
 
   private static final Log LOG = LogFactory.getLog(ImmediateStartVertexManager.class);
 
-  private final Map<String, SourceVertexInfo> srcVertexInfo = Maps.newHashMap();
+  private final Map<String, Boolean> srcVertexConfigured = Maps.newConcurrentMap();
   private int managedTasks;
   private boolean tasksScheduled = false;
-
-  class SourceVertexInfo {
-    EdgeProperty edgeProperty;
-    int numFinishedTasks;
-
-    SourceVertexInfo(EdgeProperty edgeProperty) {
-      this.edgeProperty = edgeProperty;
-    }
-  }
+  private AtomicBoolean onVertexStartedDone = new AtomicBoolean(false);
 
   public ImmediateStartVertexManager(VertexManagerPluginContext context) {
     super(context);
@@ -63,37 +61,35 @@ public class ImmediateStartVertexManager extends VertexManagerPlugin {
     Map<String, EdgeProperty> edges = getContext().getInputVertexEdgeProperties();
     for (Map.Entry<String, EdgeProperty> entry : edges.entrySet()) {
       String srcVertex = entry.getKey();
-      EdgeProperty edgeProp = entry.getValue();
-      LOG.info("Task count in " + srcVertex + ": " + getContext().getVertexNumTasks(srcVertex));
       //track vertices with task count > 0
       if (getContext().getVertexNumTasks(srcVertex) > 0) {
-        srcVertexInfo.put(srcVertex, new SourceVertexInfo(edgeProp));
+        LOG.info("Task count in " + srcVertex + ": " + getContext().getVertexNumTasks(srcVertex));
+        srcVertexConfigured.put(srcVertex, false);
+        getContext().registerForVertexStateUpdates(srcVertex, EnumSet.of(VertexState.CONFIGURED));
       } else {
         LOG.info("Vertex: " + getContext().getVertexName() + "; Ignoring " + srcVertex
             + " as it has got 0 tasks");
       }
     }
-
-    //handle completions
-    for (Map.Entry<String, List<Integer>> entry : completions.entrySet()) {
-      for (Integer task : entry.getValue()) {
-        handleSourceTaskFinished(entry.getKey(), task);
-      }
-    }
+    onVertexStartedDone.set(true);
     scheduleTasks();
   }
 
-  private void handleSourceTaskFinished(String vertex, Integer taskId) {
-    SourceVertexInfo srcInfo = srcVertexInfo.get(vertex);
-    //Not mandatory to check for duplicate completions here
-    srcInfo.numFinishedTasks++;
-  }
-
   private void scheduleTasks() {
-    if (!canScheduleTasks()) {
+    if (!onVertexStartedDone.get()) {
+      // vertex not started yet
+      return;
+    }
+    if (tasksScheduled) {
+      // already scheduled
       return;
     }
 
+    if (!canScheduleTasks()) {
+      return;
+    }
+    
+    tasksScheduled = true;
     List<TaskWithLocationHint> tasksToStart = Lists.newArrayListWithCapacity(managedTasks);
     for (int i = 0; i < managedTasks; ++i) {
       tasksToStart.add(new TaskWithLocationHint(new Integer(i), null));
@@ -103,35 +99,42 @@ public class ImmediateStartVertexManager extends VertexManagerPlugin {
       LOG.info("Starting " + tasksToStart.size() + " in " + getContext().getVertexName());
       getContext().scheduleVertexTasks(tasksToStart);
     }
-    tasksScheduled = true;
+    // all tasks scheduled. Can call vertexManagerDone().
+    // TODO TEZ-1714 for locking issues getContext().vertexManagerDone();
   }
 
   private boolean canScheduleTasks() {
-    //Check if at least 1 task is finished from each source vertex (in case of broadcast &
-    // one-to-one or custom)
-    for (Map.Entry<String, SourceVertexInfo> entry : srcVertexInfo.entrySet()) {
-      SourceVertexInfo srcVertexInfo = entry.getValue();
-      switch(srcVertexInfo.edgeProperty.getDataMovementType()) {
-      case ONE_TO_ONE:
-      case BROADCAST:
-      case CUSTOM:
-        if (srcVertexInfo.numFinishedTasks == 0) {
-          //do not schedule tasks until a task from source task is complete
-          return false;
+    // check for source vertices completely configured
+    for (Map.Entry<String, Boolean> entry : srcVertexConfigured.entrySet()) {
+      if (!entry.getValue().booleanValue()) {
+        // vertex not configured
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Waiting for vertex: " + entry.getKey() + " in vertex: " + getContext().getVertexName());
         }
-      default:
-        break;
+        return false;
       }
     }
+
     return true;
   }
+  
+  @Override
+  public void onVertexStateUpdated(VertexStateUpdate stateUpdate) {
+    Preconditions.checkArgument(stateUpdate.getVertexState() == VertexState.CONFIGURED,
+        "Received incorrect state notification : " + stateUpdate.getVertexState() + " for vertex: "
+            + stateUpdate.getVertexName() + " in vertex: " + getContext().getVertexName());
+    Preconditions.checkArgument(srcVertexConfigured.containsKey(stateUpdate.getVertexName()),
+        "Received incorrect vertex notification : " + stateUpdate.getVertexState() + " for vertex: "
+            + stateUpdate.getVertexName() + " in vertex: " + getContext().getVertexName());
+    Preconditions.checkState(srcVertexConfigured.put(stateUpdate.getVertexName(), true)
+        .booleanValue() == false);
+    LOG.info("Received configured notification: " + stateUpdate.getVertexState() + " for vertex: "
+        + stateUpdate.getVertexName() + " in vertex: " + getContext().getVertexName());
+    scheduleTasks();
+  }
 
   @Override
   public void onSourceTaskCompleted(String srcVertexName, Integer attemptId) {
-    handleSourceTaskFinished(srcVertexName, attemptId);
-    if (!tasksScheduled) {
-      scheduleTasks();
-    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/bf8d7a5e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 4a88949..593ecca 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -110,6 +110,7 @@ import org.apache.tez.dag.app.dag.event.TaskEventRecoverTask;
 import org.apache.tez.dag.app.dag.event.TaskEventTermination;
 import org.apache.tez.dag.app.dag.event.TaskEventType;
 import org.apache.tez.dag.app.dag.event.VertexEvent;
+import org.apache.tez.dag.app.dag.event.VertexEventManagerUserCodeError;
 import org.apache.tez.dag.app.dag.event.VertexEventNullEdgeInitialized;
 import org.apache.tez.dag.app.dag.event.VertexEventOneToOneSourceSplit;
 import org.apache.tez.dag.app.dag.event.VertexEventRecoverVertex;
@@ -320,7 +321,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               (VertexState.RECOVERING, VertexState.RECOVERING,
                   VertexEventType.V_TERMINATE,
                   new TerminateDuringRecoverTransition())
-
+          .addTransition
+              (VertexState.RECOVERING, EnumSet.of(VertexState.RECOVERING),
+                  VertexEventType.V_MANAGER_USER_CODE_ERROR,
+                  new VertexManagerUserCodeErrorTransition())
+          
           // Transitions from INITIALIZING state
           .addTransition(VertexState.INITIALIZING,
               EnumSet.of(VertexState.INITIALIZING, VertexState.INITED,
@@ -353,6 +358,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               EnumSet.of(VertexState.INITIALIZING, VertexState.FAILED),
               VertexEventType.V_ROUTE_EVENT,
               ROUTE_EVENT_TRANSITION)
+          .addTransition(VertexState.INITIALIZING, EnumSet.of(VertexState.FAILED),
+              VertexEventType.V_MANAGER_USER_CODE_ERROR,
+              new VertexManagerUserCodeErrorTransition())
           .addTransition(VertexState.INITIALIZING, VertexState.KILLED,
               VertexEventType.V_TERMINATE,
               new TerminateInitingVertexTransition())
@@ -399,6 +407,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           .addTransition(VertexState.INITED, VertexState.KILLED,
               VertexEventType.V_TERMINATE,
               new TerminateInitedVertexTransition())
+          .addTransition(VertexState.INITED, EnumSet.of(VertexState.FAILED),
+              VertexEventType.V_MANAGER_USER_CODE_ERROR,
+              new VertexManagerUserCodeErrorTransition())
           .addTransition(VertexState.INITED, VertexState.ERROR,
               VertexEventType.V_INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
@@ -429,6 +440,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           .addTransition(VertexState.RUNNING, VertexState.TERMINATING,
               VertexEventType.V_TERMINATE,
               new VertexKilledTransition())
+          .addTransition(VertexState.RUNNING, EnumSet.of(VertexState.TERMINATING),
+              VertexEventType.V_MANAGER_USER_CODE_ERROR,
+              new VertexManagerUserCodeErrorTransition())
           .addTransition(VertexState.RUNNING, VertexState.RUNNING,
               VertexEventType.V_TASK_RESCHEDULED,
               new TaskRescheduledTransition())
@@ -460,6 +474,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           // Ignore-able events
           .addTransition(VertexState.TERMINATING, VertexState.TERMINATING,
               EnumSet.of(VertexEventType.V_TERMINATE,
+                  VertexEventType.V_MANAGER_USER_CODE_ERROR,
                   VertexEventType.V_ROOT_INPUT_FAILED,
                   VertexEventType.V_SOURCE_VERTEX_STARTED,
                   VertexEventType.V_ROOT_INPUT_INITIALIZED,
@@ -512,6 +527,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           // Ignore-able events
           .addTransition(VertexState.FAILED, VertexState.FAILED,
               EnumSet.of(VertexEventType.V_TERMINATE,
+                  VertexEventType.V_MANAGER_USER_CODE_ERROR,
                   VertexEventType.V_ROOT_INPUT_FAILED,
                   VertexEventType.V_SOURCE_VERTEX_STARTED,
                   VertexEventType.V_TASK_RESCHEDULED,
@@ -534,6 +550,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           // Ignore-able events
           .addTransition(VertexState.KILLED, VertexState.KILLED,
               EnumSet.of(VertexEventType.V_TERMINATE,
+                  VertexEventType.V_MANAGER_USER_CODE_ERROR,
                   VertexEventType.V_ROOT_INPUT_FAILED,
                   VertexEventType.V_INIT,
                   VertexEventType.V_SOURCE_VERTEX_STARTED,
@@ -559,6 +576,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                   VertexEventType.V_START,
                   VertexEventType.V_ROUTE_EVENT,
                   VertexEventType.V_TERMINATE,
+                  VertexEventType.V_MANAGER_USER_CODE_ERROR,
                   VertexEventType.V_TASK_COMPLETED,
                   VertexEventType.V_TASK_ATTEMPT_COMPLETED,
                   VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT,
@@ -627,6 +645,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
   private final String vertexName;
   private final ProcessorDescriptor processorDescriptor;
+  
+  private boolean vertexToBeReconfiguredByManager = false;
+  AtomicBoolean vmIsInitialized = new AtomicBoolean(false);
+  AtomicBoolean completelyConfiguredSent = new AtomicBoolean(false);
 
   @VisibleForTesting
   Map<Vertex, Edge> sourceVertices;
@@ -1212,14 +1234,16 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   @Override
   public void setParallelism(int parallelism, VertexLocationHint vertexLocationHint,
       Map<String, EdgeManagerPluginDescriptor> sourceEdgeManagers,
-      Map<String, InputSpecUpdate> rootInputSpecUpdates) throws AMUserCodeException {
-    setParallelism(parallelism, vertexLocationHint, sourceEdgeManagers, rootInputSpecUpdates, false);
+      Map<String, InputSpecUpdate> rootInputSpecUpdates, boolean fromVertexManager)
+      throws AMUserCodeException {
+    setParallelism(parallelism, vertexLocationHint, sourceEdgeManagers, rootInputSpecUpdates,
+        false, fromVertexManager);
   }
 
   private void setParallelism(int parallelism, VertexLocationHint vertexLocationHint,
       Map<String, EdgeManagerPluginDescriptor> sourceEdgeManagers,
       Map<String, InputSpecUpdate> rootInputSpecUpdates,
-      boolean recovering) throws AMUserCodeException {
+      boolean recovering, boolean fromVertexManager) throws AMUserCodeException {
     if (recovering) {
       writeLock.lock();
       try {
@@ -1255,6 +1279,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     + parallelism + " for vertex: " + logIdentifier);
     setVertexLocationHint(vertexLocationHint);
     writeLock.lock();
+    
     try {
       if (parallelismSet == true) {
         String msg = "Parallelism can only be set dynamically once per vertex: " + logIdentifier; 
@@ -1262,6 +1287,15 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         throw new TezUncheckedException(msg);
       }
 
+      if (fromVertexManager && canInitVertex()) {
+        // vertex is fully defined. setParallelism has been called. VertexManager should have 
+        // informed us about this. Otherwise we would have notified listeners that we are fully 
+        // defined before we are actually fully defined
+        Preconditions.checkState(vertexToBeReconfiguredByManager, "Vertex is fully configured but still"
+            + " the reconfiguration API has been called. VertexManager must notify the framework using " 
+            + " context.vertexReconfigurationPlanned() before re-configuring the vertex.");
+      }
+      
       parallelismSet = true;
 
       // Input initializer/Vertex Manager/1-1 split expected to set parallelism.
@@ -1421,6 +1455,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     }
   }
 
+  @Override
   public void setVertexLocationHint(VertexLocationHint vertexLocationHint) {
     writeLock.lock();
     try {
@@ -1432,6 +1467,46 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       writeLock.unlock();
     }
   }
+  
+  @Override
+  public void vertexReconfigurationPlanned() {
+    vertexReconfigurationPlanned(false);
+  }
+  
+  public void vertexReconfigurationPlanned(boolean testOverride) {
+    writeLock.lock();
+    try {
+      if (testOverride) {
+        Preconditions.checkState(vmIsInitialized.get() && completelyConfiguredSent.get(),
+            "test should override only failed cases");
+      } else {
+        Preconditions.checkState(!vmIsInitialized.get(),
+            "context.vertexReconfigurationPlanned() cannot be called after initialize()");
+        Preconditions.checkState(!completelyConfiguredSent.get(), "vertexReconfigurationPlanned() "
+            + " cannot be invoked after the vertex has been configured.");
+      }
+      this.vertexToBeReconfiguredByManager = true;
+    } finally {
+      writeLock.unlock();
+    }      
+  }
+
+  @Override
+  public void doneReconfiguringVertex() {
+    writeLock.lock();
+    try {
+      Preconditions.checkState(vertexToBeReconfiguredByManager, "doneReconfiguringVertex() can be "
+          + "invoked only after vertexReconfigurationPlanned() is invoked");
+      this.vertexToBeReconfiguredByManager = false;
+      if (completelyConfiguredSent.compareAndSet(false, true)) {
+        // vertex already started and at that time this event was not sent. Send now.
+        stateChangeNotifier.stateChanged(vertexId, new VertexStateUpdate(vertexName,
+            org.apache.tez.dag.api.event.VertexState.CONFIGURED));
+      }
+    } finally {
+      writeLock.unlock();
+    }    
+  }
 
   @Override
   /**
@@ -1970,6 +2045,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
     try {
       vertexManager.initialize();
+      vmIsInitialized.set(true);
     } catch (AMUserCodeException e) {
       String msg = "Exception in " + e.getSource()+ ", vertex:" + logIdentifier;
       LOG.error(msg, e);
@@ -2042,7 +2118,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               .getVertexManagerPlugin());
       LOG.info("Setting user vertex manager plugin: "
           + pluginDesc.getClassName() + " on vertex: " + getName());
-      vertexManager = new VertexManager(pluginDesc, this, appContext);
+      vertexManager = new VertexManager(pluginDesc, this, appContext, stateChangeNotifier);
     } else {
       // Intended order of picking a vertex manager
       // If there is an InputInitializer then we use the RootInputVertexManager. May be fixed by TEZ-703
@@ -2055,26 +2131,26 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
             + logIdentifier);
         vertexManager = new VertexManager(
             VertexManagerPluginDescriptor.create(RootInputVertexManager.class.getName()),
-            this, appContext);
+            this, appContext, stateChangeNotifier);
       } else if (hasOneToOne && !hasCustom) {
         LOG.info("Setting vertexManager to InputReadyVertexManager for "
             + logIdentifier);
         vertexManager = new VertexManager(
             VertexManagerPluginDescriptor.create(InputReadyVertexManager.class.getName()),
-            this, appContext);
+            this, appContext, stateChangeNotifier);
       } else if (hasBipartite && !hasCustom) {
         LOG.info("Setting vertexManager to ShuffleVertexManager for "
             + logIdentifier);
         // shuffle vertex manager needs a conf payload
         vertexManager = new VertexManager(ShuffleVertexManager.createConfigBuilder(conf).build(),
-            this, appContext);
+            this, appContext, stateChangeNotifier);
       } else {
         // schedule all tasks upon vertex start. Default behavior.
         LOG.info("Setting vertexManager to ImmediateStartVertexManager for "
             + logIdentifier);
         vertexManager = new VertexManager(
             VertexManagerPluginDescriptor.create(ImmediateStartVertexManager.class.getName()),
-            this, appContext);
+            this, appContext, stateChangeNotifier);
       }
     }
   }
@@ -2222,7 +2298,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                   new TaskEventRecoverTask(task.getTaskId()));
             }
             try {
-              vertex.vertexManager.onVertexStarted(vertex.pendingReportedSrcCompletions);
+              vertex.recoveryCodeSimulatingStart();
               endState = VertexState.RUNNING;
             } catch (AMUserCodeException e) {
               String msg = "Exception in " + e.getSource() + ", vertex:" + vertex.getLogIdentifier();
@@ -2274,7 +2350,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                         taskState));
               }
               try {
-                vertex.vertexManager.onVertexStarted(vertex.pendingReportedSrcCompletions);
+                vertex.recoveryCodeSimulatingStart();
                 endState = VertexState.RUNNING;
               } catch (AMUserCodeException e) {
                 String msg = "Exception in " + e.getSource() +", vertex:" + vertex.getLogIdentifier();
@@ -2335,6 +2411,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     }
 
   }
+  
+  private void recoveryCodeSimulatingStart() throws AMUserCodeException {
+    vertexManager.onVertexStarted(pendingReportedSrcCompletions);
+    // This code is duplicated from startVertex() because recovery does not follow normal
+    // transitions. To be removed after recovery code is fixed.
+    maybeSendConfiguredEvent();
+  }
 
   private void routeRecoveredEvents(VertexState vertexState,
       List<TezEvent> tezEvents) {
@@ -2556,7 +2639,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           boolean successSetParallelism ;
           try {
             vertex.setParallelism(0,
-              null, vertex.recoveredSourceEdgeManagers, vertex.recoveredRootInputSpecUpdates, true);
+              null, vertex.recoveredSourceEdgeManagers, vertex.recoveredRootInputSpecUpdates, true, false);
             successSetParallelism = true;
           } catch (Exception e) {
             successSetParallelism = false;
@@ -2614,7 +2697,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           }
           try {
             vertex.setParallelism(0, null, vertex.recoveredSourceEdgeManagers,
-              vertex.recoveredRootInputSpecUpdates, true);
+              vertex.recoveredRootInputSpecUpdates, true, false);
             successSetParallelism = true;
           } catch (Exception e) {
             successSetParallelism = false;
@@ -2634,7 +2717,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                   new TaskEventRecoverTask(task.getTaskId()));
             }
             try {
-              vertex.vertexManager.onVertexStarted(vertex.pendingReportedSrcCompletions);
+              vertex.recoveryCodeSimulatingStart();
               endState = VertexState.RUNNING;
             } catch (AMUserCodeException e) {
               String msg = "Exception in " + e.getSource() + ", vertex=" + vertex.getLogIdentifier();
@@ -2674,7 +2757,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
             }
             // Wait for all tasks to recover and report back
             try {
-              vertex.vertexManager.onVertexStarted(vertex.pendingReportedSrcCompletions);
+              vertex.recoveryCodeSimulatingStart();
               endState = VertexState.RUNNING;
             } catch (AMUserCodeException e) {
               String msg = "Exception in " + e.getSource() +", vertex:" + vertex.getLogIdentifier();
@@ -2825,10 +2908,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
       // Create tasks based on initial configuration, but don't start them yet.
       if (vertex.numTasks == -1) {
+        // this block must always return VertexState.INITIALIZING
         LOG.info("Num tasks is -1. Expecting VertexManager/InputInitializers/1-1 split"
             + " to set #tasks for the vertex " + vertex.getVertexId());
 
         if (vertex.inputsWithInitializers != null) {
+          LOG.info("Vertex will initialize from input initializer. " + vertex.logIdentifier);
           vertex.setupInputInitializerManager();
           return VertexState.INITIALIZING;
         } else {
@@ -2857,8 +2942,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       } else {
         LOG.info("Creating " + vertex.numTasks + " for vertex: " + vertex.logIdentifier);
         vertex.createTasks();
-
+        // this block may return VertexState.INITIALIZING
         if (vertex.inputsWithInitializers != null) {
+          LOG.info("Vertex will initialize from input initializer. " + vertex.logIdentifier);
           vertex.setupInputInitializerManager();
           return VertexState.INITIALIZING;
         }
@@ -2867,6 +2953,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           return VertexState.INITIALIZING;
         }
         LOG.info("Directly initializing vertex: " + vertex.logIdentifier);
+        // vertex is completely configured. Send out notification now.
+        vertex.maybeSendConfiguredEvent();
         boolean isInitialized = vertex.initializeVertex();
         if (isInitialized) {
           return VertexState.INITED;
@@ -3020,7 +3108,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           " numTasks " + splitEvent.getNumTasks());
       vertex.originalOneToOneSplitSource = originalSplitSource;
       try {
-        vertex.setParallelism(splitEvent.getNumTasks(), null, null, null);
+        vertex.setParallelism(splitEvent.getNumTasks(), null, null, null, false);
       } catch (Exception e) {
         // ingore this exception, should not happen
         LOG.error("Unexpected exception, Just set Parallelims to a specified value, not involve EdgeManager,"
@@ -3130,8 +3218,25 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       return vertex.startVertex();
     }
   }
+  
+  private void maybeSendConfiguredEvent() {
+    // the vertex is fully configured by the time it starts. Always notify completely configured
+    // unless the vertex manager has told us that it is going to reconfigure it further
+    Preconditions.checkState(canInitVertex());
+    if (!this.vertexToBeReconfiguredByManager) {
+      // this vertex will not be reconfigured by its manager
+      if (completelyConfiguredSent.compareAndSet(false, true)) {
+        stateChangeNotifier.stateChanged(vertexId, new VertexStateUpdate(vertexName,
+            org.apache.tez.dag.api.event.VertexState.CONFIGURED));
+      }
+    }    
+  }
 
   private VertexState startVertex() {
+    // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+    // IMPORTANT - Until Recovery is fixed to use normal state transitions, if any code is added 
+    // here then please check if it needs to be duplicated in recoveryCodeSimulatingStart().
+    // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
     Preconditions.checkState(getState() == VertexState.INITED,
         "Vertex must be inited " + logIdentifier);
 
@@ -3147,7 +3252,16 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     }
     pendingReportedSrcCompletions.clear();
     logJobHistoryVertexStartedEvent();
-
+    
+    // the vertex is fully configured by the time it starts. Always notify completely configured
+    // unless the vertex manager has told us that it is going to reconfigure it further.
+    // If the vertex was pre-configured then the event would have been sent out earlier. Calling again 
+    // would be a no-op. If the vertex was not fully configured and waiting for that to complete then
+    // we would start immediately after that. Either parallelism updated (now) or IPO changed (future) 
+    // or vertex added (future). Simplify these cases by sending the event now automatically for the 
+    // user as if they had invoked the planned()/done() API's.
+    maybeSendConfiguredEvent();
+    
     // TODO: Metrics
     //job.metrics.runningJob(job);
 
@@ -3309,6 +3423,36 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     }
   }
 
+  private static class VertexManagerUserCodeErrorTransition implements
+    MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
+    @Override
+    public VertexState transition(VertexImpl vertex, VertexEvent event) {
+      VertexEventManagerUserCodeError errEvent = ((VertexEventManagerUserCodeError) event);
+      AMUserCodeException e = errEvent.getError();
+      String msg = "Exception in " + e.getSource() + ", vertex:" + vertex.getLogIdentifier();
+      LOG.error(msg, e);
+      
+      if (vertex.getState() == VertexState.RECOVERING) {
+        LOG.info("Received a user code error during recovering, setting recovered"
+            + " state to FAILED");
+        vertex.addDiagnostic(msg + "," + ExceptionUtils.getStackTrace(e.getCause()));
+        vertex.terminationCause = VertexTerminationCause.AM_USERCODE_FAILURE;
+        vertex.recoveredState = VertexState.FAILED;
+        return VertexState.RECOVERING;
+      } else if (vertex.getState() == VertexState.RUNNING) {
+        vertex.addDiagnostic(msg + "," + ExceptionUtils.getStackTrace(e.getCause()));
+        vertex.tryEnactKill(VertexTerminationCause.AM_USERCODE_FAILURE,
+            TaskTerminationCause.AM_USERCODE_FAILURE);
+        return VertexState.TERMINATING;
+      } else {
+        vertex.finished(VertexState.FAILED,
+            VertexTerminationCause.AM_USERCODE_FAILURE, msg
+              + ", " + ExceptionUtils.getStackTrace(e.getCause()));
+        return VertexState.FAILED;
+      }
+    }
+  }
+  
   /**
    * Here, the Vertex is being told that one of it's source task-attempts
    * completed.

http://git-wip-us.apache.org/repos/asf/tez/blob/bf8d7a5e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
index 1bfb0f9..dd38c2a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.annotation.Nullable;
 
@@ -46,11 +47,16 @@ import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.VertexManagerPlugin;
 import org.apache.tez.dag.api.VertexManagerPluginContext;
 import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
 import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.dag.StateChangeNotifier;
 import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.TaskAttempt;
 import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.event.VertexEventManagerUserCodeError;
 import org.apache.tez.dag.app.dag.impl.AMUserCodeException.Source;
+import org.apache.tez.dag.app.dag.VertexStateUpdateListener;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.runtime.api.Event;
@@ -75,18 +81,27 @@ public class VertexManager {
   UserPayload payload = null;
   AppContext appContext;
   BlockingQueue<TezEvent> rootInputInitEventQueue;
+  StateChangeNotifier stateChangeNotifier;
 
   private static final Log LOG = LogFactory.getLog(VertexManager.class);
 
-  class VertexManagerPluginContextImpl implements VertexManagerPluginContext {
-    // TODO Add functionality to allow VertexManagers to send VertexManagerEvents
+  class VertexManagerPluginContextImpl implements VertexManagerPluginContext, VertexStateUpdateListener {
 
     private EventMetaData rootEventSourceMetadata = new EventMetaData(EventProducerConsumerType.INPUT,
         managedVertex.getName(), "NULL_VERTEX", null);
     private Map<String, EventMetaData> destinationEventMetadataMap = Maps.newHashMap();
+    private final List<String> notificationRegisteredVertices = Lists.newArrayList();
+    AtomicBoolean isComplete = new AtomicBoolean(false);
 
+    private void checkAndThrowIfDone() {
+      if (isComplete()) {
+        throw new TezUncheckedException("Cannot invoke context methods after reporting done");
+      }
+    }
+    
     @Override
-    public Map<String, EdgeProperty> getInputVertexEdgeProperties() {
+    public synchronized Map<String, EdgeProperty> getInputVertexEdgeProperties() {
+      checkAndThrowIfDone();
       // TODO Something similar for Initial Inputs - payload etc visible
       Map<Vertex, Edge> inputs = managedVertex.getInputVertices();
       Map<String, EdgeProperty> vertexEdgeMap =
@@ -98,22 +113,25 @@ public class VertexManager {
     }
 
     @Override
-    public String getVertexName() {
+    public synchronized String getVertexName() {
+      checkAndThrowIfDone();
       return managedVertex.getName();
     }
 
     @Override
-    public int getVertexNumTasks(String vertexName) {
+    public synchronized int getVertexNumTasks(String vertexName) {
+      checkAndThrowIfDone();
       return appContext.getCurrentDAG().getVertex(vertexName).getTotalTasks();
     }
 
     @Override
-    public void setVertexParallelism(int parallelism, VertexLocationHint vertexLocationHint,
+    public synchronized void setVertexParallelism(int parallelism, VertexLocationHint vertexLocationHint,
         Map<String, EdgeManagerPluginDescriptor> sourceEdgeManagers,
         Map<String, InputSpecUpdate> rootInputSpecUpdate) {
+      checkAndThrowIfDone();
       try {
         managedVertex.setParallelism(parallelism, vertexLocationHint, sourceEdgeManagers,
-            rootInputSpecUpdate);
+            rootInputSpecUpdate, true);
       } catch (AMUserCodeException e) {
         // workaround: convert it to TezUncheckedException which would be caught in VM
         throw new TezUncheckedException(e);
@@ -121,13 +139,15 @@ public class VertexManager {
     }
 
     @Override
-    public void scheduleVertexTasks(List<TaskWithLocationHint> tasks) {
+    public synchronized void scheduleVertexTasks(List<TaskWithLocationHint> tasks) {
+      checkAndThrowIfDone();
       managedVertex.scheduleTasks(tasks);
     }
 
     @Nullable
     @Override
-    public Set<String> getVertexInputNames() {
+    public synchronized Set<String> getVertexInputNames() {
+      checkAndThrowIfDone();
       Set<String> inputNames = null;
       Map<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>
           inputs = managedVertex.getAdditionalInputs();
@@ -138,13 +158,15 @@ public class VertexManager {
     }
 
     @Override
-    public UserPayload getUserPayload() {
+    public synchronized UserPayload getUserPayload() {
+      checkAndThrowIfDone();
       return payload;
     }
 
     @Override
-    public void addRootInputEvents(final String inputName,
+    public synchronized void addRootInputEvents(final String inputName,
         Collection<InputDataInformationEvent> events) {
+      checkAndThrowIfDone();
       verifyIsRootInput(inputName);
       Collection<TezEvent> tezEvents = Collections2.transform(events,
           new Function<InputDataInformationEvent, TezEvent>() {
@@ -166,13 +188,15 @@ public class VertexManager {
 
 
     @Override
-    public void setVertexLocationHint(VertexLocationHint locationHint) {
+    public synchronized void setVertexLocationHint(VertexLocationHint locationHint) {
+      checkAndThrowIfDone();
       Preconditions.checkNotNull(locationHint, "locationHint is null");
       managedVertex.setVertexLocationHint(locationHint);
     }
 
     @Override
-    public int getDAGAttemptNumber() {
+    public synchronized int getDAGAttemptNumber() {
+      checkAndThrowIfDone();
       return appContext.getApplicationAttemptId().getAttemptId();
     }
 
@@ -192,22 +216,26 @@ public class VertexManager {
     }
 
     @Override
-    public Resource getVertexTaskResource() {
+    public synchronized Resource getVertexTaskResource() {
+      checkAndThrowIfDone();
       return managedVertex.getTaskResource();
     }
 
     @Override
-    public Resource getTotalAvailableResource() {
+    public synchronized Resource getTotalAvailableResource() {
+      checkAndThrowIfDone();
       return appContext.getTaskScheduler().getTotalResources();
     }
 
     @Override
-    public int getNumClusterNodes() {
+    public synchronized int getNumClusterNodes() {
+      checkAndThrowIfDone();
       return appContext.getTaskScheduler().getNumClusterNodes();
     }
 
     @Override
-    public Container getTaskContainer(String vertexName, Integer taskIndex) {
+    public synchronized Container getTaskContainer(String vertexName, Integer taskIndex) {
+      checkAndThrowIfDone();
       Vertex vertex = appContext.getCurrentDAG().getVertex(vertexName);
       Task task = vertex.getTask(taskIndex.intValue());
       TaskAttempt attempt = task.getSuccessfulAttempt();
@@ -216,16 +244,82 @@ public class VertexManager {
       }
       return null;
     }
+
+    @Override
+    public synchronized void registerForVertexStateUpdates(String vertexName, Set<VertexState> stateSet) {
+      checkAndThrowIfDone();
+      synchronized(notificationRegisteredVertices) {
+        notificationRegisteredVertices.add(vertexName);
+      }
+      stateChangeNotifier.registerForVertexUpdates(vertexName, stateSet, this);
+    }
+
+    private void unregisterForVertexStateUpdates() {
+      synchronized (notificationRegisteredVertices) {
+        for (String vertexName : notificationRegisteredVertices) {
+          stateChangeNotifier.unregisterForVertexUpdates(vertexName, this);
+        }
+
+      }
+    }
+    
+    boolean isComplete() {
+      return (isComplete.get() == true);
+    }
+
+    // TODO add later after TEZ-1714 @Override
+    public synchronized void vertexManagerDone() {
+      checkAndThrowIfDone();
+      LOG.info("Vertex Manager reported done for : " + managedVertex.getLogIdentifier());
+      this.isComplete.set(true);
+      unregisterForVertexStateUpdates();
+    }
+
+    @Override
+    public synchronized void vertexReconfigurationPlanned() {
+      checkAndThrowIfDone();
+      managedVertex.vertexReconfigurationPlanned();
+    }
+
+    @Override
+    public synchronized void doneReconfiguringVertex() {
+      checkAndThrowIfDone();
+      managedVertex.doneReconfiguringVertex();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public synchronized void onStateUpdated(VertexStateUpdate event) {
+      if (isComplete()) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Dropping state update for vertex=" + event.getVertexName() + ", state=" +
+              event.getVertexState() +
+              " since vertexmanager for " + managedVertex.getLogIdentifier() + " is complete.");
+        }
+      } else {
+        try {
+          plugin.onVertexStateUpdated(event);
+        } catch (Exception e) {
+          // state change must be triggered via an event transition
+          appContext.getEventHandler().handle(
+              new VertexEventManagerUserCodeError(managedVertex.getVertexId(),
+                  new AMUserCodeException(Source.VertexManager, e)));
+        }
+      }
+    }
+
   }
 
   public VertexManager(VertexManagerPluginDescriptor pluginDesc,
-      Vertex managedVertex, AppContext appContext) {
+      Vertex managedVertex, AppContext appContext, StateChangeNotifier stateChangeNotifier) {
     checkNotNull(pluginDesc, "pluginDesc is null");
     checkNotNull(managedVertex, "managedVertex is null");
     checkNotNull(appContext, "appContext is null");
+    checkNotNull(stateChangeNotifier, "notifier is null");
     this.pluginDesc = pluginDesc;
     this.managedVertex = managedVertex;
     this.appContext = appContext;
+    this.stateChangeNotifier = stateChangeNotifier;
     // don't specify the size of rootInputInitEventQueue, otherwise it will fail when addAll
     this.rootInputInitEventQueue = new LinkedBlockingQueue<TezEvent>();
   }
@@ -242,7 +336,9 @@ public class VertexManager {
       payload = pluginDesc.getUserPayload();
     }
     try {
-      plugin.initialize(); 
+      if (!pluginContext.isComplete()) {
+        plugin.initialize();
+      }
     } catch (Exception e) {
       throw new AMUserCodeException(Source.VertexManager, e);
     }
@@ -265,7 +361,9 @@ public class VertexManager {
       }
     }
     try {
-      plugin.onVertexStarted(pluginCompletionsMap);
+      if (!pluginContext.isComplete()) {
+        plugin.onVertexStarted(pluginCompletionsMap);
+      }
     } catch (Exception e) {
       throw new AMUserCodeException(Source.VertexManager, e);
     }
@@ -276,7 +374,9 @@ public class VertexManager {
     String vertexName =
         appContext.getCurrentDAG().getVertex(tezTaskId.getVertexID()).getName();
     try {
-      plugin.onSourceTaskCompleted(vertexName, taskId);
+      if (!pluginContext.isComplete()) {
+        plugin.onSourceTaskCompleted(vertexName, taskId);
+      }
     } catch (Exception e) {
       throw new AMUserCodeException(Source.VertexManager, e);
     }
@@ -284,7 +384,9 @@ public class VertexManager {
 
   public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) throws AMUserCodeException {
     try {
-      plugin.onVertexManagerEventReceived(vmEvent);
+      if (!pluginContext.isComplete()) {
+        plugin.onVertexManagerEventReceived(vmEvent);
+      }
     } catch (Exception e) {
       throw new AMUserCodeException(Source.VertexManager, e);
     }
@@ -293,7 +395,9 @@ public class VertexManager {
   public List<TezEvent> onRootVertexInitialized(String inputName,
       InputDescriptor inputDescriptor, List<Event> events) throws AMUserCodeException {
     try {
-      plugin.onRootVertexInitialized(inputName, inputDescriptor, events);
+      if (!pluginContext.isComplete()) {
+        plugin.onRootVertexInitialized(inputName, inputDescriptor, events);
+      }
     } catch (Exception e) {
       throw new AMUserCodeException(Source.VertexManager, e);
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/bf8d7a5e/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index f1961aa..d859ae0 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -898,13 +898,10 @@ public class TestDAGImpl {
     dispatcher.getEventHandler().handle(new DAGEventStartDag(dagWithCustomEdge.getID(),
         null));
     dispatcher.await();
-    Assert.assertEquals(DAGState.RUNNING, dagWithCustomEdge.getState());
 
     VertexImpl v2 = (VertexImpl)dagWithCustomEdge.getVertex("vertex2");
     LOG.info(v2.getTasks().size());
     Task t1= v2.getTask(0);
-    dispatcher.getEventHandler().handle(new TaskEvent(t1.getTaskId(), TaskEventType.T_SCHEDULE));
-    dispatcher.await();
     TaskAttemptImpl ta1= (TaskAttemptImpl)t1.getAttempt(TezTaskAttemptID.getInstance(t1.getTaskId(), 0));
 
     Assert.assertEquals(TaskAttemptStateInternal.FAILED, ta1.getInternalState());
@@ -947,7 +944,6 @@ public class TestDAGImpl {
 
     VertexImpl v1 = (VertexImpl)dagWithCustomEdge.getVertex("vertex1");
     VertexImpl v2 = (VertexImpl)dagWithCustomEdge.getVertex("vertex2");
-    v2.scheduleTasks(Collections.singletonList(new TaskWithLocationHint(new Integer(0), null)));
     dispatcher.await();
     Task t1= v2.getTask(0);
     TaskAttemptImpl ta1= (TaskAttemptImpl)t1.getAttempt(TezTaskAttemptID.getInstance(t1.getTaskId(), 0));
@@ -977,7 +973,6 @@ public class TestDAGImpl {
 
     VertexImpl v1 = (VertexImpl)dagWithCustomEdge.getVertex("vertex1");
     VertexImpl v2 = (VertexImpl)dagWithCustomEdge.getVertex("vertex2");
-    v2.scheduleTasks(Collections.singletonList(new TaskWithLocationHint(new Integer(0), null)));
     dispatcher.await();
 
     Task t1= v2.getTask(0);
@@ -1007,7 +1002,6 @@ public class TestDAGImpl {
 
     VertexImpl v1 = (VertexImpl)dagWithCustomEdge.getVertex("vertex1");
     VertexImpl v2 = (VertexImpl)dagWithCustomEdge.getVertex("vertex2");
-    v2.scheduleTasks(Collections.singletonList(new TaskWithLocationHint(new Integer(0), null)));
     dispatcher.await();
 
     Task t1= v2.getTask(0);
@@ -1038,7 +1032,6 @@ public class TestDAGImpl {
 
     VertexImpl v1 = (VertexImpl)dagWithCustomEdge.getVertex("vertex1");
     VertexImpl v2 = (VertexImpl)dagWithCustomEdge.getVertex("vertex2");
-    v2.scheduleTasks(Collections.singletonList(new TaskWithLocationHint(new Integer(0), null)));
     dispatcher.await();
 
     Task t1= v2.getTask(0);

http://git-wip-us.apache.org/repos/asf/tez/blob/bf8d7a5e/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestImmediateStartVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestImmediateStartVertexManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestImmediateStartVertexManager.java
new file mode 100644
index 0000000..6d071a7
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestImmediateStartVertexManager.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.impl;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import static org.mockito.Matchers.anyList;
+import static org.mockito.Mockito.anySet;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TestImmediateStartVertexManager {
+  
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  @Test (timeout=5000)
+  public void testBasic() {
+    HashMap<String, EdgeProperty> mockInputVertices = 
+        new HashMap<String, EdgeProperty>();
+    final String mockSrcVertexId1 = "Vertex1";
+    EdgeProperty eProp1 = EdgeProperty.create(
+        EdgeProperty.DataMovementType.SCATTER_GATHER,
+        EdgeProperty.DataSourceType.PERSISTED,
+        SchedulingType.SEQUENTIAL,
+        OutputDescriptor.create("out"),
+        InputDescriptor.create("in"));
+    final String mockSrcVertexId2 = "Vertex2";
+    EdgeProperty eProp2 = EdgeProperty.create(mock(EdgeManagerPluginDescriptor.class),
+        EdgeProperty.DataSourceType.PERSISTED,
+        SchedulingType.SEQUENTIAL,
+        OutputDescriptor.create("out"),
+        InputDescriptor.create("in"));
+    final String mockSrcVertexId3 = "Vertex3";
+    EdgeProperty eProp3 = EdgeProperty.create(
+        EdgeProperty.DataMovementType.BROADCAST,
+        EdgeProperty.DataSourceType.PERSISTED,
+        SchedulingType.SEQUENTIAL,
+        OutputDescriptor.create("out"),
+        InputDescriptor.create("in"));
+    
+    final String mockManagedVertexId = "Vertex4";
+    
+    mockInputVertices.put(mockSrcVertexId1, eProp1);
+    mockInputVertices.put(mockSrcVertexId2, eProp2);
+    mockInputVertices.put(mockSrcVertexId3, eProp3);
+
+    final VertexManagerPluginContext mockContext = mock(VertexManagerPluginContext.class);
+    when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
+    when(mockContext.getVertexName()).thenReturn(mockManagedVertexId);
+    when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4);
+    when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(2);
+    when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(2);
+    when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(2);
+    
+    final HashSet<Integer> scheduledTasks = new HashSet<Integer>();
+    doAnswer(new Answer() {
+      public Object answer(InvocationOnMock invocation) {
+          Object[] args = invocation.getArguments();
+          scheduledTasks.clear();
+          List<TaskWithLocationHint> tasks = (List<TaskWithLocationHint>)args[0];
+          for (TaskWithLocationHint task : tasks) {
+            scheduledTasks.add(task.getTaskIndex());
+          }
+          return null;
+      }}).when(mockContext).scheduleVertexTasks(anyList());
+    
+    ImmediateStartVertexManager manager = new ImmediateStartVertexManager(mockContext);
+    manager.initialize();
+    manager.onVertexStarted(null);
+    verify(mockContext, times(0)).scheduleVertexTasks(anyList());
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1,
+        VertexState.CONFIGURED));
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2,
+        VertexState.CONFIGURED));
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3,
+        VertexState.CONFIGURED));
+    verify(mockContext, times(1)).scheduleVertexTasks(anyList());
+    Assert.assertEquals(4, scheduledTasks.size());
+
+    // simulate race between onVertexStarted and notifications
+    scheduledTasks.clear();
+    final ImmediateStartVertexManager raceManager = new ImmediateStartVertexManager(mockContext);
+    doAnswer(new Answer() {
+      public Object answer(InvocationOnMock invocation) throws Exception {
+        raceManager.onVertexStateUpdated(new VertexStateUpdate((String)invocation.getArguments()[0],
+            VertexState.CONFIGURED));
+        scheduledTasks.clear();
+        return null;
+    }}).when(mockContext).registerForVertexStateUpdates(anyString(), anySet());
+    raceManager.initialize();
+    raceManager.onVertexStarted(null);
+    verify(mockContext, times(2)).scheduleVertexTasks(anyList());
+    Assert.assertEquals(4, scheduledTasks.size());
+  }
+  
+}