You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zj...@apache.org on 2015/09/04 15:19:04 UTC

[1/2] tez git commit: TEZ-2745. ClassNotFoundException of user code should fail dag (zjffdu)

Repository: tez
Updated Branches:
  refs/heads/master 7d412b203 -> 1b30b17db


http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index c79da5d..3382a9b 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -44,7 +44,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 
-import com.google.common.base.Throwables;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.tez.runtime.api.TaskContext;
 import org.apache.tez.runtime.api.impl.TezProcessorContextImpl;
@@ -59,6 +58,7 @@ import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
@@ -487,7 +487,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     return false;
   }
 
-  private void initializeGroupInputs() {
+  private void initializeGroupInputs() throws TezException {
     if (groupInputSpecs != null && !groupInputSpecs.isEmpty()) {
      groupInputsMap = new ConcurrentHashMap<String, MergedLogicalInput>(groupInputSpecs.size());
      for (GroupInputSpec groupInputSpec : groupInputSpecs) {
@@ -560,7 +560,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     return processorContext;
   }
 
-  private LogicalInput createInput(InputSpec inputSpec, InputContext inputContext) {
+  private LogicalInput createInput(InputSpec inputSpec, InputContext inputContext) throws TezException {
     LOG.info("Creating Input");
     InputDescriptor inputDesc = inputSpec.getInputDescriptor();
     Input input = ReflectionUtils.createClazzInstance(inputDesc.getClassName(),
@@ -576,14 +576,14 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
 
   private LogicalInput createMergedInput(InputDescriptor inputDesc,
                                          MergedInputContext mergedInputContext,
-                                         List<Input> constituentInputs) {
+                                         List<Input> constituentInputs) throws TezException {
     LogicalInput input = ReflectionUtils.createClazzInstance(inputDesc.getClassName(),
         new Class[]{MergedInputContext.class, List.class},
         new Object[]{mergedInputContext, constituentInputs});
     return input;
   }
 
-  private LogicalOutput createOutput(OutputSpec outputSpec, OutputContext outputContext) {
+  private LogicalOutput createOutput(OutputSpec outputSpec, OutputContext outputContext) throws TezException {
     LOG.info("Creating Output");
     OutputDescriptor outputDesc = outputSpec.getOutputDescriptor();
     Output output = ReflectionUtils.createClazzInstance(outputDesc.getClassName(),
@@ -599,7 +599,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   }
 
   private AbstractLogicalIOProcessor createProcessor(
-      String processorClassName, ProcessorContext processorContext) {
+      String processorClassName, ProcessorContext processorContext) throws TezException {
     Processor processor = ReflectionUtils.createClazzInstance(processorClassName,
         new Class[]{ProcessorContext.class}, new Object[]{processorContext});
     if (!(processor instanceof AbstractLogicalIOProcessor)) {

http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java
index bb6184e..2622b1f 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.EntityDescriptor;
+import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.runtime.api.MemoryUpdateCallback;
 import org.apache.tez.runtime.api.InputContext;
@@ -100,8 +101,9 @@ public class MemoryDistributor {
   /**
    * Used by the Tez framework to distribute initial memory after components
    * have made their initial requests.
+   * @throws TezException
    */
-  public void makeInitialAllocations() {
+  public void makeInitialAllocations() throws TezException {
     Preconditions.checkState(numInputsSeen.get() == numTotalInputs, "All inputs are expected to ask for memory");
     Preconditions.checkState(numOutputsSeen.get() == numTotalOutputs, "All outputs are expected to ask for memory");
     Iterable<InitialMemoryRequestContext> requestContexts = Iterables.transform(requestList,

http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-runtime-internals/src/test/java/org/apache/tez/runtime/common/resources/TestMemoryDistributor.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/common/resources/TestMemoryDistributor.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/common/resources/TestMemoryDistributor.java
index 951a877..7fbd87f 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/common/resources/TestMemoryDistributor.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/common/resources/TestMemoryDistributor.java
@@ -28,6 +28,7 @@ import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
 import org.apache.tez.runtime.api.MemoryUpdateCallback;
 import org.apache.tez.runtime.api.InputContext;
 import org.apache.tez.runtime.api.OutputContext;
@@ -47,7 +48,7 @@ public class TestMemoryDistributor {
   }
   
   @Test(timeout = 5000)
-  public void testScalingNoProcessor() {
+  public void testScalingNoProcessor() throws TezException {
     MemoryDistributor dist = new MemoryDistributor(2, 1, conf);
     
     dist.setJvmMemory(10000l);
@@ -81,7 +82,7 @@ public class TestMemoryDistributor {
   }
   
   @Test(timeout = 5000)
-  public void testScalingNoProcessor2() {
+  public void testScalingNoProcessor2() throws TezException {
     // Real world values
     MemoryDistributor dist = new MemoryDistributor(2, 0, conf);
     
@@ -106,7 +107,7 @@ public class TestMemoryDistributor {
   }
   
   @Test(timeout = 5000)
-  public void testScalingProcessor() {
+  public void testScalingProcessor() throws TezException {
     MemoryDistributor dist = new MemoryDistributor(2, 1, conf);
     
     dist.setJvmMemory(10000l);
@@ -148,7 +149,7 @@ public class TestMemoryDistributor {
   }
   
   @Test(timeout = 5000)
-  public void testScalingDisabled() {
+  public void testScalingDisabled() throws TezException {
     // Real world values
     Configuration conf = new Configuration(this.conf);
     conf.setBoolean(TezConfiguration.TEZ_TASK_SCALE_MEMORY_ENABLED, false);
@@ -175,7 +176,7 @@ public class TestMemoryDistributor {
   }
   
   @Test(timeout = 5000)
-  public void testReserveFractionConfigured() {
+  public void testReserveFractionConfigured() throws TezException {
     Configuration conf = new Configuration(this.conf);
     conf.setDouble(TezConfiguration.TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION, 0.5d);
     MemoryDistributor dist = new MemoryDistributor(2, 1, conf);

http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java
index ce9095a..2123757 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java
@@ -43,6 +43,7 @@ import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -51,7 +52,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.TezReflectionException;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -237,11 +238,11 @@ public class TestTaskExecution2 {
 
       TaskRunner2Result result = taskRunnerFuture.get();
       verifyTaskRunnerResult(result, EndReason.TASK_ERROR,
-          new TezUncheckedException("Unchecked exception"), false);
+          new TezReflectionException("TezReflectionException"), false);
 
       assertNull(taskReporter.currentCallable);
       umbilical.verifyTaskFailedEvent("Failure while running task",
-          ":org.apache.tez.dag.api.TezUncheckedException: "
+          ":org.apache.tez.dag.api.TezReflectionException: "
               + "Unable to load class: NotExitedProcessor");
       // Failure detected as a result of fall off from the run method. abort isn't required.
       assertFalse(TestProcessor.wasAborted());

http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-runtime-library/src/test/java/org/apache/tez/runtime/common/resources/TestWeightedScalingMemoryDistributor.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/common/resources/TestWeightedScalingMemoryDistributor.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/common/resources/TestWeightedScalingMemoryDistributor.java
index b34accd..a38497c 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/common/resources/TestWeightedScalingMemoryDistributor.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/common/resources/TestWeightedScalingMemoryDistributor.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.MemoryUpdateCallback;
@@ -51,7 +52,7 @@ public class TestWeightedScalingMemoryDistributor extends TestMemoryDistributor
   }
   
   @Test(timeout = 5000)
-  public void testSimpleWeightedScaling() {
+  public void testSimpleWeightedScaling() throws TezException {
     Configuration conf = new Configuration(this.conf);
     conf.setStrings(TezConfiguration.TEZ_TASK_SCALE_MEMORY_WEIGHTED_RATIOS,
         WeightedScalingMemoryDistributor.generateWeightStrings(0, 0, 1, 2, 3, 1, 1));
@@ -98,7 +99,7 @@ public class TestWeightedScalingMemoryDistributor extends TestMemoryDistributor
   }
 
   @Test(timeout = 5000)
-  public void testAdditionalReserveFractionWeightedScaling() {
+  public void testAdditionalReserveFractionWeightedScaling() throws TezException {
     Configuration conf = new Configuration(this.conf);
     conf.setStrings(TezConfiguration.TEZ_TASK_SCALE_MEMORY_WEIGHTED_RATIOS,
         WeightedScalingMemoryDistributor.generateWeightStrings(0, 0, 2, 3, 6, 1, 1));

http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
index 5663e62..db1b1e1 100644
--- a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
+++ b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
@@ -90,6 +90,7 @@ import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.TezReflectionException;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.Vertex;
@@ -887,7 +888,7 @@ public class TestMRRJobsDAGApi {
         LOG.info("Class found");
         FileSystem fs = FileSystem.get(conf);
         fs.mkdirs(new Path("/tmp/relocalizationfilefound"));
-      } catch (TezUncheckedException e) {
+      } catch (TezReflectionException e) {
         LOG.info("Class not found");
       }
 


[2/2] tez git commit: TEZ-2745. ClassNotFoundException of user code should fail dag (zjffdu)

Posted by zj...@apache.org.
TEZ-2745. ClassNotFoundException of user code should fail dag (zjffdu)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/1b30b17d
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/1b30b17d
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/1b30b17d

Branch: refs/heads/master
Commit: 1b30b17dbbd4d1f58539a0b61fae289d09c1b303
Parents: 7d412b2
Author: Jeff Zhang <zj...@apache.org>
Authored: Fri Sep 4 21:18:46 2015 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Fri Sep 4 21:18:46 2015 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   4 +
 .../org/apache/tez/client/FrameworkClient.java  |   8 +-
 .../java/org/apache/tez/client/TezClient.java   |   3 +-
 .../org/apache/tez/common/ReflectionUtils.java  |  43 ++---
 .../tez/dag/api/TezReflectionException.java     |  35 ++++
 .../dag/api/client/TimelineReaderFactory.java   |  19 ++-
 .../apache/tez/common/TestReflectionUtils.java  |   3 +-
 .../org/apache/tez/dag/app/DAGAppMaster.java    |   6 +-
 .../tez/dag/app/TaskCommunicatorManager.java    |   8 +-
 .../app/dag/RootInputInitializerManager.java    |   5 +-
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    |  27 +++-
 .../org/apache/tez/dag/app/dag/impl/Edge.java   |  13 +-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  29 +++-
 .../tez/dag/app/dag/impl/VertexManager.java     |   3 +-
 .../app/launcher/ContainerLauncherManager.java  |  25 ++-
 .../tez/dag/app/rm/TaskSchedulerManager.java    |  25 ++-
 .../dag/app/TestTaskCommunicatorManager.java    |  13 +-
 .../dag/app/TestTaskCommunicatorManager1.java   |   6 +-
 .../dag/app/TestTaskCommunicatorManager2.java   |   4 +-
 .../tez/dag/app/dag/impl/TestDAGImpl.java       | 103 ++++++++++++
 .../apache/tez/dag/app/dag/impl/TestEdge.java   |   3 +-
 .../tez/dag/app/dag/impl/TestVertexImpl.java    | 162 +++++++++++++++++--
 .../launcher/TestContainerLauncherManager.java  |  16 +-
 .../dag/app/rm/TestTaskSchedulerManager.java    |   8 +-
 .../TestHistoryEventsProtoConversion.java       |   7 +-
 .../hadoop/mapred/split/TezGroupedSplit.java    |  13 +-
 .../split/TezGroupedSplitsInputFormat.java      |  14 +-
 .../hadoop/mapreduce/split/TezGroupedSplit.java |  11 +-
 .../split/TezGroupedSplitsInputFormat.java      |  13 +-
 .../logging/ats/ATSHistoryLoggingService.java   |   3 +-
 .../runtime/LogicalIOProcessorRuntimeTask.java  |  12 +-
 .../common/resources/MemoryDistributor.java     |   4 +-
 .../common/resources/TestMemoryDistributor.java |  11 +-
 .../tez/runtime/task/TestTaskExecution2.java    |   7 +-
 .../TestWeightedScalingMemoryDistributor.java   |   5 +-
 .../apache/tez/mapreduce/TestMRRJobsDAGApi.java |   3 +-
 36 files changed, 511 insertions(+), 163 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7d996ff..72b2c97 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.8.1: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2745. ClassNotFoundException of user code should fail dag
   TEZ-2754. Tez UI: StartTime & EndTime is not displayed with right format in Graphical View
   TEZ-2752. logUnsuccessful completion in Attempt should write original finish
   time to ATS
@@ -163,6 +164,7 @@ Release 0.7.1: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2745. ClassNotFoundException of user code should fail dag
   TEZ-2761. Tez UI: update the progress on the dag and vertices pages with info from AM
   TEZ-2731. Fix Tez GenericCounter performance bottleneck
   TEZ-2752. logUnsuccessful completion in Attempt should write original finish
@@ -402,6 +404,7 @@ Release 0.6.3: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2745. ClassNotFoundException of user code should fail dag
   TEZ-2752. logUnsuccessful completion in Attempt should write original finish
   time to ATS
   TEZ-2742. VertexImpl.finished() terminationCause hides member var of the
@@ -618,6 +621,7 @@ INCOMPATIBLE CHANGES
   TEZ-2552. CRC errors can cause job to run for very long time in large jobs.
 
 ALL CHANGES:
+  TEZ-2745. ClassNotFoundException of user code should fail dag
   TEZ-2752. logUnsuccessful completion in Attempt should write original finish
   time to ATS
   TEZ-2742. VertexImpl.finished() terminationCause hides member var of the

http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java b/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java
index e1c7d00..cb20f49 100644
--- a/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java
@@ -31,6 +31,8 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.TezReflectionException;
+import org.apache.tez.dag.api.TezUncheckedException;
 
 @Private
 public abstract class FrameworkClient {
@@ -39,7 +41,11 @@ public abstract class FrameworkClient {
 
     boolean isLocal = tezConf.getBoolean(TezConfiguration.TEZ_LOCAL_MODE, TezConfiguration.TEZ_LOCAL_MODE_DEFAULT);
     if (isLocal) {
-      return ReflectionUtils.createClazzInstance("org.apache.tez.client.LocalClient");
+      try {
+        return ReflectionUtils.createClazzInstance("org.apache.tez.client.LocalClient");
+      } catch (TezReflectionException e) {
+        throw new TezUncheckedException("Fail to create LocalClient", e);
+      }
     }
     return new TezYarnClient(YarnClient.createYarnClient());
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-api/src/main/java/org/apache/tez/client/TezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
index e39cf4f..0c50d86 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -56,6 +56,7 @@ import org.apache.tez.dag.api.SessionNotRunning;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.TezReflectionException;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB;
@@ -356,7 +357,7 @@ public class TezClient {
         historyACLPolicyManager = ReflectionUtils.createClazzInstance(
             atsHistoryACLManagerClassName);
         historyACLPolicyManager.setConf(this.amConfig.getYarnConfiguration());
-      } catch (TezUncheckedException e) {
+      } catch (TezReflectionException e) {
         if (!amConfig.getTezConfiguration().getBoolean(
             TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS,
             TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS_DEFAULT)) {

http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-api/src/main/java/org/apache/tez/common/ReflectionUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/ReflectionUtils.java b/tez-api/src/main/java/org/apache/tez/common/ReflectionUtils.java
index f1eb0ae..4d89ed4 100644
--- a/tez-api/src/main/java/org/apache/tez/common/ReflectionUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/common/ReflectionUtils.java
@@ -28,6 +28,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.tez.dag.api.TezReflectionException;
 import org.apache.tez.dag.api.TezUncheckedException;
 
 @Private
@@ -36,55 +37,44 @@ public class ReflectionUtils {
   private static final Map<String, Class<?>> CLAZZ_CACHE = new ConcurrentHashMap<String, Class<?>>();
 
   @Private
-  public static Class<?> getClazz(String className) {
+  public static Class<?> getClazz(String className) throws TezReflectionException {
     Class<?> clazz = CLAZZ_CACHE.get(className);
     if (clazz == null) {
       try {
         clazz = Class.forName(className, true, Thread.currentThread().getContextClassLoader());
       } catch (ClassNotFoundException e) {
-        throw new TezUncheckedException("Unable to load class: " + className, e);
+        throw new TezReflectionException("Unable to load class: " + className, e);
       }
     }
     return clazz;
   }
 
-  private static <T> T getNewInstance(Class<T> clazz) {
+  private static <T> T getNewInstance(Class<T> clazz) throws TezReflectionException {
     T instance;
     try {
       instance = clazz.newInstance();
-    } catch (InstantiationException e) {
-      throw new TezUncheckedException(
-          "Unable to instantiate class with 0 arguments: " + clazz.getName(), e);
-    } catch (IllegalAccessException e) {
-      throw new TezUncheckedException(
+    } catch (Exception e) {
+      throw new TezReflectionException(
           "Unable to instantiate class with 0 arguments: " + clazz.getName(), e);
     }
     return instance;
   }
 
-  private static <T> T getNewInstance(Class<T> clazz, Class<?>[] parameterTypes, Object[] parameters) {
+  private static <T> T getNewInstance(Class<T> clazz, Class<?>[] parameterTypes, Object[] parameters)
+    throws TezReflectionException {
     T instance;
     try {
       Constructor<T> constructor = clazz.getConstructor(parameterTypes);
       instance = constructor.newInstance(parameters);
-    } catch (InstantiationException e) {
-      throw new TezUncheckedException(
-          "Unable to instantiate class with " + parameters.length + " arguments: " + clazz.getName(), e);
-    } catch (IllegalAccessException e) {
-      throw new TezUncheckedException(
-          "Unable to instantiate class with " + parameters.length + " arguments: " + clazz.getName(), e);
-    } catch (NoSuchMethodException e) {
-      throw new TezUncheckedException(
-          "Unable to instantiate class with " + parameters.length + " arguments: " + clazz.getName(), e);
-    } catch (InvocationTargetException e) {
-      throw new TezUncheckedException(
+    } catch (Exception e) {
+      throw new TezReflectionException(
           "Unable to instantiate class with " + parameters.length + " arguments: " + clazz.getName(), e);
     }
     return instance;
   }
 
   @Private
-  public static <T> T createClazzInstance(String className) {
+  public static <T> T createClazzInstance(String className) throws TezReflectionException {
     Class<?> clazz = getClazz(className);
     @SuppressWarnings("unchecked")
     T instance = (T) getNewInstance(clazz);
@@ -92,7 +82,8 @@ public class ReflectionUtils {
   }
 
   @Private
-  public static <T> T createClazzInstance(String className, Class<?>[] parameterTypes, Object[] parameters) {
+  public static <T> T createClazzInstance(String className, Class<?>[] parameterTypes, Object[] parameters)
+    throws TezReflectionException {
     Class<?> clazz = getClazz(className);
     @SuppressWarnings("unchecked")
     T instance = (T) getNewInstance(clazz, parameterTypes, parameters);
@@ -101,20 +92,20 @@ public class ReflectionUtils {
 
   @Private
   @SuppressWarnings("unchecked")
-  public static <T> T invokeMethod(Object target, Method method, Object... args) {
+  public static <T> T invokeMethod(Object target, Method method, Object... args) throws TezReflectionException {
     try {
       return (T) method.invoke(target, args);
     } catch (Exception e) {
-      throw new TezUncheckedException(e);
+      throw new TezReflectionException(e);
     }
   }
 
   @Private
-  public static Method getMethod(Class<?> targetClazz, String methodName, Class<?>... parameterTypes) {
+  public static Method getMethod(Class<?> targetClazz, String methodName, Class<?>... parameterTypes) throws TezReflectionException {
     try {
       return targetClazz.getMethod(methodName, parameterTypes);
     } catch (NoSuchMethodException e) {
-      throw new TezUncheckedException(e);
+      throw new TezReflectionException(e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-api/src/main/java/org/apache/tez/dag/api/TezReflectionException.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezReflectionException.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezReflectionException.java
new file mode 100644
index 0000000..4d8d1e0
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezReflectionException.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.dag.api;
+
+public class TezReflectionException extends TezException {
+
+  private static final long serialVersionUID = 7744789121243630729L;
+
+  public TezReflectionException(String message) {
+    super(message);
+  }
+
+  public TezReflectionException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public TezReflectionException(Throwable cause) {
+    super(cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-api/src/main/java/org/apache/tez/dag/api/client/TimelineReaderFactory.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/TimelineReaderFactory.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/TimelineReaderFactory.java
index f544198..c0569dd 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/TimelineReaderFactory.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/TimelineReaderFactory.java
@@ -46,7 +46,6 @@ import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
 import org.apache.hadoop.security.ssl.SSLFactory;
 import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.TezUncheckedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -167,7 +166,7 @@ public class TimelineReaderFactory {
       try {
         authenticator = getTokenAuthenticator();
         authenticator.setConnectionConfigurator(connectionConfigurator);
-      } catch (TezUncheckedException e) {
+      } catch (TezException e) {
         throw new IOException("Failed to get authenticator", e);
       }
 
@@ -179,13 +178,17 @@ public class TimelineReaderFactory {
         doAsUser = null;
       }
 
-      HttpURLConnectionFactory connectionFactory =
-          new TokenAuthenticatedURLConnectionFactory(connectionConfigurator, authenticator,
-              authUgi, doAsUser);
+      HttpURLConnectionFactory connectionFactory;
+      try {
+        connectionFactory = new TokenAuthenticatedURLConnectionFactory(connectionConfigurator, authenticator,
+            authUgi, doAsUser);
+      } catch (TezException e) {
+        throw new IOException("Fail to create TokenAuthenticatedURLConnectionFactory", e);
+      }
       return new Client(new URLConnectionClientHandler(connectionFactory), clientConfig);
     }
 
-    private static Authenticator getTokenAuthenticator() {
+    private static Authenticator getTokenAuthenticator() throws TezException {
       String authenticatorClazzName;
 
       if (UserGroupInformation.isSecurityEnabled()) {
@@ -208,7 +211,7 @@ public class TimelineReaderFactory {
       public TokenAuthenticatedURLConnectionFactory(ConnectionConfigurator connConfigurator,
                                                     Authenticator authenticator,
                                                     UserGroupInformation authUgi,
-                                                    String doAsUser) {
+                                                    String doAsUser) throws TezException {
         this.connConfigurator = connConfigurator;
         this.authenticator = authenticator;
         this.authUgi = authUgi;
@@ -377,7 +380,7 @@ public class TimelineReaderFactory {
 
         isTokenDelegationClassesPresent = true;
 
-      } catch (TezUncheckedException e) {
+      } catch (TezException e) {
         LOG.info("Could not find class required for token delegation, will fallback to pseudo auth");
       }
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-api/src/test/java/org/apache/tez/common/TestReflectionUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/common/TestReflectionUtils.java b/tez-api/src/test/java/org/apache/tez/common/TestReflectionUtils.java
index 253e3a7..2fbd35c 100644
--- a/tez-api/src/test/java/org/apache/tez/common/TestReflectionUtils.java
+++ b/tez-api/src/test/java/org/apache/tez/common/TestReflectionUtils.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.TezReflectionException;
 import org.junit.Test;
 
 public class TestReflectionUtils {
@@ -45,7 +46,7 @@ public class TestReflectionUtils {
   }
 
   @Test(timeout = 5000)
-  public void testConstructorWithParameters()
+  public void testConstructorWithParameters() throws TezReflectionException
   {
     Class<?>[] parameterTypes = new Class[] { String.class, Integer.TYPE };
     Object[] parameters = new Object[] { new String("test"), 1 };

http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 04c7b82..fee13c1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -59,6 +59,7 @@ import java.util.regex.Pattern;
 import com.google.common.collect.BiMap;
 import com.google.common.collect.HashBiMap;
 import com.google.common.collect.Lists;
+
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.Options;
@@ -1053,7 +1054,8 @@ public class DAGAppMaster extends AbstractService {
   protected TaskCommunicatorManagerInterface createTaskCommunicatorManager(AppContext context,
                                                                            TaskHeartbeatHandler thh,
                                                                            ContainerHeartbeatHandler chh,
-                                                                           List<NamedEntityDescriptor> entityDescriptors) {
+                                                                           List<NamedEntityDescriptor> entityDescriptors)
+                                                                               throws TezException {
     TaskCommunicatorManagerInterface tcm =
         new TaskCommunicatorManager(context, thh, chh, entityDescriptors);
     return tcm;
@@ -1079,7 +1081,7 @@ public class DAGAppMaster extends AbstractService {
   protected ContainerLauncherManager createContainerLauncherManager(
       List<NamedEntityDescriptor> containerLauncherDescriptors,
       boolean isLocal) throws
-      UnknownHostException {
+      UnknownHostException, TezException {
     return new ContainerLauncherManager(context, taskCommunicatorManager, workingDirectory,
         containerLauncherDescriptors, isLocal);
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
index 42df259..cfb34ac 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentMap;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+
 import org.apache.commons.collections4.ListUtils;
 import org.apache.tez.dag.api.NamedEntityDescriptor;
 import org.apache.tez.dag.api.TezConstants;
@@ -103,7 +104,7 @@ public class TaskCommunicatorManager extends AbstractService implements
 
   public TaskCommunicatorManager(AppContext context,
                                  TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh,
-                                 List<NamedEntityDescriptor> taskCommunicatorDescriptors) {
+                                 List<NamedEntityDescriptor> taskCommunicatorDescriptors) throws TezException {
     super(TaskCommunicatorManager.class.getName());
     this.context = context;
     this.taskHeartbeatHandler = thh;
@@ -141,7 +142,7 @@ public class TaskCommunicatorManager extends AbstractService implements
 
   @VisibleForTesting
   TaskCommunicator createTaskCommunicator(NamedEntityDescriptor taskCommDescriptor,
-                                          int taskCommIndex) {
+                                          int taskCommIndex) throws TezException {
     if (taskCommDescriptor.getEntityName().equals(TezConstants.getTezYarnServicePluginName())) {
       return createDefaultTaskCommunicator(taskCommunicatorContexts[taskCommIndex]);
     } else if (taskCommDescriptor.getEntityName()
@@ -167,7 +168,8 @@ public class TaskCommunicatorManager extends AbstractService implements
 
   @VisibleForTesting
   TaskCommunicator createCustomTaskCommunicator(TaskCommunicatorContext taskCommunicatorContext,
-                                                NamedEntityDescriptor taskCommDescriptor) {
+                                                NamedEntityDescriptor taskCommDescriptor)
+                                                    throws TezException {
     LOG.info("Using TaskCommunicator {}:{} " + taskCommDescriptor.getEntityName(),
         taskCommDescriptor.getClassName());
     Class<? extends TaskCommunicator> taskCommClazz =

http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
index 4ee00fa..4a8a286 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
@@ -48,6 +48,7 @@ import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.InputInitializerDescriptor;
 import org.apache.tez.dag.api.RootInputLeafOutput;
+import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.event.*;
 import org.apache.tez.dag.api.event.VertexState;
@@ -106,7 +107,7 @@ public class RootInputInitializerManager {
   }
   
   public void runInputInitializers(List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> 
-      inputs) {
+      inputs) throws TezException {
     for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input : inputs) {
 
       InputInitializerContext context =
@@ -133,7 +134,7 @@ public class RootInputInitializerManager {
 
   @VisibleForTesting
   protected InputInitializer createInitializer(RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>
-      input, InputInitializerContext context) {
+      input, InputInitializerContext context) throws TezException {
     InputInitializer initializer = ReflectionUtils
         .createClazzInstance(input.getControllerDescriptor().getClassName(),
             new Class[]{InputInitializerContext.class}, new Object[]{context});

http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 756ed28..da9c416 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -1467,7 +1467,16 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       }
     }
 
-    createDAGEdges(this);
+    try {
+      createDAGEdges(this);
+    } catch (TezException e2) {
+      String msg = "Fail to create edges, " + ExceptionUtils.getStackTrace(e2);
+      addDiagnostic(msg);
+      LOG.error(msg);
+      trySetTerminationCause(DAGTerminationCause.INIT_FAILURE);
+      finished(DAGState.FAILED);
+      return DAGState.FAILED;
+    }
     Map<String,EdgePlan> edgePlans = DagTypeConverters.createEdgePlanMapFromDAGPlan(getJobPlan().getEdgeList());
 
     // setup the dag
@@ -1489,7 +1498,17 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       }
     }
 
-    assignDAGScheduler(this);
+    try {
+      assignDAGScheduler(this);
+    } catch (TezException e1) {
+      String msg = "Fail to assign DAGScheduler for dag:" + dagName + " due to "
+          + ExceptionUtils.getStackTrace(e1);
+      LOG.error(msg);
+      addDiagnostic(msg);
+      trySetTerminationCause(DAGTerminationCause.INIT_FAILURE);
+      finished(DAGState.FAILED);
+      return DAGState.FAILED;
+    }
 
     for (Map.Entry<String, VertexGroupInfo> entry : vertexGroups.entrySet()) {
       String groupName = entry.getKey();
@@ -1510,7 +1529,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     return DAGState.INITED;
   }
 
-  private void createDAGEdges(DAGImpl dag) {
+  private void createDAGEdges(DAGImpl dag) throws TezException {
     for (EdgePlan edgePlan : dag.getJobPlan().getEdgeList()) {
       EdgeProperty edgeProperty = DagTypeConverters
           .createEdgePropertyMapFromDAGPlan(edgePlan);
@@ -1521,7 +1540,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     }
   }
 
-  private static void assignDAGScheduler(DAGImpl dag) {
+  private static void assignDAGScheduler(DAGImpl dag) throws TezException {
     String dagSchedulerClassName = dag.dagConf.get(TezConfiguration.TEZ_AM_DAG_SCHEDULER_CLASS,
         TezConfiguration.TEZ_AM_DAG_SCHEDULER_CLASS_DEFAULT);
     LOG.info("Using DAG Scheduler: " + dagSchedulerClassName);

http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
index da74a46..0be7790 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
@@ -35,6 +35,7 @@ import org.apache.tez.dag.api.EdgeManagerPluginContext;
 import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
 import org.apache.tez.dag.api.EdgeManagerPluginOnDemand;
 import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.EventRouteMetadata;
@@ -119,14 +120,14 @@ public class Edge {
       .newConcurrentMap();
 
   @SuppressWarnings("rawtypes")
-  public Edge(EdgeProperty edgeProperty, EventHandler eventHandler, Configuration conf) {
+  public Edge(EdgeProperty edgeProperty, EventHandler eventHandler, Configuration conf) throws TezException {
     this.edgeProperty = edgeProperty;
     this.eventHandler = eventHandler;
     this.conf = conf;
     createEdgeManager();
   }
 
-  private void createEdgeManager() {
+  private void createEdgeManager() throws TezException {
     switch (edgeProperty.getDataMovementType()) {
       case ONE_TO_ONE:
         edgeManagerContext = new EdgeManagerPluginContextImpl(null);
@@ -160,7 +161,7 @@ public class Edge {
       default:
         String message = "Unknown edge data movement type: "
             + edgeProperty.getDataMovementType();
-        throw new TezUncheckedException(message);
+        throw new TezException(message);
     }
   }
 
@@ -182,7 +183,11 @@ public class Edge {
   public synchronized void setEdgeProperty(EdgeProperty newEdgeProperty) throws AMUserCodeException {
     this.edgeProperty = newEdgeProperty;
     boolean wasUnInitialized = (edgeManager == null);
-    createEdgeManager();
+    try {
+      createEdgeManager();
+    } catch (TezException e) {
+      throw new AMUserCodeException(Source.EdgeManager, e);
+    }
     initialize();
     if (wasUnInitialized) {
       sendEvent(new VertexEventNullEdgeInitialized(sourceVertex.getVertexId(), this,

http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index f4dd7dc..3dae42b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -75,6 +75,7 @@ import org.apache.tez.dag.api.RootInputLeafOutput;
 import org.apache.tez.dag.api.Scope;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.TaskLocationHint;
@@ -2469,7 +2470,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
       }
     }
 
-    assignVertexManager();
+    try {
+      assignVertexManager();
+    } catch (TezException e1) {
+      String msg = "Fail to create VertexManager, " + ExceptionUtils.getStackTrace(e1);
+      LOG.error(msg);
+      return finished(VertexState.FAILED, VertexTerminationCause.INIT_FAILURE, msg);
+    }
 
     try {
       vertexManager.initialize();
@@ -2512,7 +2519,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
     return VertexState.INITED;
   }
 
-  private void assignVertexManager() {
+  private void assignVertexManager() throws TezException {
     boolean hasBipartite = false;
     boolean hasOneToOne = false;
     boolean hasCustom = false;
@@ -3359,7 +3366,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
 
         if (vertex.inputsWithInitializers != null) {
           LOG.info("Vertex will initialize from input initializer. " + vertex.logIdentifier);
-          vertex.setupInputInitializerManager();
+          try {
+            vertex.setupInputInitializerManager();
+          } catch (TezException e) {
+            String msg = "Fail to create InputInitializerManager, " + ExceptionUtils.getStackTrace(e);
+            LOG.info(msg);
+            return vertex.finished(VertexState.FAILED, VertexTerminationCause.INIT_FAILURE, msg);
+          }
           return VertexState.INITIALIZING;
         } else {
           boolean hasOneToOneUninitedSource = false;
@@ -3390,7 +3403,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
         // this block may return VertexState.INITIALIZING
         if (vertex.inputsWithInitializers != null) {
           LOG.info("Vertex will initialize from input initializer. " + vertex.logIdentifier);
-          vertex.setupInputInitializerManager();
+          try {
+            vertex.setupInputInitializerManager();
+          } catch (TezException e) {
+            String msg = "Fail to create InputInitializerManager, " + ExceptionUtils.getStackTrace(e);
+            LOG.error(msg);
+            return vertex.finished(VertexState.FAILED, VertexTerminationCause.INIT_FAILURE, msg);
+          }
           return VertexState.INITIALIZING;
         }
         if (!vertex.uninitializedEdges.isEmpty()) {
@@ -4560,7 +4579,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
     }
   }
 
-  private void setupInputInitializerManager() {
+  private void setupInputInitializerManager() throws TezException {
     rootInputInitializerManager = createRootInputInitializerManager(
         getDAG().getName(), getName(), getVertexId(),
         eventHandler, getTotalTasks(),

http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
index bb512a9..32f7a42 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
@@ -43,6 +43,7 @@ import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.InputInitializerDescriptor;
 import org.apache.tez.dag.api.RootInputLeafOutput;
+import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.VertexLocationHint;
@@ -368,7 +369,7 @@ public class VertexManager {
   }
 
   public VertexManager(VertexManagerPluginDescriptor pluginDesc, UserGroupInformation dagUgi,
-      Vertex managedVertex, AppContext appContext, StateChangeNotifier stateChangeNotifier) {
+      Vertex managedVertex, AppContext appContext, StateChangeNotifier stateChangeNotifier) throws TezException {
     checkNotNull(pluginDesc, "pluginDesc is null");
     checkNotNull(managedVertex, "managedVertex is null");
     checkNotNull(appContext, "appContext is null");

http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java
index 15a10bd..9e56f44 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java
@@ -14,19 +14,19 @@
 
 package org.apache.tez.dag.app.launcher;
 
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
 import java.net.UnknownHostException;
 import java.util.List;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.dag.api.NamedEntityDescriptor;
 import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
 import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
@@ -70,7 +70,7 @@ public class ContainerLauncherManager extends AbstractService
                                   TaskCommunicatorManagerInterface taskCommunicatorManagerInterface,
                                   String workingDirectory,
                                   List<NamedEntityDescriptor> containerLauncherDescriptors,
-                                  boolean isPureLocalMode) {
+                                  boolean isPureLocalMode) throws TezException {
     super(ContainerLauncherManager.class.getName());
 
     this.appContext = context;
@@ -101,7 +101,7 @@ public class ContainerLauncherManager extends AbstractService
       TaskCommunicatorManagerInterface taskCommunicatorManagerInterface,
       String workingDirectory,
       int containerLauncherIndex,
-      boolean isPureLocalMode) {
+      boolean isPureLocalMode) throws TezException {
     if (containerLauncherDescriptor.getEntityName().equals(
         TezConstants.getTezYarnServicePluginName())) {
       return createYarnContainerLauncher(containerLauncherContext);
@@ -144,20 +144,13 @@ public class ContainerLauncherManager extends AbstractService
   @VisibleForTesting
   @SuppressWarnings("unchecked")
   ContainerLauncher createCustomContainerLauncher(ContainerLauncherContext containerLauncherContext,
-                                                  NamedEntityDescriptor containerLauncherDescriptor) {
+                                                  NamedEntityDescriptor containerLauncherDescriptor)
+                                                      throws TezException {
     LOG.info("Creating container launcher {}:{} ", containerLauncherDescriptor.getEntityName(),
         containerLauncherDescriptor.getClassName());
-    Class<? extends ContainerLauncher> containerLauncherClazz =
-        (Class<? extends ContainerLauncher>) ReflectionUtils.getClazz(
-            containerLauncherDescriptor.getClassName());
-    try {
-      Constructor<? extends ContainerLauncher> ctor = containerLauncherClazz
-          .getConstructor(ContainerLauncherContext.class);
-      return ctor.newInstance(containerLauncherContext);
-    } catch (NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) {
-      throw new TezUncheckedException(e);
-    }
-
+    return ReflectionUtils.createClazzInstance(containerLauncherDescriptor.getClassName(),
+        new Class[]{ContainerLauncherContext.class},
+        new Object[]{containerLauncherContext});
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
index 29143a2..04d7089 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
@@ -18,8 +18,6 @@
 
 package org.apache.tez.dag.app.rm;
 
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.List;
@@ -33,6 +31,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 import org.apache.tez.dag.api.NamedEntityDescriptor;
 import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
@@ -56,6 +55,7 @@ import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.TaskLocationHint;
 import org.apache.tez.dag.api.TaskLocationHint.TaskBasedLocationAffinity;
@@ -392,7 +392,7 @@ public class TaskSchedulerManager extends AbstractService implements
                                                    AppContext appContext,
                                                    NamedEntityDescriptor taskSchedulerDescriptor,
                                                    long customAppIdIdentifier,
-                                                   int schedulerId) {
+                                                   int schedulerId) throws TezException {
     TaskSchedulerContext rawContext =
         new TaskSchedulerContextImpl(this, appContext, schedulerId, trackingUrl,
             customAppIdIdentifier, host, port, taskSchedulerDescriptor.getUserPayload());
@@ -429,24 +429,17 @@ public class TaskSchedulerManager extends AbstractService implements
   @SuppressWarnings("unchecked")
   TaskScheduler createCustomTaskScheduler(TaskSchedulerContext taskSchedulerContext,
                                           NamedEntityDescriptor taskSchedulerDescriptor,
-                                          int schedulerId) {
+                                          int schedulerId) throws TezException {
     LOG.info("Creating custom TaskScheduler {}:{}", taskSchedulerDescriptor.getEntityName(),
         taskSchedulerDescriptor.getClassName());
-    Class<? extends TaskScheduler> taskSchedulerClazz =
-        (Class<? extends TaskScheduler>) ReflectionUtils
-            .getClazz(taskSchedulerDescriptor.getClassName());
-    try {
-      Constructor<? extends TaskScheduler> ctor = taskSchedulerClazz
-          .getConstructor(TaskSchedulerContext.class);
-      return ctor.newInstance(taskSchedulerContext);
-    } catch (NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) {
-      throw new TezUncheckedException(e);
-    }
+    return ReflectionUtils.createClazzInstance(taskSchedulerDescriptor.getClassName(),
+        new Class[]{TaskSchedulerContext.class},
+        new Object[]{taskSchedulerContext});
   }
 
   @VisibleForTesting
   protected void instantiateSchedulers(String host, int port, String trackingUrl,
-                                       AppContext appContext) {
+                                       AppContext appContext) throws TezException {
     // Iterate over the list and create all the taskSchedulers
     int j = 0;
     for (int i = 0; i < taskSchedulerDescriptors.length; i++) {
@@ -467,7 +460,7 @@ public class TaskSchedulerManager extends AbstractService implements
 
   
   @Override
-  public synchronized void serviceStart() {
+  public synchronized void serviceStart() throws Exception {
     InetSocketAddress serviceAddr = clientService.getBindAddress();
     dagAppMaster = appContext.getAppMaster();
     // if web service is enabled then set tracking url. else disable it (value = "").

http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java
index be7adde..1cd8bb1 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java
@@ -47,6 +47,7 @@ import org.apache.tez.dag.api.NamedEntityDescriptor;
 import org.apache.tez.dag.api.TaskCommunicator;
 import org.apache.tez.dag.api.TaskCommunicatorContext;
 import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.event.VertexStateUpdate;
 import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -66,7 +67,7 @@ public class TestTaskCommunicatorManager {
   }
 
   @Test(timeout = 5000)
-  public void testNoTaskCommSpecified() throws IOException {
+  public void testNoTaskCommSpecified() throws IOException, TezException {
 
     AppContext appContext = mock(AppContext.class);
     TaskHeartbeatHandler thh = mock(TaskHeartbeatHandler.class);
@@ -83,7 +84,7 @@ public class TestTaskCommunicatorManager {
   }
 
   @Test(timeout = 5000)
-  public void testCustomTaskCommSpecified() throws IOException {
+  public void testCustomTaskCommSpecified() throws IOException, TezException {
 
     AppContext appContext = mock(AppContext.class);
     TaskHeartbeatHandler thh = mock(TaskHeartbeatHandler.class);
@@ -118,7 +119,7 @@ public class TestTaskCommunicatorManager {
   }
 
   @Test(timeout = 5000)
-  public void testMultipleTaskComms() throws IOException {
+  public void testMultipleTaskComms() throws IOException, TezException {
 
     AppContext appContext = mock(AppContext.class);
     TaskHeartbeatHandler thh = mock(TaskHeartbeatHandler.class);
@@ -250,13 +251,13 @@ public class TestTaskCommunicatorManager {
     public TaskCommManagerForMultipleCommTest(AppContext context,
                                               TaskHeartbeatHandler thh,
                                               ContainerHeartbeatHandler chh,
-                                              List<NamedEntityDescriptor> taskCommunicatorDescriptors) {
+                                              List<NamedEntityDescriptor> taskCommunicatorDescriptors) throws TezException {
       super(context, thh, chh, taskCommunicatorDescriptors);
     }
 
     @Override
     TaskCommunicator createTaskCommunicator(NamedEntityDescriptor taskCommDescriptor,
-                                            int taskCommIndex) {
+                                            int taskCommIndex) throws TezException {
       numTaskComms.incrementAndGet();
       boolean added = taskCommIndices.add(taskCommIndex);
       assertTrue("Cannot add multiple taskComms with the same index", added);
@@ -283,7 +284,7 @@ public class TestTaskCommunicatorManager {
 
     @Override
     TaskCommunicator createCustomTaskCommunicator(TaskCommunicatorContext taskCommunicatorContext,
-                                                  NamedEntityDescriptor taskCommDescriptor) {
+                                                  NamedEntityDescriptor taskCommDescriptor) throws TezException {
       taskCommContexts.add(taskCommunicatorContext);
       TaskCommunicator spyComm =
           spy(super.createCustomTaskCommunicator(taskCommunicatorContext, taskCommDescriptor));

http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java
index e8ce429..117d3b3 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java
@@ -107,7 +107,7 @@ public class TestTaskCommunicatorManager1 {
   TezTaskAttemptID taskAttemptID;
 
   @Before
-  public void setUp() {
+  public void setUp() throws TezException {
     appId = ApplicationId.newInstance(1000, 1);
     appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
     dag = mock(DAG.class);
@@ -304,7 +304,7 @@ public class TestTaskCommunicatorManager1 {
 
   // TODO TEZ-2003 Move this into TestTezTaskCommunicator. Potentially other tests as well.
   @Test (timeout= 5000)
-  public void testPortRange_NotSpecified() throws IOException {
+  public void testPortRange_NotSpecified() throws IOException, TezException {
     Configuration conf = new Configuration();
     JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(
         "fakeIdentifier"));
@@ -396,7 +396,7 @@ public class TestTaskCommunicatorManager1 {
     public TaskCommunicatorManagerInterfaceImplForTest(AppContext context,
                                                        TaskHeartbeatHandler thh,
                                                        ContainerHeartbeatHandler chh,
-                                                       List<NamedEntityDescriptor> taskCommDescriptors) {
+                                                       List<NamedEntityDescriptor> taskCommDescriptors) throws TezException {
       super(context, thh, chh, taskCommDescriptors);
     }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager2.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager2.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager2.java
index d75b0e5..a7652a0 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager2.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager2.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import com.google.common.collect.Lists;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@@ -41,6 +42,7 @@ import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.NamedEntityDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
 import org.apache.tez.dag.app.dag.DAG;
@@ -58,7 +60,7 @@ import org.mockito.ArgumentCaptor;
 public class TestTaskCommunicatorManager2 {
 
   @Test(timeout = 5000)
-  public void testTaskAttemptFailedKilled() throws IOException {
+  public void testTaskAttemptFailedKilled() throws IOException, TezException {
     ApplicationId appId = ApplicationId.newInstance(1000, 1);
     ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
     Credentials credentials = new Credentials();

http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index ac4f61b..676ae33 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -744,6 +744,76 @@ public class TestDAGImpl {
     return dag;
   }
 
+  // v1 -> v2
+  private DAGPlan createDAGWithNonExistEdgeManager() {
+    LOG.info("Setting up dag plan with non-exist edgemanager");
+    DAGPlan dag = DAGPlan.newBuilder()
+        .setName("testverteximpl")
+        .addVertex(
+            VertexPlan.newBuilder()
+                .setName("vertex1")
+                .setType(PlanVertexType.NORMAL)
+                .addTaskLocationHint(
+                    PlanTaskLocationHint.newBuilder()
+                        .addHost("host1")
+                        .addRack("rack1")
+                        .build()
+                )
+                .setTaskConfig(
+                    PlanTaskConfiguration.newBuilder()
+                        .setNumTasks(1)
+                        .setVirtualCores(4)
+                        .setMemoryMb(1024)
+                        .setJavaOpts("")
+                        .setTaskModule("x1.y1")
+                        .build()
+                )
+                .addOutEdgeId("e1")
+                .build()
+        )
+          .addVertex(
+              VertexPlan.newBuilder()
+                  .setName("vertex2")
+                  .setType(PlanVertexType.NORMAL)
+                  .setProcessorDescriptor(
+                      TezEntityDescriptorProto.newBuilder().setClassName("x2.y2"))
+                  .addTaskLocationHint(
+                      PlanTaskLocationHint.newBuilder()
+                          .addHost("host2")
+                          .addRack("rack2")
+                          .build()
+                  )
+                  .setTaskConfig(
+                      PlanTaskConfiguration.newBuilder()
+                          .setNumTasks(2)
+                          .setVirtualCores(4)
+                          .setMemoryMb(1024)
+                          .setJavaOpts("foo")
+                          .setTaskModule("x2.y2")
+                          .build()
+                  )
+                  .addInEdgeId("e1")
+                  .build()
+          )
+         .addEdge(
+             EdgePlan.newBuilder()
+                 .setEdgeManager(TezEntityDescriptorProto.newBuilder()
+                         .setClassName("non-exist-edge-manager")
+                 )
+                 .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("v1_v2"))
+                 .setInputVertexName("vertex1")
+                 .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o1"))
+                 .setOutputVertexName("vertex2")
+                 .setDataMovementType(PlanEdgeDataMovementType.CUSTOM)
+                 .setId("e1")
+                 .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+                 .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+                 .build()
+         )
+          .build();
+    return dag;
+  }
+
   @BeforeClass
   public static void beforeClass() {
     MockDNSToSwitchMapping.initializeMockRackResolver();
@@ -969,6 +1039,39 @@ public class TestDAGImpl {
     }
   }
 
+  @Test(timeout = 5000)
+  public void testNonExistEdgeManagerPlugin() {
+    dagPlan = createDAGWithNonExistEdgeManager();
+    dag = new DAGImpl(dagId, conf, dagPlan,
+        dispatcher.getEventHandler(),  taskCommunicatorManagerInterface,
+        fsTokens, clock, "user", thh, appContext);
+    dag.entityUpdateTracker = new StateChangeNotifierForTest(dag);
+    doReturn(dag).when(appContext).getCurrentDAG();
+
+    dag.handle(new DAGEvent(dagId, DAGEventType.DAG_INIT));
+    Assert.assertEquals(DAGState.FAILED, dag.getState());
+    Assert.assertEquals(DAGTerminationCause.INIT_FAILURE, dag.getTerminationCause());
+    Assert.assertTrue(StringUtils.join(dag.getDiagnostics(), "")
+        .contains("java.lang.ClassNotFoundException: non-exist-edge-manager"));
+  }
+
+  @Test (timeout = 5000)
+  public void testNonExistDAGScheduler() {
+    conf.set(TezConfiguration.TEZ_AM_DAG_SCHEDULER_CLASS, "non-exist-dag-scheduler");
+    dag = new DAGImpl(dagId, conf, dagPlan,
+        dispatcher.getEventHandler(),  taskCommunicatorManagerInterface,
+        fsTokens, clock, "user", thh, appContext);
+    dag.entityUpdateTracker = new StateChangeNotifierForTest(dag);
+    doReturn(dag).when(appContext).getCurrentDAG();
+
+    dag.handle(new DAGEvent(dag.getID(), DAGEventType.DAG_INIT));
+    Assert.assertEquals(DAGState.FAILED, dag.getState());
+    Assert.assertEquals(DAGState.FAILED, dag.getState());
+    Assert.assertEquals(DAGTerminationCause.INIT_FAILURE, dag.getTerminationCause());
+    Assert.assertTrue(StringUtils.join(dag.getDiagnostics(), "")
+        .contains("java.lang.ClassNotFoundException: non-exist-dag-scheduler"));
+  }
+
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testVertexCompletion() {

http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java
index eb03d1e..f53e505 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java
@@ -54,6 +54,7 @@ import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.Vertex;
@@ -158,7 +159,7 @@ public class TestEdge {
 
   @SuppressWarnings({ "rawtypes" })
   @Test (timeout = 5000)
-  public void testCompositeEventHandling() throws AMUserCodeException {
+  public void testCompositeEventHandling() throws TezException {
     EventHandler eventHandler = mock(EventHandler.class);
     EdgeProperty edgeProp = EdgeProperty.create(DataMovementType.SCATTER_GATHER,
         DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, mock(OutputDescriptor.class),

http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index eb68a6f..a54c56a 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -87,6 +87,7 @@ import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.RootInputLeafOutput;
 import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.VertexLocationHint;
@@ -575,6 +576,100 @@ public class TestVertexImpl {
     return dag;
   }
 
+  private DAGPlan createDAGPlanWithNonExistInputInitializer() {
+    LOG.info("Setting up dag plan with non exist inputinitializer");
+    DAGPlan dag = DAGPlan.newBuilder()
+        .setName("initializerWith0Tasks")
+        .addVertex(
+            VertexPlan.newBuilder()
+                .setName("vertex1")
+                .setType(PlanVertexType.NORMAL)
+                .addInputs(
+                    RootInputLeafOutputProto.newBuilder()
+                        .setControllerDescriptor(
+                            TezEntityDescriptorProto.newBuilder().setClassName(
+                                "non-exist-input-initializer"))
+                        .setName("input1")
+                        .setIODescriptor(
+                            TezEntityDescriptorProto.newBuilder()
+                                .setClassName("InputClazz")
+                                .build()
+                        )
+                        .build()
+                )
+                .setTaskConfig(
+                    PlanTaskConfiguration.newBuilder()
+                        .setNumTasks(1)
+                        .setVirtualCores(4)
+                        .setMemoryMb(1024)
+                        .setJavaOpts("")
+                        .setTaskModule("x1.y1")
+                        .build()
+                )
+                .build()
+        ).build();
+    return dag;
+  }
+
+  private DAGPlan createDAGPlanWithNonExistOutputCommitter() {
+    LOG.info("Setting up dag plan with non exist output committer");
+    DAGPlan dag = DAGPlan.newBuilder()
+        .setName("initializerWith0Tasks")
+        .addVertex(
+            VertexPlan.newBuilder()
+                .setName("vertex1")
+                .setType(PlanVertexType.NORMAL)
+                .addOutputs(
+                    RootInputLeafOutputProto.newBuilder()
+                        .setControllerDescriptor(
+                            TezEntityDescriptorProto.newBuilder().setClassName(
+                                "non-exist-output-committer"))
+                        .setName("output1")
+                        .setIODescriptor(
+                            TezEntityDescriptorProto.newBuilder()
+                                .setClassName("OutputClazz")
+                                .build()
+                        )
+                        .build()
+                )
+                .setTaskConfig(
+                    PlanTaskConfiguration.newBuilder()
+                        .setNumTasks(1)
+                        .setVirtualCores(4)
+                        .setMemoryMb(1024)
+                        .setJavaOpts("")
+                        .setTaskModule("x1.y1")
+                        .build()
+                )
+                .build()
+        ).build();
+    return dag;
+  }
+
+  private DAGPlan createDAGPlanWithNonExistVertexManager() {
+    LOG.info("Setting up dag plan with non-exist VertexManager");
+    DAGPlan dag = DAGPlan.newBuilder()
+        .setName("initializerWith0Tasks")
+        .addVertex(
+            VertexPlan.newBuilder()
+                .setName("vertex1")
+                .setType(PlanVertexType.NORMAL)
+                .setTaskConfig(
+                    PlanTaskConfiguration.newBuilder()
+                        .setNumTasks(1)
+                        .setVirtualCores(4)
+                        .setMemoryMb(1024)
+                        .setJavaOpts("")
+                        .setTaskModule("x1.y1")
+                        .build()
+                )
+                .setVertexManagerPlugin(TezEntityDescriptorProto.newBuilder()
+                    .setClassName("non-exist-vertexmanager"))
+                .build()
+        ).build();
+     return dag;
+  }
+
   private DAGPlan createDAGPlanWithMixedEdges() {
     LOG.info("Setting up mixed edge dag plan");
     org.apache.tez.dag.api.DAG dag = org.apache.tez.dag.api.DAG.create("MixedEdges");
@@ -2151,7 +2246,7 @@ public class TestVertexImpl {
   }
 
   @SuppressWarnings({ "unchecked", "rawtypes" })
-  public void setupPostDagCreation() throws AMUserCodeException {
+  public void setupPostDagCreation() throws TezException {
     String dagName = "dag0";
     // dispatcher may be created multiple times (setupPostDagCreation may be called multiples)
     if (dispatcher != null) {
@@ -2266,7 +2361,7 @@ public class TestVertexImpl {
   }
 
   @Before
-  public void setup() throws AMUserCodeException {
+  public void setup() throws TezException {
     useCustomInitializer = false;
     customInitializer = null;
     setupPreDagCreation();
@@ -2385,6 +2480,45 @@ public class TestVertexImpl {
         .getOutputDescriptor().getClassName()));
   }
 
+  @Test(timeout=5000)
+  public void testNonExistVertexManager() throws TezException {
+    setupPreDagCreation();
+    dagPlan = createDAGPlanWithNonExistVertexManager();
+    setupPostDagCreation();
+    VertexImpl v1 = vertices.get("vertex1");
+    v1.handle(new VertexEvent(v1.getVertexId(), VertexEventType.V_INIT));
+    Assert.assertEquals(VertexState.FAILED, v1.getState());
+    Assert.assertEquals(VertexTerminationCause.INIT_FAILURE, v1.getTerminationCause());
+    Assert.assertTrue(StringUtils.join(v1.getDiagnostics(),"")
+        .contains("java.lang.ClassNotFoundException: non-exist-vertexmanager"));
+  }
+
+  @Test(timeout=5000)
+  public void testNonExistInputInitializer() throws TezException {
+    setupPreDagCreation();
+    dagPlan = createDAGPlanWithNonExistInputInitializer();
+    setupPostDagCreation();
+    VertexImpl v1 = vertices.get("vertex1");
+    v1.handle(new VertexEvent(v1.getVertexId(), VertexEventType.V_INIT));
+    Assert.assertEquals(VertexState.FAILED, v1.getState());
+    Assert.assertEquals(VertexTerminationCause.INIT_FAILURE, v1.getTerminationCause());
+    Assert.assertTrue(StringUtils.join(v1.getDiagnostics(),"")
+        .contains("java.lang.ClassNotFoundException: non-exist-input-initializer"));
+  }
+
+  @Test(timeout=5000)
+  public void testNonExistOutputCommitter() throws TezException {
+    setupPreDagCreation();
+    dagPlan = createDAGPlanWithNonExistOutputCommitter();
+    setupPostDagCreation();
+    VertexImpl v1 = vertices.get("vertex1");
+    v1.handle(new VertexEvent(v1.getVertexId(), VertexEventType.V_INIT));
+    Assert.assertEquals(VertexState.FAILED, v1.getState());
+    Assert.assertEquals(VertexTerminationCause.INIT_FAILURE, v1.getTerminationCause());
+    Assert.assertTrue(StringUtils.join(v1.getDiagnostics(),"")
+        .contains("java.lang.ClassNotFoundException: non-exist-output-committer"));
+  }
+
   class TestUpdateListener implements VertexStateUpdateListener {
     List<VertexStateUpdate> events = Lists.newLinkedList();
     @Override
@@ -3734,7 +3868,7 @@ public class TestVertexImpl {
   }
 
   @Test(timeout = 5000)
-  public void testVertexManagerHeuristic() throws AMUserCodeException {
+  public void testVertexManagerHeuristic() throws TezException {
     setupPreDagCreation();
     dagPlan = createDAGPlanWithMixedEdges();
     setupPostDagCreation();
@@ -3991,7 +4125,7 @@ public class TestVertexImpl {
   }
 
   @Test(timeout = 10000)
-  public void testVertexWithInitializerParallelismSetTo0() throws InterruptedException, AMUserCodeException {
+  public void testVertexWithInitializerParallelismSetTo0() throws InterruptedException, TezException {
     useCustomInitializer = true;
     customInitializer = new RootInitializerSettingParallelismTo0(null);
     RootInitializerSettingParallelismTo0 initializer =
@@ -4533,7 +4667,7 @@ public class TestVertexImpl {
    * If broadcast, one-to-one or custom edges are present in source, tasks should not start until
    * 1 task from each source vertex is complete.
    */
-  public void testTaskSchedulingWithCustomEdges() throws AMUserCodeException {
+  public void testTaskSchedulingWithCustomEdges() throws TezException {
     setupPreDagCreation();
     dagPlan = createCustomDAGWithCustomEdges();
     setupPostDagCreation();
@@ -5359,7 +5493,7 @@ public class TestVertexImpl {
       hasShutDown = true;
     }
 
-    public void failInputInitialization() {
+    public void failInputInitialization() throws TezException {
       super.runInputInitializers(inputs);
       eventHandler.handle(new VertexEventRootInputFailed(vertexID, inputs
           .get(0).getName(),
@@ -5408,7 +5542,7 @@ public class TestVertexImpl {
   }
 
   @Test(timeout=5000)
-  public void testVertexGroupInput() throws AMUserCodeException {
+  public void testVertexGroupInput() throws TezException {
     setupPreDagCreation();
     dagPlan = createVertexGroupDAGPlan();
     setupPostDagCreation();
@@ -5558,7 +5692,7 @@ public class TestVertexImpl {
   }
 
   @Test(timeout = 5000)
-  public void testInitStartRace() throws AMUserCodeException {
+  public void testInitStartRace() throws TezException {
     // Race when a source vertex manages to start before the target vertex has
     // been initialized
     setupPreDagCreation();
@@ -5581,7 +5715,7 @@ public class TestVertexImpl {
   }
 
   @Test(timeout = 5000)
-  public void testInitStartRace2() throws AMUserCodeException {
+  public void testInitStartRace2() throws TezException {
     // Race when a source vertex manages to start before the target vertex has
     // been initialized
     setupPreDagCreation();
@@ -5608,7 +5742,7 @@ public class TestVertexImpl {
   }
 
   @Test(timeout = 5000)
-  public void testTez2684() throws AMUserCodeException, IOException {
+  public void testTez2684() throws IOException, TezException {
     setupPreDagCreation();
     dagPlan = createSamplerDAGPlan2();
     setupPostDagCreation();
@@ -5677,7 +5811,7 @@ public class TestVertexImpl {
   }
 
   @Test(timeout = 5000)
-  public void testExceptionFromVM_Initialize() throws AMUserCodeException {
+  public void testExceptionFromVM_Initialize() throws TezException {
     useCustomInitializer = true;
     setupPreDagCreation();
     dagPlan = createDAGPlanWithVMException("TestInputInitializer", VMExceptionLocation.Initialize);
@@ -5837,7 +5971,7 @@ public class TestVertexImpl {
   }
   
   @Test(timeout = 5000)
-  public void testExceptionFromII_Initialize() throws AMUserCodeException, InterruptedException {
+  public void testExceptionFromII_Initialize() throws InterruptedException, TezException {
     useCustomInitializer = true;
     customInitializer = new EventHandlingRootInputInitializer(null, IIExceptionLocation.Initialize);
     EventHandlingRootInputInitializer initializer =
@@ -5960,7 +6094,7 @@ public class TestVertexImpl {
   }
 
   @Test(timeout = 5000)
-  public void testExceptionFromII_OnVertexStateUpdated() throws AMUserCodeException, InterruptedException {
+  public void testExceptionFromII_OnVertexStateUpdated() throws InterruptedException, TezException {
     useCustomInitializer = true;
     customInitializer = new EventHandlingRootInputInitializer(null, IIExceptionLocation.OnVertexStateUpdated);
     EventHandlingRootInputInitializer initializer =
@@ -5989,7 +6123,7 @@ public class TestVertexImpl {
   }
 
   @Test(timeout = 5000)
-  public void testExceptionFromII_InitSucceededAfterInitFailure() throws AMUserCodeException, InterruptedException {
+  public void testExceptionFromII_InitSucceededAfterInitFailure() throws InterruptedException, TezException {
     useCustomInitializer = true;
     customInitializer = new EventHandlingRootInputInitializer(null, IIExceptionLocation.OnVertexStateUpdated);
     EventHandlingRootInputInitializer initializer =

http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherManager.java
index 4b931d4..a8af808 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherManager.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherManager.java
@@ -39,6 +39,8 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.NamedEntityDescriptor;
 import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.TezReflectionException;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
@@ -61,7 +63,7 @@ public class TestContainerLauncherManager {
   }
 
   @Test(timeout = 5000)
-  public void testNoLaunchersSpecified() throws IOException {
+  public void testNoLaunchersSpecified() throws IOException, TezException {
 
     AppContext appContext = mock(AppContext.class);
     TaskCommunicatorManagerInterface tal = mock(TaskCommunicatorManagerInterface.class);
@@ -77,7 +79,7 @@ public class TestContainerLauncherManager {
   }
 
   @Test(timeout = 5000)
-  public void testCustomLauncherSpecified() throws IOException {
+  public void testCustomLauncherSpecified() throws IOException, TezException {
     Configuration conf = new Configuration(false);
 
     AppContext appContext = mock(AppContext.class);
@@ -111,7 +113,7 @@ public class TestContainerLauncherManager {
   }
 
   @Test(timeout = 5000)
-  public void testMultipleContainerLaunchers() throws IOException {
+  public void testMultipleContainerLaunchers() throws IOException, TezException {
     Configuration conf = new Configuration(false);
     conf.set("testkey", "testvalue");
     UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf);
@@ -261,7 +263,7 @@ public class TestContainerLauncherManager {
                                                          String workingDirectory,
                                                          List<NamedEntityDescriptor> containerLauncherDescriptors,
                                                          boolean isPureLocalMode) throws
-        UnknownHostException {
+        UnknownHostException, TezException {
       super(context, taskCommunicatorManagerInterface, workingDirectory,
           containerLauncherDescriptors, isPureLocalMode);
     }
@@ -273,7 +275,7 @@ public class TestContainerLauncherManager {
                                               TaskCommunicatorManagerInterface taskCommunicatorManagerInterface,
                                               String workingDirectory,
                                               int containerLauncherIndex,
-                                              boolean isPureLocalMode) {
+                                              boolean isPureLocalMode) throws TezException {
       numContainerLaunchers.incrementAndGet();
       boolean added = containerLauncherIndices.add(containerLauncherIndex);
       assertTrue("Cannot add multiple launchers with the same index", added);
@@ -306,7 +308,7 @@ public class TestContainerLauncherManager {
     @Override
     ContainerLauncher createCustomContainerLauncher(
         ContainerLauncherContext containerLauncherContext,
-        NamedEntityDescriptor containerLauncherDescriptor) {
+        NamedEntityDescriptor containerLauncherDescriptor) throws TezException {
       ContainerLauncher spyLauncher = spy(super.createCustomContainerLauncher(
           containerLauncherContext, containerLauncherDescriptor));
       testContainerLaunchers.add(spyLauncher);
@@ -338,7 +340,7 @@ public class TestContainerLauncherManager {
     }
   }
 
-  private static class FakeContainerLauncher extends ContainerLauncher {
+  public static class FakeContainerLauncher extends ContainerLauncher {
 
     public FakeContainerLauncher(
         ContainerLauncherContext containerLauncherContext) {

http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
index 98b7baa..4db51b9 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
@@ -63,6 +63,7 @@ import org.apache.tez.dag.api.NamedEntityDescriptor;
 import org.apache.tez.dag.api.TaskLocationHint;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.client.DAGClientServer;
 import org.apache.tez.dag.app.AppContext;
@@ -532,7 +533,7 @@ public class TestTaskSchedulerManager {
         eq(launchRequest2));
   }
 
-  private static class TSEHForMultipleSchedulersTest extends TaskSchedulerManager {
+  public static class TSEHForMultipleSchedulersTest extends TaskSchedulerManager {
 
     private final TaskScheduler yarnTaskScheduler;
     private final TaskScheduler uberTaskScheduler;
@@ -562,7 +563,7 @@ public class TestTaskSchedulerManager {
                                       AppContext appContext,
                                       NamedEntityDescriptor taskSchedulerDescriptor,
                                       long customAppIdIdentifier,
-                                      int schedulerId) {
+                                      int schedulerId) throws TezException {
 
       numCreateInvocations.incrementAndGet();
       boolean added = seenSchedulers.add(schedulerId);
@@ -596,7 +597,8 @@ public class TestTaskSchedulerManager {
 
     @Override
     TaskScheduler createCustomTaskScheduler(TaskSchedulerContext taskSchedulerContext,
-                                            NamedEntityDescriptor taskSchedulerDescriptor, int schedulerId) {
+                                            NamedEntityDescriptor taskSchedulerDescriptor, int schedulerId)
+                                                throws TezException {
       taskSchedulerContexts.add(taskSchedulerContext);
       TaskScheduler taskScheduler = spy(super.createCustomTaskScheduler(taskSchedulerContext, taskSchedulerDescriptor, schedulerId));
       testTaskSchedulers.add(taskScheduler);

http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index 3507d99..b215a06 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -39,6 +39,7 @@ import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
 import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.TaskLocationHint;
@@ -83,7 +84,7 @@ public class TestHistoryEventsProtoConversion {
       TestHistoryEventsProtoConversion.class);
 
 
-  private HistoryEvent testProtoConversion(HistoryEvent event) throws IOException {
+  private HistoryEvent testProtoConversion(HistoryEvent event) throws IOException, TezException {
     ByteArrayOutputStream os = new ByteArrayOutputStream();
     HistoryEvent deserializedEvent = null;
     event.toProtoStream(os);
@@ -100,7 +101,7 @@ public class TestHistoryEventsProtoConversion {
   }
 
   private HistoryEvent testSummaryProtoConversion(HistoryEvent historyEvent)
-      throws IOException {
+      throws IOException, TezException {
     SummaryEvent event = (SummaryEvent) historyEvent;
     ByteArrayOutputStream os = new ByteArrayOutputStream();
     HistoryEvent deserializedEvent = null;
@@ -757,7 +758,7 @@ public class TestHistoryEventsProtoConversion {
       }
     }
 
-  private void testDAGRecoveredEvent() {
+  private void testDAGRecoveredEvent() throws TezException {
     DAGRecoveredEvent dagRecoveredEvent = new DAGRecoveredEvent(
         ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 1),
         TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1),

http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplit.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplit.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplit.java
index 0c1c327..4f3a0f2 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplit.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplit.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
 
 /**
@@ -106,10 +107,14 @@ public class TezGroupedSplit implements InputSplit, Configurable {
   public void readFields(DataInput in) throws IOException {
     wrappedInputFormatName = Text.readString(in);
     String inputSplitClassName = Text.readString(in);
-    Class<? extends InputSplit> clazz = 
-        (Class<? extends InputSplit>) 
-        TezGroupedSplitsInputFormat.getClassFromName(inputSplitClassName);
-    
+    Class<? extends InputSplit> clazz = null;
+    try {
+      clazz = (Class<? extends InputSplit>)
+      TezGroupedSplitsInputFormat.getClassFromName(inputSplitClassName);
+    } catch (TezException e) {
+      throw new IOException(e);
+    }
+
     int numSplits = in.readInt();
     
     wrappedSplits = new ArrayList<InputSplit>(numSplits);

http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
index 707f9ad..b361aec 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
@@ -33,7 +33,7 @@ import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.split.SplitSizeEstimator;
 import org.apache.tez.common.ReflectionUtils;
-import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.TezException;
 
 import com.google.common.base.Preconditions;
 
@@ -93,24 +93,28 @@ public class TezGroupedSplitsInputFormat<K, V>
   public RecordReader<K, V> getRecordReader(InputSplit split, JobConf job,
       Reporter reporter) throws IOException {
     TezGroupedSplit groupedSplit = (TezGroupedSplit) split;
-    initInputFormatFromSplit(groupedSplit);
+    try {
+      initInputFormatFromSplit(groupedSplit);
+    } catch (TezException e) {
+      throw new IOException(e);
+    }
     return new TezGroupedSplitsRecordReader(groupedSplit, job, reporter);
   }
   
   @SuppressWarnings({ "unchecked", "rawtypes" })
-  void initInputFormatFromSplit(TezGroupedSplit split) {
+  void initInputFormatFromSplit(TezGroupedSplit split) throws TezException {
     if (wrappedInputFormat == null) {
       Class<? extends InputFormat> clazz = (Class<? extends InputFormat>) 
           getClassFromName(split.wrappedInputFormatName);
       try {
         wrappedInputFormat = org.apache.hadoop.util.ReflectionUtils.newInstance(clazz, conf);
       } catch (Exception e) {
-        throw new TezUncheckedException(e);
+        throw new TezException(e);
       }
     }
   }
 
-  static Class<?> getClassFromName(String name) {
+  static Class<?> getClassFromName(String name) throws TezException {
     return ReflectionUtils.getClazz(name);
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplit.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplit.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplit.java
index 9275f14..f85bbcd 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplit.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplit.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
 
 /**
@@ -108,9 +109,13 @@ public class TezGroupedSplit extends InputSplit
   public void readFields(DataInput in) throws IOException {
     wrappedInputFormatName = Text.readString(in);
     String inputSplitClassName = Text.readString(in);
-    Class<? extends InputSplit> clazz = 
-        (Class<? extends InputSplit>) 
-        TezGroupedSplitsInputFormat.getClassFromName(inputSplitClassName);
+    Class<? extends InputSplit> clazz = null;
+    try {
+      clazz = (Class<? extends InputSplit>)
+      TezGroupedSplitsInputFormat.getClassFromName(inputSplitClassName);
+    } catch (TezException e) {
+      throw new IOException(e);
+    }
     
     int numSplits = in.readInt();
     

http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java
index 519b52a..8aabbf6 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
 
 import com.google.common.base.Preconditions;
@@ -126,24 +127,28 @@ public class TezGroupedSplitsInputFormat<K, V> extends InputFormat<K, V>
   public RecordReader<K, V> createRecordReader(InputSplit split,
       TaskAttemptContext context) throws IOException, InterruptedException {
     TezGroupedSplit groupedSplit = (TezGroupedSplit) split;
-    initInputFormatFromSplit(groupedSplit);
+    try {
+      initInputFormatFromSplit(groupedSplit);
+    } catch (TezException e) {
+      throw new IOException(e);
+    }
     return new TezGroupedSplitsRecordReader(groupedSplit, context);
   }
   
   @SuppressWarnings({ "rawtypes", "unchecked" })
-  void initInputFormatFromSplit(TezGroupedSplit split) {
+  void initInputFormatFromSplit(TezGroupedSplit split) throws TezException {
     if (wrappedInputFormat == null) {
       Class<? extends InputFormat> clazz = (Class<? extends InputFormat>) 
           getClassFromName(split.wrappedInputFormatName);
       try {
         wrappedInputFormat = org.apache.hadoop.util.ReflectionUtils.newInstance(clazz, conf);
       } catch (Exception e) {
-        throw new TezUncheckedException(e);
+        throw new TezException(e);
       }
     }
   }
   
-  static Class<?> getClassFromName(String name) {
+  static Class<?> getClassFromName(String name) throws TezException {
     return ReflectionUtils.getClazz(name);
   }
   

http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
index 9a2d77e..d0e935f 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
@@ -40,6 +40,7 @@ import org.apache.tez.common.security.HistoryACLPolicyManager;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.TezReflectionException;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.history.DAGHistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
@@ -128,7 +129,7 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
       historyACLPolicyManager = ReflectionUtils.createClazzInstance(
           atsHistoryACLManagerClassName);
       historyACLPolicyManager.setConf(conf);
-    } catch (TezUncheckedException e) {
+    } catch (TezReflectionException e) {
       LOG.warn("Could not instantiate object for " + atsHistoryACLManagerClassName
           + ". ACLs cannot be enforced correctly for history data in Timeline", e);
       if (!conf.getBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS,