You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2014/11/17 20:22:19 UTC

[10/50] [abbrv] tez git commit: TEZ-1703. Exception handling for InputInitializer. (zjffdu)

TEZ-1703. Exception handling for InputInitializer. (zjffdu)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4ec29425
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4ec29425
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4ec29425

Branch: refs/heads/TEZ-8
Commit: 4ec29425d63decfd8de4e1528f043271cf7cb3b2
Parents: 7f8fc75
Author: Jeff Zhang <zj...@apache.org>
Authored: Fri Oct 31 09:55:22 2014 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Fri Oct 31 09:55:22 2014 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../tez/runtime/api/InputInitializer.java       |   3 +-
 .../app/dag/RootInputInitializerManager.java    |  30 +++-
 .../dag/event/VertexEventRootInputFailed.java   |   7 +-
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    |   4 +-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  52 ++++--
 .../tez/dag/app/dag/impl/TestVertexImpl.java    | 178 ++++++++++++++++++-
 .../tez/test/TestExceptionPropagation.java      | 138 +++++++++++---
 8 files changed, 362 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/4ec29425/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0b2dc65..61b380e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -92,6 +92,7 @@ ALL CHANGES:
   TEZ-1716. Additional ATS data for UI.
   TEZ-1722. DAG should be related to Application Id in ATS data.
   TEZ-1711. Don't cache outputSpecList in VertexImpl.getOutputSpecList(taskIndex)
+  TEZ-1703. Exception handling for InputInitializer.
 
 Release 0.5.1: 2014-10-02
 

http://git-wip-us.apache.org/repos/asf/tez/blob/4ec29425/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializer.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializer.java b/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializer.java
index 7b22b62..d9d6517 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializer.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializer.java
@@ -101,7 +101,8 @@ public abstract class InputInitializer {
    * @param stateUpdate an event indicating the name of the vertex, and it's updated state.
    *                    Additional information may be available for specific events, Look at the
    *                    type hierarchy for {@link org.apache.tez.dag.api.event.VertexStateUpdate}
+   * @throws Exception
    */
-  public void onVertexStateUpdated(VertexStateUpdate stateUpdate) {
+  public void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws Exception {
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/4ec29425/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
index 1f7a83f..bdd3689 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
@@ -19,6 +19,8 @@
 package org.apache.tez.dag.app.dag;
 
 import javax.annotation.Nullable;
+
+import java.lang.reflect.UndeclaredThrowableException;
 import java.security.PrivilegedExceptionAction;
 import java.util.Collection;
 import java.util.HashMap;
@@ -53,7 +55,9 @@ import org.apache.tez.dag.api.oldrecords.TaskState;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.dag.event.VertexEventRootInputFailed;
 import org.apache.tez.dag.app.dag.event.VertexEventRootInputInitialized;
+import org.apache.tez.dag.app.dag.impl.AMUserCodeException;
 import org.apache.tez.dag.app.dag.impl.TezRootInputInitializerContextImpl;
+import org.apache.tez.dag.app.dag.impl.AMUserCodeException.Source;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.runtime.api.Event;
@@ -275,12 +279,18 @@ public class RootInputInitializerManager {
     @SuppressWarnings("unchecked")
     @Override
     public void onFailure(Throwable t) {
+      // catch real root cause of failure, it would throw UndeclaredThrowableException
+      // if using UGI.doAs
+      if (t instanceof UndeclaredThrowableException) {
+        t = t.getCause();
+      }
       initializer.setComplete();
       LOG.info(
           "Failed InputInitializer for Input: " + initializer.getInput().getName() +
               " on vertex " + initializer.getVertexLogIdentifier());
       eventHandler
-          .handle(new VertexEventRootInputFailed(vertexID, initializer.getInput().getName(), t));
+          .handle(new VertexEventRootInputFailed(vertexID, initializer.getInput().getName(),
+              new AMUserCodeException(Source.InputInitializer,t)));
     }
   }
 
@@ -294,6 +304,7 @@ public class RootInputInitializerManager {
     private final InputInitializerContext context;
     private final AtomicBoolean isComplete = new AtomicBoolean(false);
     private final String vertexLogIdentifier;
+    private final TezVertexID vertexId;
     private final StateChangeNotifier stateChangeNotifier;
     private final List<String> notificationRegisteredVertices = Lists.newArrayList();
     private final AppContext appContext;
@@ -306,6 +317,7 @@ public class RootInputInitializerManager {
       this.initializer = initializer;
       this.context = context;
       this.vertexLogIdentifier = vertex.getLogIdentifier();
+      this.vertexId = vertex.getVertexId();
       this.stateChangeNotifier = stateChangeNotifier;
       this.appContext = appContext;
     }
@@ -348,6 +360,7 @@ public class RootInputInitializerManager {
       }
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public void onStateUpdated(VertexStateUpdate event) {
       if (isComplete()) {
@@ -357,7 +370,13 @@ public class RootInputInitializerManager {
               " since initializer " + input.getName() + " is already complete.");
         }
       } else {
-        initializer.onVertexStateUpdated(event);
+        try {
+          initializer.onVertexStateUpdated(event);
+        } catch (Exception e) {
+          appContext.getEventHandler().handle(
+              new VertexEventRootInputFailed(vertexId, input.getName(),
+                  new AMUserCodeException(Source.InputInitializer,e)));
+        }
       }
     }
 
@@ -455,14 +474,15 @@ public class RootInputInitializerManager {
       sendEvents(toForwardEvents);
     }
 
+    @SuppressWarnings("unchecked")
     private void sendEvents(List<InputInitializerEvent> events) {
       if (events != null && !events.isEmpty()) {
         try {
           initializer.handleInputInitializerEvent(events);
         } catch (Exception e) {
-          throw new TezUncheckedException(
-              "Initializer for input: " + getInput().getName() + " on vertex: " + getVertexLogIdentifier() +
-                  " failed to process events", e);
+          appContext.getEventHandler().handle(
+              new VertexEventRootInputFailed(vertexId, input.getName(),
+                  new AMUserCodeException(Source.InputInitializer,e)));
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/4ec29425/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRootInputFailed.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRootInputFailed.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRootInputFailed.java
index 4ab4ae9..b7701d7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRootInputFailed.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRootInputFailed.java
@@ -18,20 +18,21 @@
 
 package org.apache.tez.dag.app.dag.event;
 
+import org.apache.tez.dag.app.dag.impl.AMUserCodeException;
 import org.apache.tez.dag.records.TezVertexID;
 
 public class VertexEventRootInputFailed extends VertexEvent {
   
   private final String inputName;
-  private final Throwable error;
+  private final AMUserCodeException error;
 
-  public VertexEventRootInputFailed(TezVertexID vertexId, String inputName, Throwable error) {
+  public VertexEventRootInputFailed(TezVertexID vertexId, String inputName, AMUserCodeException error) {
     super(vertexId, VertexEventType.V_ROOT_INPUT_FAILED);
     this.inputName = inputName;
     this.error = error;
   }
   
-  public Throwable getError() {
+  public AMUserCodeException getError() {
     return this.error;
   }
   

http://git-wip-us.apache.org/repos/asf/tez/blob/4ec29425/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 6dccf3a..cddcbd5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -1686,9 +1686,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       }
       else if (vertexEvent.getVertexState() == VertexState.FAILED) {
         job.enactKill(
-            DAGTerminationCause.VERTEX_FAILURE,
-            vertexEvent.getVertexTerminationCause() == null ? VertexTerminationCause.OTHER_VERTEX_FAILURE
-                : vertexEvent.getVertexTerminationCause());
+            DAGTerminationCause.VERTEX_FAILURE, VertexTerminationCause.OTHER_VERTEX_FAILURE);
         job.vertexFailed(vertex);
         forceTransitionToKillWait = true;
       }

http://git-wip-us.apache.org/repos/asf/tez/blob/4ec29425/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index c182810..4a88949 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -335,7 +335,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               EnumSet.of(VertexState.INITED, VertexState.FAILED),
               VertexEventType.V_READY_TO_INIT,
               new VertexInitializedTransition())
-          .addTransition(VertexState.INITIALIZING, VertexState.FAILED,
+          .addTransition(VertexState.INITIALIZING,
+              EnumSet.of(VertexState.FAILED),
               VertexEventType.V_ROOT_INPUT_FAILED,
               new RootInputInitFailedTransition())
           .addTransition(VertexState.INITIALIZING, VertexState.INITIALIZING,
@@ -367,6 +368,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           // Transitions from INITED state
           // SOURCE_VERTEX_STARTED - for sources which determine parallelism,
           // they must complete before this vertex can start.
+          .addTransition(VertexState.INITED,
+              EnumSet.of(VertexState.FAILED),
+              VertexEventType.V_ROOT_INPUT_FAILED,
+              new RootInputInitFailedTransition())
           .addTransition
               (VertexState.INITED,
                   EnumSet.of(VertexState.INITED, VertexState.ERROR),
@@ -399,6 +404,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               INTERNAL_ERROR_TRANSITION)
 
           // Transitions from RUNNING state
+          .addTransition(VertexState.RUNNING,
+              EnumSet.of(VertexState.TERMINATING),
+              VertexEventType.V_ROOT_INPUT_FAILED,
+              new RootInputInitFailedTransition())
           .addTransition(VertexState.RUNNING, VertexState.RUNNING,
               VertexEventType.V_TASK_ATTEMPT_COMPLETED,
               TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION)
@@ -451,6 +460,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           // Ignore-able events
           .addTransition(VertexState.TERMINATING, VertexState.TERMINATING,
               EnumSet.of(VertexEventType.V_TERMINATE,
+                  VertexEventType.V_ROOT_INPUT_FAILED,
                   VertexEventType.V_SOURCE_VERTEX_STARTED,
                   VertexEventType.V_ROOT_INPUT_INITIALIZED,
                   VertexEventType.V_NULL_EDGE_INITIALIZED,
@@ -483,6 +493,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               new TaskCompletedAfterVertexSuccessTransition())
           .addTransition(VertexState.SUCCEEDED, VertexState.SUCCEEDED,
               EnumSet.of(VertexEventType.V_TERMINATE,
+                  VertexEventType.V_ROOT_INPUT_FAILED,
                   VertexEventType.V_TASK_ATTEMPT_COMPLETED,
                   // after we are done reruns of source tasks should not affect
                   // us. These reruns may be triggered by other consumer vertices.
@@ -501,6 +512,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           // Ignore-able events
           .addTransition(VertexState.FAILED, VertexState.FAILED,
               EnumSet.of(VertexEventType.V_TERMINATE,
+                  VertexEventType.V_ROOT_INPUT_FAILED,
                   VertexEventType.V_SOURCE_VERTEX_STARTED,
                   VertexEventType.V_TASK_RESCHEDULED,
                   VertexEventType.V_START,
@@ -522,6 +534,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           // Ignore-able events
           .addTransition(VertexState.KILLED, VertexState.KILLED,
               EnumSet.of(VertexEventType.V_TERMINATE,
+                  VertexEventType.V_ROOT_INPUT_FAILED,
                   VertexEventType.V_INIT,
                   VertexEventType.V_SOURCE_VERTEX_STARTED,
                   VertexEventType.V_START,
@@ -541,6 +554,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               VertexState.ERROR,
               VertexState.ERROR,
               EnumSet.of(VertexEventType.V_INIT,
+                  VertexEventType.V_ROOT_INPUT_FAILED,
                   VertexEventType.V_SOURCE_VERTEX_STARTED,
                   VertexEventType.V_START,
                   VertexEventType.V_ROUTE_EVENT,
@@ -1654,7 +1668,18 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       }
       else if (vertex.terminationCause == VertexTerminationCause.AM_USERCODE_FAILURE) {
         vertex.setFinishTime();
-        String diagnosticMsg = "Vertex failed/killed due to VertexManager failed. "
+        String diagnosticMsg = "Vertex failed/killed due to VertexManagerPlugin/EdgeManagerPlugin failed. "
+            + "failedTasks:"
+            + vertex.failedTaskCount
+            + " killedTasks:"
+            + vertex.killedTaskCount;
+        LOG.info(diagnosticMsg);
+        vertex.abortVertex(State.FAILED);
+        return vertex.finished(VertexState.FAILED);
+      }
+      else if (vertex.terminationCause == VertexTerminationCause.ROOT_INPUT_INIT_FAILURE) {
+        vertex.setFinishTime();
+        String diagnosticMsg = "Vertex failed/killed due to ROOT_INPUT_INIT_FAILURE failed. "
             + "failedTasks:"
             + vertex.failedTaskCount
             + " killedTasks:"
@@ -1665,7 +1690,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       }
       else {
         //should never occur
-        throw new TezUncheckedException("All tasks complete, but cannot determine final state of vertex"
+        throw new TezUncheckedException("All tasks complete, but cannot determine final state of vertex:" + vertex.logIdentifier
             + ", failedTaskCount=" + vertex.failedTaskCount
             + ", killedTaskCount=" + vertex.killedTaskCount
             + ", successfulTaskCount=" + vertex.succeededTaskCount
@@ -3202,19 +3227,26 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   }
 
   private static class RootInputInitFailedTransition implements
-      SingleArcTransition<VertexImpl, VertexEvent> {
+      MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
 
     @Override
-    public void transition(VertexImpl vertex, VertexEvent event) {
+    public VertexState transition(VertexImpl vertex, VertexEvent event) {
       VertexEventRootInputFailed fe = (VertexEventRootInputFailed) event;
       String msg = "Vertex Input: " + fe.getInputName()
           + " initializer failed, vertex=" + vertex.getLogIdentifier();
-      if (fe.getError() != null) {
-        msg = msg + ExceptionUtils.getStackTrace(fe.getError());
+      LOG.error(msg, fe.getError());
+      if (vertex.getState() == VertexState.RUNNING) {
+        vertex.addDiagnostic(msg
+              + ", " + ExceptionUtils.getStackTrace(fe.getError().getCause()));
+        vertex.tryEnactKill(VertexTerminationCause.ROOT_INPUT_INIT_FAILURE,
+            TaskTerminationCause.AM_USERCODE_FAILURE);
+        return VertexState.TERMINATING;
+      } else {
+        vertex.finished(VertexState.FAILED,
+            VertexTerminationCause.ROOT_INPUT_INIT_FAILURE, msg
+              + ", " + ExceptionUtils.getStackTrace(fe.getError().getCause()));
+        return VertexState.FAILED;
       }
-      LOG.error(msg);
-      vertex.finished(VertexState.FAILED,
-          VertexTerminationCause.ROOT_INPUT_INIT_FAILURE, msg);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/4ec29425/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index fdf0e07..ef2c7bd 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -125,6 +125,7 @@ import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted;
 import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule;
 import org.apache.tez.dag.app.dag.event.VertexEventTermination;
 import org.apache.tez.dag.app.dag.event.VertexEventType;
+import org.apache.tez.dag.app.dag.impl.AMUserCodeException.Source;
 import org.apache.tez.dag.app.dag.impl.DAGImpl.VertexGroupInfo;
 import org.apache.tez.dag.app.dag.impl.TestVertexImpl.VertexManagerWithException.VMExceptionLocation;
 import org.apache.tez.dag.app.rm.TaskSchedulerEventHandler;
@@ -4647,7 +4648,8 @@ public class TestVertexImpl {
       super.runInputInitializers(inputs);
       eventHandler.handle(new VertexEventRootInputFailed(vertexID, inputs
           .get(0).getName(),
-          new RuntimeException("MockInitializerFailed")));
+          new AMUserCodeException(Source.InputInitializer,
+              new RuntimeException("MockInitializerFailed"))));
       dispatcher.await();
     }
 
@@ -4969,9 +4971,164 @@ public class TestVertexImpl {
     initVertex(v1);
     String diagnostics = StringUtils.join(v1.getDiagnostics(), ",");
     assertTrue(diagnostics.contains(IIExceptionLocation.Initialize.name()));
+    Assert.assertEquals(VertexState.FAILED, v1.getState());
     Assert.assertEquals(VertexTerminationCause.ROOT_INPUT_INIT_FAILURE, v1.getTerminationCause());
   }
   
+  @SuppressWarnings("unchecked")
+  @Test(timeout = 5000)
+  public void testExceptionFromII_InitFailedAfterInitialized() throws AMUserCodeException {
+    useCustomInitializer = true;
+    customInitializer = new EventHandlingRootInputInitializer(null, IIExceptionLocation.Initialize2);
+    EventHandlingRootInputInitializer initializer =
+        (EventHandlingRootInputInitializer) customInitializer;
+    setupPreDagCreation();
+    dagPlan = createDAGPlanWithIIException();
+    setupPostDagCreation();
+
+    VertexImplWithRunningInputInitializer v1 =
+        (VertexImplWithRunningInputInitializer) vertices.get("vertex1");
+    // INIT_SUCCEEDED followed by INIT_FAILURE
+    initVertex(v1);
+    dispatcher.getEventHandler().handle(new VertexEventRootInputInitialized(
+        v1.getVertexId(), "input1", null));
+    dispatcher.await();
+
+    String diagnostics = StringUtils.join(v1.getDiagnostics(), ",");
+    assertTrue(diagnostics.contains(IIExceptionLocation.Initialize2.name()));
+    Assert.assertEquals(VertexState.FAILED, v1.getState());
+    Assert.assertEquals(VertexTerminationCause.ROOT_INPUT_INIT_FAILURE, v1.getTerminationCause());
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test(timeout = 5000)
+  public void testExceptionFromII_InitFailedAfterRunning() throws AMUserCodeException {
+    useCustomInitializer = true;
+    customInitializer = new EventHandlingRootInputInitializer(null, IIExceptionLocation.Initialize2);
+    EventHandlingRootInputInitializer initializer =
+        (EventHandlingRootInputInitializer) customInitializer;
+    setupPreDagCreation();
+    dagPlan = createDAGPlanWithIIException();
+    setupPostDagCreation();
+
+    VertexImplWithRunningInputInitializer v1 =
+        (VertexImplWithRunningInputInitializer) vertices.get("vertex1");
+    initVertex(v1);
+    dispatcher.getEventHandler().handle(new VertexEventRootInputInitialized(
+        v1.getVertexId(), "input1", null));
+    dispatcher.getEventHandler().handle(new VertexEvent(v1.getVertexId(),
+        VertexEventType.V_START));
+    dispatcher.await();
+
+    String diagnostics = StringUtils.join(v1.getDiagnostics(), ",");
+    assertTrue(diagnostics.contains(IIExceptionLocation.Initialize2.name()));
+    Assert.assertEquals(VertexState.FAILED, v1.getState());
+    Assert.assertEquals(VertexTerminationCause.ROOT_INPUT_INIT_FAILURE, v1.getTerminationCause());
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test(timeout = 5000)
+  public void testExceptionFromII_HandleInputInitializerEvent() throws AMUserCodeException, InterruptedException {
+    useCustomInitializer = true;
+    customInitializer = new EventHandlingRootInputInitializer(null, IIExceptionLocation.HandleInputInitializerEvent);
+    EventHandlingRootInputInitializer initializer =
+        (EventHandlingRootInputInitializer) customInitializer;
+    setupPreDagCreation();
+    dagPlan = createDAGPlanWithRunningInitializer();
+    setupPostDagCreation();
+
+    VertexImplWithRunningInputInitializer v1 =
+        (VertexImplWithRunningInputInitializer) vertices.get("vertex1");
+    VertexImplWithRunningInputInitializer v2 =
+        (VertexImplWithRunningInputInitializer) vertices.get("vertex2");
+
+    initVertex(v1);
+    startVertex(v1);
+    Assert.assertEquals(VertexState.RUNNING, v1.getState());
+    Assert.assertEquals(VertexState.INITIALIZING, v2.getState());
+    dispatcher.await();
+
+    // Wait for the initializer to be invoked - which may be a separate thread.
+    while (!initializer.initStarted.get()) {
+      Thread.sleep(10);
+    }
+    Assert.assertFalse(initializer.eventReceived.get());
+    Assert.assertFalse(initializer.initComplete.get());
+
+    // Signal the initializer by sending an event - via vertex1
+    InputInitializerEvent event = InputInitializerEvent.create("vertex2", "input1", null);
+    // Create taskId and taskAttemptId for the single task that exists in vertex1
+    TezTaskID t0_v1 = TezTaskID.getInstance(v1.getVertexId(), 0);
+    TezTaskAttemptID ta0_t0_v1 = TezTaskAttemptID.getInstance(t0_v1, 0);
+    TezEvent tezEvent = new TezEvent(event,
+        new EventMetaData(EventProducerConsumerType.OUTPUT, "vertex1", "vertex2", ta0_t0_v1));
+
+    // at least one task attempt is succeed, otherwise input initialize events won't been handled.
+    dispatcher.getEventHandler().handle(new TaskEvent(t0_v1, TaskEventType.T_ATTEMPT_LAUNCHED));
+    dispatcher.getEventHandler().handle(new TaskEventTAUpdate(ta0_t0_v1, TaskEventType.T_ATTEMPT_SUCCEEDED));
+    dispatcher.getEventHandler()
+        .handle(new VertexEventRouteEvent(v1.getVertexId(), Collections.singletonList(tezEvent)));
+    dispatcher.await();
+
+    // it would cause v2 fail as its II throw exception in handleInputInitializerEvent
+    String diagnostics = StringUtils.join(v2.getDiagnostics(), ",");
+    assertTrue(diagnostics.contains(IIExceptionLocation.HandleInputInitializerEvent.name()));
+    Assert.assertEquals(VertexState.FAILED, v2.getState());
+    Assert.assertEquals(VertexTerminationCause.ROOT_INPUT_INIT_FAILURE, v2.getTerminationCause());
+  }
+
+  @Test(timeout = 5000)
+  public void testExceptionFromII_OnVertexStateUpdated() throws AMUserCodeException, InterruptedException {
+    useCustomInitializer = true;
+    customInitializer = new EventHandlingRootInputInitializer(null, IIExceptionLocation.OnVertexStateUpdated);
+    EventHandlingRootInputInitializer initializer =
+        (EventHandlingRootInputInitializer) customInitializer;
+    setupPreDagCreation();
+    dagPlan = createDAGPlanWithRunningInitializer();
+    setupPostDagCreation();
+
+    VertexImplWithRunningInputInitializer v1 =
+        (VertexImplWithRunningInputInitializer) vertices.get("vertex1");
+    VertexImplWithRunningInputInitializer v2 =
+        (VertexImplWithRunningInputInitializer) vertices.get("vertex2");
+
+    initVertex(v1);
+    startVertex(v1);  // v2 would get the state update from v1
+    Assert.assertEquals(VertexState.RUNNING, v1.getState());
+    Assert.assertEquals(VertexState.FAILED, v2.getState());
+    String diagnostics = StringUtils.join(v2.getDiagnostics(), ",");
+    assertTrue(diagnostics.contains(IIExceptionLocation.OnVertexStateUpdated.name()));
+    Assert.assertEquals(VertexTerminationCause.ROOT_INPUT_INIT_FAILURE, v2.getTerminationCause());
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test(timeout = 5000)
+  public void testExceptionFromII_InitSucceededAfterInitFailure() throws AMUserCodeException, InterruptedException {
+    useCustomInitializer = true;
+    customInitializer = new EventHandlingRootInputInitializer(null, IIExceptionLocation.OnVertexStateUpdated);
+    EventHandlingRootInputInitializer initializer =
+        (EventHandlingRootInputInitializer) customInitializer;
+    setupPreDagCreation();
+    dagPlan = createDAGPlanWithRunningInitializer();
+    setupPostDagCreation();
+
+    VertexImplWithRunningInputInitializer v1 =
+        (VertexImplWithRunningInputInitializer) vertices.get("vertex1");
+    VertexImplWithRunningInputInitializer v2 =
+        (VertexImplWithRunningInputInitializer) vertices.get("vertex2");
+
+    initVertex(v1);
+    startVertex(v1);  // v2 would get the state update from v1
+    // it should be OK receive INIT_SUCCEEDED event after INIT_FAILED event
+    dispatcher.getEventHandler().handle(new VertexEventRootInputInitialized(
+        v2.getVertexId(), "input1", null));
+
+    Assert.assertEquals(VertexState.RUNNING, v1.getState());
+    Assert.assertEquals(VertexState.FAILED, v2.getState());
+    String diagnostics = StringUtils.join(v2.getDiagnostics(), ",");
+    assertTrue(diagnostics.contains(IIExceptionLocation.OnVertexStateUpdated.name()));
+    Assert.assertEquals(VertexTerminationCause.ROOT_INPUT_INIT_FAILURE, v2.getTerminationCause());
+  }
 
   @InterfaceAudience.Private
   public static class RootInputSpecUpdaterVertexManager extends VertexManagerPlugin {
@@ -5119,6 +5276,9 @@ public class TestVertexImpl {
   
   public static enum IIExceptionLocation {
     Initialize,
+    Initialize2, // for test case that InputInitFailed after InputInitSucceeded
+    HandleInputInitializerEvent,
+    OnVertexStateUpdated
   }
 
   @InterfaceAudience.Private
@@ -5154,6 +5314,16 @@ public class TestVertexImpl {
       if (exLocation == IIExceptionLocation.Initialize) {
         throw new Exception(exLocation.name());
       }
+      if (exLocation == IIExceptionLocation.Initialize2) {
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException e) {
+          // InputInitializerManager is been shutdown if Initialized succeeded,
+          // catch the exception and throw the exception to simulate the case that 
+          // init failure after init succeeded
+          throw new Exception(exLocation.name());
+        }
+      }
       context.registerForVertexStateUpdates("vertex1", null);
       initStarted.set(true);
       lock.lock();
@@ -5175,6 +5345,9 @@ public class TestVertexImpl {
     @Override
     public void handleInputInitializerEvent(List<InputInitializerEvent> events) throws
         Exception {
+      if (exLocation == IIExceptionLocation.HandleInputInitializerEvent) {
+        throw new Exception(exLocation.name());
+      }
       initializerEvents.addAll(events);
       if (initializerEvents.size() == numExpectedEvents) {
         eventReceived.set(true);
@@ -5197,6 +5370,9 @@ public class TestVertexImpl {
     }
 
     public void onVertexStateUpdated(VertexStateUpdate stateUpdate) {
+      if (exLocation == IIExceptionLocation.OnVertexStateUpdated) {
+        throw new RuntimeException(exLocation.name());
+      }
       stateUpdates.add(stateUpdate);
     }
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/4ec29425/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java b/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
index eef6ab3..0175d7b 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
@@ -48,6 +48,7 @@ import org.apache.tez.dag.api.Edge;
 import org.apache.tez.dag.api.EdgeManagerPluginContext;
 import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
 import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.InputInitializerDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
@@ -62,9 +63,10 @@ import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
 import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.dag.app.dag.impl.OneToOneEdgeManager;
 import org.apache.tez.dag.app.dag.impl.RootInputVertexManager;
-import org.apache.tez.dag.app.dag.impl.ScatterGatherEdgeManager;
-import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
+import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager;
 import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
 import org.apache.tez.runtime.api.AbstractLogicalInput;
 import org.apache.tez.runtime.api.AbstractLogicalOutput;
@@ -84,8 +86,8 @@ import org.apache.tez.runtime.api.events.InputInitializerEvent;
 import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
-import org.apache.tez.runtime.library.processor.SleepProcessor;
-import org.apache.tez.runtime.library.processor.SleepProcessor.SleepProcessorConfig;
+import org.apache.tez.test.TestAMRecovery.DoNothingProcessor;
+import org.apache.tez.test.dag.MultiAttemptDAG.NoOpInput;
 import org.junit.Test;
 
 import com.google.common.collect.Lists;
@@ -210,7 +212,7 @@ public class TestExceptionPropagation {
    * @throws Exception
    * 
    */
-  @Test(timeout = 120000)
+  @Test(timeout = 180000)
   public void testExceptionPropagationSession() throws Exception {
     try {
       startSessionClient();
@@ -309,6 +311,10 @@ public class TestExceptionPropagation {
     EM_RouteInputErrorEventToSource,
     // Not Supported yet
     // EM_RouteInputSourceTaskFailedEventToDestination,
+
+    // II
+    II_Initialize, II_HandleInputInitializerEvents, II_OnVertexStateUpdated
+
   }
 
   /**
@@ -333,22 +339,31 @@ public class TestExceptionPropagation {
     v1.setVertexManagerPlugin(RootInputVertexManagerWithException.getVMDesc(payload));
 
     Vertex v2 = 
-        Vertex.create("v2", 
-            ProcessorDescriptor.create(SleepProcessor.class.getName())
-              .setUserPayload(new SleepProcessorConfig(3).toUserPayload())
-            , 1);
-    v2.setVertexManagerPlugin(ShuffleVertexManagerWithException.getVMDesc(exLocation));
+        Vertex.create("v2", DoNothingProcessor.getProcDesc(), 1);
+    v2.addDataSource("input2",
+        DataSourceDescriptor.create(InputDescriptor.create(NoOpInput.class.getName()),
+          InputInitializerWithException2.getIIDesc(payload), null));
 
     dag.addVertex(v1)
-      .addVertex(v2)
-      .addEdge(Edge.create(v1, v2, EdgeProperty.create(
-        EdgeManagerPluginDescriptor.create(CustomEdgeManager.class.getName())
-          .setUserPayload(payload),
-        DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
-        OutputWithException.getOutputDesc(payload), InputWithException.getInputDesc(payload))));
+      .addVertex(v2);
+    if (exLocation.name().startsWith("EM_")) {
+      dag.addEdge(Edge.create(v1, v2, EdgeProperty.create(
+          EdgeManagerPluginDescriptor.create(CustomEdgeManager.class.getName())
+            .setUserPayload(payload),
+          DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+          OutputWithException.getOutputDesc(payload), InputWithException.getInputDesc(payload))));
+    } else {
+      // set Customized VertexManager here, it can't been used for CustomEdge
+      v2.setVertexManagerPlugin(InputReadyVertexManagerWithException.getVMDesc(exLocation));
+      dag.addEdge(Edge.create(v1, v2, EdgeProperty.create(DataMovementType.ONE_TO_ONE,
+          DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+          OutputWithException.getOutputDesc(payload), InputWithException.getInputDesc(payload))));
+    }
+
     return dag;
   }
 
+  // InputInitializer of vertex1
   public static class InputInitializerWithException extends InputInitializer {
 
     private ExceptionLocation exLocation;
@@ -380,7 +395,65 @@ public class TestExceptionPropagation {
     }
   }
 
-  // input of vertex2
+  // InputInitializer of vertex2
+  public static class InputInitializerWithException2 extends InputInitializer {
+
+    private ExceptionLocation exLocation;
+    private Object condition = new Object();
+
+    public InputInitializerWithException2(
+        InputInitializerContext initializerContext) {
+      super(initializerContext);
+      this.exLocation =
+          ExceptionLocation.valueOf(new String(getContext().getUserPayload()
+              .deepCopyAsArray()));
+    }
+
+    @Override
+    public List<Event> initialize() throws Exception {
+      if (exLocation == ExceptionLocation.II_Initialize) {
+        throw new Exception(exLocation.name());
+      }
+      if (exLocation == ExceptionLocation.II_OnVertexStateUpdated) {
+        getContext().registerForVertexStateUpdates("v1", null);
+      }
+
+      if (exLocation == ExceptionLocation.II_HandleInputInitializerEvents
+          || exLocation == ExceptionLocation.II_OnVertexStateUpdated) {
+        // wait for handleInputInitializerEvent() and onVertexStateUpdated() is invoked
+        synchronized (condition) {
+          condition.wait();
+        }
+      }
+
+      return null;
+    }
+
+    @Override
+    public void handleInputInitializerEvent(List<InputInitializerEvent> events)
+        throws Exception {
+      if (exLocation == ExceptionLocation.II_HandleInputInitializerEvents) {
+        throw new RuntimeException(exLocation.name());
+      }
+    }
+
+    @Override
+    public void onVertexStateUpdated(VertexStateUpdate stateUpdate)
+        throws Exception {
+      if (exLocation == ExceptionLocation.II_OnVertexStateUpdated) {
+        throw new Exception(exLocation.name());
+      }
+      super.onVertexStateUpdated(stateUpdate);
+    }
+
+    public static InputInitializerDescriptor getIIDesc(UserPayload payload) {
+      return InputInitializerDescriptor.create(
+          InputInitializerWithException2.class.getName())
+          .setUserPayload(payload);
+    }
+  }
+
+  // Input of vertex2
   public static class InputWithException extends AbstractLogicalInput {
 
     private ExceptionLocation exLocation;
@@ -436,10 +509,12 @@ public class TestExceptionPropagation {
       getContext().requestInitialMemory(0l, null); // mandatory call
       if (this.exLocation == ExceptionLocation.INPUT_INITIALIZE) {
         throw new Exception(this.exLocation.name());
-      } else if (this.exLocation == ExceptionLocation.EM_RouteInputErrorEventToSource
-          || this.exLocation == ExceptionLocation.EM_GetNumDestinationConsumerTasks) {
-        Event errorEvent = InputReadErrorEvent.create("read error", 0, 0);
-        return Lists.newArrayList(errorEvent);
+      } else if ( getContext().getSourceVertexName().equals("v1")) {
+        if (this.exLocation == ExceptionLocation.EM_RouteInputErrorEventToSource
+            || this.exLocation == ExceptionLocation.EM_GetNumDestinationConsumerTasks) {
+          Event errorEvent = InputReadErrorEvent.create("read error", 0, 0);
+          return Lists.newArrayList(errorEvent);
+        }
       }
       return null;
     }
@@ -450,7 +525,7 @@ public class TestExceptionPropagation {
     }
   }
 
-  // output of vertex1
+  // Output of vertex1
   public static class OutputWithException extends AbstractLogicalOutput {
 
     private ExceptionLocation exLocation;
@@ -497,8 +572,12 @@ public class TestExceptionPropagation {
         List<Event> events = new ArrayList<Event>();
         events.add(DataMovementEvent.create(0, ByteBuffer.wrap(new byte[0])));
         return events;
-      }
-      else {
+      } else if (this.exLocation == ExceptionLocation.II_HandleInputInitializerEvents) {
+        // send InputInitliazer to InputInitializer of v2
+        List<Event> events = new ArrayList<Event>();
+        events.add(InputInitializerEvent.create("v2", "input2", ByteBuffer.wrap(new byte[0])));
+        return events;
+      } else {
         return null;
       }
     }
@@ -576,6 +655,7 @@ public class TestExceptionPropagation {
     }
   }
 
+  // VertexManager of vertex1
   public static class RootInputVertexManagerWithException extends RootInputVertexManager {
 
     private ExceptionLocation exLocation;
@@ -618,12 +698,13 @@ public class TestExceptionPropagation {
     }
   }
 
-  public static class ShuffleVertexManagerWithException extends ShuffleVertexManager {
+  // VertexManager of vertex2
+  public static class InputReadyVertexManagerWithException extends InputReadyVertexManager {
 
     private ExceptionLocation exLocation;
     private static final String Test_ExceptionLocation = "Test.ExceptionLocation";
 
-    public ShuffleVertexManagerWithException(VertexManagerPluginContext context) {
+    public InputReadyVertexManagerWithException(VertexManagerPluginContext context) {
       super(context);
     }
 
@@ -666,12 +747,13 @@ public class TestExceptionPropagation {
       Configuration conf = new Configuration();
       conf.set(Test_ExceptionLocation, exLocation.name());
       UserPayload payload = TezUtils.createUserPayloadFromConf(conf);
-      return VertexManagerPluginDescriptor.create(ShuffleVertexManagerWithException.class.getName())
+      return VertexManagerPluginDescriptor.create(InputReadyVertexManagerWithException.class.getName())
               .setUserPayload(payload);
     }
   }
 
-  public static class CustomEdgeManager extends ScatterGatherEdgeManager {
+  // EdgeManager for edge linking vertex1 and vertex2
+  public static class CustomEdgeManager extends OneToOneEdgeManager {
 
     private ExceptionLocation exLocation;