You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/05/09 03:42:59 UTC

[21/43] tez git commit: TEZ-2117. Add a manager for ContainerLaunchers running in the AM. (sseth)

TEZ-2117. Add a manager for ContainerLaunchers running in the AM.
(sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/5f27b83a
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/5f27b83a
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/5f27b83a

Branch: refs/heads/TEZ-2003
Commit: 5f27b83ad5c02b3ce111694246ba536de543f691
Parents: aadd049
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Feb 18 14:45:34 2015 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri May 8 14:41:01 2015 -0700

----------------------------------------------------------------------
 TEZ-2003-CHANGES.txt                            |   1 +
 .../org/apache/tez/dag/app/DAGAppMaster.java    |  54 ++--------
 .../tez/dag/app/launcher/ContainerLauncher.java |   2 +-
 .../app/launcher/ContainerLauncherRouter.java   | 108 +++++++++++++++++++
 .../apache/tez/dag/app/MockDAGAppMaster.java    |   5 +-
 5 files changed, 124 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/5f27b83a/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index 975ce65..1cd74a4 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -2,5 +2,6 @@ ALL CHANGES:
   TEZ-2019. Temporarily allow the scheduler and launcher to be specified via configuration.
   TEZ-2006. Task communication plane needs to be pluggable.
   TEZ-2090. Add tests for jobs running in external services.
+  TEZ-2117. Add a manager for ContainerLaunchers running in the AM.
 
 INCOMPATIBLE CHANGES:

http://git-wip-us.apache.org/repos/asf/tez/blob/5f27b83a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index bfc2d58..0f4d812 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -25,8 +25,6 @@ import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.PrintWriter;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URL;
@@ -139,9 +137,7 @@ import org.apache.tez.dag.app.dag.event.TaskEventType;
 import org.apache.tez.dag.app.dag.event.VertexEvent;
 import org.apache.tez.dag.app.dag.event.VertexEventType;
 import org.apache.tez.dag.app.dag.impl.DAGImpl;
-import org.apache.tez.dag.app.launcher.ContainerLauncher;
-import org.apache.tez.dag.app.launcher.ContainerLauncherImpl;
-import org.apache.tez.dag.app.launcher.LocalContainerLauncher;
+import org.apache.tez.dag.app.launcher.ContainerLauncherRouter;
 import org.apache.tez.dag.app.rm.AMSchedulerEventType;
 import org.apache.tez.dag.app.rm.NMCommunicatorEventType;
 import org.apache.tez.dag.app.rm.TaskSchedulerEventHandler;
@@ -226,7 +222,7 @@ public class DAGAppMaster extends AbstractService {
   private AppContext context;
   private Configuration amConf;
   private AsyncDispatcher dispatcher;
-  private ContainerLauncher containerLauncher;
+  private ContainerLauncherRouter containerLauncherRouter;
   private ContainerHeartbeatHandler containerHeartbeatHandler;
   private TaskHeartbeatHandler taskHeartbeatHandler;
   private TaskAttemptListener taskAttemptListener;
@@ -504,9 +500,9 @@ public class DAGAppMaster extends AbstractService {
         taskSchedulerEventHandler);
     addIfServiceDependency(taskSchedulerEventHandler, clientRpcServer);
 
-    containerLauncher = createContainerLauncher(context);
-    addIfService(containerLauncher, true);
-    dispatcher.register(NMCommunicatorEventType.class, containerLauncher);
+    this.containerLauncherRouter = createContainerLauncherRouter(conf);
+    addIfService(containerLauncherRouter, true);
+    dispatcher.register(NMCommunicatorEventType.class, containerLauncherRouter);
 
     historyEventHandler = createHistoryEventHandler(context);
     addIfService(historyEventHandler, true);
@@ -1034,38 +1030,10 @@ public class DAGAppMaster extends AbstractService {
     return chh;
   }
 
-  protected ContainerLauncher
-      createContainerLauncher(final AppContext context) throws UnknownHostException {
-    if(isLocal){
-      LOG.info("Creating LocalContainerLauncher");
-      return new LocalContainerLauncher(context, taskAttemptListener, workingDirectory);
-    } else {
-      // TODO: Temporary reflection with specific parameters until a clean interface is defined.
-      String containerLauncherClassName = getConfig().get(TezConfiguration.TEZ_AM_CONTAINER_LAUNCHER_CLASS);
-      if (containerLauncherClassName == null) {
-        LOG.info("Creating Default Container Launcher");
-        return new ContainerLauncherImpl(context);
-      } else {
-        LOG.info("Creating container launcher : " + containerLauncherClassName);
-        Class<? extends ContainerLauncher> containerLauncherClazz = (Class<? extends ContainerLauncher>) ReflectionUtils.getClazz(
-            containerLauncherClassName);
-        try {
-          Constructor<? extends ContainerLauncher> ctor = containerLauncherClazz
-              .getConstructor(AppContext.class, Configuration.class, TaskAttemptListener.class);
-          ctor.setAccessible(true);
-          ContainerLauncher instance = ctor.newInstance(context, getConfig(), taskAttemptListener);
-          return instance;
-        } catch (NoSuchMethodException e) {
-          throw new TezUncheckedException(e);
-        } catch (InvocationTargetException e) {
-          throw new TezUncheckedException(e);
-        } catch (InstantiationException e) {
-          throw new TezUncheckedException(e);
-        } catch (IllegalAccessException e) {
-          throw new TezUncheckedException(e);
-        }
-      }
-    }
+  protected ContainerLauncherRouter createContainerLauncherRouter(Configuration conf) throws
+      UnknownHostException {
+    return  new ContainerLauncherRouter(conf, isLocal, context, taskAttemptListener, workingDirectory);
+
   }
 
   public ApplicationId getAppID() {
@@ -1088,8 +1056,8 @@ public class DAGAppMaster extends AbstractService {
     return dispatcher;
   }
 
-  public ContainerLauncher getContainerLauncher() {
-    return containerLauncher;
+  public ContainerLauncherRouter getContainerLauncherRouter() {
+    return containerLauncherRouter;
   }
 
   public TaskAttemptListener getTaskAttemptListener() {

http://git-wip-us.apache.org/repos/asf/tez/blob/5f27b83a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncher.java
index 305eb50..8a8498f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncher.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncher.java
@@ -23,7 +23,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.rm.NMCommunicatorEvent;
 
-public interface ContainerLauncher 
+public interface ContainerLauncher
     extends EventHandler<NMCommunicatorEvent> {
 
     void dagComplete(DAG dag);

http://git-wip-us.apache.org/repos/asf/tez/blob/5f27b83a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
new file mode 100644
index 0000000..34001ed
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.launcher;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.net.UnknownHostException;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.rm.NMCommunicatorEvent;
+
+public class ContainerLauncherRouter extends AbstractService
+    implements EventHandler<NMCommunicatorEvent> {
+
+  static final Log LOG = LogFactory.getLog(ContainerLauncherImpl.class);
+
+  private final ContainerLauncher containerLauncher;
+
+  @VisibleForTesting
+  public ContainerLauncherRouter(ContainerLauncher containerLauncher) {
+    super(ContainerLauncherRouter.class.getName());
+    this.containerLauncher = containerLauncher;
+  }
+
+  // Accepting conf to setup final parameters, if required.
+  public ContainerLauncherRouter(Configuration conf, boolean isLocal, AppContext context,
+                                 TaskAttemptListener taskAttemptListener,
+                                 String workingDirectory) throws UnknownHostException {
+    super(ContainerLauncherRouter.class.getName());
+
+    if (isLocal) {
+      LOG.info("Creating LocalContainerLauncher");
+      containerLauncher =
+          new LocalContainerLauncher(context, taskAttemptListener, workingDirectory);
+    } else {
+      // TODO: Temporary reflection with specific parameters until a clean interface is defined.
+      String containerLauncherClassName =
+          conf.get(TezConfiguration.TEZ_AM_CONTAINER_LAUNCHER_CLASS);
+      if (containerLauncherClassName == null) {
+        LOG.info("Creating Default Container Launcher");
+        containerLauncher = new ContainerLauncherImpl(context);
+      } else {
+        LOG.info("Creating container launcher : " + containerLauncherClassName);
+        Class<? extends ContainerLauncher> containerLauncherClazz =
+            (Class<? extends ContainerLauncher>) ReflectionUtils.getClazz(
+                containerLauncherClassName);
+        try {
+          Constructor<? extends ContainerLauncher> ctor = containerLauncherClazz
+              .getConstructor(AppContext.class, Configuration.class, TaskAttemptListener.class);
+          ctor.setAccessible(true);
+          containerLauncher = ctor.newInstance(context, conf, taskAttemptListener);
+        } catch (NoSuchMethodException e) {
+          throw new TezUncheckedException(e);
+        } catch (InvocationTargetException e) {
+          throw new TezUncheckedException(e);
+        } catch (InstantiationException e) {
+          throw new TezUncheckedException(e);
+        } catch (IllegalAccessException e) {
+          throw new TezUncheckedException(e);
+        }
+      }
+
+    }
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) {
+    ((AbstractService)containerLauncher).init(conf);
+  }
+
+  @Override
+  public void serviceStart() {
+    ((AbstractService)containerLauncher).start();
+  }
+
+  @Override
+  public void serviceStop() {
+    ((AbstractService)containerLauncher).stop();
+  }
+
+
+  @Override
+  public void handle(NMCommunicatorEvent event) {
+    containerLauncher.handle(event);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/5f27b83a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
index b846922..7274cde 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -57,6 +57,7 @@ import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TaskCommunicator;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.app.launcher.ContainerLauncher;
+import org.apache.tez.dag.app.launcher.ContainerLauncherRouter;
 import org.apache.tez.dag.app.rm.NMCommunicatorEvent;
 import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent;
 import org.apache.tez.dag.app.rm.NMCommunicatorStopRequestEvent;
@@ -509,9 +510,9 @@ public class MockDAGAppMaster extends DAGAppMaster {
   
   // use mock container launcher for tests
   @Override
-  protected ContainerLauncher createContainerLauncher(final AppContext context)
+  protected ContainerLauncherRouter createContainerLauncherRouter(final Configuration conf)
       throws UnknownHostException {
-    return containerLauncher;
+    return new ContainerLauncherRouter(containerLauncher);
   }
 
   @Override