You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2015/02/10 22:37:53 UTC

tez git commit: TEZ-1914. VertexManager logic should not run on the central dispatcher (bikas)

Repository: tez
Updated Branches:
  refs/heads/master 48055300d -> f03546896


TEZ-1914. VertexManager logic should not run on the central dispatcher (bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f0354689
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f0354689
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f0354689

Branch: refs/heads/master
Commit: f03546896ed13bb605e8f39e738d4221de04bd22
Parents: 4805530
Author: Bikas Saha <bi...@apache.org>
Authored: Tue Feb 10 13:37:43 2015 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Tue Feb 10 13:37:43 2015 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/tez/common/AsyncDispatcher.java  |   4 +-
 .../java/org/apache/tez/dag/app/AppContext.java |   4 +
 .../org/apache/tez/dag/app/DAGAppMaster.java    |  21 ++
 .../tez/dag/app/dag/event/CallableEvent.java    |  42 +++
 .../dag/app/dag/event/CallableEventType.java    |  25 ++
 .../event/VertexEventInputDataInformation.java  |  40 +++
 .../tez/dag/app/dag/event/VertexEventType.java  |   2 +
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   |   4 +-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  85 +++--
 .../tez/dag/app/dag/impl/VertexManager.java     | 311 +++++++++++++++----
 .../app/dag/impl/CallableEventDispatcher.java   |  37 +++
 .../tez/dag/app/dag/impl/TestDAGImpl.java       |  35 ++-
 .../tez/dag/app/dag/impl/TestVertexImpl.java    |  99 ++++--
 .../tez/dag/app/dag/impl/TestVertexManager.java |  82 +++--
 .../vertexmanager/InputReadyVertexManager.java  |   2 +-
 .../TestInputReadyVertexManager.java            |   6 -
 17 files changed, 646 insertions(+), 154 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/f0354689/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4da24a7..d617bee 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.7.0: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-1914. VertexManager logic should not run on the central dispatcher
   TEZ-2023. Refactor logIndividualFetchComplete() to be common for both shuffle-schedulers.
   TEZ-1999. IndexOutOfBoundsException during merge.
   TEZ-2000. Source vertex exists error during DAG submission.

http://git-wip-us.apache.org/repos/asf/tez/blob/f0354689/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
index c23d669..253db23 100644
--- a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
+++ b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
@@ -29,7 +29,6 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.hadoop.yarn.event.Dispatcher;
@@ -87,7 +86,7 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher {
     this.eventDispatchers = Maps.newHashMap();
   }
 
-  Runnable createThread() {
+  public Runnable createThread() {
     return new Runnable() {
       @Override
       public void run() {
@@ -122,6 +121,7 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher {
 
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
+    // TODO TEZ-2049 remove YARN reference
     this.exitOnDispatchException =
         conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
           Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR);

http://git-wip-us.apache.org/repos/asf/tez/blob/f0354689/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
index f8086d0..5564809 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
@@ -37,6 +37,8 @@ import org.apache.tez.common.security.ACLManager;
 import org.apache.tez.dag.history.HistoryEventHandler;
 import org.apache.tez.dag.records.TezDAGID;
 
+import com.google.common.util.concurrent.ListeningExecutorService;
+
 
 /**
  * Context interface for sharing information across components in Tez DAG
@@ -63,6 +65,8 @@ public interface AppContext {
   String getUser();
 
   DAG getCurrentDAG();
+  
+  ListeningExecutorService getExecService();
 
   void setDAG(DAG dag);
 

http://git-wip-us.apache.org/repos/asf/tez/blob/f0354689/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index c7e1e83..5aca3cf 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -47,6 +47,8 @@ import java.util.Random;
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
@@ -163,6 +165,9 @@ import org.codehaus.jettison.json.JSONException;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
  * The Tez DAG Application Master.
@@ -254,6 +259,10 @@ public class DAGAppMaster extends AbstractService {
   private Path currentRecoveryDataDir;
   private Path tezSystemStagingDir;
   private FileSystem recoveryFS;
+  
+  private ExecutorService rawExecutor;
+  private ListeningExecutorService execService;
+  
   /**
    * set of already executed dag names.
    */
@@ -483,6 +492,10 @@ public class DAGAppMaster extends AbstractService {
       }
     }
 
+    rawExecutor = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true)
+        .setNameFormat("App Shared Pool - " + "#%d").build());
+    execService = MoreExecutors.listeningDecorator(rawExecutor);
+
     initServices(conf);
     super.serviceInit(conf);
 
@@ -1261,6 +1274,11 @@ public class DAGAppMaster extends AbstractService {
         rLock.unlock();
       }
     }
+    
+    @Override
+    public ListeningExecutorService getExecService() {
+      return execService;
+    }
 
     @Override
     public Set<String> getAllDAGIDs() {
@@ -1677,6 +1695,7 @@ public class DAGAppMaster extends AbstractService {
     if (this.dagSubmissionTimer != null) {
       this.dagSubmissionTimer.cancel();
     }
+        
     stopServices();
 
     // Given pre-emption, we should delete tez scratch dir only if unregister is
@@ -1708,6 +1727,8 @@ public class DAGAppMaster extends AbstractService {
       }
     }
 
+    execService.shutdownNow();
+
     super.serviceStop();
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/f0354689/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/CallableEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/CallableEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/CallableEvent.java
new file mode 100644
index 0000000..e148fe8
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/CallableEvent.java
@@ -0,0 +1,42 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+import java.util.concurrent.Callable;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+import com.google.common.util.concurrent.FutureCallback;
+
+public abstract class CallableEvent extends AbstractEvent<CallableEventType> implements
+    Callable<Void> {
+  private final FutureCallback<Void> callback;
+
+  public CallableEvent(FutureCallback<Void> callback) {
+    super(CallableEventType.CALLABLE);
+    this.callback = callback;
+  }
+
+  public FutureCallback<Void> getCallback() {
+    return callback;
+  }
+
+  @Override
+  public abstract Void call() throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/f0354689/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/CallableEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/CallableEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/CallableEventType.java
new file mode 100644
index 0000000..e9e93b9
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/CallableEventType.java
@@ -0,0 +1,25 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+public enum CallableEventType {
+  
+  CALLABLE,
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/f0354689/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventInputDataInformation.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventInputDataInformation.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventInputDataInformation.java
new file mode 100644
index 0000000..6b5cad5
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventInputDataInformation.java
@@ -0,0 +1,40 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+
+package org.apache.tez.dag.app.dag.event;
+
+import java.util.List;
+
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.impl.TezEvent;
+
+public class VertexEventInputDataInformation extends VertexEvent {
+
+  private final List<TezEvent> events;
+  
+  public VertexEventInputDataInformation(TezVertexID vertexId, List<TezEvent> events) {
+    super(vertexId, VertexEventType.V_INPUT_DATA_INFORMATION);
+    this.events = events;
+  }
+  
+  public List<TezEvent> getEvents() {
+    return events;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/f0354689/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
index 5eb4929..aa202a4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
@@ -49,6 +49,8 @@ public enum VertexEventType {
   //Producer: VertexInputInitializer
   V_ROOT_INPUT_INITIALIZED,
   V_ROOT_INPUT_FAILED,
+  
+  V_INPUT_DATA_INFORMATION,
 
   // Recover Event, Producer:DAG
   V_RECOVER,

http://git-wip-us.apache.org/repos/asf/tez/blob/f0354689/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 149033c..aba20cf 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -252,15 +252,15 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     .addTransition(TaskStateInternal.FAILED, TaskStateInternal.FAILED,
         EnumSet.of(
             TaskEventType.T_TERMINATE,
+            TaskEventType.T_SCHEDULE,
             TaskEventType.T_ADD_SPEC_ATTEMPT))
 
     // Transitions from KILLED state
     .addTransition(TaskStateInternal.KILLED, TaskStateInternal.KILLED,
         EnumSet.of(
             TaskEventType.T_TERMINATE,
+            TaskEventType.T_SCHEDULE,
             TaskEventType.T_ADD_SPEC_ATTEMPT))
-    .addTransition(TaskStateInternal.FAILED, TaskStateInternal.FAILED,
-        TaskEventType.T_SCHEDULE)
 
     // create the topology tables
     .installTopology();

http://git-wip-us.apache.org/repos/asf/tez/blob/f0354689/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 865b182..05c3cc1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -40,6 +40,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import javax.annotation.Nullable;
 
 import com.google.common.base.Strings;
+
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.logging.Log;
@@ -114,6 +115,7 @@ import org.apache.tez.dag.app.dag.event.TaskEventRecoverTask;
 import org.apache.tez.dag.app.dag.event.TaskEventTermination;
 import org.apache.tez.dag.app.dag.event.TaskEventType;
 import org.apache.tez.dag.app.dag.event.VertexEvent;
+import org.apache.tez.dag.app.dag.event.VertexEventInputDataInformation;
 import org.apache.tez.dag.app.dag.event.VertexEventManagerUserCodeError;
 import org.apache.tez.dag.app.dag.event.VertexEventNullEdgeInitialized;
 import org.apache.tez.dag.app.dag.event.VertexEventRecoverVertex;
@@ -342,6 +344,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               VertexEventType.V_ROOT_INPUT_INITIALIZED,
               new RootInputInitializedTransition())
           .addTransition(VertexState.INITIALIZING,
+              EnumSet.of(VertexState.INITIALIZING, VertexState.INITED,
+                  VertexState.FAILED),
+              VertexEventType.V_INPUT_DATA_INFORMATION,
+              new InputDataInformationTransition())
+          .addTransition(VertexState.INITIALIZING,
               EnumSet.of(VertexState.INITED, VertexState.FAILED),
               VertexEventType.V_READY_TO_INIT,
               new VertexInitializedTransition())
@@ -861,7 +868,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
   @Override
   public int getTotalTasks() {
-    return numTasks;
+    readLock.lock();
+    try {
+      return numTasks;
+    } finally {
+      readLock.unlock();
+    }
   }
 
   @Override
@@ -2175,7 +2187,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               .getVertexManagerPlugin());
       LOG.info("Setting user vertex manager plugin: "
           + pluginDesc.getClassName() + " on vertex: " + getLogIdentifier());
-      vertexManager = new VertexManager(pluginDesc, this, appContext, stateChangeNotifier);
+      vertexManager = new VertexManager(pluginDesc, dagUgi, this, appContext, stateChangeNotifier);
     } else {
       // Intended order of picking a vertex manager
       // If there is an InputInitializer then we use the RootInputVertexManager. May be fixed by TEZ-703
@@ -2188,26 +2200,26 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
             + logIdentifier);
         vertexManager = new VertexManager(
             VertexManagerPluginDescriptor.create(RootInputVertexManager.class.getName()),
-            this, appContext, stateChangeNotifier);
+            dagUgi, this, appContext, stateChangeNotifier);
       } else if (hasOneToOne && !hasCustom) {
         LOG.info("Setting vertexManager to InputReadyVertexManager for "
             + logIdentifier);
         vertexManager = new VertexManager(
             VertexManagerPluginDescriptor.create(InputReadyVertexManager.class.getName()),
-            this, appContext, stateChangeNotifier);
+            dagUgi, this, appContext, stateChangeNotifier);
       } else if (hasBipartite && !hasCustom) {
         LOG.info("Setting vertexManager to ShuffleVertexManager for "
             + logIdentifier);
         // shuffle vertex manager needs a conf payload
         vertexManager = new VertexManager(ShuffleVertexManager.createConfigBuilder(conf).build(),
-            this, appContext, stateChangeNotifier);
+            dagUgi, this, appContext, stateChangeNotifier);
       } else {
         // schedule all tasks upon vertex start. Default behavior.
         LOG.info("Setting vertexManager to ImmediateStartVertexManager for "
             + logIdentifier);
         vertexManager = new VertexManager(
             VertexManagerPluginDescriptor.create(ImmediateStartVertexManager.class.getName()),
-            this, appContext, stateChangeNotifier);
+            dagUgi, this, appContext, stateChangeNotifier);
       }
     }
   }
