You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zj...@apache.org on 2015/09/04 15:19:05 UTC
[2/2] tez git commit: TEZ-2745. ClassNotFoundException of user code
should fail dag (zjffdu)
TEZ-2745. ClassNotFoundException of user code should fail dag (zjffdu)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/1b30b17d
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/1b30b17d
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/1b30b17d
Branch: refs/heads/master
Commit: 1b30b17dbbd4d1f58539a0b61fae289d09c1b303
Parents: 7d412b2
Author: Jeff Zhang <zj...@apache.org>
Authored: Fri Sep 4 21:18:46 2015 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Fri Sep 4 21:18:46 2015 +0800
----------------------------------------------------------------------
CHANGES.txt | 4 +
.../org/apache/tez/client/FrameworkClient.java | 8 +-
.../java/org/apache/tez/client/TezClient.java | 3 +-
.../org/apache/tez/common/ReflectionUtils.java | 43 ++---
.../tez/dag/api/TezReflectionException.java | 35 ++++
.../dag/api/client/TimelineReaderFactory.java | 19 ++-
.../apache/tez/common/TestReflectionUtils.java | 3 +-
.../org/apache/tez/dag/app/DAGAppMaster.java | 6 +-
.../tez/dag/app/TaskCommunicatorManager.java | 8 +-
.../app/dag/RootInputInitializerManager.java | 5 +-
.../apache/tez/dag/app/dag/impl/DAGImpl.java | 27 +++-
.../org/apache/tez/dag/app/dag/impl/Edge.java | 13 +-
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 29 +++-
.../tez/dag/app/dag/impl/VertexManager.java | 3 +-
.../app/launcher/ContainerLauncherManager.java | 25 ++-
.../tez/dag/app/rm/TaskSchedulerManager.java | 25 ++-
.../dag/app/TestTaskCommunicatorManager.java | 13 +-
.../dag/app/TestTaskCommunicatorManager1.java | 6 +-
.../dag/app/TestTaskCommunicatorManager2.java | 4 +-
.../tez/dag/app/dag/impl/TestDAGImpl.java | 103 ++++++++++++
.../apache/tez/dag/app/dag/impl/TestEdge.java | 3 +-
.../tez/dag/app/dag/impl/TestVertexImpl.java | 162 +++++++++++++++++--
.../launcher/TestContainerLauncherManager.java | 16 +-
.../dag/app/rm/TestTaskSchedulerManager.java | 8 +-
.../TestHistoryEventsProtoConversion.java | 7 +-
.../hadoop/mapred/split/TezGroupedSplit.java | 13 +-
.../split/TezGroupedSplitsInputFormat.java | 14 +-
.../hadoop/mapreduce/split/TezGroupedSplit.java | 11 +-
.../split/TezGroupedSplitsInputFormat.java | 13 +-
.../logging/ats/ATSHistoryLoggingService.java | 3 +-
.../runtime/LogicalIOProcessorRuntimeTask.java | 12 +-
.../common/resources/MemoryDistributor.java | 4 +-
.../common/resources/TestMemoryDistributor.java | 11 +-
.../tez/runtime/task/TestTaskExecution2.java | 7 +-
.../TestWeightedScalingMemoryDistributor.java | 5 +-
.../apache/tez/mapreduce/TestMRRJobsDAGApi.java | 3 +-
36 files changed, 511 insertions(+), 163 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7d996ff..72b2c97 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.8.1: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2745. ClassNotFoundException of user code should fail dag
TEZ-2754. Tez UI: StartTime & EndTime is not displayed with right format in Graphical View
TEZ-2752. logUnsuccessful completion in Attempt should write original finish
time to ATS
@@ -163,6 +164,7 @@ Release 0.7.1: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2745. ClassNotFoundException of user code should fail dag
TEZ-2761. Tez UI: update the progress on the dag and vertices pages with info from AM
TEZ-2731. Fix Tez GenericCounter performance bottleneck
TEZ-2752. logUnsuccessful completion in Attempt should write original finish
@@ -402,6 +404,7 @@ Release 0.6.3: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2745. ClassNotFoundException of user code should fail dag
TEZ-2752. logUnsuccessful completion in Attempt should write original finish
time to ATS
TEZ-2742. VertexImpl.finished() terminationCause hides member var of the
@@ -618,6 +621,7 @@ INCOMPATIBLE CHANGES
TEZ-2552. CRC errors can cause job to run for very long time in large jobs.
ALL CHANGES:
+ TEZ-2745. ClassNotFoundException of user code should fail dag
TEZ-2752. logUnsuccessful completion in Attempt should write original finish
time to ATS
TEZ-2742. VertexImpl.finished() terminationCause hides member var of the
http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java b/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java
index e1c7d00..cb20f49 100644
--- a/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java
@@ -31,6 +31,8 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.TezReflectionException;
+import org.apache.tez.dag.api.TezUncheckedException;
@Private
public abstract class FrameworkClient {
@@ -39,7 +41,11 @@ public abstract class FrameworkClient {
boolean isLocal = tezConf.getBoolean(TezConfiguration.TEZ_LOCAL_MODE, TezConfiguration.TEZ_LOCAL_MODE_DEFAULT);
if (isLocal) {
- return ReflectionUtils.createClazzInstance("org.apache.tez.client.LocalClient");
+ try {
+ return ReflectionUtils.createClazzInstance("org.apache.tez.client.LocalClient");
+ } catch (TezReflectionException e) {
+ throw new TezUncheckedException("Fail to create LocalClient", e);
+ }
}
return new TezYarnClient(YarnClient.createYarnClient());
}
http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-api/src/main/java/org/apache/tez/client/TezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
index e39cf4f..0c50d86 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -56,6 +56,7 @@ import org.apache.tez.dag.api.SessionNotRunning;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.TezReflectionException;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB;
@@ -356,7 +357,7 @@ public class TezClient {
historyACLPolicyManager = ReflectionUtils.createClazzInstance(
atsHistoryACLManagerClassName);
historyACLPolicyManager.setConf(this.amConfig.getYarnConfiguration());
- } catch (TezUncheckedException e) {
+ } catch (TezReflectionException e) {
if (!amConfig.getTezConfiguration().getBoolean(
TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS,
TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS_DEFAULT)) {
http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-api/src/main/java/org/apache/tez/common/ReflectionUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/ReflectionUtils.java b/tez-api/src/main/java/org/apache/tez/common/ReflectionUtils.java
index f1eb0ae..4d89ed4 100644
--- a/tez-api/src/main/java/org/apache/tez/common/ReflectionUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/common/ReflectionUtils.java
@@ -28,6 +28,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.tez.dag.api.TezReflectionException;
import org.apache.tez.dag.api.TezUncheckedException;
@Private
@@ -36,55 +37,44 @@ public class ReflectionUtils {
private static final Map<String, Class<?>> CLAZZ_CACHE = new ConcurrentHashMap<String, Class<?>>();
@Private
- public static Class<?> getClazz(String className) {
+ public static Class<?> getClazz(String className) throws TezReflectionException {
Class<?> clazz = CLAZZ_CACHE.get(className);
if (clazz == null) {
try {
clazz = Class.forName(className, true, Thread.currentThread().getContextClassLoader());
} catch (ClassNotFoundException e) {
- throw new TezUncheckedException("Unable to load class: " + className, e);
+ throw new TezReflectionException("Unable to load class: " + className, e);
}
}
return clazz;
}
- private static <T> T getNewInstance(Class<T> clazz) {
+ private static <T> T getNewInstance(Class<T> clazz) throws TezReflectionException {
T instance;
try {
instance = clazz.newInstance();
- } catch (InstantiationException e) {
- throw new TezUncheckedException(
- "Unable to instantiate class with 0 arguments: " + clazz.getName(), e);
- } catch (IllegalAccessException e) {
- throw new TezUncheckedException(
+ } catch (Exception e) {
+ throw new TezReflectionException(
"Unable to instantiate class with 0 arguments: " + clazz.getName(), e);
}
return instance;
}
- private static <T> T getNewInstance(Class<T> clazz, Class<?>[] parameterTypes, Object[] parameters) {
+ private static <T> T getNewInstance(Class<T> clazz, Class<?>[] parameterTypes, Object[] parameters)
+ throws TezReflectionException {
T instance;
try {
Constructor<T> constructor = clazz.getConstructor(parameterTypes);
instance = constructor.newInstance(parameters);
- } catch (InstantiationException e) {
- throw new TezUncheckedException(
- "Unable to instantiate class with " + parameters.length + " arguments: " + clazz.getName(), e);
- } catch (IllegalAccessException e) {
- throw new TezUncheckedException(
- "Unable to instantiate class with " + parameters.length + " arguments: " + clazz.getName(), e);
- } catch (NoSuchMethodException e) {
- throw new TezUncheckedException(
- "Unable to instantiate class with " + parameters.length + " arguments: " + clazz.getName(), e);
- } catch (InvocationTargetException e) {
- throw new TezUncheckedException(
+ } catch (Exception e) {
+ throw new TezReflectionException(
"Unable to instantiate class with " + parameters.length + " arguments: " + clazz.getName(), e);
}
return instance;
}
@Private
- public static <T> T createClazzInstance(String className) {
+ public static <T> T createClazzInstance(String className) throws TezReflectionException {
Class<?> clazz = getClazz(className);
@SuppressWarnings("unchecked")
T instance = (T) getNewInstance(clazz);
@@ -92,7 +82,8 @@ public class ReflectionUtils {
}
@Private
- public static <T> T createClazzInstance(String className, Class<?>[] parameterTypes, Object[] parameters) {
+ public static <T> T createClazzInstance(String className, Class<?>[] parameterTypes, Object[] parameters)
+ throws TezReflectionException {
Class<?> clazz = getClazz(className);
@SuppressWarnings("unchecked")
T instance = (T) getNewInstance(clazz, parameterTypes, parameters);
@@ -101,20 +92,20 @@ public class ReflectionUtils {
@Private
@SuppressWarnings("unchecked")
- public static <T> T invokeMethod(Object target, Method method, Object... args) {
+ public static <T> T invokeMethod(Object target, Method method, Object... args) throws TezReflectionException {
try {
return (T) method.invoke(target, args);
} catch (Exception e) {
- throw new TezUncheckedException(e);
+ throw new TezReflectionException(e);
}
}
@Private
- public static Method getMethod(Class<?> targetClazz, String methodName, Class<?>... parameterTypes) {
+ public static Method getMethod(Class<?> targetClazz, String methodName, Class<?>... parameterTypes) throws TezReflectionException {
try {
return targetClazz.getMethod(methodName, parameterTypes);
} catch (NoSuchMethodException e) {
- throw new TezUncheckedException(e);
+ throw new TezReflectionException(e);
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-api/src/main/java/org/apache/tez/dag/api/TezReflectionException.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezReflectionException.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezReflectionException.java
new file mode 100644
index 0000000..4d8d1e0
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezReflectionException.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.dag.api;
+
+public class TezReflectionException extends TezException {
+
+ private static final long serialVersionUID = 7744789121243630729L;
+
+ public TezReflectionException(String message) {
+ super(message);
+ }
+
+ public TezReflectionException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public TezReflectionException(Throwable cause) {
+ super(cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-api/src/main/java/org/apache/tez/dag/api/client/TimelineReaderFactory.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/TimelineReaderFactory.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/TimelineReaderFactory.java
index f544198..c0569dd 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/TimelineReaderFactory.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/TimelineReaderFactory.java
@@ -46,7 +46,6 @@ import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.TezUncheckedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -167,7 +166,7 @@ public class TimelineReaderFactory {
try {
authenticator = getTokenAuthenticator();
authenticator.setConnectionConfigurator(connectionConfigurator);
- } catch (TezUncheckedException e) {
+ } catch (TezException e) {
throw new IOException("Failed to get authenticator", e);
}
@@ -179,13 +178,17 @@ public class TimelineReaderFactory {
doAsUser = null;
}
- HttpURLConnectionFactory connectionFactory =
- new TokenAuthenticatedURLConnectionFactory(connectionConfigurator, authenticator,
- authUgi, doAsUser);
+ HttpURLConnectionFactory connectionFactory;
+ try {
+ connectionFactory = new TokenAuthenticatedURLConnectionFactory(connectionConfigurator, authenticator,
+ authUgi, doAsUser);
+ } catch (TezException e) {
+ throw new IOException("Fail to create TokenAuthenticatedURLConnectionFactory", e);
+ }
return new Client(new URLConnectionClientHandler(connectionFactory), clientConfig);
}
- private static Authenticator getTokenAuthenticator() {
+ private static Authenticator getTokenAuthenticator() throws TezException {
String authenticatorClazzName;
if (UserGroupInformation.isSecurityEnabled()) {
@@ -208,7 +211,7 @@ public class TimelineReaderFactory {
public TokenAuthenticatedURLConnectionFactory(ConnectionConfigurator connConfigurator,
Authenticator authenticator,
UserGroupInformation authUgi,
- String doAsUser) {
+ String doAsUser) throws TezException {
this.connConfigurator = connConfigurator;
this.authenticator = authenticator;
this.authUgi = authUgi;
@@ -377,7 +380,7 @@ public class TimelineReaderFactory {
isTokenDelegationClassesPresent = true;
- } catch (TezUncheckedException e) {
+ } catch (TezException e) {
LOG.info("Could not find class required for token delegation, will fallback to pseudo auth");
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-api/src/test/java/org/apache/tez/common/TestReflectionUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/common/TestReflectionUtils.java b/tez-api/src/test/java/org/apache/tez/common/TestReflectionUtils.java
index 253e3a7..2fbd35c 100644
--- a/tez-api/src/test/java/org/apache/tez/common/TestReflectionUtils.java
+++ b/tez-api/src/test/java/org/apache/tez/common/TestReflectionUtils.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.TezReflectionException;
import org.junit.Test;
public class TestReflectionUtils {
@@ -45,7 +46,7 @@ public class TestReflectionUtils {
}
@Test(timeout = 5000)
- public void testConstructorWithParameters()
+ public void testConstructorWithParameters() throws TezReflectionException
{
Class<?>[] parameterTypes = new Class[] { String.class, Integer.TYPE };
Object[] parameters = new Object[] { new String("test"), 1 };
http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 04c7b82..fee13c1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -59,6 +59,7 @@ import java.util.regex.Pattern;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.Lists;
+
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.Options;
@@ -1053,7 +1054,8 @@ public class DAGAppMaster extends AbstractService {
protected TaskCommunicatorManagerInterface createTaskCommunicatorManager(AppContext context,
TaskHeartbeatHandler thh,
ContainerHeartbeatHandler chh,
- List<NamedEntityDescriptor> entityDescriptors) {
+ List<NamedEntityDescriptor> entityDescriptors)
+ throws TezException {
TaskCommunicatorManagerInterface tcm =
new TaskCommunicatorManager(context, thh, chh, entityDescriptors);
return tcm;
@@ -1079,7 +1081,7 @@ public class DAGAppMaster extends AbstractService {
protected ContainerLauncherManager createContainerLauncherManager(
List<NamedEntityDescriptor> containerLauncherDescriptors,
boolean isLocal) throws
- UnknownHostException {
+ UnknownHostException, TezException {
return new ContainerLauncherManager(context, taskCommunicatorManager, workingDirectory,
containerLauncherDescriptors, isLocal);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
index 42df259..cfb34ac 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentMap;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+
import org.apache.commons.collections4.ListUtils;
import org.apache.tez.dag.api.NamedEntityDescriptor;
import org.apache.tez.dag.api.TezConstants;
@@ -103,7 +104,7 @@ public class TaskCommunicatorManager extends AbstractService implements
public TaskCommunicatorManager(AppContext context,
TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh,
- List<NamedEntityDescriptor> taskCommunicatorDescriptors) {
+ List<NamedEntityDescriptor> taskCommunicatorDescriptors) throws TezException {
super(TaskCommunicatorManager.class.getName());
this.context = context;
this.taskHeartbeatHandler = thh;
@@ -141,7 +142,7 @@ public class TaskCommunicatorManager extends AbstractService implements
@VisibleForTesting
TaskCommunicator createTaskCommunicator(NamedEntityDescriptor taskCommDescriptor,
- int taskCommIndex) {
+ int taskCommIndex) throws TezException {
if (taskCommDescriptor.getEntityName().equals(TezConstants.getTezYarnServicePluginName())) {
return createDefaultTaskCommunicator(taskCommunicatorContexts[taskCommIndex]);
} else if (taskCommDescriptor.getEntityName()
@@ -167,7 +168,8 @@ public class TaskCommunicatorManager extends AbstractService implements
@VisibleForTesting
TaskCommunicator createCustomTaskCommunicator(TaskCommunicatorContext taskCommunicatorContext,
- NamedEntityDescriptor taskCommDescriptor) {
+ NamedEntityDescriptor taskCommDescriptor)
+ throws TezException {
LOG.info("Using TaskCommunicator {}:{} " + taskCommDescriptor.getEntityName(),
taskCommDescriptor.getClassName());
Class<? extends TaskCommunicator> taskCommClazz =
http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
index 4ee00fa..4a8a286 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
@@ -48,6 +48,7 @@ import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.InputInitializerDescriptor;
import org.apache.tez.dag.api.RootInputLeafOutput;
+import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.event.*;
import org.apache.tez.dag.api.event.VertexState;
@@ -106,7 +107,7 @@ public class RootInputInitializerManager {
}
public void runInputInitializers(List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>
- inputs) {
+ inputs) throws TezException {
for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input : inputs) {
InputInitializerContext context =
@@ -133,7 +134,7 @@ public class RootInputInitializerManager {
@VisibleForTesting
protected InputInitializer createInitializer(RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>
- input, InputInitializerContext context) {
+ input, InputInitializerContext context) throws TezException {
InputInitializer initializer = ReflectionUtils
.createClazzInstance(input.getControllerDescriptor().getClassName(),
new Class[]{InputInitializerContext.class}, new Object[]{context});
http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 756ed28..da9c416 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -1467,7 +1467,16 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
}
}
- createDAGEdges(this);
+ try {
+ createDAGEdges(this);
+ } catch (TezException e2) {
+ String msg = "Fail to create edges, " + ExceptionUtils.getStackTrace(e2);
+ addDiagnostic(msg);
+ LOG.error(msg);
+ trySetTerminationCause(DAGTerminationCause.INIT_FAILURE);
+ finished(DAGState.FAILED);
+ return DAGState.FAILED;
+ }
Map<String,EdgePlan> edgePlans = DagTypeConverters.createEdgePlanMapFromDAGPlan(getJobPlan().getEdgeList());
// setup the dag
@@ -1489,7 +1498,17 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
}
}
- assignDAGScheduler(this);
+ try {
+ assignDAGScheduler(this);
+ } catch (TezException e1) {
+ String msg = "Fail to assign DAGScheduler for dag:" + dagName + " due to "
+ + ExceptionUtils.getStackTrace(e1);
+ LOG.error(msg);
+ addDiagnostic(msg);
+ trySetTerminationCause(DAGTerminationCause.INIT_FAILURE);
+ finished(DAGState.FAILED);
+ return DAGState.FAILED;
+ }
for (Map.Entry<String, VertexGroupInfo> entry : vertexGroups.entrySet()) {
String groupName = entry.getKey();
@@ -1510,7 +1529,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
return DAGState.INITED;
}
- private void createDAGEdges(DAGImpl dag) {
+ private void createDAGEdges(DAGImpl dag) throws TezException {
for (EdgePlan edgePlan : dag.getJobPlan().getEdgeList()) {
EdgeProperty edgeProperty = DagTypeConverters
.createEdgePropertyMapFromDAGPlan(edgePlan);
@@ -1521,7 +1540,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
}
}
- private static void assignDAGScheduler(DAGImpl dag) {
+ private static void assignDAGScheduler(DAGImpl dag) throws TezException {
String dagSchedulerClassName = dag.dagConf.get(TezConfiguration.TEZ_AM_DAG_SCHEDULER_CLASS,
TezConfiguration.TEZ_AM_DAG_SCHEDULER_CLASS_DEFAULT);
LOG.info("Using DAG Scheduler: " + dagSchedulerClassName);
http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
index da74a46..0be7790 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
@@ -35,6 +35,7 @@ import org.apache.tez.dag.api.EdgeManagerPluginContext;
import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.EdgeManagerPluginOnDemand;
import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.EventRouteMetadata;
@@ -119,14 +120,14 @@ public class Edge {
.newConcurrentMap();
@SuppressWarnings("rawtypes")
- public Edge(EdgeProperty edgeProperty, EventHandler eventHandler, Configuration conf) {
+ public Edge(EdgeProperty edgeProperty, EventHandler eventHandler, Configuration conf) throws TezException {
this.edgeProperty = edgeProperty;
this.eventHandler = eventHandler;
this.conf = conf;
createEdgeManager();
}
- private void createEdgeManager() {
+ private void createEdgeManager() throws TezException {
switch (edgeProperty.getDataMovementType()) {
case ONE_TO_ONE:
edgeManagerContext = new EdgeManagerPluginContextImpl(null);
@@ -160,7 +161,7 @@ public class Edge {
default:
String message = "Unknown edge data movement type: "
+ edgeProperty.getDataMovementType();
- throw new TezUncheckedException(message);
+ throw new TezException(message);
}
}
@@ -182,7 +183,11 @@ public class Edge {
public synchronized void setEdgeProperty(EdgeProperty newEdgeProperty) throws AMUserCodeException {
this.edgeProperty = newEdgeProperty;
boolean wasUnInitialized = (edgeManager == null);
- createEdgeManager();
+ try {
+ createEdgeManager();
+ } catch (TezException e) {
+ throw new AMUserCodeException(Source.EdgeManager, e);
+ }
initialize();
if (wasUnInitialized) {
sendEvent(new VertexEventNullEdgeInitialized(sourceVertex.getVertexId(), this,
http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index f4dd7dc..3dae42b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -75,6 +75,7 @@ import org.apache.tez.dag.api.RootInputLeafOutput;
import org.apache.tez.dag.api.Scope;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.TaskLocationHint;
@@ -2469,7 +2470,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
}
}
- assignVertexManager();
+ try {
+ assignVertexManager();
+ } catch (TezException e1) {
+ String msg = "Fail to create VertexManager, " + ExceptionUtils.getStackTrace(e1);
+ LOG.error(msg);
+ return finished(VertexState.FAILED, VertexTerminationCause.INIT_FAILURE, msg);
+ }
try {
vertexManager.initialize();
@@ -2512,7 +2519,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
return VertexState.INITED;
}
- private void assignVertexManager() {
+ private void assignVertexManager() throws TezException {
boolean hasBipartite = false;
boolean hasOneToOne = false;
boolean hasCustom = false;
@@ -3359,7 +3366,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
if (vertex.inputsWithInitializers != null) {
LOG.info("Vertex will initialize from input initializer. " + vertex.logIdentifier);
- vertex.setupInputInitializerManager();
+ try {
+ vertex.setupInputInitializerManager();
+ } catch (TezException e) {
+ String msg = "Fail to create InputInitializerManager, " + ExceptionUtils.getStackTrace(e);
+ LOG.info(msg);
+ return vertex.finished(VertexState.FAILED, VertexTerminationCause.INIT_FAILURE, msg);
+ }
return VertexState.INITIALIZING;
} else {
boolean hasOneToOneUninitedSource = false;
@@ -3390,7 +3403,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
// this block may return VertexState.INITIALIZING
if (vertex.inputsWithInitializers != null) {
LOG.info("Vertex will initialize from input initializer. " + vertex.logIdentifier);
- vertex.setupInputInitializerManager();
+ try {
+ vertex.setupInputInitializerManager();
+ } catch (TezException e) {
+ String msg = "Fail to create InputInitializerManager, " + ExceptionUtils.getStackTrace(e);
+ LOG.error(msg);
+ return vertex.finished(VertexState.FAILED, VertexTerminationCause.INIT_FAILURE, msg);
+ }
return VertexState.INITIALIZING;
}
if (!vertex.uninitializedEdges.isEmpty()) {
@@ -4560,7 +4579,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
}
}
- private void setupInputInitializerManager() {
+ private void setupInputInitializerManager() throws TezException {
rootInputInitializerManager = createRootInputInitializerManager(
getDAG().getName(), getName(), getVertexId(),
eventHandler, getTotalTasks(),
http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
index bb512a9..32f7a42 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
@@ -43,6 +43,7 @@ import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.InputInitializerDescriptor;
import org.apache.tez.dag.api.RootInputLeafOutput;
+import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.VertexLocationHint;
@@ -368,7 +369,7 @@ public class VertexManager {
}
public VertexManager(VertexManagerPluginDescriptor pluginDesc, UserGroupInformation dagUgi,
- Vertex managedVertex, AppContext appContext, StateChangeNotifier stateChangeNotifier) {
+ Vertex managedVertex, AppContext appContext, StateChangeNotifier stateChangeNotifier) throws TezException {
checkNotNull(pluginDesc, "pluginDesc is null");
checkNotNull(managedVertex, "managedVertex is null");
checkNotNull(appContext, "appContext is null");
http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java
index 15a10bd..9e56f44 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java
@@ -14,19 +14,19 @@
package org.apache.tez.dag.app.launcher;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
import java.net.UnknownHostException;
import java.util.List;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.dag.api.NamedEntityDescriptor;
import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
@@ -70,7 +70,7 @@ public class ContainerLauncherManager extends AbstractService
TaskCommunicatorManagerInterface taskCommunicatorManagerInterface,
String workingDirectory,
List<NamedEntityDescriptor> containerLauncherDescriptors,
- boolean isPureLocalMode) {
+ boolean isPureLocalMode) throws TezException {
super(ContainerLauncherManager.class.getName());
this.appContext = context;
@@ -101,7 +101,7 @@ public class ContainerLauncherManager extends AbstractService
TaskCommunicatorManagerInterface taskCommunicatorManagerInterface,
String workingDirectory,
int containerLauncherIndex,
- boolean isPureLocalMode) {
+ boolean isPureLocalMode) throws TezException {
if (containerLauncherDescriptor.getEntityName().equals(
TezConstants.getTezYarnServicePluginName())) {
return createYarnContainerLauncher(containerLauncherContext);
@@ -144,20 +144,13 @@ public class ContainerLauncherManager extends AbstractService
@VisibleForTesting
@SuppressWarnings("unchecked")
ContainerLauncher createCustomContainerLauncher(ContainerLauncherContext containerLauncherContext,
- NamedEntityDescriptor containerLauncherDescriptor) {
+ NamedEntityDescriptor containerLauncherDescriptor)
+ throws TezException {
LOG.info("Creating container launcher {}:{} ", containerLauncherDescriptor.getEntityName(),
containerLauncherDescriptor.getClassName());
- Class<? extends ContainerLauncher> containerLauncherClazz =
- (Class<? extends ContainerLauncher>) ReflectionUtils.getClazz(
- containerLauncherDescriptor.getClassName());
- try {
- Constructor<? extends ContainerLauncher> ctor = containerLauncherClazz
- .getConstructor(ContainerLauncherContext.class);
- return ctor.newInstance(containerLauncherContext);
- } catch (NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) {
- throw new TezUncheckedException(e);
- }
-
+ return ReflectionUtils.createClazzInstance(containerLauncherDescriptor.getClassName(),
+ new Class[]{ContainerLauncherContext.class},
+ new Object[]{containerLauncherContext});
}
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
index 29143a2..04d7089 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
@@ -18,8 +18,6 @@
package org.apache.tez.dag.app.rm;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.List;
@@ -33,6 +31,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
import org.apache.tez.dag.api.NamedEntityDescriptor;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
@@ -56,6 +55,7 @@ import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.TaskLocationHint;
import org.apache.tez.dag.api.TaskLocationHint.TaskBasedLocationAffinity;
@@ -392,7 +392,7 @@ public class TaskSchedulerManager extends AbstractService implements
AppContext appContext,
NamedEntityDescriptor taskSchedulerDescriptor,
long customAppIdIdentifier,
- int schedulerId) {
+ int schedulerId) throws TezException {
TaskSchedulerContext rawContext =
new TaskSchedulerContextImpl(this, appContext, schedulerId, trackingUrl,
customAppIdIdentifier, host, port, taskSchedulerDescriptor.getUserPayload());
@@ -429,24 +429,17 @@ public class TaskSchedulerManager extends AbstractService implements
@SuppressWarnings("unchecked")
TaskScheduler createCustomTaskScheduler(TaskSchedulerContext taskSchedulerContext,
NamedEntityDescriptor taskSchedulerDescriptor,
- int schedulerId) {
+ int schedulerId) throws TezException {
LOG.info("Creating custom TaskScheduler {}:{}", taskSchedulerDescriptor.getEntityName(),
taskSchedulerDescriptor.getClassName());
- Class<? extends TaskScheduler> taskSchedulerClazz =
- (Class<? extends TaskScheduler>) ReflectionUtils
- .getClazz(taskSchedulerDescriptor.getClassName());
- try {
- Constructor<? extends TaskScheduler> ctor = taskSchedulerClazz
- .getConstructor(TaskSchedulerContext.class);
- return ctor.newInstance(taskSchedulerContext);
- } catch (NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) {
- throw new TezUncheckedException(e);
- }
+ return ReflectionUtils.createClazzInstance(taskSchedulerDescriptor.getClassName(),
+ new Class[]{TaskSchedulerContext.class},
+ new Object[]{taskSchedulerContext});
}
@VisibleForTesting
protected void instantiateSchedulers(String host, int port, String trackingUrl,
- AppContext appContext) {
+ AppContext appContext) throws TezException {
// Iterate over the list and create all the taskSchedulers
int j = 0;
for (int i = 0; i < taskSchedulerDescriptors.length; i++) {
@@ -467,7 +460,7 @@ public class TaskSchedulerManager extends AbstractService implements
@Override
- public synchronized void serviceStart() {
+ public synchronized void serviceStart() throws Exception {
InetSocketAddress serviceAddr = clientService.getBindAddress();
dagAppMaster = appContext.getAppMaster();
// if web service is enabled then set tracking url. else disable it (value = "").
http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java
index be7adde..1cd8bb1 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java
@@ -47,6 +47,7 @@ import org.apache.tez.dag.api.NamedEntityDescriptor;
import org.apache.tez.dag.api.TaskCommunicator;
import org.apache.tez.dag.api.TaskCommunicatorContext;
import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -66,7 +67,7 @@ public class TestTaskCommunicatorManager {
}
@Test(timeout = 5000)
- public void testNoTaskCommSpecified() throws IOException {
+ public void testNoTaskCommSpecified() throws IOException, TezException {
AppContext appContext = mock(AppContext.class);
TaskHeartbeatHandler thh = mock(TaskHeartbeatHandler.class);
@@ -83,7 +84,7 @@ public class TestTaskCommunicatorManager {
}
@Test(timeout = 5000)
- public void testCustomTaskCommSpecified() throws IOException {
+ public void testCustomTaskCommSpecified() throws IOException, TezException {
AppContext appContext = mock(AppContext.class);
TaskHeartbeatHandler thh = mock(TaskHeartbeatHandler.class);
@@ -118,7 +119,7 @@ public class TestTaskCommunicatorManager {
}
@Test(timeout = 5000)
- public void testMultipleTaskComms() throws IOException {
+ public void testMultipleTaskComms() throws IOException, TezException {
AppContext appContext = mock(AppContext.class);
TaskHeartbeatHandler thh = mock(TaskHeartbeatHandler.class);
@@ -250,13 +251,13 @@ public class TestTaskCommunicatorManager {
public TaskCommManagerForMultipleCommTest(AppContext context,
TaskHeartbeatHandler thh,
ContainerHeartbeatHandler chh,
- List<NamedEntityDescriptor> taskCommunicatorDescriptors) {
+ List<NamedEntityDescriptor> taskCommunicatorDescriptors) throws TezException {
super(context, thh, chh, taskCommunicatorDescriptors);
}
@Override
TaskCommunicator createTaskCommunicator(NamedEntityDescriptor taskCommDescriptor,
- int taskCommIndex) {
+ int taskCommIndex) throws TezException {
numTaskComms.incrementAndGet();
boolean added = taskCommIndices.add(taskCommIndex);
assertTrue("Cannot add multiple taskComms with the same index", added);
@@ -283,7 +284,7 @@ public class TestTaskCommunicatorManager {
@Override
TaskCommunicator createCustomTaskCommunicator(TaskCommunicatorContext taskCommunicatorContext,
- NamedEntityDescriptor taskCommDescriptor) {
+ NamedEntityDescriptor taskCommDescriptor) throws TezException {
taskCommContexts.add(taskCommunicatorContext);
TaskCommunicator spyComm =
spy(super.createCustomTaskCommunicator(taskCommunicatorContext, taskCommDescriptor));
http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java
index e8ce429..117d3b3 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java
@@ -107,7 +107,7 @@ public class TestTaskCommunicatorManager1 {
TezTaskAttemptID taskAttemptID;
@Before
- public void setUp() {
+ public void setUp() throws TezException {
appId = ApplicationId.newInstance(1000, 1);
appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
dag = mock(DAG.class);
@@ -304,7 +304,7 @@ public class TestTaskCommunicatorManager1 {
// TODO TEZ-2003 Move this into TestTezTaskCommunicator. Potentially other tests as well.
@Test (timeout= 5000)
- public void testPortRange_NotSpecified() throws IOException {
+ public void testPortRange_NotSpecified() throws IOException, TezException {
Configuration conf = new Configuration();
JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(
"fakeIdentifier"));
@@ -396,7 +396,7 @@ public class TestTaskCommunicatorManager1 {
public TaskCommunicatorManagerInterfaceImplForTest(AppContext context,
TaskHeartbeatHandler thh,
ContainerHeartbeatHandler chh,
- List<NamedEntityDescriptor> taskCommDescriptors) {
+ List<NamedEntityDescriptor> taskCommDescriptors) throws TezException {
super(context, thh, chh, taskCommDescriptors);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager2.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager2.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager2.java
index d75b0e5..a7652a0 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager2.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager2.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
import java.util.Map;
import com.google.common.collect.Lists;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@@ -41,6 +42,7 @@ import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.NamedEntityDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.apache.tez.dag.app.dag.DAG;
@@ -58,7 +60,7 @@ import org.mockito.ArgumentCaptor;
public class TestTaskCommunicatorManager2 {
@Test(timeout = 5000)
- public void testTaskAttemptFailedKilled() throws IOException {
+ public void testTaskAttemptFailedKilled() throws IOException, TezException {
ApplicationId appId = ApplicationId.newInstance(1000, 1);
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
Credentials credentials = new Credentials();
http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index ac4f61b..676ae33 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -744,6 +744,76 @@ public class TestDAGImpl {
return dag;
}
+ // v1 -> v2
+ private DAGPlan createDAGWithNonExistEdgeManager() {
+ LOG.info("Setting up dag plan with non-exist edgemanager");
+ DAGPlan dag = DAGPlan.newBuilder()
+ .setName("testverteximpl")
+ .addVertex(
+ VertexPlan.newBuilder()
+ .setName("vertex1")
+ .setType(PlanVertexType.NORMAL)
+ .addTaskLocationHint(
+ PlanTaskLocationHint.newBuilder()
+ .addHost("host1")
+ .addRack("rack1")
+ .build()
+ )
+ .setTaskConfig(
+ PlanTaskConfiguration.newBuilder()
+ .setNumTasks(1)
+ .setVirtualCores(4)
+ .setMemoryMb(1024)
+ .setJavaOpts("")
+ .setTaskModule("x1.y1")
+ .build()
+ )
+ .addOutEdgeId("e1")
+ .build()
+ )
+ .addVertex(
+ VertexPlan.newBuilder()
+ .setName("vertex2")
+ .setType(PlanVertexType.NORMAL)
+ .setProcessorDescriptor(
+ TezEntityDescriptorProto.newBuilder().setClassName("x2.y2"))
+ .addTaskLocationHint(
+ PlanTaskLocationHint.newBuilder()
+ .addHost("host2")
+ .addRack("rack2")
+ .build()
+ )
+ .setTaskConfig(
+ PlanTaskConfiguration.newBuilder()
+ .setNumTasks(2)
+ .setVirtualCores(4)
+ .setMemoryMb(1024)
+ .setJavaOpts("foo")
+ .setTaskModule("x2.y2")
+ .build()
+ )
+ .addInEdgeId("e1")
+ .build()
+ )
+ .addEdge(
+ EdgePlan.newBuilder()
+ .setEdgeManager(TezEntityDescriptorProto.newBuilder()
+ .setClassName("non-exist-edge-manager")
+ )
+ .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("v1_v2"))
+ .setInputVertexName("vertex1")
+ .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o1"))
+ .setOutputVertexName("vertex2")
+ .setDataMovementType(PlanEdgeDataMovementType.CUSTOM)
+ .setId("e1")
+ .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+ .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+ .build()
+ )
+ .build();
+ return dag;
+ }
+
@BeforeClass
public static void beforeClass() {
MockDNSToSwitchMapping.initializeMockRackResolver();
@@ -969,6 +1039,39 @@ public class TestDAGImpl {
}
}
+ @Test(timeout = 5000)
+ public void testNonExistEdgeManagerPlugin() {
+ dagPlan = createDAGWithNonExistEdgeManager();
+ dag = new DAGImpl(dagId, conf, dagPlan,
+ dispatcher.getEventHandler(), taskCommunicatorManagerInterface,
+ fsTokens, clock, "user", thh, appContext);
+ dag.entityUpdateTracker = new StateChangeNotifierForTest(dag);
+ doReturn(dag).when(appContext).getCurrentDAG();
+
+ dag.handle(new DAGEvent(dagId, DAGEventType.DAG_INIT));
+ Assert.assertEquals(DAGState.FAILED, dag.getState());
+ Assert.assertEquals(DAGTerminationCause.INIT_FAILURE, dag.getTerminationCause());
+ Assert.assertTrue(StringUtils.join(dag.getDiagnostics(), "")
+ .contains("java.lang.ClassNotFoundException: non-exist-edge-manager"));
+ }
+
+ @Test (timeout = 5000)
+ public void testNonExistDAGScheduler() {
+ conf.set(TezConfiguration.TEZ_AM_DAG_SCHEDULER_CLASS, "non-exist-dag-scheduler");
+ dag = new DAGImpl(dagId, conf, dagPlan,
+ dispatcher.getEventHandler(), taskCommunicatorManagerInterface,
+ fsTokens, clock, "user", thh, appContext);
+ dag.entityUpdateTracker = new StateChangeNotifierForTest(dag);
+ doReturn(dag).when(appContext).getCurrentDAG();
+
+ dag.handle(new DAGEvent(dag.getID(), DAGEventType.DAG_INIT));
+ Assert.assertEquals(DAGState.FAILED, dag.getState());
+ Assert.assertEquals(DAGState.FAILED, dag.getState());
+ Assert.assertEquals(DAGTerminationCause.INIT_FAILURE, dag.getTerminationCause());
+ Assert.assertTrue(StringUtils.join(dag.getDiagnostics(), "")
+ .contains("java.lang.ClassNotFoundException: non-exist-dag-scheduler"));
+ }
+
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testVertexCompletion() {
http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java
index eb03d1e..f53e505 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java
@@ -54,6 +54,7 @@ import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.Vertex;
@@ -158,7 +159,7 @@ public class TestEdge {
@SuppressWarnings({ "rawtypes" })
@Test (timeout = 5000)
- public void testCompositeEventHandling() throws AMUserCodeException {
+ public void testCompositeEventHandling() throws TezException {
EventHandler eventHandler = mock(EventHandler.class);
EdgeProperty edgeProp = EdgeProperty.create(DataMovementType.SCATTER_GATHER,
DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, mock(OutputDescriptor.class),
http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index eb68a6f..a54c56a 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -87,6 +87,7 @@ import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.RootInputLeafOutput;
import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.VertexLocationHint;
@@ -575,6 +576,100 @@ public class TestVertexImpl {
return dag;
}
+ private DAGPlan createDAGPlanWithNonExistInputInitializer() {
+ LOG.info("Setting up dag plan with non exist inputinitializer");
+ DAGPlan dag = DAGPlan.newBuilder()
+ .setName("initializerWith0Tasks")
+ .addVertex(
+ VertexPlan.newBuilder()
+ .setName("vertex1")
+ .setType(PlanVertexType.NORMAL)
+ .addInputs(
+ RootInputLeafOutputProto.newBuilder()
+ .setControllerDescriptor(
+ TezEntityDescriptorProto.newBuilder().setClassName(
+ "non-exist-input-initializer"))
+ .setName("input1")
+ .setIODescriptor(
+ TezEntityDescriptorProto.newBuilder()
+ .setClassName("InputClazz")
+ .build()
+ )
+ .build()
+ )
+ .setTaskConfig(
+ PlanTaskConfiguration.newBuilder()
+ .setNumTasks(1)
+ .setVirtualCores(4)
+ .setMemoryMb(1024)
+ .setJavaOpts("")
+ .setTaskModule("x1.y1")
+ .build()
+ )
+ .build()
+ ).build();
+ return dag;
+ }
+
+ private DAGPlan createDAGPlanWithNonExistOutputCommitter() {
+ LOG.info("Setting up dag plan with non exist output committer");
+ DAGPlan dag = DAGPlan.newBuilder()
+ .setName("initializerWith0Tasks")
+ .addVertex(
+ VertexPlan.newBuilder()
+ .setName("vertex1")
+ .setType(PlanVertexType.NORMAL)
+ .addOutputs(
+ RootInputLeafOutputProto.newBuilder()
+ .setControllerDescriptor(
+ TezEntityDescriptorProto.newBuilder().setClassName(
+ "non-exist-output-committer"))
+ .setName("output1")
+ .setIODescriptor(
+ TezEntityDescriptorProto.newBuilder()
+ .setClassName("OutputClazz")
+ .build()
+ )
+ .build()
+ )
+ .setTaskConfig(
+ PlanTaskConfiguration.newBuilder()
+ .setNumTasks(1)
+ .setVirtualCores(4)
+ .setMemoryMb(1024)
+ .setJavaOpts("")
+ .setTaskModule("x1.y1")
+ .build()
+ )
+ .build()
+ ).build();
+ return dag;
+ }
+
+ private DAGPlan createDAGPlanWithNonExistVertexManager() {
+ LOG.info("Setting up dag plan with non-exist VertexManager");
+ DAGPlan dag = DAGPlan.newBuilder()
+ .setName("initializerWith0Tasks")
+ .addVertex(
+ VertexPlan.newBuilder()
+ .setName("vertex1")
+ .setType(PlanVertexType.NORMAL)
+ .setTaskConfig(
+ PlanTaskConfiguration.newBuilder()
+ .setNumTasks(1)
+ .setVirtualCores(4)
+ .setMemoryMb(1024)
+ .setJavaOpts("")
+ .setTaskModule("x1.y1")
+ .build()
+ )
+ .setVertexManagerPlugin(TezEntityDescriptorProto.newBuilder()
+ .setClassName("non-exist-vertexmanager"))
+ .build()
+ ).build();
+ return dag;
+ }
+
private DAGPlan createDAGPlanWithMixedEdges() {
LOG.info("Setting up mixed edge dag plan");
org.apache.tez.dag.api.DAG dag = org.apache.tez.dag.api.DAG.create("MixedEdges");
@@ -2151,7 +2246,7 @@ public class TestVertexImpl {
}
@SuppressWarnings({ "unchecked", "rawtypes" })
- public void setupPostDagCreation() throws AMUserCodeException {
+ public void setupPostDagCreation() throws TezException {
String dagName = "dag0";
// dispatcher may be created multiple times (setupPostDagCreation may be called multiples)
if (dispatcher != null) {
@@ -2266,7 +2361,7 @@ public class TestVertexImpl {
}
@Before
- public void setup() throws AMUserCodeException {
+ public void setup() throws TezException {
useCustomInitializer = false;
customInitializer = null;
setupPreDagCreation();
@@ -2385,6 +2480,45 @@ public class TestVertexImpl {
.getOutputDescriptor().getClassName()));
}
+ @Test(timeout=5000)
+ public void testNonExistVertexManager() throws TezException {
+ setupPreDagCreation();
+ dagPlan = createDAGPlanWithNonExistVertexManager();
+ setupPostDagCreation();
+ VertexImpl v1 = vertices.get("vertex1");
+ v1.handle(new VertexEvent(v1.getVertexId(), VertexEventType.V_INIT));
+ Assert.assertEquals(VertexState.FAILED, v1.getState());
+ Assert.assertEquals(VertexTerminationCause.INIT_FAILURE, v1.getTerminationCause());
+ Assert.assertTrue(StringUtils.join(v1.getDiagnostics(),"")
+ .contains("java.lang.ClassNotFoundException: non-exist-vertexmanager"));
+ }
+
+ @Test(timeout=5000)
+ public void testNonExistInputInitializer() throws TezException {
+ setupPreDagCreation();
+ dagPlan = createDAGPlanWithNonExistInputInitializer();
+ setupPostDagCreation();
+ VertexImpl v1 = vertices.get("vertex1");
+ v1.handle(new VertexEvent(v1.getVertexId(), VertexEventType.V_INIT));
+ Assert.assertEquals(VertexState.FAILED, v1.getState());
+ Assert.assertEquals(VertexTerminationCause.INIT_FAILURE, v1.getTerminationCause());
+ Assert.assertTrue(StringUtils.join(v1.getDiagnostics(),"")
+ .contains("java.lang.ClassNotFoundException: non-exist-input-initializer"));
+ }
+
+ @Test(timeout=5000)
+ public void testNonExistOutputCommitter() throws TezException {
+ setupPreDagCreation();
+ dagPlan = createDAGPlanWithNonExistOutputCommitter();
+ setupPostDagCreation();
+ VertexImpl v1 = vertices.get("vertex1");
+ v1.handle(new VertexEvent(v1.getVertexId(), VertexEventType.V_INIT));
+ Assert.assertEquals(VertexState.FAILED, v1.getState());
+ Assert.assertEquals(VertexTerminationCause.INIT_FAILURE, v1.getTerminationCause());
+ Assert.assertTrue(StringUtils.join(v1.getDiagnostics(),"")
+ .contains("java.lang.ClassNotFoundException: non-exist-output-committer"));
+ }
+
class TestUpdateListener implements VertexStateUpdateListener {
List<VertexStateUpdate> events = Lists.newLinkedList();
@Override
@@ -3734,7 +3868,7 @@ public class TestVertexImpl {
}
@Test(timeout = 5000)
- public void testVertexManagerHeuristic() throws AMUserCodeException {
+ public void testVertexManagerHeuristic() throws TezException {
setupPreDagCreation();
dagPlan = createDAGPlanWithMixedEdges();
setupPostDagCreation();
@@ -3991,7 +4125,7 @@ public class TestVertexImpl {
}
@Test(timeout = 10000)
- public void testVertexWithInitializerParallelismSetTo0() throws InterruptedException, AMUserCodeException {
+ public void testVertexWithInitializerParallelismSetTo0() throws InterruptedException, TezException {
useCustomInitializer = true;
customInitializer = new RootInitializerSettingParallelismTo0(null);
RootInitializerSettingParallelismTo0 initializer =
@@ -4533,7 +4667,7 @@ public class TestVertexImpl {
* If broadcast, one-to-one or custom edges are present in source, tasks should not start until
* 1 task from each source vertex is complete.
*/
- public void testTaskSchedulingWithCustomEdges() throws AMUserCodeException {
+ public void testTaskSchedulingWithCustomEdges() throws TezException {
setupPreDagCreation();
dagPlan = createCustomDAGWithCustomEdges();
setupPostDagCreation();
@@ -5359,7 +5493,7 @@ public class TestVertexImpl {
hasShutDown = true;
}
- public void failInputInitialization() {
+ public void failInputInitialization() throws TezException {
super.runInputInitializers(inputs);
eventHandler.handle(new VertexEventRootInputFailed(vertexID, inputs
.get(0).getName(),
@@ -5408,7 +5542,7 @@ public class TestVertexImpl {
}
@Test(timeout=5000)
- public void testVertexGroupInput() throws AMUserCodeException {
+ public void testVertexGroupInput() throws TezException {
setupPreDagCreation();
dagPlan = createVertexGroupDAGPlan();
setupPostDagCreation();
@@ -5558,7 +5692,7 @@ public class TestVertexImpl {
}
@Test(timeout = 5000)
- public void testInitStartRace() throws AMUserCodeException {
+ public void testInitStartRace() throws TezException {
// Race when a source vertex manages to start before the target vertex has
// been initialized
setupPreDagCreation();
@@ -5581,7 +5715,7 @@ public class TestVertexImpl {
}
@Test(timeout = 5000)
- public void testInitStartRace2() throws AMUserCodeException {
+ public void testInitStartRace2() throws TezException {
// Race when a source vertex manages to start before the target vertex has
// been initialized
setupPreDagCreation();
@@ -5608,7 +5742,7 @@ public class TestVertexImpl {
}
@Test(timeout = 5000)
- public void testTez2684() throws AMUserCodeException, IOException {
+ public void testTez2684() throws IOException, TezException {
setupPreDagCreation();
dagPlan = createSamplerDAGPlan2();
setupPostDagCreation();
@@ -5677,7 +5811,7 @@ public class TestVertexImpl {
}
@Test(timeout = 5000)
- public void testExceptionFromVM_Initialize() throws AMUserCodeException {
+ public void testExceptionFromVM_Initialize() throws TezException {
useCustomInitializer = true;
setupPreDagCreation();
dagPlan = createDAGPlanWithVMException("TestInputInitializer", VMExceptionLocation.Initialize);
@@ -5837,7 +5971,7 @@ public class TestVertexImpl {
}
@Test(timeout = 5000)
- public void testExceptionFromII_Initialize() throws AMUserCodeException, InterruptedException {
+ public void testExceptionFromII_Initialize() throws InterruptedException, TezException {
useCustomInitializer = true;
customInitializer = new EventHandlingRootInputInitializer(null, IIExceptionLocation.Initialize);
EventHandlingRootInputInitializer initializer =
@@ -5960,7 +6094,7 @@ public class TestVertexImpl {
}
@Test(timeout = 5000)
- public void testExceptionFromII_OnVertexStateUpdated() throws AMUserCodeException, InterruptedException {
+ public void testExceptionFromII_OnVertexStateUpdated() throws InterruptedException, TezException {
useCustomInitializer = true;
customInitializer = new EventHandlingRootInputInitializer(null, IIExceptionLocation.OnVertexStateUpdated);
EventHandlingRootInputInitializer initializer =
@@ -5989,7 +6123,7 @@ public class TestVertexImpl {
}
@Test(timeout = 5000)
- public void testExceptionFromII_InitSucceededAfterInitFailure() throws AMUserCodeException, InterruptedException {
+ public void testExceptionFromII_InitSucceededAfterInitFailure() throws InterruptedException, TezException {
useCustomInitializer = true;
customInitializer = new EventHandlingRootInputInitializer(null, IIExceptionLocation.OnVertexStateUpdated);
EventHandlingRootInputInitializer initializer =
http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherManager.java
index 4b931d4..a8af808 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherManager.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherManager.java
@@ -39,6 +39,8 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.NamedEntityDescriptor;
import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.TezReflectionException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
@@ -61,7 +63,7 @@ public class TestContainerLauncherManager {
}
@Test(timeout = 5000)
- public void testNoLaunchersSpecified() throws IOException {
+ public void testNoLaunchersSpecified() throws IOException, TezException {
AppContext appContext = mock(AppContext.class);
TaskCommunicatorManagerInterface tal = mock(TaskCommunicatorManagerInterface.class);
@@ -77,7 +79,7 @@ public class TestContainerLauncherManager {
}
@Test(timeout = 5000)
- public void testCustomLauncherSpecified() throws IOException {
+ public void testCustomLauncherSpecified() throws IOException, TezException {
Configuration conf = new Configuration(false);
AppContext appContext = mock(AppContext.class);
@@ -111,7 +113,7 @@ public class TestContainerLauncherManager {
}
@Test(timeout = 5000)
- public void testMultipleContainerLaunchers() throws IOException {
+ public void testMultipleContainerLaunchers() throws IOException, TezException {
Configuration conf = new Configuration(false);
conf.set("testkey", "testvalue");
UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf);
@@ -261,7 +263,7 @@ public class TestContainerLauncherManager {
String workingDirectory,
List<NamedEntityDescriptor> containerLauncherDescriptors,
boolean isPureLocalMode) throws
- UnknownHostException {
+ UnknownHostException, TezException {
super(context, taskCommunicatorManagerInterface, workingDirectory,
containerLauncherDescriptors, isPureLocalMode);
}
@@ -273,7 +275,7 @@ public class TestContainerLauncherManager {
TaskCommunicatorManagerInterface taskCommunicatorManagerInterface,
String workingDirectory,
int containerLauncherIndex,
- boolean isPureLocalMode) {
+ boolean isPureLocalMode) throws TezException {
numContainerLaunchers.incrementAndGet();
boolean added = containerLauncherIndices.add(containerLauncherIndex);
assertTrue("Cannot add multiple launchers with the same index", added);
@@ -306,7 +308,7 @@ public class TestContainerLauncherManager {
@Override
ContainerLauncher createCustomContainerLauncher(
ContainerLauncherContext containerLauncherContext,
- NamedEntityDescriptor containerLauncherDescriptor) {
+ NamedEntityDescriptor containerLauncherDescriptor) throws TezException {
ContainerLauncher spyLauncher = spy(super.createCustomContainerLauncher(
containerLauncherContext, containerLauncherDescriptor));
testContainerLaunchers.add(spyLauncher);
@@ -338,7 +340,7 @@ public class TestContainerLauncherManager {
}
}
- private static class FakeContainerLauncher extends ContainerLauncher {
+ public static class FakeContainerLauncher extends ContainerLauncher {
public FakeContainerLauncher(
ContainerLauncherContext containerLauncherContext) {
http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
index 98b7baa..4db51b9 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
@@ -63,6 +63,7 @@ import org.apache.tez.dag.api.NamedEntityDescriptor;
import org.apache.tez.dag.api.TaskLocationHint;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.client.DAGClientServer;
import org.apache.tez.dag.app.AppContext;
@@ -532,7 +533,7 @@ public class TestTaskSchedulerManager {
eq(launchRequest2));
}
- private static class TSEHForMultipleSchedulersTest extends TaskSchedulerManager {
+ public static class TSEHForMultipleSchedulersTest extends TaskSchedulerManager {
private final TaskScheduler yarnTaskScheduler;
private final TaskScheduler uberTaskScheduler;
@@ -562,7 +563,7 @@ public class TestTaskSchedulerManager {
AppContext appContext,
NamedEntityDescriptor taskSchedulerDescriptor,
long customAppIdIdentifier,
- int schedulerId) {
+ int schedulerId) throws TezException {
numCreateInvocations.incrementAndGet();
boolean added = seenSchedulers.add(schedulerId);
@@ -596,7 +597,8 @@ public class TestTaskSchedulerManager {
@Override
TaskScheduler createCustomTaskScheduler(TaskSchedulerContext taskSchedulerContext,
- NamedEntityDescriptor taskSchedulerDescriptor, int schedulerId) {
+ NamedEntityDescriptor taskSchedulerDescriptor, int schedulerId)
+ throws TezException {
taskSchedulerContexts.add(taskSchedulerContext);
TaskScheduler taskScheduler = spy(super.createCustomTaskScheduler(taskSchedulerContext, taskSchedulerDescriptor, schedulerId));
testTaskSchedulers.add(taskScheduler);
http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index 3507d99..b215a06 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -39,6 +39,7 @@ import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.TaskLocationHint;
@@ -83,7 +84,7 @@ public class TestHistoryEventsProtoConversion {
TestHistoryEventsProtoConversion.class);
- private HistoryEvent testProtoConversion(HistoryEvent event) throws IOException {
+ private HistoryEvent testProtoConversion(HistoryEvent event) throws IOException, TezException {
ByteArrayOutputStream os = new ByteArrayOutputStream();
HistoryEvent deserializedEvent = null;
event.toProtoStream(os);
@@ -100,7 +101,7 @@ public class TestHistoryEventsProtoConversion {
}
private HistoryEvent testSummaryProtoConversion(HistoryEvent historyEvent)
- throws IOException {
+ throws IOException, TezException {
SummaryEvent event = (SummaryEvent) historyEvent;
ByteArrayOutputStream os = new ByteArrayOutputStream();
HistoryEvent deserializedEvent = null;
@@ -757,7 +758,7 @@ public class TestHistoryEventsProtoConversion {
}
}
- private void testDAGRecoveredEvent() {
+ private void testDAGRecoveredEvent() throws TezException {
DAGRecoveredEvent dagRecoveredEvent = new DAGRecoveredEvent(
ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 1),
TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1),
http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplit.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplit.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplit.java
index 0c1c327..4f3a0f2 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplit.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplit.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
/**
@@ -106,10 +107,14 @@ public class TezGroupedSplit implements InputSplit, Configurable {
public void readFields(DataInput in) throws IOException {
wrappedInputFormatName = Text.readString(in);
String inputSplitClassName = Text.readString(in);
- Class<? extends InputSplit> clazz =
- (Class<? extends InputSplit>)
- TezGroupedSplitsInputFormat.getClassFromName(inputSplitClassName);
-
+ Class<? extends InputSplit> clazz = null;
+ try {
+ clazz = (Class<? extends InputSplit>)
+ TezGroupedSplitsInputFormat.getClassFromName(inputSplitClassName);
+ } catch (TezException e) {
+ throw new IOException(e);
+ }
+
int numSplits = in.readInt();
wrappedSplits = new ArrayList<InputSplit>(numSplits);
http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
index 707f9ad..b361aec 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
@@ -33,7 +33,7 @@ import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.split.SplitSizeEstimator;
import org.apache.tez.common.ReflectionUtils;
-import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.TezException;
import com.google.common.base.Preconditions;
@@ -93,24 +93,28 @@ public class TezGroupedSplitsInputFormat<K, V>
public RecordReader<K, V> getRecordReader(InputSplit split, JobConf job,
Reporter reporter) throws IOException {
TezGroupedSplit groupedSplit = (TezGroupedSplit) split;
- initInputFormatFromSplit(groupedSplit);
+ try {
+ initInputFormatFromSplit(groupedSplit);
+ } catch (TezException e) {
+ throw new IOException(e);
+ }
return new TezGroupedSplitsRecordReader(groupedSplit, job, reporter);
}
@SuppressWarnings({ "unchecked", "rawtypes" })
- void initInputFormatFromSplit(TezGroupedSplit split) {
+ void initInputFormatFromSplit(TezGroupedSplit split) throws TezException {
if (wrappedInputFormat == null) {
Class<? extends InputFormat> clazz = (Class<? extends InputFormat>)
getClassFromName(split.wrappedInputFormatName);
try {
wrappedInputFormat = org.apache.hadoop.util.ReflectionUtils.newInstance(clazz, conf);
} catch (Exception e) {
- throw new TezUncheckedException(e);
+ throw new TezException(e);
}
}
}
- static Class<?> getClassFromName(String name) {
+ static Class<?> getClassFromName(String name) throws TezException {
return ReflectionUtils.getClazz(name);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplit.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplit.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplit.java
index 9275f14..f85bbcd 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplit.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplit.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
/**
@@ -108,9 +109,13 @@ public class TezGroupedSplit extends InputSplit
public void readFields(DataInput in) throws IOException {
wrappedInputFormatName = Text.readString(in);
String inputSplitClassName = Text.readString(in);
- Class<? extends InputSplit> clazz =
- (Class<? extends InputSplit>)
- TezGroupedSplitsInputFormat.getClassFromName(inputSplitClassName);
+ Class<? extends InputSplit> clazz = null;
+ try {
+ clazz = (Class<? extends InputSplit>)
+ TezGroupedSplitsInputFormat.getClassFromName(inputSplitClassName);
+ } catch (TezException e) {
+ throw new IOException(e);
+ }
int numSplits = in.readInt();
http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java
index 519b52a..8aabbf6 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
import com.google.common.base.Preconditions;
@@ -126,24 +127,28 @@ public class TezGroupedSplitsInputFormat<K, V> extends InputFormat<K, V>
public RecordReader<K, V> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException, InterruptedException {
TezGroupedSplit groupedSplit = (TezGroupedSplit) split;
- initInputFormatFromSplit(groupedSplit);
+ try {
+ initInputFormatFromSplit(groupedSplit);
+ } catch (TezException e) {
+ throw new IOException(e);
+ }
return new TezGroupedSplitsRecordReader(groupedSplit, context);
}
@SuppressWarnings({ "rawtypes", "unchecked" })
- void initInputFormatFromSplit(TezGroupedSplit split) {
+ void initInputFormatFromSplit(TezGroupedSplit split) throws TezException {
if (wrappedInputFormat == null) {
Class<? extends InputFormat> clazz = (Class<? extends InputFormat>)
getClassFromName(split.wrappedInputFormatName);
try {
wrappedInputFormat = org.apache.hadoop.util.ReflectionUtils.newInstance(clazz, conf);
} catch (Exception e) {
- throw new TezUncheckedException(e);
+ throw new TezException(e);
}
}
}
- static Class<?> getClassFromName(String name) {
+ static Class<?> getClassFromName(String name) throws TezException {
return ReflectionUtils.getClazz(name);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
index 9a2d77e..d0e935f 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
@@ -40,6 +40,7 @@ import org.apache.tez.common.security.HistoryACLPolicyManager;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.TezReflectionException;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
@@ -128,7 +129,7 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
historyACLPolicyManager = ReflectionUtils.createClazzInstance(
atsHistoryACLManagerClassName);
historyACLPolicyManager.setConf(conf);
- } catch (TezUncheckedException e) {
+ } catch (TezReflectionException e) {
LOG.warn("Could not instantiate object for " + atsHistoryACLManagerClassName
+ ". ACLs cannot be enforced correctly for history data in Timeline", e);
if (!conf.getBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS,