You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/02/18 23:41:06 UTC
[19/23] tez git commit: TEZ-2019. Temporarily allow the scheduler and
launcher to be specified via configuration. (sseth)
TEZ-2019. Temporarily allow the scheduler and launcher to be specified
via configuration. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ee954577
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ee954577
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ee954577
Branch: refs/heads/TEZ-2003
Commit: ee95457755e04cf546f761467c9091332ed05442
Parents: 269905b
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri Jan 30 16:02:32 2015 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Feb 18 14:39:25 2015 -0800
----------------------------------------------------------------------
TEZ-2003-CHANGES.txt | 4 +++
.../apache/tez/dag/api/TezConfiguration.java | 6 ++++
.../org/apache/tez/dag/app/DAGAppMaster.java | 30 ++++++++++++++++-
.../dag/app/rm/TaskSchedulerEventHandler.java | 34 ++++++++++++++++++--
.../org/apache/tez/runtime/task/TezChild.java | 3 +-
5 files changed, 73 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/ee954577/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
new file mode 100644
index 0000000..1822fcb
--- /dev/null
+++ b/TEZ-2003-CHANGES.txt
@@ -0,0 +1,4 @@
+ALL CHANGES:
+ TEZ-2019. Temporarily allow the scheduler and launcher to be specified via configuration.
+
+INCOMPATIBLE CHANGES:
http://git-wip-us.apache.org/repos/asf/tez/blob/ee954577/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 0bf78f9..c35a853 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -1136,6 +1136,12 @@ public class TezConfiguration extends Configuration {
+ "tez-ui.webservice.enable";
public static final boolean TEZ_AM_WEBSERVICE_ENABLE_DEFAULT = true;
+ @ConfigurationScope(Scope.VERTEX)
+ public static final String TEZ_AM_CONTAINER_LAUNCHER_CLASS = TEZ_AM_PREFIX + "container-launcher.class";
+ @ConfigurationScope(Scope.VERTEX)
+ public static final String TEZ_AM_TASK_SCHEDULER_CLASS = TEZ_AM_PREFIX + "task-scheduler.class";
+
+
// TODO only validate property here, value can also be validated if necessary
public static void validateProperty(String property, Scope usedScope) {
Scope validScope = PropertyScope.get(property);
http://git-wip-us.apache.org/repos/asf/tez/blob/ee954577/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 8fd5626..9f523ac 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -27,6 +27,8 @@ import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintWriter;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
@@ -91,6 +93,7 @@ import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tez.common.AsyncDispatcher;
+import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezConverterUtils;
import org.apache.tez.common.TezUtilsInternal;
@@ -938,9 +941,34 @@ public class DAGAppMaster extends AbstractService {
protected ContainerLauncher
createContainerLauncher(final AppContext context) throws UnknownHostException {
if(isLocal){
+ LOG.info("Creating LocalContainerLauncher");
return new LocalContainerLauncher(context, taskAttemptListener, workingDirectory);
} else {
- return new ContainerLauncherImpl(context);
+ // TODO: Temporary reflection with specific parameters until a clean interface is defined.
+ String containerLauncherClassName = getConfig().get(TezConfiguration.TEZ_AM_CONTAINER_LAUNCHER_CLASS);
+ if (containerLauncherClassName == null) {
+ LOG.info("Creating Default Container Launcher");
+ return new ContainerLauncherImpl(context);
+ } else {
+ LOG.info("Creating container launcher : " + containerLauncherClassName);
+ Class<? extends ContainerLauncher> containerLauncherClazz = (Class<? extends ContainerLauncher>) ReflectionUtils.getClazz(
+ containerLauncherClassName);
+ try {
+ Constructor<? extends ContainerLauncher> ctor = containerLauncherClazz
+ .getConstructor(AppContext.class, Configuration.class, TaskAttemptListener.class);
+ ctor.setAccessible(true);
+ ContainerLauncher instance = ctor.newInstance(context, getConfig(), taskAttemptListener);
+ return instance;
+ } catch (NoSuchMethodException e) {
+ throw new TezUncheckedException(e);
+ } catch (InvocationTargetException e) {
+ throw new TezUncheckedException(e);
+ } catch (InstantiationException e) {
+ throw new TezUncheckedException(e);
+ } catch (IllegalAccessException e) {
+ throw new TezUncheckedException(e);
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/ee954577/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index 05cbc66..97bd7c8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -18,6 +18,8 @@
package org.apache.tez.dag.app.rm;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.List;
@@ -42,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.TaskLocationHint;
@@ -329,12 +332,39 @@ public class TaskSchedulerEventHandler extends AbstractService
boolean isLocal = getConfig().getBoolean(TezConfiguration.TEZ_LOCAL_MODE,
TezConfiguration.TEZ_LOCAL_MODE_DEFAULT);
if (isLocal) {
+ LOG.info("Using TaskScheduler: LocalTaskSchedulerService");
return new LocalTaskSchedulerService(this, this.containerSignatureMatcher,
host, port, trackingUrl, appContext);
}
else {
- return new YarnTaskSchedulerService(this, this.containerSignatureMatcher,
- host, port, trackingUrl, appContext);
+ String schedulerClassName = getConfig().get(TezConfiguration.TEZ_AM_TASK_SCHEDULER_CLASS);
+ if (schedulerClassName == null) {
+ LOG.info("Using TaskScheduler: YarnTaskSchedulerService");
+ return new YarnTaskSchedulerService(this, this.containerSignatureMatcher,
+ host, port, trackingUrl, appContext);
+ } else {
+ LOG.info("Using custom TaskScheduler: " + schedulerClassName);
+ // TODO Temporary reflection with specific parameters. Remove once there is a clean interface.
+ Class<? extends TaskSchedulerService> taskSchedulerClazz =
+ (Class<? extends TaskSchedulerService>) ReflectionUtils.getClazz(schedulerClassName);
+ try {
+ Constructor<? extends TaskSchedulerService> ctor = taskSchedulerClazz
+ .getConstructor(TaskSchedulerAppCallback.class, AppContext.class, String.class,
+ Integer.class, String.class, Configuration.class);
+ ctor.setAccessible(true);
+ TaskSchedulerService taskSchedulerService =
+ ctor.newInstance(this, appContext, host, port, trackingUrl, getConfig());
+ return taskSchedulerService;
+ } catch (NoSuchMethodException e) {
+ throw new TezUncheckedException(e);
+ } catch (InvocationTargetException e) {
+ throw new TezUncheckedException(e);
+ } catch (InstantiationException e) {
+ throw new TezUncheckedException(e);
+ } catch (IllegalAccessException e) {
+ throw new TezUncheckedException(e);
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/ee954577/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
index d537846..6164e52 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
@@ -356,7 +356,8 @@ public class TezChild {
DefaultMetricsSystem.shutdown();
if (!isLocal) {
RPC.stopProxy(umbilical);
- LogManager.shutdown();
+ // TODO Temporary change. Revert. Ideally, move this over to the main method in TezChild if possible.
+// LogManager.shutdown();
}
}
}