You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by sr...@apache.org on 2016/02/18 10:58:15 UTC

[21/24] tez git commit: TEZ-3029. Add an onError method to service plugin contexts. (sseth)

http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
index 28670ff..fd56495 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
@@ -84,6 +84,7 @@ import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDAGFinished;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
 import org.apache.tez.dag.app.dag.event.DAGEvent;
 import org.apache.tez.dag.app.dag.event.DAGEventStartDag;
+import org.apache.tez.dag.app.dag.event.DAGEventTerminateDag;
 import org.apache.tez.dag.app.dag.event.DAGEventType;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
@@ -711,11 +712,11 @@ public class TestCommit {
         TaskState.SUCCEEDED));
     Assert.assertEquals(VertexState.COMMITTING, v1.getState());
     // kill dag which will trigger the vertex killed event
-    dag.handle(new DAGEvent(dag.getID(), DAGEventType.DAG_KILL));
+    dag.handle(new DAGEventTerminateDag(dag.getID(), DAGTerminationCause.DAG_KILL, null));
     dispatcher.await();
     Assert.assertEquals(VertexState.KILLED, v1.getState());
     Assert.assertTrue(v1.commitFutures.isEmpty());
-    Assert.assertEquals(VertexTerminationCause.DAG_KILL,
+    Assert.assertEquals(VertexTerminationCause.DAG_TERMINATED,
         v1.getTerminationCause());
     Assert.assertEquals(DAGState.KILLED, dag.getState());
     Assert
@@ -1514,10 +1515,20 @@ public class TestCommit {
     // Assert.assertEquals(0, v3OutputCommitter.abortCounter);
   }
 
-  // Kill dag while it is in COMMITTING in the case of
-  // TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is true
+
   @Test(timeout = 5000)
   public void testDAGKilledWhileCommitting1_OnDAGSuccess() throws Exception {
+    _testDAGTerminatedWhileCommitting1_OnDAGSuccess(DAGTerminationCause.DAG_KILL);
+  }
+
+  @Test(timeout = 5000)
+  public void testServiceErrorWhileCommitting1_OnDAGSuccess() throws Exception {
+    _testDAGTerminatedWhileCommitting1_OnDAGSuccess(DAGTerminationCause.SERVICE_PLUGIN_ERROR);
+  }
+
+  // Kill dag while it is in COMMITTING in the case of
+  // TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is true
+  private void _testDAGTerminatedWhileCommitting1_OnDAGSuccess(DAGTerminationCause terminationCause) throws Exception {
     conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
         true);
     setupDAG(createDAGPlan(true, true));
@@ -1534,14 +1545,14 @@ public class TestCommit {
     v3.handle(new VertexEventTaskCompleted(v3.getTask(0).getTaskId(),
         TaskState.SUCCEEDED));
     waitUntil(dag, DAGState.COMMITTING);
-    dag.handle(new DAGEvent(dag.getID(), DAGEventType.DAG_KILL));
-    waitUntil(dag, DAGState.KILLED);
+    dag.handle(new DAGEventTerminateDag(dag.getID(), terminationCause, null));
+    waitUntil(dag, terminationCause.getFinishedState());
 
     Assert.assertEquals(VertexState.SUCCEEDED, v1.getState());
     Assert.assertEquals(VertexState.SUCCEEDED, v2.getState());
     Assert.assertEquals(VertexState.SUCCEEDED, v3.getState());
     Assert
-        .assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause());
+        .assertEquals(terminationCause, dag.getTerminationCause());
     Assert.assertTrue(dag.commitFutures.isEmpty());
     historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 0);
     historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
@@ -1569,10 +1580,20 @@ public class TestCommit {
     Assert.assertEquals(1, v3OutputCommitter.abortCounter);
   }
 
-  // Kill dag while it is in COMMITTING in the case of
-  // TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is false
+
   @Test(timeout = 5000)
   public void testDAGKilledWhileCommitting1_OnVertexSuccess() throws Exception {
+    _testDAGTerminatedWhileCommitting1_OnVertexSuccess(DAGTerminationCause.DAG_KILL);
+  }
+
+  @Test(timeout = 5000)
+  public void testServiceErrorWhileCommitting1_OnVertexSuccess() throws Exception {
+    _testDAGTerminatedWhileCommitting1_OnVertexSuccess(DAGTerminationCause.SERVICE_PLUGIN_ERROR);
+  }
+
+  // Kill dag while it is in COMMITTING in the case of
+  // TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is false
+  private void _testDAGTerminatedWhileCommitting1_OnVertexSuccess(DAGTerminationCause terminationCause) throws Exception {
     conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
         false);
     setupDAG(createDAGPlan(true, true));
@@ -1596,15 +1617,15 @@ public class TestCommit {
     v3OutputCommitter.unblockCommit();
     // dag go to COMMITTING due to the pending commit of v12Out
     waitUntil(dag, DAGState.COMMITTING);
-    dag.handle(new DAGEvent(dag.getID(), DAGEventType.DAG_KILL));
-    waitUntil(dag, DAGState.KILLED);
+    dag.handle(new DAGEventTerminateDag(dag.getID(), terminationCause, null));
+    waitUntil(dag, terminationCause.getFinishedState());
 
     Assert.assertEquals(VertexState.SUCCEEDED, v1.getState());
     Assert.assertEquals(VertexState.SUCCEEDED, v2.getState());
     Assert.assertEquals(VertexState.SUCCEEDED, v3.getState());
-    Assert.assertEquals(DAGState.KILLED, dag.getState());
+    Assert.assertEquals(terminationCause.getFinishedState(), dag.getState());
     Assert
-        .assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause());
+        .assertEquals(terminationCause, dag.getTerminationCause());
     Assert.assertTrue(dag.commitFutures.isEmpty());
     historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 1);
     historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
@@ -1631,9 +1652,18 @@ public class TestCommit {
     Assert.assertEquals(1, v3OutputCommitter.abortCounter);
   }
 
-  // DAG killed while dag is still in RUNNING and vertex is in COMMITTING
   @Test(timeout = 5000)
   public void testDAGKilledWhileRunning_OnVertexSuccess() throws Exception {
+    _testDAGKilledWhileRunning_OnVertexSuccess(DAGTerminationCause.DAG_KILL);
+  }
+
+  @Test(timeout = 5000)
+  public void testServiceErrorWhileRunning_OnVertexSuccess() throws Exception {
+    _testDAGKilledWhileRunning_OnVertexSuccess(DAGTerminationCause.SERVICE_PLUGIN_ERROR);
+  }
+
+  // DAG killed while dag is still in RUNNING and vertex is in COMMITTING
+  private void _testDAGKilledWhileRunning_OnVertexSuccess(DAGTerminationCause terminationCause) throws Exception {
     conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
         false);
     setupDAG(createDAGPlan(true, true));
@@ -1652,17 +1682,17 @@ public class TestCommit {
     Assert.assertEquals(VertexState.COMMITTING, v3.getState());
     // dag is still in RUNNING because v3 has not completed
     Assert.assertEquals(DAGState.RUNNING, dag.getState());
-    dag.handle(new DAGEvent(dag.getID(), DAGEventType.DAG_KILL));
-    waitUntil(dag, DAGState.KILLED);
+    dag.handle(new DAGEventTerminateDag(dag.getID(), terminationCause, null));
+    waitUntil(dag, terminationCause.getFinishedState());
 
     Assert.assertEquals(VertexState.SUCCEEDED, v1.getState());
     Assert.assertEquals(VertexState.SUCCEEDED, v2.getState());
     Assert.assertEquals(VertexState.KILLED, v3.getState());
-    Assert.assertEquals(VertexTerminationCause.DAG_KILL, v3.getTerminationCause());
+    Assert.assertEquals(VertexTerminationCause.DAG_TERMINATED, v3.getTerminationCause());
     Assert.assertTrue(v3.commitFutures.isEmpty());
-    Assert.assertEquals(DAGState.KILLED, dag.getState());
+    Assert.assertEquals(terminationCause.getFinishedState(), dag.getState());
     Assert
-        .assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause());
+        .assertEquals(terminationCause, dag.getTerminationCause());
     Assert.assertTrue(dag.commitFutures.isEmpty());
     // commit uv12 may not have started, so can't verify the VertexGroupCommitStartedEvent
     historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
@@ -1903,10 +1933,19 @@ public class TestCommit {
     Assert.assertEquals(1, v3OutputCommitter.abortCounter);
   }
 
-  // test commit will be canceled no matter it is started or still in the threadpool
-  // ControlledThreadPoolExecutor is used for to not schedule the commits
   @Test(timeout = 5000)
   public void testCommitCanceled_OnDAGSuccess() throws Exception {
+    _testCommitCanceled_OnDAGSuccess(DAGTerminationCause.DAG_KILL);
+  }
+
+  @Test(timeout = 5000)
+  public void testCommitCanceled_OnDAGSuccess2() throws Exception {
+    _testCommitCanceled_OnDAGSuccess(DAGTerminationCause.SERVICE_PLUGIN_ERROR);
+  }
+
+  // test commit will be canceled no matter it is started or still in the threadpool
+  // ControlledThreadPoolExecutor is used for to not schedule the commits
+  private void _testCommitCanceled_OnDAGSuccess(DAGTerminationCause terminationCause) throws Exception {
     conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
         true);
     setupDAG(createDAGPlan(true, true));
@@ -1931,10 +1970,10 @@ public class TestCommit {
     // mean the commits have been submitted to ThreadPool
     Assert.assertEquals(2, dag.commitFutures.size());
 
-    dag.handle(new DAGEvent(dag.getID(), DAGEventType.DAG_KILL));
-    waitUntil(dag, DAGState.KILLED);
+    dag.handle(new DAGEventTerminateDag(dag.getID(), terminationCause, null));
+    waitUntil(dag, terminationCause.getFinishedState());
     
-    Assert.assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause());
+    Assert.assertEquals(terminationCause, dag.getTerminationCause());
     // mean the commits have been canceled
     Assert.assertTrue(dag.commitFutures.isEmpty());
     historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 0);

http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index 2158368..480e3cf 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -21,7 +21,6 @@ package org.apache.tez.dag.app.dag.impl;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -41,6 +40,7 @@ import java.util.concurrent.locks.ReentrantLock;
 import org.apache.commons.lang.StringUtils;
 import org.apache.tez.common.counters.Limits;
 import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.app.dag.event.DAGEventTerminateDag;
 import org.apache.tez.hadoop.shim.DefaultHadoopShim;
 import org.apache.tez.hadoop.shim.HadoopShim;
 import org.slf4j.Logger;
@@ -1641,8 +1641,7 @@ public class TestDAGImpl {
     startDAG(dag);
     dispatcher.await();
 
-    dispatcher.getEventHandler().handle(
-        new DAGEvent(dagId, DAGEventType.DAG_KILL));
+    dispatcher.getEventHandler().handle(new DAGEventTerminateDag(dagId, DAGTerminationCause.DAG_KILL, null));
     dispatcher.await();
 
     Assert.assertEquals(DAGState.KILLED, dag.getState());
@@ -1654,9 +1653,18 @@ public class TestDAGImpl {
 
   }
 
-  @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testKillRunningDAG() {
+    _testTerminateRunningDAG(DAGTerminationCause.DAG_KILL);
+  }
+
+  @Test(timeout = 5000)
+  public void testServiceErrorRunningDAG() {
+    _testTerminateRunningDAG(DAGTerminationCause.SERVICE_PLUGIN_ERROR);
+  }
+
+  @SuppressWarnings("unchecked")
+  private void _testTerminateRunningDAG(DAGTerminationCause terminationCause) {
     initDAG(dag);
     startDAG(dag);
     dispatcher.await();
@@ -1674,7 +1682,7 @@ public class TestDAGImpl {
     Assert.assertEquals(VertexState.SUCCEEDED, v0.getState());
     Assert.assertEquals(VertexState.RUNNING, v1.getState());
 
-    dispatcher.getEventHandler().handle(new DAGEvent(dagId, DAGEventType.DAG_KILL));
+    dispatcher.getEventHandler().handle(new DAGEventTerminateDag(dagId, terminationCause, null));
     dispatcher.await();
 
     Assert.assertEquals(DAGState.TERMINATING, dag.getState());
@@ -1817,7 +1825,7 @@ public class TestDAGImpl {
       dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
           TezVertexID.getInstance(dagId, 5), VertexState.FAILED));
     } else if (testState == DAGStatus.State.KILLED) {
-      dispatcher.getEventHandler().handle(new DAGEvent(dagId, DAGEventType.DAG_KILL));
+      dispatcher.getEventHandler().handle(new DAGEventTerminateDag(dagId, DAGTerminationCause.DAG_KILL, null));
     } else if (testState == DAGStatus.State.ERROR) {
       dispatcher.getEventHandler().handle(new DAGEventStartDag(dagId, new LinkedList<URL>()));
     } else {
@@ -1871,11 +1879,21 @@ public class TestDAGImpl {
     }
   }
 
+
+  @Test(timeout = 5000)
+  public void testDAGKill() {
+    _testDAGTerminate(DAGTerminationCause.DAG_KILL);
+  }
+
+  @Test(timeout = 5000)
+  public void testDAGServiceError() {
+    _testDAGTerminate(DAGTerminationCause.SERVICE_PLUGIN_ERROR);
+  }
+
   // Couple of vertices succeed. DAG_KILLED processed, which causes the rest of the vertices to be
   // marked as KILLED.
   @SuppressWarnings("unchecked")
-  @Test(timeout = 5000)
-  public void testDAGKill() {
+  private void _testDAGTerminate(DAGTerminationCause terminationCause) {
     initDAG(dag);
     startDAG(dag);
     dispatcher.await();
@@ -1887,10 +1905,10 @@ public class TestDAGImpl {
 
     dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
         TezVertexID.getInstance(dagId, 1), VertexState.SUCCEEDED));
-    dispatcher.getEventHandler().handle(new DAGEvent(dagId, DAGEventType.DAG_KILL));
+    dispatcher.getEventHandler().handle(new DAGEventTerminateDag(dagId, terminationCause, null));
     dispatcher.await();
-    Assert.assertEquals(DAGState.KILLED, dag.getState());
-    Assert.assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause());
+    Assert.assertEquals(terminationCause.getFinishedState(), dag.getState());
+    Assert.assertEquals(terminationCause, dag.getTerminationCause());
     Assert.assertEquals(2, dag.getSuccessfulVertices());
 
     int killedCount = 0;
@@ -1902,16 +1920,25 @@ public class TestDAGImpl {
     Assert.assertEquals(4, killedCount);
 
     for (Vertex v : dag.getVertices().values()) {
-      Assert.assertEquals(VertexTerminationCause.DAG_KILL, v.getTerminationCause());
+      Assert.assertEquals(VertexTerminationCause.DAG_TERMINATED, v.getTerminationCause());
     }
 
     Assert.assertEquals(1, dagFinishEventHandler.dagFinishEvents);
   }
 
+  @Test(timeout = 5000)
+  public void testDAGKillVertexSuccessAfterTerminated() {
+    _testDAGKillVertexSuccessAfterTerminated(DAGTerminationCause.DAG_KILL);
+  }
+
+  @Test(timeout = 5000)
+  public void testDAGServiceErrorVertexSuccessAfterTerminated() {
+    _testDAGKillVertexSuccessAfterTerminated(DAGTerminationCause.SERVICE_PLUGIN_ERROR);
+  }
+
   // Vertices succeed after a DAG kill has been processed. Should be ignored.
   @SuppressWarnings("unchecked")
-  @Test(timeout = 5000)
-  public void testDAGKillVertexSuccessAfterKill() {
+  private void _testDAGKillVertexSuccessAfterTerminated(DAGTerminationCause terminationCause) {
     initDAG(dag);
     startDAG(dag);
     dispatcher.await();
@@ -1923,10 +1950,10 @@ public class TestDAGImpl {
 
     dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
         TezVertexID.getInstance(dagId, 1), VertexState.SUCCEEDED));
-    dispatcher.getEventHandler().handle(new DAGEvent(dagId, DAGEventType.DAG_KILL));
+    dispatcher.getEventHandler().handle(new DAGEventTerminateDag(dagId, terminationCause, null));
     dispatcher.await();
 
-    Assert.assertEquals(DAGState.KILLED, dag.getState());
+    Assert.assertEquals(terminationCause.getFinishedState(), dag.getState());
 
     // Vertex SUCCESS gets processed after the DAG has reached the KILLED state. Should be ignored.
     for (int i = 2; i < 6; ++i) {
@@ -1943,18 +1970,27 @@ public class TestDAGImpl {
     }
     Assert.assertEquals(4, killedCount);
 
-    Assert.assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause());
+    Assert.assertEquals(terminationCause, dag.getTerminationCause());
     Assert.assertEquals(2, dag.getSuccessfulVertices());
     for (Vertex v : dag.getVertices().values()) {
-      Assert.assertEquals(VertexTerminationCause.DAG_KILL, v.getTerminationCause());
+      Assert.assertEquals(VertexTerminationCause.DAG_TERMINATED, v.getTerminationCause());
     }
     Assert.assertEquals(1, dagFinishEventHandler.dagFinishEvents);
   }
 
-  // Vertex KILLED after a DAG_KILLED is issued. Termination reason should be DAG_KILLED
-  @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testDAGKillPending() {
+    _testDAGKillPending(DAGTerminationCause.DAG_KILL);
+  }
+
+  @Test(timeout = 5000)
+  public void testDAGServiceErrorPending() {
+    _testDAGKillPending(DAGTerminationCause.SERVICE_PLUGIN_ERROR);
+  }
+
+  // Vertex KILLED after a DAG_KILLED is issued. Termination reason should be DAG_KILLED
+  @SuppressWarnings("unchecked")
+  private void _testDAGKillPending(DAGTerminationCause terminationCause) {
     initDAG(dag);
     startDAG(dag);
     dispatcher.await();
@@ -1972,17 +2008,17 @@ public class TestDAGImpl {
           TezVertexID.getInstance(dagId, i), VertexState.SUCCEEDED));
     }
     dispatcher.await();
-    dispatcher.getEventHandler().handle(new DAGEvent(dagId, DAGEventType.DAG_KILL));
+    dispatcher.getEventHandler().handle(new DAGEventTerminateDag(dagId, terminationCause, null));
     dispatcher.await();
-    Assert.assertEquals(DAGState.KILLED, dag.getState());
+    Assert.assertEquals(terminationCause.getFinishedState(), dag.getState());
 
     dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
         TezVertexID.getInstance(dagId, 5), VertexState.KILLED));
     dispatcher.await();
-    Assert.assertEquals(DAGState.KILLED, dag.getState());
+    Assert.assertEquals(terminationCause.getFinishedState(), dag.getState());
     Assert.assertEquals(5, dag.getSuccessfulVertices());
     Assert.assertEquals(dag.getVertex(TezVertexID.getInstance(dagId, 5)).getTerminationCause(),
-        VertexTerminationCause.DAG_KILL);
+        VertexTerminationCause.DAG_TERMINATED);
     Assert.assertEquals(1, dagFinishEventHandler.dagFinishEvents);
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 986f64d..659d099 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -143,7 +143,6 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventTerminationCauseEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.dag.event.TaskEvent;
 import org.apache.tez.dag.app.dag.event.TaskEventScheduleTask;
@@ -189,7 +188,6 @@ import org.apache.tez.runtime.api.events.InputDataInformationEvent;
 import org.apache.tez.runtime.api.events.InputFailedEvent;
 import org.apache.tez.runtime.api.events.InputInitializerEvent;
 import org.apache.tez.runtime.api.events.InputUpdatePayloadEvent;
-import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
 import org.apache.tez.test.EdgeManagerForTest;
 import org.apache.tez.test.VertexManagerPluginForTest;
@@ -206,7 +204,6 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
-import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 import org.mockito.internal.util.collections.Sets;
 
@@ -2515,10 +2512,10 @@ public class TestVertexImpl {
 
   private void killVertex(VertexImpl v) {
     dispatcher.getEventHandler().handle(
-        new VertexEventTermination(v.getVertexId(), VertexTerminationCause.DAG_KILL));
+        new VertexEventTermination(v.getVertexId(), VertexTerminationCause.DAG_TERMINATED));
     dispatcher.await();
     Assert.assertEquals(VertexState.KILLED, v.getState());
-    Assert.assertEquals(v.getTerminationCause(), VertexTerminationCause.DAG_KILL);
+    Assert.assertEquals(v.getTerminationCause(), VertexTerminationCause.DAG_TERMINATED);
   }
 
   private void startVertex(VertexImpl v,
@@ -3322,7 +3319,7 @@ public class TestVertexImpl {
         StringUtils.join(v3.getDiagnostics(), ",").toLowerCase(Locale.ENGLISH);
     assertTrue(diagnostics.contains(
         "vertex received kill while in running state"));
-    Assert.assertEquals(VertexTerminationCause.DAG_KILL, v3.getTerminationCause());
+    Assert.assertEquals(VertexTerminationCause.DAG_TERMINATED, v3.getTerminationCause());
     assertTrue(diagnostics.contains(v3.getTerminationCause().name().toLowerCase(Locale.ENGLISH)));
   }
 
@@ -3334,7 +3331,7 @@ public class TestVertexImpl {
     startVertex(v);
 
     dispatcher.getEventHandler().handle(
-        new VertexEventTermination(v.getVertexId(), VertexTerminationCause.DAG_KILL));
+        new VertexEventTermination(v.getVertexId(), VertexTerminationCause.DAG_TERMINATED));
     dispatcher.await();
     Assert.assertEquals(VertexState.KILLED, v.getState());
 
@@ -3359,7 +3356,7 @@ public class TestVertexImpl {
     startVertex(v);
 
     dispatcher.getEventHandler().handle(
-        new VertexEventTermination(v.getVertexId(), VertexTerminationCause.DAG_KILL));
+        new VertexEventTermination(v.getVertexId(), VertexTerminationCause.DAG_TERMINATED));
     dispatcher.await();
     Assert.assertEquals(VertexState.KILLED, v.getState());
 

http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherManager.java
index 1f75afb..b3568eb 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherManager.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherManager.java
@@ -39,6 +39,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
@@ -52,15 +53,21 @@ import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
+import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError;
+import org.apache.tez.dag.app.dag.event.DAGEventTerminateDag;
 import org.apache.tez.dag.app.rm.ContainerLauncherLaunchRequestEvent;
 import org.apache.tez.dag.app.rm.ContainerLauncherStopRequestEvent;
+import org.apache.tez.dag.helpers.DagInfoImplForTest;
+import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
 import org.apache.tez.serviceplugins.api.ContainerLauncher;
 import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
 import org.apache.tez.serviceplugins.api.ContainerStopRequest;
+import org.apache.tez.serviceplugins.api.ServicePluginErrorDefaults;
 import org.apache.tez.serviceplugins.api.ServicePluginException;
+import org.apache.tez.serviceplugins.api.TaskCommunicatorDescriptor;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -244,6 +251,75 @@ public class TestContainerLauncherManager {
 
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
+  public void testReportFailureFromContainerLauncher() throws ServicePluginException, TezException {
+    final String dagName = DAG_NAME;
+    final int dagIndex = DAG_INDEX;
+    TezDAGID dagId = TezDAGID.getInstance(ApplicationId.newInstance(0, 0), dagIndex);
+    DAG dag = mock(DAG.class);
+    doReturn(dagName).when(dag).getName();
+    doReturn(dagId).when(dag).getID();
+    EventHandler eventHandler = mock(EventHandler.class);
+    AppContext appContext = mock(AppContext.class);
+    doReturn(eventHandler).when(appContext).getEventHandler();
+    doReturn(dag).when(appContext).getCurrentDAG();
+    doReturn("testlauncher").when(appContext).getContainerLauncherName(0);
+
+    NamedEntityDescriptor<TaskCommunicatorDescriptor> taskCommDescriptor =
+        new NamedEntityDescriptor<>("testlauncher", ContainerLauncherForTest.class.getName());
+    List<NamedEntityDescriptor> list = new LinkedList<>();
+    list.add(taskCommDescriptor);
+    ContainerLauncherManager containerLauncherManager =
+        new ContainerLauncherManager(appContext, mock(TaskCommunicatorManagerInterface.class), "",
+            list, false);
+
+    try {
+      ContainerLaunchContext clc1 = mock(ContainerLaunchContext.class);
+      Container container1 = mock(Container.class);
+      ContainerLauncherLaunchRequestEvent launchRequestEvent =
+          new ContainerLauncherLaunchRequestEvent(clc1, container1, 0, 0, 0);
+
+
+      containerLauncherManager.handle(launchRequestEvent);
+
+      ArgumentCaptor<Event> argumentCaptor = ArgumentCaptor.forClass(Event.class);
+      verify(eventHandler, times(1)).handle(argumentCaptor.capture());
+
+      Event rawEvent = argumentCaptor.getValue();
+      assertTrue(rawEvent instanceof DAGAppMasterEventUserServiceFatalError);
+      DAGAppMasterEventUserServiceFatalError event =
+          (DAGAppMasterEventUserServiceFatalError) rawEvent;
+      assertEquals(DAGAppMasterEventType.CONTAINER_LAUNCHER_SERVICE_FATAL_ERROR, event.getType());
+      assertTrue(event.getDiagnosticInfo().contains("ReportedFatalError"));
+      assertTrue(
+          event.getDiagnosticInfo().contains(ServicePluginErrorDefaults.INCONSISTENT_STATE.name()));
+      assertTrue(event.getDiagnosticInfo().contains("[0:testlauncher]"));
+
+      reset(eventHandler);
+      // stop container
+
+      ContainerId containerId2 = mock(ContainerId.class);
+      NodeId nodeId2 = mock(NodeId.class);
+      ContainerLauncherStopRequestEvent stopRequestEvent =
+          new ContainerLauncherStopRequestEvent(containerId2, nodeId2, null, 0, 0, 0);
+
+      argumentCaptor = ArgumentCaptor.forClass(Event.class);
+
+      containerLauncherManager.handle(stopRequestEvent);
+      verify(eventHandler, times(1)).handle(argumentCaptor.capture());
+      rawEvent = argumentCaptor.getValue();
+      assertTrue(rawEvent instanceof DAGEventTerminateDag);
+      DAGEventTerminateDag killEvent = (DAGEventTerminateDag) rawEvent;
+      assertTrue(killEvent.getDiagnosticInfo().contains("ReportError"));
+      assertTrue(killEvent.getDiagnosticInfo()
+          .contains(ServicePluginErrorDefaults.SERVICE_UNAVAILABLE.name()));
+      assertTrue(killEvent.getDiagnosticInfo().contains("[0:testlauncher]"));
+    } finally {
+      containerLauncherManager.stop();
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test(timeout = 5000)
   public void testContainerLauncherUserError() throws ServicePluginException {
 
     ContainerLauncher containerLauncher = mock(ContainerLauncher.class);
@@ -256,7 +332,8 @@ public class TestContainerLauncherManager {
     Configuration conf = new Configuration(false);
 
     ContainerLauncherManager containerLauncherManager =
-        new ContainerLauncherManager(containerLauncher, appContext);
+        new ContainerLauncherManager(appContext);
+    containerLauncherManager.setContainerLauncher(containerLauncher);
     try {
       containerLauncherManager.init(conf);
       containerLauncherManager.start();
@@ -437,4 +514,26 @@ public class TestContainerLauncherManager {
     }
   }
 
+  private static final String DAG_NAME = "dagName";
+  private static final int DAG_INDEX = 1;
+  public static class ContainerLauncherForTest extends ContainerLauncher {
+
+    public ContainerLauncherForTest(
+        ContainerLauncherContext containerLauncherContext) {
+      super(containerLauncherContext);
+    }
+
+    @Override
+    public void launchContainer(ContainerLaunchRequest launchRequest) throws
+        ServicePluginException {
+      getContext().reportError(ServicePluginErrorDefaults.INCONSISTENT_STATE, "ReportedFatalError", null);
+    }
+
+    @Override
+    public void stopContainer(ContainerStopRequest stopRequest) throws ServicePluginException {
+      getContext()
+          .reportError(ServicePluginErrorDefaults.SERVICE_UNAVAILABLE, "ReportError", new DagInfoImplForTest(DAG_INDEX, DAG_NAME));
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
index f69d8be..a3e5ff5 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
@@ -33,6 +33,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -49,6 +50,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.commons.io.IOExceptionWithCause;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@@ -74,6 +76,7 @@ import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerContextDr
 import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerWithDrainableContext;
 import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.AlwaysMatchesContainerMatcher;
 import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.PreemptionMatcher;
+import org.apache.tez.serviceplugins.api.DagInfo;
 import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
 import org.apache.tez.serviceplugins.api.TaskSchedulerContext.AppFinalStatus;
 import org.junit.After;
@@ -503,10 +506,14 @@ public class TestTaskScheduler {
     drainableAppCallback.drain();
     verify(mockApp).nodesUpdated(mockUpdatedNodes);
 
-    Exception mockException = mock(Exception.class);
+    ArgumentCaptor<String> argumentCaptor = ArgumentCaptor.forClass(String.class);
+    Exception mockException = new IOException("mockexception");
     scheduler.onError(mockException);
     drainableAppCallback.drain();
-    verify(mockApp).onError(mockException);
+    verify(mockApp)
+        .reportError(eq(YarnTaskSchedulerServiceError.RESOURCEMANAGER_ERROR), argumentCaptor.capture(),
+            any(DagInfo.class));
+    assertTrue(argumentCaptor.getValue().contains("mockexception"));
 
     scheduler.onShutdownRequest();
     drainableAppCallback.drain();
@@ -1220,10 +1227,14 @@ public class TestTaskScheduler {
     drainableAppCallback.drain();
     verify(mockApp).nodesUpdated(mockUpdatedNodes);
 
-    Exception mockException = mock(Exception.class);
+
+    ArgumentCaptor<String> argumentCaptor = ArgumentCaptor.forClass(String.class);
+    Exception mockException = new IOException("mockexception");
     scheduler.onError(mockException);
     drainableAppCallback.drain();
-    verify(mockApp).onError(mockException);
+    verify(mockApp).reportError(eq(YarnTaskSchedulerServiceError.RESOURCEMANAGER_ERROR), argumentCaptor.capture(),
+            any(DagInfo.class));
+    assertTrue(argumentCaptor.getValue().contains("mockexception"));
 
     scheduler.onShutdownRequest();
     drainableAppCallback.drain();

http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
index b54d024..ab85751 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
@@ -24,6 +24,8 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collection;
@@ -68,6 +70,8 @@ import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
 import org.apache.tez.dag.app.rm.YarnTaskSchedulerService.CookieContainerRequest;
+import org.apache.tez.serviceplugins.api.DagInfo;
+import org.apache.tez.serviceplugins.api.ServicePluginError;
 import org.apache.tez.serviceplugins.api.TaskScheduler;
 import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
 
@@ -283,9 +287,10 @@ class TestTaskSchedulerHelpers {
     }
 
     @Override
-    public void onError(Throwable t) {
+    public void reportError(@Nonnull ServicePluginError servicePluginError, String message,
+                            DagInfo dagInfo) {
       invocations++;
-      real.onError(t);
+      real.reportError(servicePluginError, message, dagInfo);
     }
 
     @Override
@@ -327,6 +332,12 @@ class TestTaskSchedulerHelpers {
       return real.getApplicationAttemptId();
     }
 
+    @Nullable
+    @Override
+    public DagInfo getCurrentDagInfo() {
+      return real.getCurrentDagInfo();
+    }
+
     @Override
     public String getAppHostName() {
       return real.getAppHostName();

http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
index 4d828e2..791bb7f 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
@@ -28,11 +28,13 @@ import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
@@ -71,9 +73,11 @@ import org.apache.tez.dag.api.client.DAGClientServer;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ContainerContext;
 import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
+import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.dag.TaskAttempt;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError;
+import org.apache.tez.dag.app.dag.event.DAGEventTerminateDag;
 import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl;
 import org.apache.tez.dag.app.dag.impl.TaskImpl;
 import org.apache.tez.dag.app.dag.impl.VertexImpl;
@@ -84,16 +88,19 @@ import org.apache.tez.dag.app.rm.container.AMContainerEventType;
 import org.apache.tez.dag.app.rm.container.AMContainerMap;
 import org.apache.tez.dag.app.rm.container.AMContainerState;
 import org.apache.tez.dag.app.web.WebUIService;
+import org.apache.tez.dag.helpers.DagInfoImplForTest;
 import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.serviceplugins.api.ServicePluginErrorDefaults;
 import org.apache.tez.serviceplugins.api.ServicePluginException;
 import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
 import org.apache.tez.serviceplugins.api.TaskScheduler;
 import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
+import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -539,6 +546,81 @@ public class TestTaskSchedulerManager {
 
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
+  public void testReportFailureFromTaskScheduler() {
+    String dagName = DAG_NAME;
+    Configuration conf = new TezConfiguration();
+    String taskSchedulerName = "testTaskScheduler";
+    String expIdentifier = "[0:" + taskSchedulerName + "]";
+    EventHandler eventHandler = mock(EventHandler.class);
+    AppContext appContext = mock(AppContext.class, RETURNS_DEEP_STUBS);
+    doReturn(taskSchedulerName).when(appContext).getTaskSchedulerName(0);
+    doReturn(eventHandler).when(appContext).getEventHandler();
+    doReturn(conf).when(appContext).getAMConf();
+    InetSocketAddress address = new InetSocketAddress("host", 55000);
+
+    DAGClientServer dagClientServer = mock(DAGClientServer.class);
+    doReturn(address).when(dagClientServer).getBindAddress();
+
+    DAG dag = mock(DAG.class);
+    TezDAGID dagId = TezDAGID.getInstance(ApplicationId.newInstance(1, 0), DAG_INDEX);
+    doReturn(dagName).when(dag).getName();
+    doReturn(dagId).when(dag).getID();
+    doReturn(dag).when(appContext).getCurrentDAG();
+
+    NamedEntityDescriptor<TaskSchedulerDescriptor> namedEntityDescriptor =
+        new NamedEntityDescriptor<>(taskSchedulerName, TaskSchedulerForFailureTest.class.getName());
+    List<NamedEntityDescriptor> list = new LinkedList<>();
+    list.add(namedEntityDescriptor);
+
+    TaskSchedulerManager taskSchedulerManager =
+        new TaskSchedulerManager(appContext, dagClientServer, eventHandler,
+            mock(ContainerSignatureMatcher.class), mock(WebUIService.class), list, false) {
+          @Override
+          TaskSchedulerContext wrapTaskSchedulerContext(TaskSchedulerContext rawContext) {
+            // Avoid wrapping in threads
+            return rawContext;
+          }
+        };
+    try {
+      taskSchedulerManager.init(new TezConfiguration());
+      taskSchedulerManager.start();
+
+      taskSchedulerManager.getTotalResources(0);
+      ArgumentCaptor<Event> argumentCaptor = ArgumentCaptor.forClass(Event.class);
+      verify(eventHandler, times(1)).handle(argumentCaptor.capture());
+
+      Event rawEvent = argumentCaptor.getValue();
+      assertTrue(rawEvent instanceof DAGEventTerminateDag);
+      DAGEventTerminateDag killEvent = (DAGEventTerminateDag) rawEvent;
+      assertTrue(killEvent.getDiagnosticInfo().contains("ReportError"));
+      assertTrue(killEvent.getDiagnosticInfo()
+          .contains(ServicePluginErrorDefaults.SERVICE_UNAVAILABLE.name()));
+      assertTrue(killEvent.getDiagnosticInfo().contains(expIdentifier));
+
+
+      reset(eventHandler);
+      taskSchedulerManager.getAvailableResources(0);
+      argumentCaptor = ArgumentCaptor.forClass(Event.class);
+
+      verify(eventHandler, times(1)).handle(argumentCaptor.capture());
+      rawEvent = argumentCaptor.getValue();
+
+      assertTrue(rawEvent instanceof DAGAppMasterEventUserServiceFatalError);
+      DAGAppMasterEventUserServiceFatalError event =
+          (DAGAppMasterEventUserServiceFatalError) rawEvent;
+      assertEquals(DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR, event.getType());
+      assertTrue(event.getDiagnosticInfo().contains("ReportedFatalError"));
+      assertTrue(
+          event.getDiagnosticInfo().contains(ServicePluginErrorDefaults.INCONSISTENT_STATE.name()));
+      assertTrue(event.getDiagnosticInfo().contains(expIdentifier));
+
+    } finally {
+      taskSchedulerManager.stop();
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test(timeout = 5000)
   public void testTaskSchedulerUserError() {
     TaskScheduler taskScheduler = mock(TaskScheduler.class, new ExceptionAnswer());
 
@@ -798,4 +880,83 @@ public class TestTaskSchedulerManager {
       return false;
     }
   }
+
+  private static final String DAG_NAME = "dagName";
+  private static final int DAG_INDEX = 1;
+  public static class TaskSchedulerForFailureTest extends TaskScheduler {
+
+    public TaskSchedulerForFailureTest(TaskSchedulerContext taskSchedulerContext) {
+      super(taskSchedulerContext);
+    }
+
+    @Override
+    public Resource getAvailableResources() throws ServicePluginException {
+      getContext().reportError(ServicePluginErrorDefaults.INCONSISTENT_STATE, "ReportedFatalError", null);
+      return Resource.newInstance(1024, 1);
+    }
+
+    @Override
+    public Resource getTotalResources() throws ServicePluginException {
+      getContext()
+          .reportError(ServicePluginErrorDefaults.SERVICE_UNAVAILABLE, "ReportError", new DagInfoImplForTest(DAG_INDEX, DAG_NAME));
+      return Resource.newInstance(1024, 1);
+    }
+
+    @Override
+    public int getClusterNodeCount() throws ServicePluginException {
+      return 0;
+    }
+
+    @Override
+    public void blacklistNode(NodeId nodeId) throws ServicePluginException {
+
+    }
+
+    @Override
+    public void unblacklistNode(NodeId nodeId) throws ServicePluginException {
+
+    }
+
+    @Override
+    public void allocateTask(Object task, Resource capability, String[] hosts, String[] racks,
+                             Priority priority, Object containerSignature,
+                             Object clientCookie) throws
+        ServicePluginException {
+
+    }
+
+    @Override
+    public void allocateTask(Object task, Resource capability, ContainerId containerId,
+                             Priority priority, Object containerSignature,
+                             Object clientCookie) throws
+        ServicePluginException {
+
+    }
+
+    @Override
+    public boolean deallocateTask(Object task, boolean taskSucceeded,
+                                  TaskAttemptEndReason endReason,
+                                  @Nullable String diagnostics) throws ServicePluginException {
+      return false;
+    }
+
+    @Override
+    public Object deallocateContainer(ContainerId containerId) throws ServicePluginException {
+      return null;
+    }
+
+    @Override
+    public void setShouldUnregister() throws ServicePluginException {
+
+    }
+
+    @Override
+    public boolean hasUnregistered() throws ServicePluginException {
+      return false;
+    }
+
+    @Override
+    public void dagComplete() throws ServicePluginException {
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/test/java/org/apache/tez/dag/helpers/DagInfoImplForTest.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/helpers/DagInfoImplForTest.java b/tez-dag/src/test/java/org/apache/tez/dag/helpers/DagInfoImplForTest.java
new file mode 100644
index 0000000..f92513f
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/helpers/DagInfoImplForTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.helpers;
+
+import org.apache.tez.serviceplugins.api.DagInfo;
+
+public class DagInfoImplForTest implements DagInfo {
+
+  private final int index;
+  private final String name;
+
+  public DagInfoImplForTest(int index, String name) {
+    this.index = index;
+    this.name = name;
+  }
+
+  @Override
+  public int getIndex() {
+    return index;
+  }
+
+  @Override
+  public String getName() {
+    return name;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/ErrorPluginConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/ErrorPluginConfiguration.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/ErrorPluginConfiguration.java
new file mode 100644
index 0000000..32d1fb6
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/ErrorPluginConfiguration.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.serviceplugins.api.ServicePluginContextBase;
+import org.apache.tez.serviceplugins.api.ServicePluginErrorDefaults;
+
+public class ErrorPluginConfiguration {
+
+  public static final String REPORT_FATAL_ERROR_MESSAGE = "ReportedFatalError";
+  public static final String REPORT_NONFATAL_ERROR_MESSAGE = "ReportedError";
+  public static final String THROW_ERROR_EXCEPTION_STRING = "Simulated Error";
+
+  private static final String CONF_THROW_ERROR = "throw.error";
+  private static final String CONF_REPORT_ERROR = "report.error";
+  private static final String CONF_REPORT_ERROR_FATAL = "report.error.fatal";
+  private static final String CONF_REPORT_ERROR_DAG_NAME = "report.error.dag.name";
+
+  private final HashMap<String, String> kv;
+
+  private ErrorPluginConfiguration() {
+    this.kv = new HashMap<>();
+  }
+
+  private ErrorPluginConfiguration(HashMap<String, String> map) {
+    this.kv = map;
+  }
+
+  public static ErrorPluginConfiguration createThrowErrorConf() {
+    ErrorPluginConfiguration conf = new ErrorPluginConfiguration();
+    conf.kv.put(CONF_THROW_ERROR, String.valueOf(true));
+    return conf;
+  }
+
+  public static ErrorPluginConfiguration createReportFatalErrorConf(String dagName) {
+    ErrorPluginConfiguration conf = new ErrorPluginConfiguration();
+    conf.kv.put(CONF_REPORT_ERROR, String.valueOf(true));
+    conf.kv.put(CONF_REPORT_ERROR_FATAL, String.valueOf(true));
+    conf.kv.put(CONF_REPORT_ERROR_DAG_NAME, dagName);
+    return conf;
+  }
+
+  public static ErrorPluginConfiguration createReportNonFatalErrorConf(String dagName) {
+    ErrorPluginConfiguration conf = new ErrorPluginConfiguration();
+    conf.kv.put(CONF_REPORT_ERROR, String.valueOf(true));
+    conf.kv.put(CONF_REPORT_ERROR_FATAL, String.valueOf(false));
+    conf.kv.put(CONF_REPORT_ERROR_DAG_NAME, dagName);
+    return conf;
+  }
+
+  public static UserPayload toUserPayload(ErrorPluginConfiguration conf) throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    ObjectOutputStream oos = new ObjectOutputStream(baos);
+    oos.writeObject(conf.kv);
+    oos.close();
+    UserPayload userPayload = UserPayload.create(ByteBuffer.wrap(baos.toByteArray()));
+    return userPayload;
+  }
+
+  @SuppressWarnings("unchecked")
+  public static ErrorPluginConfiguration toErrorPluginConfiguration(UserPayload userPayload) throws
+      IOException, ClassNotFoundException {
+
+    byte[] b = new byte[userPayload.getPayload().remaining()];
+    userPayload.getPayload().get(b);
+    ByteArrayInputStream bais = new ByteArrayInputStream(b);
+    ObjectInputStream ois = new ObjectInputStream(bais);
+
+    HashMap<String, String> map = (HashMap) ois.readObject();
+    ErrorPluginConfiguration conf = new ErrorPluginConfiguration(map);
+    return conf;
+  }
+
+  public boolean shouldThrowError() {
+    return (kv.containsKey(CONF_THROW_ERROR) && Boolean.parseBoolean(kv.get(CONF_THROW_ERROR)));
+  }
+
+  public boolean shouldReportFatalError(String dagName) {
+    if (kv.containsKey(CONF_REPORT_ERROR) && Boolean.parseBoolean(kv.get(CONF_REPORT_ERROR)) &&
+        Boolean.parseBoolean(kv.get(CONF_REPORT_ERROR_FATAL))) {
+      if (dagName == null || dagName.isEmpty() || kv.get(CONF_REPORT_ERROR_DAG_NAME).equals("*") ||
+          kv.get(CONF_REPORT_ERROR_DAG_NAME).equals(dagName)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  public boolean shouldReportNonFatalError(String dagName) {
+    if (kv.containsKey(CONF_REPORT_ERROR) && Boolean.parseBoolean(kv.get(CONF_REPORT_ERROR)) &&
+        Boolean.parseBoolean(kv.get(CONF_REPORT_ERROR_FATAL)) == false) {
+      if (dagName == null || dagName.isEmpty() || kv.get(CONF_REPORT_ERROR_DAG_NAME).equals("*") ||
+          kv.get(CONF_REPORT_ERROR_DAG_NAME).equals(dagName)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  public static void processError(ErrorPluginConfiguration conf, ServicePluginContextBase context) {
+    if (conf.shouldThrowError()) {
+      throw new RuntimeException(ErrorPluginConfiguration.THROW_ERROR_EXCEPTION_STRING);
+    } else if (conf.shouldReportFatalError(null)) {
+      context.reportError(ServicePluginErrorDefaults.INCONSISTENT_STATE,
+          ErrorPluginConfiguration.REPORT_FATAL_ERROR_MESSAGE,
+          context.getCurrentDagInfo());
+    } else if (conf.shouldReportNonFatalError(null)) {
+      context.reportError(ServicePluginErrorDefaults.SERVICE_UNAVAILABLE,
+          ErrorPluginConfiguration.REPORT_NONFATAL_ERROR_MESSAGE,
+          context.getCurrentDagInfo());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncherWithErrors.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncherWithErrors.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncherWithErrors.java
index d489cca..b4ea176 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncherWithErrors.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncherWithErrors.java
@@ -14,24 +14,33 @@
 
 package org.apache.tez.dag.app.launcher;
 
+import java.io.IOException;
+
+import org.apache.tez.dag.app.ErrorPluginConfiguration;
 import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
 import org.apache.tez.serviceplugins.api.ContainerLauncher;
 import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
 import org.apache.tez.serviceplugins.api.ContainerStopRequest;
+import org.apache.tez.serviceplugins.api.ServicePluginErrorDefaults;
 
 public class TezTestServiceContainerLauncherWithErrors extends ContainerLauncher {
+
+  private final ErrorPluginConfiguration conf;
+
   public TezTestServiceContainerLauncherWithErrors(
-      ContainerLauncherContext containerLauncherContext) {
+      ContainerLauncherContext containerLauncherContext) throws IOException,
+      ClassNotFoundException {
     super(containerLauncherContext);
+    conf = ErrorPluginConfiguration.toErrorPluginConfiguration(containerLauncherContext.getInitialUserPayload());
   }
 
   @Override
   public void launchContainer(ContainerLaunchRequest launchRequest) {
-    throw new RuntimeException("Simulated Error");
+    ErrorPluginConfiguration.processError(conf, getContext());
   }
 
   @Override
   public void stopContainer(ContainerStopRequest stopRequest) {
-    throw new RuntimeException("Simulated Error");
+    ErrorPluginConfiguration.processError(conf, getContext());
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerServiceWithErrors.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerServiceWithErrors.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerServiceWithErrors.java
index 1705eac..13d4815 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerServiceWithErrors.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerServiceWithErrors.java
@@ -16,18 +16,25 @@ package org.apache.tez.dag.app.rm;
 
 import javax.annotation.Nullable;
 
+import java.io.IOException;
+
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.dag.app.ErrorPluginConfiguration;
 import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
 import org.apache.tez.serviceplugins.api.TaskScheduler;
 import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
 
 public class TezTestServiceTaskSchedulerServiceWithErrors extends TaskScheduler {
+
+  private final ErrorPluginConfiguration conf;
+
   public TezTestServiceTaskSchedulerServiceWithErrors(
-      TaskSchedulerContext taskSchedulerContext) {
+      TaskSchedulerContext taskSchedulerContext) throws IOException, ClassNotFoundException {
     super(taskSchedulerContext);
+    conf = ErrorPluginConfiguration.toErrorPluginConfiguration(taskSchedulerContext.getInitialUserPayload());
   }
 
   @Override
@@ -47,35 +54,37 @@ public class TezTestServiceTaskSchedulerServiceWithErrors extends TaskScheduler
 
   @Override
   public void blacklistNode(NodeId nodeId) {
-    throw new RuntimeException("Simulated Error");
+    ErrorPluginConfiguration.processError(conf, getContext());
   }
 
   @Override
   public void unblacklistNode(NodeId nodeId) {
-    throw new RuntimeException("Simulated Error");
+    ErrorPluginConfiguration.processError(conf, getContext());
   }
 
   @Override
   public void allocateTask(Object task, Resource capability, String[] hosts, String[] racks,
                            Priority priority, Object containerSignature, Object clientCookie) {
-    throw new RuntimeException("Simulated Error");
+    ErrorPluginConfiguration.processError(conf, getContext());
   }
 
   @Override
   public void allocateTask(Object task, Resource capability, ContainerId containerId,
                            Priority priority, Object containerSignature, Object clientCookie) {
-    throw new RuntimeException("Simulated Error");
+    ErrorPluginConfiguration.processError(conf, getContext());
   }
 
   @Override
   public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEndReason endReason,
                                 @Nullable String diagnostics) {
-    throw new RuntimeException("Simulated Error");
+    ErrorPluginConfiguration.processError(conf, getContext());
+    return true;
   }
 
   @Override
   public Object deallocateContainer(ContainerId containerId) {
-    throw new RuntimeException("Simulated Error");
+    ErrorPluginConfiguration.processError(conf, getContext());
+    return null;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorWithErrors.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorWithErrors.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorWithErrors.java
index 90313d4..8221957 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorWithErrors.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorWithErrors.java
@@ -15,6 +15,7 @@
 package org.apache.tez.dag.app.taskcomm;
 
 import javax.annotation.Nullable;
+import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Map;
 
@@ -22,6 +23,8 @@ import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.tez.dag.app.ErrorPluginConfiguration;
+import org.apache.tez.serviceplugins.api.ServicePluginErrorDefaults;
 import org.apache.tez.serviceplugins.api.TaskCommunicator;
 import org.apache.tez.serviceplugins.api.TaskCommunicatorContext;
 import org.apache.tez.dag.api.event.VertexStateUpdate;
@@ -31,20 +34,24 @@ import org.apache.tez.serviceplugins.api.ContainerEndReason;
 import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
 
 public class TezTestServiceTaskCommunicatorWithErrors extends TaskCommunicator {
+
+  private final ErrorPluginConfiguration conf;
+
   public TezTestServiceTaskCommunicatorWithErrors(
-      TaskCommunicatorContext taskCommunicatorContext) {
+      TaskCommunicatorContext taskCommunicatorContext) throws IOException, ClassNotFoundException {
     super(taskCommunicatorContext);
+    conf = ErrorPluginConfiguration.toErrorPluginConfiguration(taskCommunicatorContext.getInitialUserPayload());
   }
 
   @Override
   public void registerRunningContainer(ContainerId containerId, String hostname, int port) {
-    throw new RuntimeException("Simulated Error");
+    ErrorPluginConfiguration.processError(conf, getContext());
   }
 
   @Override
   public void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason,
                                    @Nullable String diagnostics) {
-    throw new RuntimeException("Simulated Error");
+    ErrorPluginConfiguration.processError(conf, getContext());
   }
 
   @Override
@@ -52,14 +59,14 @@ public class TezTestServiceTaskCommunicatorWithErrors extends TaskCommunicator {
                                          Map<String, LocalResource> additionalResources,
                                          Credentials credentials, boolean credentialsChanged,
                                          int priority) {
-    throw new RuntimeException("Simulated Error");
+    ErrorPluginConfiguration.processError(conf, getContext());
   }
 
   @Override
   public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID,
                                            TaskAttemptEndReason endReason,
                                            @Nullable String diagnostics) {
-    throw new RuntimeException("Simulated Error");
+    ErrorPluginConfiguration.processError(conf, getContext());
   }
 
   @Override
@@ -69,7 +76,7 @@ public class TezTestServiceTaskCommunicatorWithErrors extends TaskCommunicator {
 
   @Override
   public void onVertexStateUpdated(VertexStateUpdate stateUpdate) {
-    throw new RuntimeException("Simulated Error");
+    ErrorPluginConfiguration.processError(conf, getContext());
   }
 
   @Override
@@ -78,6 +85,7 @@ public class TezTestServiceTaskCommunicatorWithErrors extends TaskCommunicator {
 
   @Override
   public Object getMetaInfo() {
-    throw new RuntimeException("Simulated Error");
+    ErrorPluginConfiguration.processError(conf, getContext());
+    return null;
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServicesErrors.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServicesErrors.java b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServicesErrors.java
index bfd3ed2..ac6ebde 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServicesErrors.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServicesErrors.java
@@ -19,7 +19,9 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.util.EnumSet;
+import java.util.List;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -40,6 +42,7 @@ import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.StatusGetOpts;
+import org.apache.tez.dag.app.ErrorPluginConfiguration;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
 import org.apache.tez.dag.app.launcher.TezTestServiceContainerLauncherWithErrors;
 import org.apache.tez.dag.app.launcher.TezTestServiceNoOpContainerLauncher;
@@ -49,6 +52,7 @@ import org.apache.tez.dag.app.taskcomm.TezTestServiceTaskCommunicatorImpl;
 import org.apache.tez.dag.app.taskcomm.TezTestServiceTaskCommunicatorWithErrors;
 import org.apache.tez.examples.JoinValidateConfigured;
 import org.apache.tez.serviceplugins.api.ContainerLauncherDescriptor;
+import org.apache.tez.serviceplugins.api.ServicePluginErrorDefaults;
 import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
 import org.apache.tez.serviceplugins.api.TaskCommunicatorDescriptor;
 import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor;
@@ -63,7 +67,13 @@ public class TestExternalTezServicesErrors {
   private static final Logger LOG = LoggerFactory.getLogger(TestExternalTezServicesErrors.class);
 
   private static final String EXT_PUSH_ENTITY_NAME = "ExtServiceTestPush";
-  private static final String EXT_FAIL_ENTITY_NAME = "ExtServiceTestFail";
+  private static final String EXT_THROW_ERROR_ENTITY_NAME = "ExtServiceTestThrowErrors";
+  private static final String EXT_REPORT_NON_FATAL_ERROR_ENTITY_NAME = "ExtServiceTestReportNonFatalErrors";
+  private static final String EXT_REPORT_FATAL_ERROR_ENTITY_NAME = "ExtServiceTestReportFatalErrors";
+
+  private static final String SUFFIX_LAUNCHER = "ContainerLauncher";
+  private static final String SUFFIX_TASKCOMM = "TaskCommunicator";
+  private static final String SUFFIX_SCHEDULER = "TaskScheduler";
 
   private static ExternalTezServiceTestHelper extServiceTestHelper;
 
@@ -76,12 +86,32 @@ public class TestExternalTezServicesErrors {
   private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_EXT_SERVICE_PUSH =
       Vertex.VertexExecutionContext.create(
           EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME);
-  private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_LAUNCHER_FAIL =
-      Vertex.VertexExecutionContext.create(EXT_PUSH_ENTITY_NAME, EXT_FAIL_ENTITY_NAME, EXT_PUSH_ENTITY_NAME);
-  private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_TASKCOMM_FAIL =
-      Vertex.VertexExecutionContext.create(EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME, EXT_FAIL_ENTITY_NAME);
-  private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_SCHEDULER_FAIL =
-      Vertex.VertexExecutionContext.create(EXT_FAIL_ENTITY_NAME, EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME);
+  // Throw error contexts
+  private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_LAUNCHER_THROW =
+      Vertex.VertexExecutionContext.create(EXT_PUSH_ENTITY_NAME, EXT_THROW_ERROR_ENTITY_NAME, EXT_PUSH_ENTITY_NAME);
+  private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_TASKCOMM_THROW =
+      Vertex.VertexExecutionContext.create(EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME,
+          EXT_THROW_ERROR_ENTITY_NAME);
+  private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_SCHEDULER_THROW =
+      Vertex.VertexExecutionContext.create(EXT_THROW_ERROR_ENTITY_NAME, EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME);
+
+  // Report-non-fatal contexts
+  private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_LAUNCHER_REPORT_NON_FATAL =
+      Vertex.VertexExecutionContext.create(EXT_PUSH_ENTITY_NAME, EXT_REPORT_NON_FATAL_ERROR_ENTITY_NAME, EXT_PUSH_ENTITY_NAME);
+  private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_TASKCOMM_REPORT_NON_FATAL =
+      Vertex.VertexExecutionContext.create(EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME,
+          EXT_REPORT_NON_FATAL_ERROR_ENTITY_NAME);
+  private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_SCHEDULER_REPORT_NON_FATAL =
+      Vertex.VertexExecutionContext.create(EXT_REPORT_NON_FATAL_ERROR_ENTITY_NAME, EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME);
+
+  // Report fatal contexts
+  private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_LAUNCHER_REPORT_FATAL =
+      Vertex.VertexExecutionContext.create(EXT_PUSH_ENTITY_NAME, EXT_REPORT_FATAL_ERROR_ENTITY_NAME, EXT_PUSH_ENTITY_NAME);
+  private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_TASKCOMM_REPORT_FATAL =
+      Vertex.VertexExecutionContext.create(EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME,
+          EXT_REPORT_FATAL_ERROR_ENTITY_NAME);
+  private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_SCHEDULER_REPORT_FATAL =
+      Vertex.VertexExecutionContext.create(EXT_REPORT_FATAL_ERROR_ENTITY_NAME, EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME);
 
 
   private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_DEFAULT = EXECUTION_CONTEXT_EXT_SERVICE_PUSH;
@@ -93,29 +123,63 @@ public class TestExternalTezServicesErrors {
   public static void setup() throws Exception {
 
     extServiceTestHelper = new ExternalTezServiceTestHelper(TEST_ROOT_DIR);
-    UserPayload userPayload = TezUtils.createUserPayloadFromConf(extServiceTestHelper.getConfForJobs());
+    UserPayload userPayload =
+        TezUtils.createUserPayloadFromConf(extServiceTestHelper.getConfForJobs());
+    UserPayload userPayloadThrowError =
+        ErrorPluginConfiguration.toUserPayload(ErrorPluginConfiguration.createThrowErrorConf());
+
+    UserPayload userPayloadReportFatalErrorLauncher = ErrorPluginConfiguration
+        .toUserPayload(ErrorPluginConfiguration.createReportFatalErrorConf(SUFFIX_LAUNCHER));
+    UserPayload userPayloadReportFatalErrorTaskComm = ErrorPluginConfiguration
+        .toUserPayload(ErrorPluginConfiguration.createReportFatalErrorConf(SUFFIX_TASKCOMM));
+    UserPayload userPayloadReportFatalErrorScheduler = ErrorPluginConfiguration
+        .toUserPayload(ErrorPluginConfiguration.createReportFatalErrorConf(SUFFIX_SCHEDULER));
+
+    UserPayload userPayloadReportNonFatalErrorLauncher = ErrorPluginConfiguration
+        .toUserPayload(ErrorPluginConfiguration.createReportNonFatalErrorConf(SUFFIX_LAUNCHER));
+    UserPayload userPayloadReportNonFatalErrorTaskComm = ErrorPluginConfiguration
+        .toUserPayload(ErrorPluginConfiguration.createReportNonFatalErrorConf(SUFFIX_TASKCOMM));
+    UserPayload userPayloadReportNonFatalErrorScheduler = ErrorPluginConfiguration
+        .toUserPayload(ErrorPluginConfiguration.createReportNonFatalErrorConf(SUFFIX_SCHEDULER));
 
     TaskSchedulerDescriptor[] taskSchedulerDescriptors = new TaskSchedulerDescriptor[]{
         TaskSchedulerDescriptor
             .create(EXT_PUSH_ENTITY_NAME, TezTestServiceTaskSchedulerService.class.getName())
             .setUserPayload(userPayload),
-        TaskSchedulerDescriptor.create(EXT_FAIL_ENTITY_NAME,
+        TaskSchedulerDescriptor.create(EXT_THROW_ERROR_ENTITY_NAME,
+            TezTestServiceTaskSchedulerServiceWithErrors.class.getName()).setUserPayload(
+            userPayloadThrowError),
+        TaskSchedulerDescriptor.create(EXT_REPORT_FATAL_ERROR_ENTITY_NAME,
+            TezTestServiceTaskSchedulerServiceWithErrors.class.getName()).setUserPayload(
+            userPayloadReportFatalErrorScheduler),
+        TaskSchedulerDescriptor.create(EXT_REPORT_NON_FATAL_ERROR_ENTITY_NAME,
             TezTestServiceTaskSchedulerServiceWithErrors.class.getName()).setUserPayload(
-            userPayload)};
+            userPayloadReportNonFatalErrorScheduler),
+    };
 
     ContainerLauncherDescriptor[] containerLauncherDescriptors = new ContainerLauncherDescriptor[]{
         ContainerLauncherDescriptor
             .create(EXT_PUSH_ENTITY_NAME, TezTestServiceNoOpContainerLauncher.class.getName())
             .setUserPayload(userPayload),
-        ContainerLauncherDescriptor.create(EXT_FAIL_ENTITY_NAME,
-            TezTestServiceContainerLauncherWithErrors.class.getName()).setUserPayload(userPayload)};
+        ContainerLauncherDescriptor.create(EXT_THROW_ERROR_ENTITY_NAME,
+            TezTestServiceContainerLauncherWithErrors.class.getName()).setUserPayload(userPayloadThrowError),
+        ContainerLauncherDescriptor.create(EXT_REPORT_FATAL_ERROR_ENTITY_NAME,
+            TezTestServiceContainerLauncherWithErrors.class.getName()).setUserPayload(userPayloadReportFatalErrorLauncher),
+        ContainerLauncherDescriptor.create(EXT_REPORT_NON_FATAL_ERROR_ENTITY_NAME,
+            TezTestServiceContainerLauncherWithErrors.class.getName()).setUserPayload(userPayloadReportNonFatalErrorLauncher)
+    };
 
     TaskCommunicatorDescriptor[] taskCommunicatorDescriptors = new TaskCommunicatorDescriptor[]{
         TaskCommunicatorDescriptor
             .create(EXT_PUSH_ENTITY_NAME, TezTestServiceTaskCommunicatorImpl.class.getName())
             .setUserPayload(userPayload),
-        TaskCommunicatorDescriptor.create(EXT_FAIL_ENTITY_NAME,
-            TezTestServiceTaskCommunicatorWithErrors.class.getName()).setUserPayload(userPayload)};
+        TaskCommunicatorDescriptor.create(EXT_THROW_ERROR_ENTITY_NAME,
+            TezTestServiceTaskCommunicatorWithErrors.class.getName()).setUserPayload(userPayloadThrowError),
+        TaskCommunicatorDescriptor.create(EXT_REPORT_FATAL_ERROR_ENTITY_NAME,
+            TezTestServiceTaskCommunicatorWithErrors.class.getName()).setUserPayload(userPayloadReportFatalErrorTaskComm),
+        TaskCommunicatorDescriptor.create(EXT_REPORT_NON_FATAL_ERROR_ENTITY_NAME,
+            TezTestServiceTaskCommunicatorWithErrors.class.getName()).setUserPayload(userPayloadReportNonFatalErrorTaskComm)
+    };
 
     servicePluginsDescriptor = ServicePluginsDescriptor.create(true, true,
         taskSchedulerDescriptors, containerLauncherDescriptors, taskCommunicatorDescriptors);
@@ -137,35 +201,86 @@ public class TestExternalTezServicesErrors {
     extServiceTestHelper.tearDownAll();
   }
 
-  @Test (timeout = 90000)
-  public void testContainerLauncherError() throws Exception {
-    testServiceError("_testContainerLauncherError_", EXECUTION_CONTEXT_LAUNCHER_FAIL,
-        DAGAppMasterEventType.CONTAINER_LAUNCHER_SERVICE_FATAL_ERROR);
+  @Test(timeout = 90000)
+  public void testContainerLauncherThrowError() throws Exception {
+    testFatalError("_testContainerLauncherError_", EXECUTION_CONTEXT_LAUNCHER_THROW,
+        SUFFIX_LAUNCHER, Lists.newArrayList("Service Error",
+            DAGAppMasterEventType.CONTAINER_LAUNCHER_SERVICE_FATAL_ERROR.name()));
+  }
+
+  @Test(timeout = 90000)
+  public void testTaskCommunicatorThrowError() throws Exception {
+    testFatalError("_testContainerLauncherError_", EXECUTION_CONTEXT_TASKCOMM_THROW,
+        SUFFIX_TASKCOMM, Lists.newArrayList("Service Error",
+            DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR.name()));
+  }
+
+  @Test(timeout = 90000)
+  public void testTaskSchedulerThrowError() throws Exception {
+    testFatalError("_testContainerLauncherError_", EXECUTION_CONTEXT_SCHEDULER_THROW,
+        SUFFIX_SCHEDULER, Lists.newArrayList("Service Error",
+            DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR.name()));
+  }
+
+  @Test (timeout = 150000)
+  public void testNonFatalErrors() throws IOException, TezException, InterruptedException {
+    String methodName = "testNonFatalErrors";
+    TezConfiguration tezClientConf = new TezConfiguration(extServiceTestHelper.getConfForJobs());
+    TezClient tezClient = TezClient
+        .newBuilder(TestExternalTezServicesErrors.class.getSimpleName() + methodName + "_session",
+            tezClientConf)
+        .setIsSession(true).setServicePluginDescriptor(servicePluginsDescriptor).build();
+    try {
+      tezClient.start();
+      LOG.info("TezSessionStarted for " + methodName);
+      tezClient.waitTillReady();
+      LOG.info("TezSession ready for submission for " + methodName);
+
+
+      runAndVerifyForNonFatalErrors(tezClient, SUFFIX_LAUNCHER, EXECUTION_CONTEXT_LAUNCHER_REPORT_NON_FATAL);
+      runAndVerifyForNonFatalErrors(tezClient, SUFFIX_TASKCOMM, EXECUTION_CONTEXT_TASKCOMM_REPORT_NON_FATAL);
+      runAndVerifyForNonFatalErrors(tezClient, SUFFIX_SCHEDULER, EXECUTION_CONTEXT_SCHEDULER_REPORT_NON_FATAL);
+
+    } finally {
+      tezClient.stop();
+    }
+  }
+
+  @Test(timeout = 90000)
+  public void testContainerLauncherReportFatalError() throws Exception {
+    testFatalError("_testContainerLauncherReportFatalError_",
+        EXECUTION_CONTEXT_LAUNCHER_REPORT_FATAL, SUFFIX_LAUNCHER, Lists
+            .newArrayList(ErrorPluginConfiguration.REPORT_FATAL_ERROR_MESSAGE,
+                ServicePluginErrorDefaults.INCONSISTENT_STATE.name()));
   }
 
-  @Test (timeout = 90000)
-  public void testTaskCommunicatorError() throws Exception {
-    testServiceError("_testTaskCommunicatorError_", EXECUTION_CONTEXT_TASKCOMM_FAIL,
-        DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR);
+  @Test(timeout = 90000)
+  public void testTaskCommReportFatalError() throws Exception {
+    testFatalError("_testTaskCommReportFatalError_", EXECUTION_CONTEXT_TASKCOMM_REPORT_FATAL,
+        SUFFIX_TASKCOMM, Lists.newArrayList(ErrorPluginConfiguration.REPORT_FATAL_ERROR_MESSAGE,
+            ServicePluginErrorDefaults.INCONSISTENT_STATE.name()));
   }
 
-  @Test (timeout = 90000)
-  public void testTaskSchedulerError() throws Exception {
-    testServiceError("_testTaskSchedulerError_", EXECUTION_CONTEXT_SCHEDULER_FAIL,
-        DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR);
+  @Test(timeout = 90000)
+  public void testTaskSchedulerReportFatalError() throws Exception {
+    testFatalError("_testTaskSchedulerReportFatalError_",
+        EXECUTION_CONTEXT_SCHEDULER_REPORT_FATAL, SUFFIX_SCHEDULER,
+        Lists.newArrayList(ErrorPluginConfiguration.REPORT_FATAL_ERROR_MESSAGE,
+            ServicePluginErrorDefaults.INCONSISTENT_STATE.name()));
   }
 
-  private void testServiceError(String methodName,
-                                Vertex.VertexExecutionContext lhsExecutionContext,
-                                DAGAppMasterEventType expectedEventType) throws
-      IOException, TezException, InterruptedException, YarnException {
+
+  private void testFatalError(String methodName,
+                              Vertex.VertexExecutionContext lhsExecutionContext,
+                              String dagNameSuffix, List<String> expectedDiagMessages) throws
+      IOException, TezException, YarnException, InterruptedException {
     TezConfiguration tezClientConf = new TezConfiguration(extServiceTestHelper.getConfForJobs());
     TezClient tezClient = TezClient
         .newBuilder(TestExternalTezServicesErrors.class.getSimpleName() + methodName + "_session",
             tezClientConf)
         .setIsSession(true).setServicePluginDescriptor(servicePluginsDescriptor).build();
 
-    ApplicationId appId;
+    ApplicationId appId= null;
     try {
       tezClient.start();
       LOG.info("TezSessionStarted for " + methodName);
@@ -175,10 +290,11 @@ public class TestExternalTezServicesErrors {
       JoinValidateConfigured joinValidate =
           new JoinValidateConfigured(EXECUTION_CONTEXT_DEFAULT, lhsExecutionContext,
               EXECUTION_CONTEXT_EXT_SERVICE_PUSH,
-              EXECUTION_CONTEXT_EXT_SERVICE_PUSH, "LauncherFailTest");
+              EXECUTION_CONTEXT_EXT_SERVICE_PUSH, dagNameSuffix);
 
       DAG dag = joinValidate
-          .createDag(new TezConfiguration(extServiceTestHelper.getConfForJobs()), HASH_JOIN_EXPECTED_RESULT_PATH,
+          .createDag(new TezConfiguration(extServiceTestHelper.getConfForJobs()),
+              HASH_JOIN_EXPECTED_RESULT_PATH,
               HASH_JOIN_OUTPUT_PATH, 3);
 
       DAGClient dagClient = tezClient.submitDAG(dag);
@@ -188,14 +304,15 @@ public class TestExternalTezServicesErrors {
       assertEquals(DAGStatus.State.ERROR, dagStatus.getState());
       boolean foundDiag = false;
       for (String diag : dagStatus.getDiagnostics()) {
-        if (diag.contains("Service Error") && diag.contains(
-            expectedEventType.toString()) &&
-            diag.contains("Simulated Error")) {
-          foundDiag = true;
+        foundDiag = checkDiag(diag, expectedDiagMessages);
+        if (foundDiag) {
+          break;
         }
       }
       appId = tezClient.getAppMasterApplicationId();
       assertTrue(foundDiag);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
     } finally {
       tezClient.stop();
     }
@@ -222,14 +339,58 @@ public class TestExternalTezServicesErrors {
         String diag = appAttemptReport.getDiagnostics();
         assertEquals(FinalApplicationStatus.FAILED, appReport.getFinalApplicationStatus());
         assertEquals(YarnApplicationState.FINISHED, appReport.getYarnApplicationState());
-        assertTrue(diag.contains("Service Error") && diag.contains(
-            expectedEventType.toString()) &&
-            diag.contains("Simulated Error"));
-
+        checkDiag(diag, expectedDiagMessages);
       } finally {
         yarnClient.stop();
       }
     }
   }
 
+  private boolean checkDiag(String diag, List<String> expected) {
+    boolean found = true;
+    for (String exp : expected) {
+      if (diag.contains(exp)) {
+        found = true;
+        continue;
+      } else {
+        found = false;
+        break;
+      }
+    }
+    return found;
+  }
+
+
+  private void runAndVerifyForNonFatalErrors(TezClient tezClient, String componentName,
+                                             Vertex.VertexExecutionContext lhsContext) throws
+      TezException,
+      InterruptedException, IOException {
+    LOG.info("Running JoinValidate with componentName reportNonFatalException");
+    JoinValidateConfigured joinValidate =
+        new JoinValidateConfigured(EXECUTION_CONTEXT_DEFAULT, lhsContext,
+            EXECUTION_CONTEXT_EXT_SERVICE_PUSH,
+            EXECUTION_CONTEXT_EXT_SERVICE_PUSH, componentName);
+
+    DAG dag = joinValidate
+        .createDag(new TezConfiguration(extServiceTestHelper.getConfForJobs()),
+            HASH_JOIN_EXPECTED_RESULT_PATH,
+            HASH_JOIN_OUTPUT_PATH, 3);
+
+    DAGClient dagClient = tezClient.submitDAG(dag);
+
+    DAGStatus dagStatus =
+        dagClient.waitForCompletionWithStatusUpdates(Sets.newHashSet(StatusGetOpts.GET_COUNTERS));
+    assertEquals(DAGStatus.State.FAILED, dagStatus.getState());
+
+    boolean foundDiag = false;
+    for (String diag : dagStatus.getDiagnostics()) {
+      if (diag.contains(ErrorPluginConfiguration.REPORT_NONFATAL_ERROR_MESSAGE) &&
+          diag.contains(ServicePluginErrorDefaults.SERVICE_UNAVAILABLE.name())) {
+        foundDiag = true;
+        break;
+      }
+    }
+    assertTrue(foundDiag);
+  }
+
 }