@@ -3063,14 +3075,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       VertexState state = vertex.getState();
       if (state == VertexState.INITIALIZING) {
         try {
-          List<TezEvent> inputInfoEvents =
-              vertex.vertexManager.onRootVertexInitialized(
-              liInitEvent.getInputName(),
-              vertex.getAdditionalInputs().get(liInitEvent.getInputName())
-                  .getIODescriptor(), liInitEvent.getEvents());
-          if (inputInfoEvents != null && !inputInfoEvents.isEmpty()) {
-            VertexImpl.handleRoutedTezEvents(vertex, inputInfoEvents, false);
-          }
+          vertex.vertexManager.onRootVertexInitialized(liInitEvent.getInputName(), vertex
+              .getAdditionalInputs().get(liInitEvent.getInputName()).getIODescriptor(),
+              liInitEvent.getEvents());
         } catch (AMUserCodeException e) {
             String msg = "Exception in " + e.getSource() + ", vertex:" + vertex.getLogIdentifier();
             LOG.error(msg, e);
@@ -3087,10 +3094,35 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         vertex.rootInputInitializerManager.shutdown();
         vertex.rootInputInitializerManager = null;
       }
+      
+      // the return of these events from the VM will complete initialization and move into 
+      // INITED state if possible via InputDataInformationTransition
+
+      return vertex.getState();
+    }
+  }
+
+  public static class InputDataInformationTransition implements
+      MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
+
+    @Override
+    public VertexState transition(VertexImpl vertex, VertexEvent event) {
+      VertexEventInputDataInformation iEvent = (VertexEventInputDataInformation) event;
+      List<TezEvent> inputInfoEvents = iEvent.getEvents();
+      try {
+        if (inputInfoEvents != null && !inputInfoEvents.isEmpty()) {
+          VertexImpl.handleRoutedTezEvents(vertex, inputInfoEvents, false);
+        }
+      } catch (AMUserCodeException e) {
+        String msg = "Exception in " + e.getSource() + ", vertex:" + vertex.getLogIdentifier();
+        LOG.error(msg, e);
+        vertex.finished(VertexState.FAILED, VertexTerminationCause.AM_USERCODE_FAILURE, msg + ","
+            + ExceptionUtils.getStackTrace(e.getCause()));
+        return VertexState.FAILED;
+      }
 
       // done. check if we need to do the initialization
-      if (vertex.getState() == VertexState.INITIALIZING &&
-          vertex.initWaitsForRootInitializers) {
+      if (vertex.getState() == VertexState.INITIALIZING && vertex.initWaitsForRootInitializers) {
         if (vertex.numInitializedInputs == vertex.inputsWithInitializers.size()) {
           // set the wait flag to false if all initializers are done
           vertex.initWaitsForRootInitializers = false;
@@ -4021,7 +4053,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   @Override
   public Map<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>
     getAdditionalInputs() {
-    return this.rootInputDescriptors;
+    readLock.lock();
+    try {
+      return this.rootInputDescriptors;
+    } finally {
+      readLock.unlock();
+    }
   }
 
   @Nullable
@@ -4059,7 +4096,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
   @Override
   public Map<Vertex, Edge> getInputVertices() {
-    return Collections.unmodifiableMap(this.sourceVertices);
+    readLock.lock();
+    try {
+      return Collections.unmodifiableMap(this.sourceVertices);
+    } finally {
+      readLock.unlock();
+    }
   }
 
   @Override
@@ -4092,7 +4134,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   }
 
   public Resource getTaskResource() {
-    return taskResource;
+    readLock.lock();
+    try {
+      return taskResource;
+    } finally {
+      readLock.unlock();
+    }
   }
 
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/tez/blob/f0354689/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
index da86151..af92348 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
@@ -20,6 +20,8 @@ package org.apache.tez.dag.app.dag.impl;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import java.lang.reflect.UndeclaredThrowableException;
+import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -33,6 +35,7 @@ import javax.annotation.Nullable;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.tez.common.ReflectionUtils;
@@ -54,6 +57,8 @@ import org.apache.tez.dag.app.dag.StateChangeNotifier;
 import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.TaskAttempt;
 import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.event.CallableEvent;
+import org.apache.tez.dag.app.dag.event.VertexEventInputDataInformation;
 import org.apache.tez.dag.app.dag.event.VertexEventManagerUserCodeError;
 import org.apache.tez.dag.app.dag.impl.AMUserCodeException.Source;
 import org.apache.tez.dag.app.dag.VertexStateUpdateListener;
@@ -62,7 +67,6 @@ import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.InputSpecUpdate;
 import org.apache.tez.runtime.api.events.InputDataInformationEvent;
-import org.apache.tez.runtime.api.events.VertexManagerEvent;
 import org.apache.tez.runtime.api.impl.EventMetaData;
 import org.apache.tez.runtime.api.impl.TezEvent;
 import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
@@ -72,18 +76,30 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Collections2;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
 
+@SuppressWarnings("unchecked")
 public class VertexManager {
-  VertexManagerPluginDescriptor pluginDesc;
-  VertexManagerPlugin plugin;
-  Vertex managedVertex;
-  VertexManagerPluginContextImpl pluginContext;
-  UserPayload payload = null;
-  AppContext appContext;
-  BlockingQueue<TezEvent> rootInputInitEventQueue;
-  StateChangeNotifier stateChangeNotifier;
+  final VertexManagerPluginDescriptor pluginDesc;
+  final UserGroupInformation dagUgi;
+  final VertexManagerPlugin plugin;
+  final Vertex managedVertex;
+  final VertexManagerPluginContextImpl pluginContext;
+  final UserPayload payload;
+  final AppContext appContext;
+  final BlockingQueue<TezEvent> rootInputInitEventQueue;
+  final StateChangeNotifier stateChangeNotifier;
+  
+  private final ListeningExecutorService execService;
+  private final LinkedBlockingQueue<VertexManagerEvent> eventQueue;
+  private final AtomicBoolean eventInFlight;
+  private final AtomicBoolean pluginFailed;
 
   private static final Log LOG = LogFactory.getLog(VertexManager.class);
+  private final VertexManagerCallback VM_CALLBACK = new VertexManagerCallback();
 
   class VertexManagerPluginContextImpl implements VertexManagerPluginContext, VertexStateUpdateListener {
 
@@ -97,6 +113,9 @@ public class VertexManager {
       if (isComplete()) {
         throw new TezUncheckedException("Cannot invoke context methods after reporting done");
       }
+      if (pluginFailed.get()) {
+        throw new TezUncheckedException("Cannot invoke context methods after throwing an exception");
+      }
     }
     
     @Override
@@ -233,6 +252,7 @@ public class VertexManager {
       return appContext.getTaskScheduler().getNumClusterNodes();
     }
 
+    // TODO TEZ-2048. Remove this API
     @Override
     public synchronized Container getTaskContainer(String vertexName, Integer taskIndex) {
       checkAndThrowIfDone();
@@ -287,41 +307,36 @@ public class VertexManager {
       managedVertex.doneReconfiguringVertex();
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public synchronized void onStateUpdated(VertexStateUpdate event) {
-      if (isComplete()) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Dropping state update for vertex=" + event.getVertexName() + ", state=" +
-              event.getVertexState() +
-              " since vertexmanager for " + managedVertex.getLogIdentifier() + " is complete.");
-        }
-      } else {
-        try {
-          plugin.onVertexStateUpdated(event);
-        } catch (Exception e) {
-          // state change must be triggered via an event transition
-          appContext.getEventHandler().handle(
-              new VertexEventManagerUserCodeError(managedVertex.getVertexId(),
-                  new AMUserCodeException(Source.VertexManager, e)));
-        }
-      }
+      enqueueAndScheduleNextEvent(new VertexManagerEventOnVertexStateUpdate(event));
     }
 
   }
 
-  public VertexManager(VertexManagerPluginDescriptor pluginDesc,
+  public VertexManager(VertexManagerPluginDescriptor pluginDesc, UserGroupInformation dagUgi,
       Vertex managedVertex, AppContext appContext, StateChangeNotifier stateChangeNotifier) {
     checkNotNull(pluginDesc, "pluginDesc is null");
     checkNotNull(managedVertex, "managedVertex is null");
     checkNotNull(appContext, "appContext is null");
     checkNotNull(stateChangeNotifier, "notifier is null");
     this.pluginDesc = pluginDesc;
+    this.dagUgi = dagUgi;
     this.managedVertex = managedVertex;
     this.appContext = appContext;
     this.stateChangeNotifier = stateChangeNotifier;
     // don't specify the size of rootInputInitEventQueue, otherwise it will fail when addAll
     this.rootInputInitEventQueue = new LinkedBlockingQueue<TezEvent>();
+    
+    pluginContext = new VertexManagerPluginContextImpl();
+    Preconditions.checkArgument(pluginDesc != null);
+    plugin = ReflectionUtils.createClazzInstance(pluginDesc.getClassName(),
+        new Class[] { VertexManagerPluginContext.class }, new Object[] { pluginContext });
+    payload = pluginDesc.getUserPayload();
+    execService = appContext.getExecService();
+    eventQueue = new LinkedBlockingQueue<VertexManagerEvent>();
+    eventInFlight = new AtomicBoolean(false);
+    pluginFailed = new AtomicBoolean(false);
   }
 
   public VertexManagerPlugin getPlugin() {
@@ -329,20 +344,57 @@ public class VertexManager {
   }
 
   public void initialize() throws AMUserCodeException {
-    pluginContext = new VertexManagerPluginContextImpl();
-    if (pluginDesc != null) {
-      plugin = ReflectionUtils.createClazzInstance(pluginDesc.getClassName(),
-          new Class[]{VertexManagerPluginContext.class}, new Object[]{pluginContext});
-      payload = pluginDesc.getUserPayload();
-    }
     try {
       if (!pluginContext.isComplete()) {
-        plugin.initialize();
+        // TODO TEZ-2066 tracks moving this async.
+        synchronized (VertexManager.this) {
+          plugin.initialize();
+        }
       }
     } catch (Exception e) {
       throw new AMUserCodeException(Source.VertexManager, e);
     }
   }
+  
+  private boolean pluginInvocationAllowed(String msg) {
+    if (pluginFailed.get()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(msg + " . Manager failed. Vertex=" + managedVertex.getLogIdentifier());
+      }
+      return false;
+    }
+    if (pluginContext.isComplete()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(msg+ " . Manager complete. Not scheduling event. Vertex=" + managedVertex.getLogIdentifier());
+      }
+      return false;
+    }
+    return true;
+  }
+  
+  private void enqueueAndScheduleNextEvent(VertexManagerEvent e) {
+    if (!pluginInvocationAllowed("Dropping event")) {
+      return;
+    }
+    eventQueue.add(e);
+    tryScheduleNextEvent();
+  }
+  
+  private void tryScheduleNextEvent() {
+    if (!pluginInvocationAllowed("Not scheduling")) {
+      return;
+    }
+    if (eventQueue.isEmpty()) {
+      return;
+    }
+    if (eventInFlight.compareAndSet(false, true)) {
+      // no event was in flight
+      VertexManagerEvent e = eventQueue.poll();
+      Preconditions.checkState(e != null);
+      ListenableFuture<Void> future = execService.submit(e);
+      Futures.addCallback(future, e.getCallback());
+    }
+  }
 
   public void onVertexStarted(List<TezTaskAttemptID> completions) throws AMUserCodeException {
     Map<String, List<Integer>> pluginCompletionsMap = Maps.newHashMap();
@@ -360,53 +412,180 @@ public class VertexManager {
         taskIdList.add(taskId);
       }
     }
-    try {
-      if (!pluginContext.isComplete()) {
-        plugin.onVertexStarted(pluginCompletionsMap);
-      }
-    } catch (Exception e) {
-      throw new AMUserCodeException(Source.VertexManager, e);
-    }
+    enqueueAndScheduleNextEvent(new VertexManagerEventOnVertexStarted(pluginCompletionsMap));
   }
 
   public void onSourceTaskCompleted(TezTaskID tezTaskId) throws AMUserCodeException {
     Integer taskId = Integer.valueOf(tezTaskId.getId());
     String vertexName =
         appContext.getCurrentDAG().getVertex(tezTaskId.getVertexID()).getName();
-    try {
-      if (!pluginContext.isComplete()) {
-        plugin.onSourceTaskCompleted(vertexName, taskId);
-      }
-    } catch (Exception e) {
-      throw new AMUserCodeException(Source.VertexManager, e);
+    enqueueAndScheduleNextEvent(new VertexManagerEventSourceTaskCompleted(taskId, vertexName));
+  }
+
+  public void onVertexManagerEventReceived(
+      org.apache.tez.runtime.api.events.VertexManagerEvent vmEvent) throws AMUserCodeException {
+    enqueueAndScheduleNextEvent(new VertexManagerEventReceived(vmEvent));
+  }
+
+  public void onRootVertexInitialized(String inputName,
+      InputDescriptor inputDescriptor, List<Event> events) throws AMUserCodeException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("vertex:" + managedVertex.getLogIdentifier() + "; enqueueing onRootVertexInitialized"
+          + " on input:" + inputName + ", current task events size is " + rootInputInitEventQueue.size());
     }
+    enqueueAndScheduleNextEvent(new VertexManagerEventRootInputInitialized(inputName,
+        inputDescriptor, events));
   }
 
-  public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) throws AMUserCodeException {
-    try {
-      if (!pluginContext.isComplete()) {
-        plugin.onVertexManagerEventReceived(vmEvent);
+  private class VertexManagerCallback implements FutureCallback<Void> {
+
+    @Override
+    public void onFailure(Throwable t) {
+      // stop further event processing
+      pluginFailed.set(true);
+      eventQueue.clear();
+      // catch real root cause of failure, it would throw UndeclaredThrowableException
+      // if using UGI.doAs
+      if (t instanceof UndeclaredThrowableException) {
+        t = t.getCause();
       }
-    } catch (Exception e) {
-      throw new AMUserCodeException(Source.VertexManager, e);
+      Preconditions.checkState(appContext != null);
+      Preconditions.checkState(managedVertex != null);
+      // state change must be triggered via an event transition
+      appContext.getEventHandler().handle(
+          new VertexEventManagerUserCodeError(managedVertex.getVertexId(),
+              new AMUserCodeException(Source.VertexManager, t)));
+      // enqueue no further events due to user code error
+    }
+    
+    @Override
+    public void onSuccess(Void result) {
+      Preconditions.checkState(eventInFlight.get());
+      eventInFlight.set(false);
+      tryScheduleNextEvent();
     }
   }
+  
+  private class VertexManagerRootInputInitializedCallback extends VertexManagerCallback {
 
-  public List<TezEvent> onRootVertexInitialized(String inputName,
-      InputDescriptor inputDescriptor, List<Event> events) throws AMUserCodeException {
-    try {
-      if (!pluginContext.isComplete()) {
-        plugin.onRootVertexInitialized(inputName, inputDescriptor, events);
+    @Override
+    public void onSuccess(Void result) {
+      super.onSuccess(result);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("vertex:" + managedVertex.getLogIdentifier()
+            + "; after call of VertexManagerPlugin.onRootVertexInitialized" + " on input:"
+            + ", current task events size is " + rootInputInitEventQueue.size());
       }
-    } catch (Exception e) {
-      throw new AMUserCodeException(Source.VertexManager, e);
+      List<TezEvent> resultEvents = new ArrayList<TezEvent>();
+      rootInputInitEventQueue.drainTo(resultEvents);
+      appContext.getEventHandler().handle(
+          new VertexEventInputDataInformation(managedVertex.getVertexId(), resultEvents));
     }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("vertex:" + managedVertex.getLogIdentifier() + "; after call of VertexManagerPlugin.onRootVertexInitialized"
-          + " on input:" + inputName + ", current task events size is " + rootInputInitEventQueue.size());
+  }
+  
+  class VertexManagerEventOnVertexStateUpdate extends VertexManagerEvent {
+    private final VertexStateUpdate event;
+    
+    public VertexManagerEventOnVertexStateUpdate(VertexStateUpdate event) {
+      this.event = event;
+    }
+
+    @Override
+    public void invoke() throws Exception {
+      plugin.onVertexStateUpdated(event);
+    }
+    
+  }
+  
+  class VertexManagerEventOnVertexStarted extends VertexManagerEvent {
+    private final Map<String, List<Integer>> pluginCompletionsMap;
+
+    public VertexManagerEventOnVertexStarted(Map<String, List<Integer>> pluginCompletionsMap) {
+      this.pluginCompletionsMap = pluginCompletionsMap;
+    }
+    
+    @Override
+    public void invoke() throws Exception {
+      plugin.onVertexStarted(pluginCompletionsMap);
+    }
+    
+  }
+  
+  class VertexManagerEventSourceTaskCompleted extends VertexManagerEvent {
+    private final Integer taskId;
+    private final String vertexName;
+    
+    public VertexManagerEventSourceTaskCompleted(Integer taskId, String vertexName) {
+      this.taskId = taskId;
+      this.vertexName = vertexName;
+    }
+    
+    @Override
+    public void invoke() throws Exception {
+      plugin.onSourceTaskCompleted(vertexName, taskId);      
     }
-    List<TezEvent> resultEvents = new ArrayList<TezEvent>();
-    rootInputInitEventQueue.drainTo(resultEvents);
-    return resultEvents;
+    
+  }
+  
+  class VertexManagerEventReceived extends VertexManagerEvent {
+    private final org.apache.tez.runtime.api.events.VertexManagerEvent vmEvent;
+    
+    public VertexManagerEventReceived(org.apache.tez.runtime.api.events.VertexManagerEvent vmEvent) {
+      this.vmEvent = vmEvent;
+    }
+    
+    @Override
+    public void invoke() throws Exception {
+      plugin.onVertexManagerEventReceived(vmEvent);
+    }
+    
+  }
+  
+  class VertexManagerEventRootInputInitialized extends VertexManagerEvent {
+    private final String inputName;
+    private final InputDescriptor inputDescriptor;
+    private final List<Event> events;
+    
+    public VertexManagerEventRootInputInitialized(String inputName,
+        InputDescriptor inputDescriptor, List<Event> events) {
+      super(new VertexManagerRootInputInitializedCallback());
+      this.inputName = inputName;
+      this.inputDescriptor = inputDescriptor;
+      this.events = events;
+    }
+
+    @Override
+    public void invoke() throws Exception {
+      plugin.onRootVertexInitialized(inputName, inputDescriptor, events);
+    }
+
+  }
+  
+  abstract class VertexManagerEvent extends CallableEvent {
+    public VertexManagerEvent() {
+      this(VM_CALLBACK);
+    }
+    public VertexManagerEvent(VertexManagerCallback callback) {
+      super(callback);
+    }
+
+    @Override
+    public Void call() throws Exception {
+      final VertexManager manager = VertexManager.this;
+      manager.dagUgi.doAs(new PrivilegedExceptionAction<Void>() {
+        @Override
+        public Void run() throws Exception {
+          synchronized (manager) {
+            if (manager.pluginInvocationAllowed("Not invoking")) {
+              invoke();
+            }
+          }
+          return null;
+        }
+      });
+      return null;
+    }
+
+    public abstract void invoke() throws Exception;
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/f0354689/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/CallableEventDispatcher.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/CallableEventDispatcher.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/CallableEventDispatcher.java
new file mode 100644
index 0000000..a81bd68
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/CallableEventDispatcher.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.impl;
+
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.dag.app.dag.event.CallableEvent;
+
+public class CallableEventDispatcher implements EventHandler<CallableEvent> {
+
+    @Override
+    public void handle(CallableEvent event) {
+      try {
+        event.call();
+        event.getCallback().onSuccess(null);
+      } catch (Exception e) {
+        event.getCallback().onFailure(e);
+      }
+    }
+    
+  }
+  

http://git-wip-us.apache.org/repos/asf/tez/blob/f0354689/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index cae9059..599f01e 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -18,6 +18,7 @@
 
 package org.apache.tez.dag.app.dag.impl;
 
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -26,9 +27,10 @@ import static org.mockito.Mockito.verify;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
@@ -37,6 +39,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -58,7 +61,6 @@ import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
 import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
-import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.oldrecords.TaskState;
 import org.apache.tez.dag.api.records.DAGProtos;
@@ -85,6 +87,8 @@ import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
 import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.VertexState;
 import org.apache.tez.dag.app.dag.VertexTerminationCause;
+import org.apache.tez.dag.app.dag.event.CallableEvent;
+import org.apache.tez.dag.app.dag.event.CallableEventType;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDAGFinished;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
 import org.apache.tez.dag.app.dag.event.DAGEvent;
@@ -125,8 +129,13 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.protobuf.ByteString;
 
 public class TestDAGImpl {
@@ -136,6 +145,7 @@ public class TestDAGImpl {
   private TezDAGID dagId;
   private static Configuration conf;
   private DrainDispatcher dispatcher;
+  private ListeningExecutorService execService;
   private Credentials fsTokens;
   private AppContext appContext;
   private ACLManager aclManager;
@@ -724,6 +734,7 @@ public class TestDAGImpl {
     MockDNSToSwitchMapping.initializeMockRackResolver();
   }
 
+  @SuppressWarnings({ "unchecked", "rawtypes" })
   @Before
   public void setup() {
     conf = new Configuration();
@@ -736,6 +747,19 @@ public class TestDAGImpl {
     dispatcher = new DrainDispatcher();
     fsTokens = new Credentials();
     appContext = mock(AppContext.class);
+    execService = mock(ListeningExecutorService.class);
+    final ListenableFuture<Void> mockFuture = mock(ListenableFuture.class);
+    
+    Mockito.doAnswer(new Answer() {
+      public ListenableFuture<Void> answer(InvocationOnMock invocation) {
+          Object[] args = invocation.getArguments();
+          CallableEvent e = (CallableEvent) args[0];
+          dispatcher.getEventHandler().handle(e);
+          return mockFuture;
+      }})
+    .when(execService).submit((Callable<Void>) any());
+    
+    doReturn(execService).when(appContext).getExecService();
     historyEventHandler = mock(HistoryEventHandler.class);
     aclManager = new ACLManager("amUser");
     doReturn(conf).when(appContext).getAMConf();
@@ -750,6 +774,7 @@ public class TestDAGImpl {
     doReturn(dag).when(appContext).getCurrentDAG();
     mrrAppContext = mock(AppContext.class);
     doReturn(aclManager).when(mrrAppContext).getAMACLManager();
+    doReturn(execService).when(mrrAppContext).getExecService();
     mrrDagId = TezDAGID.getInstance(appAttemptId.getApplicationId(), 2);
     mrrDagPlan = createTestMRRDAGPlan();
     mrrDag = new DAGImpl(mrrDagId, conf, mrrDagPlan,
@@ -763,6 +788,7 @@ public class TestDAGImpl {
     doReturn(historyEventHandler).when(mrrAppContext).getHistoryHandler();
     groupAppContext = mock(AppContext.class);
     doReturn(aclManager).when(groupAppContext).getAMACLManager();
+    doReturn(execService).when(groupAppContext).getExecService();
     groupDagId = TezDAGID.getInstance(appAttemptId.getApplicationId(), 3);
     groupDagPlan = createGroupDAGPlan();
     groupDag = new DAGImpl(groupDagId, conf, groupDagPlan,
@@ -778,6 +804,7 @@ public class TestDAGImpl {
 
     // reset totalCommitCounter to 0
     TotalCountingOutputCommitter.totalCommitCounter = 0;
+    dispatcher.register(CallableEventType.class, new CallableEventDispatcher());
     taskEventDispatcher = new TaskEventDispatcher();
     dispatcher.register(TaskEventType.class, taskEventDispatcher);
     taskAttemptEventDispatcher = new TaskAttemptEventDispatcher();
@@ -797,6 +824,7 @@ public class TestDAGImpl {
   public void teardown() {
     dispatcher.await();
     dispatcher.stop();
+    execService.shutdownNow();
     dagPlan = null;
     dag = null;
   }
@@ -817,6 +845,7 @@ public class TestDAGImpl {
         dispatcher.getEventHandler(),  taskAttemptListener,
         fsTokens, clock, "user", thh, dagWithCustomEdgeAppContext);
     doReturn(conf).when(dagWithCustomEdgeAppContext).getAMConf();
+    doReturn(execService).when(dagWithCustomEdgeAppContext).getExecService();
     doReturn(dagWithCustomEdge).when(dagWithCustomEdgeAppContext).getCurrentDAG();
     doReturn(appAttemptId).when(dagWithCustomEdgeAppContext).getApplicationAttemptId();
     doReturn(appAttemptId.getApplicationId()).when(dagWithCustomEdgeAppContext).getApplicationID();
@@ -838,7 +867,7 @@ public class TestDAGImpl {
     dispatcher.await();
     Assert.assertEquals(DAGState.RUNNING, impl.getState());
   }
-
+  
   @Test(timeout = 5000)
   public void testDAGInit() {
     initDAG(dag);

http://git-wip-us.apache.org/repos/asf/tez/blob/f0354689/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 83a3a8a..2c6f5e0 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -38,6 +38,7 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
@@ -116,6 +117,8 @@ import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.VertexState;
 import org.apache.tez.dag.app.dag.VertexStateUpdateListener;
 import org.apache.tez.dag.app.dag.VertexTerminationCause;
+import org.apache.tez.dag.app.dag.event.CallableEvent;
+import org.apache.tez.dag.app.dag.event.CallableEventType;
 import org.apache.tez.dag.app.dag.event.DAGEvent;
 import org.apache.tez.dag.app.dag.event.DAGEventType;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
@@ -176,10 +179,13 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
+import org.mockito.Mockito;
 import org.mockito.internal.util.collections.Sets;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
 
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -187,6 +193,7 @@ import org.mockito.stubbing.Answer;
 public class TestVertexImpl {
 
   private static final Log LOG = LogFactory.getLog(TestVertexImpl.class);
+  private ListeningExecutorService execService;
 
   private boolean useCustomInitializer = false;
   private InputInitializer customInitializer = null;
@@ -2119,6 +2126,7 @@ public class TestVertexImpl {
         anyInt());
   }
 
+  @SuppressWarnings({ "unchecked", "rawtypes" })
   public void setupPostDagCreation() throws AMUserCodeException {
     String dagName = "dag0";
     dispatcher = new DrainDispatcher();
@@ -2138,6 +2146,19 @@ public class TestVertexImpl {
     doReturn(appAttemptId).when(appContext).getApplicationAttemptId();
     doReturn(appAttemptId.getApplicationId()).when(appContext).getApplicationID();
     doReturn(dag).when(appContext).getCurrentDAG();
+    execService = mock(ListeningExecutorService.class);
+    final ListenableFuture<Void> mockFuture = mock(ListenableFuture.class);
+    
+    Mockito.doAnswer(new Answer() {
+      public ListenableFuture<Void> answer(InvocationOnMock invocation) {
+          Object[] args = invocation.getArguments();
+          CallableEvent e = (CallableEvent) args[0];
+          dispatcher.getEventHandler().handle(e);
+          return mockFuture;
+      }})
+    .when(execService).submit((Callable<Void>) any());
+    
+    doReturn(execService).when(appContext).getExecService();
     doReturn(conf).when(appContext).getAMConf();
     doReturn(new Credentials()).when(dag).getCredentials();
     doReturn(DAGPlan.getDefaultInstance()).when(dag).getJobPlan();
@@ -2191,6 +2212,7 @@ public class TestVertexImpl {
       edge.initialize();
     }
 
+    dispatcher.register(CallableEventType.class, new CallableEventDispatcher());
     taskAttemptEventDispatcher = new TaskAttemptEventDispatcher();
     dispatcher.register(TaskAttemptEventType.class, taskAttemptEventDispatcher);
     taskEventDispatcher = new TaskEventDispatcher();
@@ -2224,6 +2246,7 @@ public class TestVertexImpl {
       dispatcher.await();
       dispatcher.stop();
     }
+    execService.shutdownNow();
     dispatcher = null;
     vertexEventDispatcher = null;
     dagEventDispatcher = null;
@@ -2247,7 +2270,6 @@ public class TestVertexImpl {
     }
   }
 
-
   @SuppressWarnings("unchecked")
   private void initVertex(VertexImpl v) {
     Assert.assertEquals(VertexState.NEW, v.getState());
@@ -2382,6 +2404,7 @@ public class TestVertexImpl {
     Assert.assertEquals(VertexState.INITED, v2.getState());
     Assert.assertEquals(0, listener.events.size()); // configured event not sent
     startVertex(v1, true);
+    dispatcher.await();
     Assert.assertEquals(VertexState.RUNNING, v2.getState());
     Assert.assertEquals(1, listener.events.size()); // configured event sent after VM
     Assert.assertEquals("vertex2", listener.events.get(0).getVertexName());
@@ -2821,12 +2844,13 @@ public class TestVertexImpl {
   
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
-  public void testVertexTaskAttemptProcessorFailure() {
+  public void testVertexTaskAttemptProcessorFailure() throws Exception {
     initAllVertices(VertexState.INITED);
 
     VertexImpl v = vertices.get("vertex1");
 
     startVertex(v);
+    dispatcher.await();
     TaskAttemptImpl ta = (TaskAttemptImpl) v.getTask(0).getAttempts().values().iterator().next();
     ta.handle(new TaskAttemptEventSchedule(ta.getID(), 2, 2));
     
@@ -2856,12 +2880,13 @@ public class TestVertexImpl {
 
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
-  public void testVertexTaskAttemptInputFailure() {
+  public void testVertexTaskAttemptInputFailure() throws Exception {
     initAllVertices(VertexState.INITED);
 
     VertexImpl v = vertices.get("vertex1");
 
     startVertex(v);
+    dispatcher.await();
     TaskAttemptImpl ta = (TaskAttemptImpl) v.getTask(0).getAttempts().values().iterator().next();
     ta.handle(new TaskAttemptEventSchedule(ta.getID(), 2, 2));
     
@@ -2892,12 +2917,13 @@ public class TestVertexImpl {
 
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
-  public void testVertexTaskAttemptOutputFailure() {
+  public void testVertexTaskAttemptOutputFailure() throws Exception {
     initAllVertices(VertexState.INITED);
 
     VertexImpl v = vertices.get("vertex1");
 
     startVertex(v);
+    dispatcher.await();
     TaskAttemptImpl ta = (TaskAttemptImpl) v.getTask(0).getAttempts().values().iterator().next();
     ta.handle(new TaskAttemptEventSchedule(ta.getID(), 2, 2));
     
@@ -3355,7 +3381,7 @@ public class TestVertexImpl {
 
 
   @Test(timeout = 5000)
-  public void testVertexWithOneToOneSplit() throws AMUserCodeException {
+  public void testVertexWithOneToOneSplit() throws Exception {
     // create a diamond shaped dag with 1-1 edges. 
     // split the source and remaining vertices should split equally
     // vertex with 2 incoming splits from the same source should split once
@@ -3386,7 +3412,7 @@ public class TestVertexImpl {
     RootInputInitializerManagerControlled initializerManager1 = v1.getRootInputInitializerManager();
     List<TaskLocationHint> v1Hints = createTaskLocationHints(numTasks);
     initializerManager1.completeInputInitialization(0, numTasks, v1Hints);
-
+    dispatcher.await();
     Assert.assertEquals(VertexState.INITED, v1.getState());
     Assert.assertEquals(numTasks, v1.getTotalTasks());
     Assert.assertEquals(RootInputVertexManager.class.getName(), v1
@@ -3437,10 +3463,10 @@ public class TestVertexImpl {
     // fudge vertex manager so that tasks dont start running
     v1.vertexManager = new VertexManager(
         VertexManagerPluginDescriptor.create(VertexManagerPluginForTest.class.getName()),
-        v1, appContext, mock(StateChangeNotifier.class));
+        UserGroupInformation.getCurrentUser(), v1, appContext, mock(StateChangeNotifier.class));
     v1.vertexManager.initialize();
     startVertex(v1);
-    
+    dispatcher.await();
     Assert.assertEquals(numTasks, vertices.get("vertex2").getTotalTasks());
     Assert.assertEquals(numTasks, vertices.get("vertex3").getTotalTasks());
     Assert.assertEquals(numTasks, vertices.get("vertex4").getTotalTasks());
@@ -3476,7 +3502,7 @@ public class TestVertexImpl {
     // fudge vertex manager so that tasks dont start running
     v1.vertexManager = new VertexManager(
         VertexManagerPluginDescriptor.create(VertexManagerPluginForTest.class.getName()),
-        v1, appContext, mock(StateChangeNotifier.class));
+        UserGroupInformation.getCurrentUser(), v1, appContext, mock(StateChangeNotifier.class));
     v1.vertexManager.initialize();
     
     Assert.assertEquals(numTasks, vertices.get("vertex2").getTotalTasks());
@@ -3519,7 +3545,7 @@ public class TestVertexImpl {
 
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
-  public void testVertexWithInitializerFailure() throws AMUserCodeException {
+  public void testVertexWithInitializerFailure() throws Exception {
     useCustomInitializer = true;
     setupPreDagCreation();
     dagPlan = createDAGPlanWithInputInitializer("TestInputInitializer");
@@ -3548,7 +3574,7 @@ public class TestVertexImpl {
     Assert.assertEquals(VertexState.INITIALIZING, v2.getState());
     RootInputInitializerManagerControlled initializerManager2 = v2.getRootInputInitializerManager();
     initializerManager2.failInputInitialization();
-    
+    dispatcher.await();
     Assert.assertEquals(VertexState.FAILED, v2.getState());
     Assert.assertEquals(RootInputVertexManager.class.getName(), v2
         .getVertexManager().getPlugin().getClass().getName());
@@ -4400,7 +4426,7 @@ public class TestVertexImpl {
   }
 
   @Test(timeout = 5000)
-  public void testVertexWithMultipleInitializers1() throws AMUserCodeException {
+  public void testVertexWithMultipleInitializers1() throws Exception {
     useCustomInitializer = true;
     setupPreDagCreation();
     dagPlan = createDAGPlanWithMultipleInitializers("TestInputInitializer");
@@ -4419,15 +4445,17 @@ public class TestVertexImpl {
 
     // Complete initializer which sets parallelism first
     initializerManager1.completeInputInitialization(0, 5, v1Hints);
+    dispatcher.await();
     Assert.assertEquals(VertexState.INITIALIZING, v1.getState());
 
     // Complete second initializer
     initializerManager1.completeInputInitialization(1);
+    dispatcher.await();
     Assert.assertEquals(VertexState.INITED, v1.getState());
   }
 
   @Test(timeout = 5000)
-  public void testVertexWithMultipleInitializers2() throws AMUserCodeException {
+  public void testVertexWithMultipleInitializers2() throws Exception {
     useCustomInitializer = true;
     setupPreDagCreation();
     dagPlan = createDAGPlanWithMultipleInitializers("TestInputInitializer");
@@ -4446,16 +4474,18 @@ public class TestVertexImpl {
 
     // Complete initializer which does not set parallelism
     initializerManager1.completeInputInitialization(1);
+    dispatcher.await();
     Assert.assertEquals(VertexState.INITIALIZING, v1.getState());
 
     // Complete second initializer which sets parallelism
     initializerManager1.completeInputInitialization(0, 5, v1Hints);
+    dispatcher.await();
     Assert.assertEquals(VertexState.INITED, v1.getState());
   }
 
   @SuppressWarnings("unchecked")
   @Test(timeout = 500000)
-  public void testVertexWithInitializerSuccess() throws AMUserCodeException {
+  public void testVertexWithInitializerSuccess() throws Exception {
     useCustomInitializer = true;
     setupPreDagCreation();
     dagPlan = createDAGPlanWithInputInitializer("TestInputInitializer");
@@ -4470,7 +4500,7 @@ public class TestVertexImpl {
     RootInputInitializerManagerControlled initializerManager1 = v1.getRootInputInitializerManager();
     List<TaskLocationHint> v1Hints = createTaskLocationHints(5);
     initializerManager1.completeInputInitialization(0, 5, v1Hints);
-
+    dispatcher.await();
     Assert.assertEquals(VertexState.INITED, v1.getState());
     Assert.assertEquals(5, v1.getTotalTasks());
     // task events get buffered
@@ -4510,7 +4540,7 @@ public class TestVertexImpl {
     RootInputInitializerManagerControlled initializerManager2 = v2.getRootInputInitializerManager();
     List<TaskLocationHint> v2Hints = createTaskLocationHints(10);
     initializerManager2.completeInputInitialization(0, 10, v2Hints);
-    
+    dispatcher.await();
     Assert.assertEquals(VertexState.INITED, v2.getState());
     Assert.assertEquals(10, v2.getTotalTasks());
     // task events get buffered
@@ -4530,7 +4560,7 @@ public class TestVertexImpl {
   
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
-  public void testVertexWithInputDistributor() throws AMUserCodeException {
+  public void testVertexWithInputDistributor() throws Exception {
     useCustomInitializer = true;
     setupPreDagCreation();
     dagPlan = createDAGPlanWithInputDistributor("TestInputInitializer");
@@ -4547,6 +4577,7 @@ public class TestVertexImpl {
     RootInputInitializerManagerControlled initializerManager1 = v1.getRootInputInitializerManager();
     byte[] payload = new byte[0];
     initializerManager1.completeInputDistribution(payload);
+    dispatcher.await();
     // edge is still null so its initializing
     Assert.assertEquals(VertexState.INITIALIZING, v1.getState());
     Assert.assertEquals(true, initializerManager1.hasShutDown);
@@ -4565,7 +4596,7 @@ public class TestVertexImpl {
   
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
-  public void testVertexRootInputSpecUpdateAll() throws AMUserCodeException {
+  public void testVertexRootInputSpecUpdateAll() throws Exception {
     useCustomInitializer = true;
     setupPreDagCreation();
     dagPlan = createDAGPlanWithInputInitializer("TestInputInitializer");
@@ -4580,7 +4611,7 @@ public class TestVertexImpl {
     Assert.assertEquals(VertexState.INITIALIZING, v3.getState());
     RootInputInitializerManagerControlled initializerManager1 = v3.getRootInputInitializerManager();
     initializerManager1.completeInputInitialization();
-
+    dispatcher.await();
     Assert.assertEquals(VertexState.INITED, v3.getState());
     Assert.assertEquals(expectedNumTasks, v3.getTotalTasks());
     Assert.assertEquals(RootInputSpecUpdaterVertexManager.class.getName(), v3.getVertexManager()
@@ -4596,7 +4627,7 @@ public class TestVertexImpl {
   
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
-  public void testVertexRootInputSpecUpdatePerTask() throws AMUserCodeException {
+  public void testVertexRootInputSpecUpdatePerTask() throws Exception {
     useCustomInitializer = true;
     setupPreDagCreation();
     dagPlan = createDAGPlanWithInputInitializer("TestInputInitializer");
@@ -4611,7 +4642,7 @@ public class TestVertexImpl {
     Assert.assertEquals(VertexState.INITIALIZING, v4.getState());
     RootInputInitializerManagerControlled initializerManager1 = v4.getRootInputInitializerManager();
     initializerManager1.completeInputInitialization();
-
+    dispatcher.await();
     Assert.assertEquals(VertexState.INITED, v4.getState());
     Assert.assertEquals(expectedNumTasks, v4.getTotalTasks());
     Assert.assertEquals(RootInputSpecUpdaterVertexManager.class.getName(), v4.getVertexManager()
@@ -4969,7 +5000,7 @@ public class TestVertexImpl {
     // fudge the vm so we can do custom stuff
     vB.vertexManager = new VertexManager(
         VertexManagerPluginDescriptor.create(VertexManagerPluginForTest.class.getName()),
-        vB, appContext, mock(StateChangeNotifier.class));
+        UserGroupInformation.getCurrentUser(), vB, appContext, mock(StateChangeNotifier.class));
     
     vB.vertexReconfigurationPlanned();
     
@@ -5093,7 +5124,7 @@ public class TestVertexImpl {
   
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
-  public void testExceptionFromVM_OnRootVertexInitialized() throws AMUserCodeException {
+  public void testExceptionFromVM_OnRootVertexInitialized() throws Exception {
     useCustomInitializer = true;
     setupPreDagCreation();
     dagPlan = createDAGPlanWithVMException("TestInputInitializer", VMExceptionLocation.OnRootVertexInitialized);
@@ -5118,7 +5149,7 @@ public class TestVertexImpl {
   
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
-  public void testExceptionFromVM_OnVertexStarted() throws AMUserCodeException {
+  public void testExceptionFromVM_OnVertexStarted() throws Exception {
     useCustomInitializer = true;
     setupPreDagCreation();
     dagPlan = createDAGPlanWithVMException("TestInputInitializer", VMExceptionLocation.OnVertexStarted);
@@ -5136,7 +5167,7 @@ public class TestVertexImpl {
     dispatcher.getEventHandler().handle(new VertexEvent(v1.getVertexId(),
         VertexEventType.V_START));
     dispatcher.await();
-
+    
     Assert.assertEquals(VertexManagerWithException.class, v1.vertexManager.getPlugin().getClass());
     Assert.assertEquals(VertexState.FAILED, v1.getState());
     String diagnostics = StringUtils.join(v1.getDiagnostics(), ",");
@@ -5146,7 +5177,7 @@ public class TestVertexImpl {
   
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
-  public void testExceptionFromVM_OnSourceTaskCompleted() throws AMUserCodeException {
+  public void testExceptionFromVM_OnSourceTaskCompleted() throws Exception {
     useCustomInitializer = true;
     setupPreDagCreation();
     dagPlan = createDAGPlanWithVMException("TestInputInitializer", VMExceptionLocation.OnSourceTaskCompleted);
@@ -5183,7 +5214,7 @@ public class TestVertexImpl {
   
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
-  public void testExceptionFromVM_OnVertexManagerEventReceived() throws AMUserCodeException {
+  public void testExceptionFromVM_OnVertexManagerEventReceived() throws Exception {
     useCustomInitializer = true;
     setupPreDagCreation();
     dagPlan = createDAGPlanWithVMException("TestInputInitializer", VMExceptionLocation.OnVertexManagerEventReceived);
@@ -5210,7 +5241,7 @@ public class TestVertexImpl {
   
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
-  public void testExceptionFromVM_OnVertexManagerVertexStateUpdated() throws AMUserCodeException {
+  public void testExceptionFromVM_OnVertexManagerVertexStateUpdated() throws Exception {
     useCustomInitializer = true;
     setupPreDagCreation();
     dagPlan = createDAGPlanWithVMException("TestVMStateUpdate", VMExceptionLocation.OnVertexManagerVertexStateUpdated);
@@ -5228,7 +5259,7 @@ public class TestVertexImpl {
     dispatcher.await();
     Assert.assertEquals(VertexState.INITED, v2.getState());
     startVertex(v1, false);
-
+    dispatcher.await();
     Assert.assertEquals(VertexState.FAILED, v2.getState());
     String diagnostics = StringUtils.join(v2.getDiagnostics(), ",");
     assertTrue(diagnostics.contains(VMExceptionLocation.OnVertexManagerVertexStateUpdated.name()));
@@ -5261,7 +5292,7 @@ public class TestVertexImpl {
 
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
-  public void testExceptionFromII_InitFailedAfterInitialized() throws AMUserCodeException {
+  public void testExceptionFromII_InitFailedAfterInitialized() throws Exception {
     useCustomInitializer = true;
     setupPreDagCreation();
     dagPlan = createDAGPlanWithIIException();
@@ -5272,6 +5303,7 @@ public class TestVertexImpl {
     initVertex(v1);
     RootInputInitializerManagerControlled initializerManager1 = v1.getRootInputInitializerManager();
     initializerManager1.completeInputInitialization(0);
+    dispatcher.await();
     Assert.assertEquals(VertexState.INITED, v1.getState());
     String errorMsg = "ErrorWhenInitFailureAtInited";
     dispatcher.getEventHandler().handle(new VertexEventRootInputFailed(v1.getVertexId(), "input1",
@@ -5285,7 +5317,7 @@ public class TestVertexImpl {
 
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
-  public void testExceptionFromII_InitFailedAfterRunning() throws AMUserCodeException {
+  public void testExceptionFromII_InitFailedAfterRunning() throws Exception {
     useCustomInitializer = true;
     setupPreDagCreation();
     dagPlan = createDAGPlanWithIIException();
@@ -5296,6 +5328,7 @@ public class TestVertexImpl {
     initVertex(v1);
     RootInputInitializerManagerControlled initializerManager1 = v1.getRootInputInitializerManager();
     initializerManager1.completeInputInitialization(0);
+    dispatcher.await();
     startVertex(v1);
     Assert.assertEquals(VertexState.RUNNING, v1.getState());
     String errorMsg = "ErrorWhenInitFailureAtRunning";
@@ -5310,7 +5343,7 @@ public class TestVertexImpl {
 
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
-  public void testExceptionFromII_HandleInputInitializerEvent() throws AMUserCodeException, InterruptedException {
+  public void testExceptionFromII_HandleInputInitializerEvent() throws Exception {
     useCustomInitializer = true;
     customInitializer = new EventHandlingRootInputInitializer(null, IIExceptionLocation.HandleInputInitializerEvent);
     EventHandlingRootInputInitializer initializer =
@@ -5351,7 +5384,7 @@ public class TestVertexImpl {
     dispatcher.getEventHandler()
         .handle(new VertexEventRouteEvent(v1.getVertexId(), Collections.singletonList(tezEvent)));
     dispatcher.await();
-
+    
     // it would cause v2 fail as its II throw exception in handleInputInitializerEvent
     String diagnostics = StringUtils.join(v2.getDiagnostics(), ",");
     assertTrue(diagnostics.contains(IIExceptionLocation.HandleInputInitializerEvent.name()));

http://git-wip-us.apache.org/repos/asf/tez/blob/f0354689/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
index 73dc5eb..81cb42a 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
@@ -23,6 +23,8 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.util.HashMap;
@@ -31,7 +33,10 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Callable;
 
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.VertexManagerPlugin;
 import org.apache.tez.dag.api.VertexManagerPluginContext;
@@ -39,38 +44,73 @@ import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.dag.StateChangeNotifier;
 import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.event.CallableEvent;
+import org.apache.tez.dag.app.dag.event.VertexEventInputDataInformation;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.events.InputDataInformationEvent;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
 import org.apache.tez.runtime.api.impl.TezEvent;
+import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
 
+@SuppressWarnings({ "rawtypes", "unchecked" })
 public class TestVertexManager {
-
-  @Test(timeout = 5000)
-  public void testOnRootVertexInitialized() throws Exception {
-    Vertex mockVertex = mock(Vertex.class, RETURNS_DEEP_STUBS);
-    AppContext mockAppContext = mock(AppContext.class, RETURNS_DEEP_STUBS);
+  AppContext mockAppContext;
+  ListeningExecutorService execService;
+  Vertex mockVertex;
+  EventHandler mockHandler;
+  ArgumentCaptor<VertexEventInputDataInformation> requestCaptor;
+  
+  @Before
+  public void setup() {
+    mockAppContext = mock(AppContext.class, RETURNS_DEEP_STUBS);
+    execService = mock(ListeningExecutorService.class);
+    final ListenableFuture<Void> mockFuture = mock(ListenableFuture.class);
+    Mockito.doAnswer(new Answer() {
+      public ListenableFuture<Void> answer(InvocationOnMock invocation) {
+          Object[] args = invocation.getArguments();
+          CallableEvent e = (CallableEvent) args[0];
+          new CallableEventDispatcher().handle(e);
+          return mockFuture;
+      }})
+    .when(execService).submit((Callable<Void>) any());
+    doReturn(execService).when(mockAppContext).getExecService();
+    mockVertex = mock(Vertex.class, RETURNS_DEEP_STUBS);
     doReturn("vertex1").when(mockVertex).getName();
+    mockHandler = mock(EventHandler.class);
+    when(mockAppContext.getEventHandler()).thenReturn(mockHandler);
     when(
         mockAppContext.getCurrentDAG().getVertex(any(String.class))
             .getTotalTasks()).thenReturn(1);
+    requestCaptor = ArgumentCaptor.forClass(VertexEventInputDataInformation.class);
 
+  }
+  
+  @Test(timeout = 5000)
+  public void testOnRootVertexInitialized() throws Exception {
     VertexManager vm =
         new VertexManager(
             VertexManagerPluginDescriptor.create(RootInputVertexManager.class
-                .getName()), mockVertex, mockAppContext, mock(StateChangeNotifier.class));
+                .getName()), UserGroupInformation.getCurrentUser(), 
+                mockVertex, mockAppContext, mock(StateChangeNotifier.class));
     vm.initialize();
     InputDescriptor id1 = mock(InputDescriptor.class);
     List<Event> events1 = new LinkedList<Event>();
     InputDataInformationEvent diEvent1 =
         InputDataInformationEvent.createWithSerializedPayload(0, null);
     events1.add(diEvent1);
-    List<TezEvent> tezEvents1 =
-        vm.onRootVertexInitialized("input1", id1, events1);
+    vm.onRootVertexInitialized("input1", id1, events1);
+    verify(mockHandler, times(1)).handle(requestCaptor.capture());
+    List<TezEvent> tezEvents1 = requestCaptor.getValue().getEvents();
     assertEquals(1, tezEvents1.size());
     assertEquals(diEvent1, tezEvents1.get(0).getEvent());
 
@@ -79,8 +119,9 @@ public class TestVertexManager {
     InputDataInformationEvent diEvent2 =
         InputDataInformationEvent.createWithSerializedPayload(0, null);
     events2.add(diEvent2);
-    List<TezEvent> tezEvents2 =
-        vm.onRootVertexInitialized("input1", id2, events2);
+    vm.onRootVertexInitialized("input1", id2, events2);
+    verify(mockHandler, times(2)).handle(requestCaptor.capture());
+    List<TezEvent> tezEvents2 = requestCaptor.getValue().getEvents();
     assertEquals(tezEvents2.size(), 1);
     assertEquals(diEvent2, tezEvents2.get(0).getEvent());
   }
@@ -92,17 +133,11 @@ public class TestVertexManager {
    */
   @Test(timeout = 5000)
   public void testOnRootVertexInitialized2() throws Exception {
-    Vertex mockVertex = mock(Vertex.class, RETURNS_DEEP_STUBS);
-    AppContext mockAppContext = mock(AppContext.class, RETURNS_DEEP_STUBS);
-    doReturn("vertex1").when(mockVertex).getName();
-    when(
-        mockAppContext.getCurrentDAG().getVertex(any(String.class))
-            .getTotalTasks()).thenReturn(1);
-
     VertexManager vm =
         new VertexManager(
             VertexManagerPluginDescriptor.create(CustomVertexManager.class
-                .getName()), mockVertex, mockAppContext, mock(StateChangeNotifier.class));
+                .getName()), UserGroupInformation.getCurrentUser(),
+                mockVertex, mockAppContext, mock(StateChangeNotifier.class));
     vm.initialize();
     InputDescriptor id1 = mock(InputDescriptor.class);
     List<Event> events1 = new LinkedList<Event>();
@@ -111,17 +146,20 @@ public class TestVertexManager {
     events1.add(diEvent1);
 
     // do not call context.addRootInputEvents, just cache the TezEvent
-    List<TezEvent> tezEventsAfterInput1 = vm.onRootVertexInitialized("input1", id1, events1);
+    vm.onRootVertexInitialized("input1", id1, events1);
+    verify(mockHandler, times(1)).handle(requestCaptor.capture());
+    List<TezEvent> tezEventsAfterInput1 = requestCaptor.getValue().getEvents();
     assertEquals(0, tezEventsAfterInput1.size());
-
+    
     InputDescriptor id2 = mock(InputDescriptor.class);
     List<Event> events2 = new LinkedList<Event>();
     InputDataInformationEvent diEvent2 =
         InputDataInformationEvent.createWithSerializedPayload(0, null);
     events2.add(diEvent2);
     // call context.addRootInputEvents(input1), context.addRootInputEvents(input2)
-    List<TezEvent> tezEventsAfterInput2 =
-        vm.onRootVertexInitialized("input2", id2, events2);
+    vm.onRootVertexInitialized("input2", id2, events2);
+    verify(mockHandler, times(2)).handle(requestCaptor.capture());
+    List<TezEvent> tezEventsAfterInput2 = requestCaptor.getValue().getEvents();
     assertEquals(2, tezEventsAfterInput2.size());
 
     // also verify the EventMetaData

http://git-wip-us.apache.org/repos/asf/tez/blob/f0354689/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
index e2e9dd3..bbe9dcf 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
@@ -170,7 +170,7 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
   public synchronized void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws Exception {
     numConfiguredSources++;
     int target = getContext().getInputVertexEdgeProperties().size();
-    LOG.info("For vertex: " + getContext().getVertexName() + "Received configured signal from: "
+    LOG.info("For vertex: " + getContext().getVertexName() + " Received configured signal from: "
         + stateUpdate.getVertexName() + " numConfiguredSources: " + numConfiguredSources
         + " needed: " + target);
     Preconditions.checkState(numConfiguredSources <= target, "Vertex: " + getContext().getVertexName());

http://git-wip-us.apache.org/repos/asf/tez/blob/f0354689/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
index 8de747d..411ea71 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
@@ -243,12 +243,6 @@ public class TestInputReadyVertexManager {
     when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(3);
     when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(3);
     when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(3);
-    when(mockContext.getTaskContainer(mockSrcVertexId2, 0)).thenReturn(mockContainer2);
-    when(mockContext.getTaskContainer(mockSrcVertexId2, 1)).thenReturn(mockContainer2);
-    when(mockContext.getTaskContainer(mockSrcVertexId2, 2)).thenReturn(mockContainer2);
-    when(mockContext.getTaskContainer(mockSrcVertexId3, 0)).thenReturn(mockContainer3);
-    when(mockContext.getTaskContainer(mockSrcVertexId3, 1)).thenReturn(mockContainer3);
-    when(mockContext.getTaskContainer(mockSrcVertexId3, 2)).thenReturn(mockContainer3);
     mockInputVertices.put(mockSrcVertexId1, eProp1);
     mockInputVertices.put(mockSrcVertexId2, eProp2);
     mockInputVertices.put(mockSrcVertexId3, eProp3);