You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/05/09 03:42:59 UTC
[21/43] tez git commit: TEZ-2117. Add a manager for
ContainerLaunchers running in the AM. (sseth)
TEZ-2117. Add a manager for ContainerLaunchers running in the AM.
(sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/5f27b83a
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/5f27b83a
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/5f27b83a
Branch: refs/heads/TEZ-2003
Commit: 5f27b83ad5c02b3ce111694246ba536de543f691
Parents: aadd049
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Feb 18 14:45:34 2015 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri May 8 14:41:01 2015 -0700
----------------------------------------------------------------------
TEZ-2003-CHANGES.txt | 1 +
.../org/apache/tez/dag/app/DAGAppMaster.java | 54 ++--------
.../tez/dag/app/launcher/ContainerLauncher.java | 2 +-
.../app/launcher/ContainerLauncherRouter.java | 108 +++++++++++++++++++
.../apache/tez/dag/app/MockDAGAppMaster.java | 5 +-
5 files changed, 124 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/5f27b83a/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index 975ce65..1cd74a4 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -2,5 +2,6 @@ ALL CHANGES:
TEZ-2019. Temporarily allow the scheduler and launcher to be specified via configuration.
TEZ-2006. Task communication plane needs to be pluggable.
TEZ-2090. Add tests for jobs running in external services.
+ TEZ-2117. Add a manager for ContainerLaunchers running in the AM.
INCOMPATIBLE CHANGES:
http://git-wip-us.apache.org/repos/asf/tez/blob/5f27b83a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index bfc2d58..0f4d812 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -25,8 +25,6 @@ import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintWriter;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
@@ -139,9 +137,7 @@ import org.apache.tez.dag.app.dag.event.TaskEventType;
import org.apache.tez.dag.app.dag.event.VertexEvent;
import org.apache.tez.dag.app.dag.event.VertexEventType;
import org.apache.tez.dag.app.dag.impl.DAGImpl;
-import org.apache.tez.dag.app.launcher.ContainerLauncher;
-import org.apache.tez.dag.app.launcher.ContainerLauncherImpl;
-import org.apache.tez.dag.app.launcher.LocalContainerLauncher;
+import org.apache.tez.dag.app.launcher.ContainerLauncherRouter;
import org.apache.tez.dag.app.rm.AMSchedulerEventType;
import org.apache.tez.dag.app.rm.NMCommunicatorEventType;
import org.apache.tez.dag.app.rm.TaskSchedulerEventHandler;
@@ -226,7 +222,7 @@ public class DAGAppMaster extends AbstractService {
private AppContext context;
private Configuration amConf;
private AsyncDispatcher dispatcher;
- private ContainerLauncher containerLauncher;
+ private ContainerLauncherRouter containerLauncherRouter;
private ContainerHeartbeatHandler containerHeartbeatHandler;
private TaskHeartbeatHandler taskHeartbeatHandler;
private TaskAttemptListener taskAttemptListener;
@@ -504,9 +500,9 @@ public class DAGAppMaster extends AbstractService {
taskSchedulerEventHandler);
addIfServiceDependency(taskSchedulerEventHandler, clientRpcServer);
- containerLauncher = createContainerLauncher(context);
- addIfService(containerLauncher, true);
- dispatcher.register(NMCommunicatorEventType.class, containerLauncher);
+ this.containerLauncherRouter = createContainerLauncherRouter(conf);
+ addIfService(containerLauncherRouter, true);
+ dispatcher.register(NMCommunicatorEventType.class, containerLauncherRouter);
historyEventHandler = createHistoryEventHandler(context);
addIfService(historyEventHandler, true);
@@ -1034,38 +1030,10 @@ public class DAGAppMaster extends AbstractService {
return chh;
}
- protected ContainerLauncher
- createContainerLauncher(final AppContext context) throws UnknownHostException {
- if(isLocal){
- LOG.info("Creating LocalContainerLauncher");
- return new LocalContainerLauncher(context, taskAttemptListener, workingDirectory);
- } else {
- // TODO: Temporary reflection with specific parameters until a clean interface is defined.
- String containerLauncherClassName = getConfig().get(TezConfiguration.TEZ_AM_CONTAINER_LAUNCHER_CLASS);
- if (containerLauncherClassName == null) {
- LOG.info("Creating Default Container Launcher");
- return new ContainerLauncherImpl(context);
- } else {
- LOG.info("Creating container launcher : " + containerLauncherClassName);
- Class<? extends ContainerLauncher> containerLauncherClazz = (Class<? extends ContainerLauncher>) ReflectionUtils.getClazz(
- containerLauncherClassName);
- try {
- Constructor<? extends ContainerLauncher> ctor = containerLauncherClazz
- .getConstructor(AppContext.class, Configuration.class, TaskAttemptListener.class);
- ctor.setAccessible(true);
- ContainerLauncher instance = ctor.newInstance(context, getConfig(), taskAttemptListener);
- return instance;
- } catch (NoSuchMethodException e) {
- throw new TezUncheckedException(e);
- } catch (InvocationTargetException e) {
- throw new TezUncheckedException(e);
- } catch (InstantiationException e) {
- throw new TezUncheckedException(e);
- } catch (IllegalAccessException e) {
- throw new TezUncheckedException(e);
- }
- }
- }
+ protected ContainerLauncherRouter createContainerLauncherRouter(Configuration conf) throws
+ UnknownHostException {
+ return new ContainerLauncherRouter(conf, isLocal, context, taskAttemptListener, workingDirectory);
+
}
public ApplicationId getAppID() {
@@ -1088,8 +1056,8 @@ public class DAGAppMaster extends AbstractService {
return dispatcher;
}
- public ContainerLauncher getContainerLauncher() {
- return containerLauncher;
+ public ContainerLauncherRouter getContainerLauncherRouter() {
+ return containerLauncherRouter;
}
public TaskAttemptListener getTaskAttemptListener() {
http://git-wip-us.apache.org/repos/asf/tez/blob/5f27b83a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncher.java
index 305eb50..8a8498f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncher.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncher.java
@@ -23,7 +23,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.rm.NMCommunicatorEvent;
-public interface ContainerLauncher
+public interface ContainerLauncher
extends EventHandler<NMCommunicatorEvent> {
void dagComplete(DAG dag);
http://git-wip-us.apache.org/repos/asf/tez/blob/5f27b83a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
new file mode 100644
index 0000000..34001ed
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.launcher;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.net.UnknownHostException;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.rm.NMCommunicatorEvent;
+
+public class ContainerLauncherRouter extends AbstractService
+ implements EventHandler<NMCommunicatorEvent> {
+
+ static final Log LOG = LogFactory.getLog(ContainerLauncherImpl.class);
+
+ private final ContainerLauncher containerLauncher;
+
+ @VisibleForTesting
+ public ContainerLauncherRouter(ContainerLauncher containerLauncher) {
+ super(ContainerLauncherRouter.class.getName());
+ this.containerLauncher = containerLauncher;
+ }
+
+ // Accepting conf to setup final parameters, if required.
+ public ContainerLauncherRouter(Configuration conf, boolean isLocal, AppContext context,
+ TaskAttemptListener taskAttemptListener,
+ String workingDirectory) throws UnknownHostException {
+ super(ContainerLauncherRouter.class.getName());
+
+ if (isLocal) {
+ LOG.info("Creating LocalContainerLauncher");
+ containerLauncher =
+ new LocalContainerLauncher(context, taskAttemptListener, workingDirectory);
+ } else {
+ // TODO: Temporary reflection with specific parameters until a clean interface is defined.
+ String containerLauncherClassName =
+ conf.get(TezConfiguration.TEZ_AM_CONTAINER_LAUNCHER_CLASS);
+ if (containerLauncherClassName == null) {
+ LOG.info("Creating Default Container Launcher");
+ containerLauncher = new ContainerLauncherImpl(context);
+ } else {
+ LOG.info("Creating container launcher : " + containerLauncherClassName);
+ Class<? extends ContainerLauncher> containerLauncherClazz =
+ (Class<? extends ContainerLauncher>) ReflectionUtils.getClazz(
+ containerLauncherClassName);
+ try {
+ Constructor<? extends ContainerLauncher> ctor = containerLauncherClazz
+ .getConstructor(AppContext.class, Configuration.class, TaskAttemptListener.class);
+ ctor.setAccessible(true);
+ containerLauncher = ctor.newInstance(context, conf, taskAttemptListener);
+ } catch (NoSuchMethodException e) {
+ throw new TezUncheckedException(e);
+ } catch (InvocationTargetException e) {
+ throw new TezUncheckedException(e);
+ } catch (InstantiationException e) {
+ throw new TezUncheckedException(e);
+ } catch (IllegalAccessException e) {
+ throw new TezUncheckedException(e);
+ }
+ }
+
+ }
+ }
+
+ @Override
+ public void serviceInit(Configuration conf) {
+ ((AbstractService)containerLauncher).init(conf);
+ }
+
+ @Override
+ public void serviceStart() {
+ ((AbstractService)containerLauncher).start();
+ }
+
+ @Override
+ public void serviceStop() {
+ ((AbstractService)containerLauncher).stop();
+ }
+
+
+ @Override
+ public void handle(NMCommunicatorEvent event) {
+ containerLauncher.handle(event);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/5f27b83a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
index b846922..7274cde 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -57,6 +57,7 @@ import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TaskCommunicator;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.app.launcher.ContainerLauncher;
+import org.apache.tez.dag.app.launcher.ContainerLauncherRouter;
import org.apache.tez.dag.app.rm.NMCommunicatorEvent;
import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent;
import org.apache.tez.dag.app.rm.NMCommunicatorStopRequestEvent;
@@ -509,9 +510,9 @@ public class MockDAGAppMaster extends DAGAppMaster {
// use mock container launcher for tests
@Override
- protected ContainerLauncher createContainerLauncher(final AppContext context)
+ protected ContainerLauncherRouter createContainerLauncherRouter(final Configuration conf)
throws UnknownHostException {
- return containerLauncher;
+ return new ContainerLauncherRouter(containerLauncher);
}
@Override