You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/26 01:48:53 UTC

[6/7] tez git commit: TEZ-2708. Rename classes and variables post TEZ-2003 changes. (sseth)

http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
deleted file mode 100644
index 07d269d..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
+++ /dev/null
@@ -1,410 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app.launcher;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.commons.lang.exception.ExceptionUtils;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.TezConstants;
-import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
-import org.apache.tez.serviceplugins.api.ContainerLauncher;
-import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
-import org.apache.tez.serviceplugins.api.ContainerStopRequest;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.Token;
-import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy;
-import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData;
-import org.apache.hadoop.yarn.util.Records;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezUncheckedException;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-
-// TODO See what part of this lifecycle and state management can be simplified.
-// Ideally, no state - only sendStart / sendStop.
-
-// TODO Review this entire code and clean it up.
-
-/**
- * This class is responsible for launching of containers.
- */
-public class ContainerLauncherImpl extends ContainerLauncher {
-
-  // TODO Ensure the same thread is used to launch / stop the same container. Or - ensure event ordering.
-  static final Logger LOG = LoggerFactory.getLogger(ContainerLauncherImpl.class);
-
-  private final ConcurrentHashMap<ContainerId, Container> containers =
-    new ConcurrentHashMap<>();
-  protected ThreadPoolExecutor launcherPool;
-  protected static final int INITIAL_POOL_SIZE = 10;
-  private final int limitOnPoolSize;
-  private final Configuration conf;
-  private Thread eventHandlingThread;
-  protected BlockingQueue<ContainerOp> eventQueue = new LinkedBlockingQueue<>();
-  private ContainerManagementProtocolProxy cmProxy;
-  private AtomicBoolean serviceStopped = new AtomicBoolean(false);
-
-  private Container getContainer(ContainerOp event) {
-    ContainerId id = event.getBaseOperation().getContainerId();
-    Container c = containers.get(id);
-    if(c == null) {
-      c = new Container(id,
-          event.getBaseOperation().getNodeId().toString(), event.getBaseOperation().getContainerToken());
-      Container old = containers.putIfAbsent(id, c);
-      if(old != null) {
-        c = old;
-      }
-    }
-    return c;
-  }
-
-  private void removeContainerIfDone(ContainerId id) {
-    Container c = containers.get(id);
-    if(c != null && c.isCompletelyDone()) {
-      containers.remove(id);
-    }
-  }
-
-
-  private static enum ContainerState {
-    PREP, FAILED, RUNNING, DONE, KILLED_BEFORE_LAUNCH
-  }
-
-  private class Container {
-    private ContainerState state;
-    // store enough information to be able to cleanup the container
-    private ContainerId containerID;
-    final private String containerMgrAddress;
-    private Token containerToken;
-
-    public Container(ContainerId containerID,
-        String containerMgrAddress, Token containerToken) {
-      this.state = ContainerState.PREP;
-      this.containerMgrAddress = containerMgrAddress;
-      this.containerID = containerID;
-      this.containerToken = containerToken;
-    }
-
-    public synchronized boolean isCompletelyDone() {
-      return state == ContainerState.DONE || state == ContainerState.FAILED;
-    }
-
-    @SuppressWarnings("unchecked")
-    public synchronized void launch(ContainerLaunchRequest event) {
-      LOG.info("Launching Container with Id: " + event.getContainerId());
-      if(this.state == ContainerState.KILLED_BEFORE_LAUNCH) {
-        state = ContainerState.DONE;
-        sendContainerLaunchFailedMsg(event.getContainerId(),
-            "Container was killed before it was launched");
-        return;
-      }
-
-      ContainerManagementProtocolProxyData proxy = null;
-      try {
-
-        proxy = getCMProxy(containerID, containerMgrAddress,
-            containerToken);
-
-        // Construct the actual Container
-        ContainerLaunchContext containerLaunchContext =
-          event.getContainerLaunchContext();
-
-        // Now launch the actual container
-        StartContainerRequest startRequest = Records
-          .newRecord(StartContainerRequest.class);
-        startRequest.setContainerToken(event.getContainerToken());
-        startRequest.setContainerLaunchContext(containerLaunchContext);
-
-        StartContainersResponse response =
-            proxy.getContainerManagementProtocol().startContainers(
-                StartContainersRequest.newInstance(
-                    Collections.singletonList(startRequest)));
-        if (response.getFailedRequests() != null
-            && !response.getFailedRequests().isEmpty()) {
-          throw response.getFailedRequests().get(containerID).deSerialize();
-        }
-
-        // after launching, send launched event to task attempt to move
-        // it from ASSIGNED to RUNNING state
-        getContext().containerLaunched(containerID);
-        this.state = ContainerState.RUNNING;
-      } catch (Throwable t) {
-        String message = "Container launch failed for " + containerID + " : "
-            + ExceptionUtils.getStackTrace(t);
-        this.state = ContainerState.FAILED;
-        sendContainerLaunchFailedMsg(containerID, message);
-      } finally {
-        if (proxy != null) {
-          cmProxy.mayBeCloseProxy(proxy);
-        }
-      }
-    }
-
-    @SuppressWarnings("unchecked")
-    public synchronized void kill() {
-
-      if(isCompletelyDone()) {
-        return;
-      }
-      if(this.state == ContainerState.PREP) {
-        this.state = ContainerState.KILLED_BEFORE_LAUNCH;
-      } else {
-        LOG.info("Sending a stop request to the NM for ContainerId: "
-            + containerID);
-
-        ContainerManagementProtocolProxyData proxy = null;
-        try {
-          proxy = getCMProxy(this.containerID, this.containerMgrAddress,
-              this.containerToken);
-
-            // kill the remote container if already launched
-            StopContainersRequest stopRequest = Records
-              .newRecord(StopContainersRequest.class);
-            stopRequest.setContainerIds(Collections.singletonList(containerID));
-
-            proxy.getContainerManagementProtocol().stopContainers(stopRequest);
-
-            // If stopContainer returns without an error, assuming the stop made
-            // it over to the NodeManager.
-          getContext().containerStopRequested(containerID);
-        } catch (Throwable t) {
-
-          // ignore the cleanup failure
-          String message = "cleanup failed for container "
-            + this.containerID + " : "
-            + ExceptionUtils.getStackTrace(t);
-          getContext().containerStopFailed(containerID, message);
-          LOG.warn(message);
-          this.state = ContainerState.DONE;
-          return;
-        } finally {
-          if (proxy != null) {
-            cmProxy.mayBeCloseProxy(proxy);
-          }
-        }
-        this.state = ContainerState.DONE;
-      }
-    }
-  }
-
-  public ContainerLauncherImpl(ContainerLauncherContext containerLauncherContext) {
-    super(containerLauncherContext);
-    try {
-      this.conf = TezUtils.createConfFromUserPayload(containerLauncherContext.getInitialUserPayload());
-    } catch (IOException e) {
-      throw new TezUncheckedException(
-          "Failed to parse user payload for " + ContainerLauncherImpl.class.getSimpleName(), e);
-    }
-    conf.setInt(
-        CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
-        0);
-    this.limitOnPoolSize = conf.getInt(
-        TezConfiguration.TEZ_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT,
-        TezConfiguration.TEZ_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT_DEFAULT);
-    LOG.info("Upper limit on the thread pool size is " + this.limitOnPoolSize);
-  }
-
-  @Override
-  public void start() {
-    // pass a copy of config to ContainerManagementProtocolProxy until YARN-3497 is fixed
-    cmProxy =
-        new ContainerManagementProtocolProxy(conf);
-
-    ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
-        "ContainerLauncher #%d").setDaemon(true).build();
-
-    // Start with a default core-pool size of 10 and change it dynamically.
-    launcherPool = new ThreadPoolExecutor(INITIAL_POOL_SIZE,
-        Integer.MAX_VALUE, 1, TimeUnit.HOURS,
-        new LinkedBlockingQueue<Runnable>(),
-        tf, new CustomizedRejectedExecutionHandler());
-    eventHandlingThread = new Thread() {
-      @Override
-      public void run() {
-        ContainerOp event = null;
-        while (!Thread.currentThread().isInterrupted()) {
-          try {
-            event = eventQueue.take();
-          } catch (InterruptedException e) {
-            if(!serviceStopped.get()) {
-              LOG.error("Returning, interrupted : " + e);
-            }
-            return;
-          }
-          int poolSize = launcherPool.getCorePoolSize();
-
-          // See if we need up the pool size only if haven't reached the
-          // maximum limit yet.
-          if (poolSize != limitOnPoolSize) {
-
-            // nodes where containers will run at *this* point of time. This is
-            // *not* the cluster size and doesn't need to be.
-            int numNodes =
-                getContext().getNumNodes(TezConstants.getTezYarnServicePluginName());
-            int idealPoolSize = Math.min(limitOnPoolSize, numNodes);
-
-            if (poolSize < idealPoolSize) {
-              // Bump up the pool size to idealPoolSize+INITIAL_POOL_SIZE, the
-              // later is just a buffer so we are not always increasing the
-              // pool-size
-              int newPoolSize = Math.min(limitOnPoolSize, idealPoolSize
-                  + INITIAL_POOL_SIZE);
-              LOG.info("Setting ContainerLauncher pool size to " + newPoolSize
-                  + " as number-of-nodes to talk to is " + numNodes);
-              launcherPool.setCorePoolSize(newPoolSize);
-            }
-          }
-
-          // the events from the queue are handled in parallel
-          // using a thread pool
-          launcherPool.execute(createEventProcessor(event));
-
-          // TODO: Group launching of multiple containers to a single
-          // NodeManager into a single connection
-        }
-      }
-    };
-    eventHandlingThread.setName("ContainerLauncher Event Handler");
-    eventHandlingThread.start();
-  }
-
-  private void shutdownAllContainers() {
-    for (Container ct : this.containers.values()) {
-      if (ct != null) {
-        ct.kill();
-      }
-    }
-  }
-
-  @Override
-  public void shutdown() {
-    if(!serviceStopped.compareAndSet(false, true)) {
-      LOG.info("Ignoring multiple stops");
-      return;
-    }
-    // shutdown any containers that might be left running
-    shutdownAllContainers();
-    if (eventHandlingThread != null) {
-      eventHandlingThread.interrupt();
-    }
-    if (launcherPool != null) {
-      launcherPool.shutdownNow();
-    }
-  }
-
-  protected EventProcessor createEventProcessor(ContainerOp event) {
-    return new EventProcessor(event);
-  }
-
-  protected ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData getCMProxy(
-      ContainerId containerID, final String containerManagerBindAddr,
-      Token containerToken) throws IOException {
-    return cmProxy.getProxy(containerManagerBindAddr, containerID);
-  }
-
-  /**
-   * Setup and start the container on remote nodemanager.
-   */
-  class EventProcessor implements Runnable {
-    private ContainerOp event;
-
-    EventProcessor(ContainerOp event) {
-      this.event = event;
-    }
-
-    @Override
-    public void run() {
-      LOG.info("Processing operation {}", event.toString());
-
-      // Load ContainerManager tokens before creating a connection.
-      // TODO: Do it only once per NodeManager.
-      ContainerId containerID = event.getBaseOperation().getContainerId();
-
-      Container c = getContainer(event);
-      switch(event.getOpType()) {
-        case LAUNCH_REQUEST:
-          ContainerLaunchRequest launchRequest = event.getLaunchRequest();
-          c.launch(launchRequest);
-          break;
-        case STOP_REQUEST:
-          c.kill();
-          break;
-      }
-      removeContainerIfDone(containerID);
-    }
-  }
-
-  /**
-   * ThreadPoolExecutor.submit may fail if you are submitting task
-   * when ThreadPoolExecutor is shutting down (DAGAppMaster is shutting down).
-   * Use this CustomizedRejectedExecutionHandler to just logging rather than abort the application.
-   */
-  private static class CustomizedRejectedExecutionHandler implements RejectedExecutionHandler {
-    @Override
-    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
-      LOG.warn("Can't submit task to ThreadPoolExecutor:" + executor);
-    }
-  }
-
-  @SuppressWarnings("unchecked")
-  void sendContainerLaunchFailedMsg(ContainerId containerId,
-      String message) {
-    LOG.error(message);
-    getContext().containerLaunchFailed(containerId, message);
-  }
-
-
-  @Override
-  public void launchContainer(ContainerLaunchRequest launchRequest) {
-    try {
-      eventQueue.put(new ContainerOp(ContainerOp.OPType.LAUNCH_REQUEST, launchRequest));
-    } catch (InterruptedException e) {
-      throw new TezUncheckedException(e);
-    }
-  }
-
-  @Override
-  public void stopContainer(ContainerStopRequest stopRequest) {
-    try {
-      eventQueue.put(new ContainerOp(ContainerOp.OPType.STOP_REQUEST, stopRequest));
-    } catch (InterruptedException e) {
-      throw new TezUncheckedException(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java
new file mode 100644
index 0000000..15a10bd
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.launcher;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.net.UnknownHostException;
+import java.util.List;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.dag.api.NamedEntityDescriptor;
+import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
+import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
+import org.apache.tez.serviceplugins.api.ContainerLauncher;
+import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
+import org.apache.tez.serviceplugins.api.ContainerStopRequest;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.ContainerLauncherContextImpl;
+import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
+import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.rm.ContainerLauncherEvent;
+import org.apache.tez.dag.app.rm.ContainerLauncherLaunchRequestEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ContainerLauncherManager extends AbstractService
+    implements EventHandler<ContainerLauncherEvent> {
+
+  static final Logger LOG = LoggerFactory.getLogger(TezContainerLauncherImpl.class);
+
+  @VisibleForTesting
+  final ContainerLauncher containerLaunchers[];
+  @VisibleForTesting
+  final ContainerLauncherContext containerLauncherContexts[];
+  protected final ServicePluginLifecycleAbstractService[] containerLauncherServiceWrappers;
+  private final AppContext appContext;
+
+  @VisibleForTesting
+  public ContainerLauncherManager(ContainerLauncher containerLauncher, AppContext context) {
+    super(ContainerLauncherManager.class.getName());
+    this.appContext = context;
+    containerLaunchers = new ContainerLauncher[] {containerLauncher};
+    containerLauncherContexts = new ContainerLauncherContext[] {containerLauncher.getContext()};
+    containerLauncherServiceWrappers = new ServicePluginLifecycleAbstractService[]{
+        new ServicePluginLifecycleAbstractService<>(containerLauncher)};
+  }
+
+  // Accepting conf to setup final parameters, if required.
+  public ContainerLauncherManager(AppContext context,
+                                  TaskCommunicatorManagerInterface taskCommunicatorManagerInterface,
+                                  String workingDirectory,
+                                  List<NamedEntityDescriptor> containerLauncherDescriptors,
+                                  boolean isPureLocalMode) {
+    super(ContainerLauncherManager.class.getName());
+
+    this.appContext = context;
+    Preconditions.checkArgument(
+        containerLauncherDescriptors != null && !containerLauncherDescriptors.isEmpty(),
+        "ContainerLauncherDescriptors must be specified");
+    containerLauncherContexts = new ContainerLauncherContext[containerLauncherDescriptors.size()];
+    containerLaunchers = new ContainerLauncher[containerLauncherDescriptors.size()];
+    containerLauncherServiceWrappers = new ServicePluginLifecycleAbstractService[containerLauncherDescriptors.size()];
+
+
+    for (int i = 0; i < containerLauncherDescriptors.size(); i++) {
+      UserPayload userPayload = containerLauncherDescriptors.get(i).getUserPayload();
+      ContainerLauncherContext containerLauncherContext =
+          new ContainerLauncherContextImpl(context, taskCommunicatorManagerInterface, userPayload);
+      containerLauncherContexts[i] = containerLauncherContext;
+      containerLaunchers[i] = createContainerLauncher(containerLauncherDescriptors.get(i), context,
+          containerLauncherContext, taskCommunicatorManagerInterface, workingDirectory, i, isPureLocalMode);
+      containerLauncherServiceWrappers[i] = new ServicePluginLifecycleAbstractService<>(containerLaunchers[i]);
+    }
+  }
+
+  @VisibleForTesting
+  ContainerLauncher createContainerLauncher(
+      NamedEntityDescriptor containerLauncherDescriptor,
+      AppContext context,
+      ContainerLauncherContext containerLauncherContext,
+      TaskCommunicatorManagerInterface taskCommunicatorManagerInterface,
+      String workingDirectory,
+      int containerLauncherIndex,
+      boolean isPureLocalMode) {
+    if (containerLauncherDescriptor.getEntityName().equals(
+        TezConstants.getTezYarnServicePluginName())) {
+      return createYarnContainerLauncher(containerLauncherContext);
+    } else if (containerLauncherDescriptor.getEntityName()
+        .equals(TezConstants.getTezUberServicePluginName())) {
+      return createUberContainerLauncher(containerLauncherContext, context,
+          taskCommunicatorManagerInterface,
+          workingDirectory, isPureLocalMode);
+    } else {
+      return createCustomContainerLauncher(containerLauncherContext, containerLauncherDescriptor);
+    }
+  }
+
+  @VisibleForTesting
+  ContainerLauncher createYarnContainerLauncher(ContainerLauncherContext containerLauncherContext) {
+    LOG.info("Creating DefaultContainerLauncher");
+    return new TezContainerLauncherImpl(containerLauncherContext);
+  }
+
+  @VisibleForTesting
+  ContainerLauncher createUberContainerLauncher(ContainerLauncherContext containerLauncherContext,
+                                                AppContext context,
+                                                TaskCommunicatorManagerInterface taskCommunicatorManagerInterface,
+                                                String workingDirectory,
+                                                boolean isPureLocalMode) {
+    LOG.info("Creating LocalContainerLauncher");
+    // TODO Post TEZ-2003. LocalContainerLauncher is special cased, since it makes use of
+    // extensive internals which are only available at runtime. Will likely require
+    // some kind of runtime binding of parameters in the payload to work correctly.
+    try {
+      return
+          new LocalContainerLauncher(containerLauncherContext, context,
+              taskCommunicatorManagerInterface,
+              workingDirectory, isPureLocalMode);
+    } catch (UnknownHostException e) {
+      throw new TezUncheckedException(e);
+    }
+  }
+
+  @VisibleForTesting
+  @SuppressWarnings("unchecked")
+  ContainerLauncher createCustomContainerLauncher(ContainerLauncherContext containerLauncherContext,
+                                                  NamedEntityDescriptor containerLauncherDescriptor) {
+    LOG.info("Creating container launcher {}:{} ", containerLauncherDescriptor.getEntityName(),
+        containerLauncherDescriptor.getClassName());
+    Class<? extends ContainerLauncher> containerLauncherClazz =
+        (Class<? extends ContainerLauncher>) ReflectionUtils.getClazz(
+            containerLauncherDescriptor.getClassName());
+    try {
+      Constructor<? extends ContainerLauncher> ctor = containerLauncherClazz
+          .getConstructor(ContainerLauncherContext.class);
+      return ctor.newInstance(containerLauncherContext);
+    } catch (NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) {
+      throw new TezUncheckedException(e);
+    }
+
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) {
+    for (int i = 0 ; i < containerLaunchers.length ; i++) {
+      containerLauncherServiceWrappers[i].init(conf);
+    }
+  }
+
+  @Override
+  public void serviceStart() {
+    for (int i = 0 ; i < containerLaunchers.length ; i++) {
+      containerLauncherServiceWrappers[i].start();
+    }
+  }
+
+  @Override
+  public void serviceStop() {
+    for (int i = 0 ; i < containerLaunchers.length ; i++) {
+      containerLauncherServiceWrappers[i].stop();
+    }
+  }
+
+  public void dagComplete(DAG dag) {
+    // Nothing required at the moment. Containers are shared across DAGs
+  }
+
+  public void dagSubmitted() {
+    // Nothing to do right now. Indicates that a new DAG has been submitted and
+    // the context has updated information.
+  }
+
+
+  @Override
+  public void handle(ContainerLauncherEvent event) {
+    int launcherId = event.getLauncherId();
+    String schedulerName = appContext.getTaskSchedulerName(event.getSchedulerId());
+    String taskCommName = appContext.getTaskCommunicatorName(event.getTaskCommId());
+    switch (event.getType()) {
+      case CONTAINER_LAUNCH_REQUEST:
+        ContainerLauncherLaunchRequestEvent launchEvent = (ContainerLauncherLaunchRequestEvent) event;
+        ContainerLaunchRequest launchRequest =
+            new ContainerLaunchRequest(launchEvent.getNodeId(), launchEvent.getContainerId(),
+                launchEvent.getContainerToken(), launchEvent.getContainerLaunchContext(),
+                launchEvent.getContainer(), schedulerName,
+                taskCommName);
+        containerLaunchers[launcherId].launchContainer(launchRequest);
+        break;
+      case CONTAINER_STOP_REQUEST:
+        ContainerStopRequest stopRequest =
+            new ContainerStopRequest(event.getNodeId(), event.getContainerId(),
+                event.getContainerToken(), schedulerName, taskCommName);
+        containerLaunchers[launcherId].stopContainer(stopRequest);
+        break;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
deleted file mode 100644
index b56bd5b..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.app.launcher;
-
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.net.UnknownHostException;
-import java.util.List;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.tez.common.ReflectionUtils;
-import org.apache.tez.dag.api.NamedEntityDescriptor;
-import org.apache.tez.dag.api.TezConstants;
-import org.apache.tez.dag.api.UserPayload;
-import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
-import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
-import org.apache.tez.serviceplugins.api.ContainerLauncher;
-import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
-import org.apache.tez.serviceplugins.api.ContainerStopRequest;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.ContainerLauncherContextImpl;
-import org.apache.tez.dag.app.TaskAttemptListener;
-import org.apache.tez.dag.app.dag.DAG;
-import org.apache.tez.dag.app.rm.NMCommunicatorEvent;
-import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ContainerLauncherRouter extends AbstractService
-    implements EventHandler<NMCommunicatorEvent> {
-
-  static final Logger LOG = LoggerFactory.getLogger(ContainerLauncherImpl.class);
-
-  @VisibleForTesting
-  final ContainerLauncher containerLaunchers[];
-  @VisibleForTesting
-  final ContainerLauncherContext containerLauncherContexts[];
-  protected final ServicePluginLifecycleAbstractService[] containerLauncherServiceWrappers;
-  private final AppContext appContext;
-
-  @VisibleForTesting
-  public ContainerLauncherRouter(ContainerLauncher containerLauncher, AppContext context) {
-    super(ContainerLauncherRouter.class.getName());
-    this.appContext = context;
-    containerLaunchers = new ContainerLauncher[] {containerLauncher};
-    containerLauncherContexts = new ContainerLauncherContext[] {containerLauncher.getContext()};
-    containerLauncherServiceWrappers = new ServicePluginLifecycleAbstractService[]{
-        new ServicePluginLifecycleAbstractService<>(containerLauncher)};
-  }
-
-  // Accepting conf to setup final parameters, if required.
-  public ContainerLauncherRouter(AppContext context,
-                                 TaskAttemptListener taskAttemptListener,
-                                 String workingDirectory,
-                                 List<NamedEntityDescriptor> containerLauncherDescriptors,
-                                 boolean isPureLocalMode) {
-    super(ContainerLauncherRouter.class.getName());
-
-    this.appContext = context;
-    Preconditions.checkArgument(
-        containerLauncherDescriptors != null && !containerLauncherDescriptors.isEmpty(),
-        "ContainerLauncherDescriptors must be specified");
-    containerLauncherContexts = new ContainerLauncherContext[containerLauncherDescriptors.size()];
-    containerLaunchers = new ContainerLauncher[containerLauncherDescriptors.size()];
-    containerLauncherServiceWrappers = new ServicePluginLifecycleAbstractService[containerLauncherDescriptors.size()];
-
-
-    for (int i = 0; i < containerLauncherDescriptors.size(); i++) {
-      UserPayload userPayload = containerLauncherDescriptors.get(i).getUserPayload();
-      ContainerLauncherContext containerLauncherContext =
-          new ContainerLauncherContextImpl(context, taskAttemptListener, userPayload);
-      containerLauncherContexts[i] = containerLauncherContext;
-      containerLaunchers[i] = createContainerLauncher(containerLauncherDescriptors.get(i), context,
-          containerLauncherContext, taskAttemptListener, workingDirectory, i, isPureLocalMode);
-      containerLauncherServiceWrappers[i] = new ServicePluginLifecycleAbstractService<>(containerLaunchers[i]);
-    }
-  }
-
-  @VisibleForTesting
-  ContainerLauncher createContainerLauncher(
-      NamedEntityDescriptor containerLauncherDescriptor,
-      AppContext context,
-      ContainerLauncherContext containerLauncherContext,
-      TaskAttemptListener taskAttemptListener,
-      String workingDirectory,
-      int containerLauncherIndex,
-      boolean isPureLocalMode) {
-    if (containerLauncherDescriptor.getEntityName().equals(
-        TezConstants.getTezYarnServicePluginName())) {
-      return createYarnContainerLauncher(containerLauncherContext);
-    } else if (containerLauncherDescriptor.getEntityName()
-        .equals(TezConstants.getTezUberServicePluginName())) {
-      return createUberContainerLauncher(containerLauncherContext, context, taskAttemptListener,
-          workingDirectory, isPureLocalMode);
-    } else {
-      return createCustomContainerLauncher(containerLauncherContext, containerLauncherDescriptor);
-    }
-  }
-
-  @VisibleForTesting
-  ContainerLauncher createYarnContainerLauncher(ContainerLauncherContext containerLauncherContext) {
-    LOG.info("Creating DefaultContainerLauncher");
-    return new ContainerLauncherImpl(containerLauncherContext);
-  }
-
-  @VisibleForTesting
-  ContainerLauncher createUberContainerLauncher(ContainerLauncherContext containerLauncherContext,
-                                                AppContext context,
-                                                TaskAttemptListener taskAttemptListener,
-                                                String workingDirectory,
-                                                boolean isPureLocalMode) {
-    LOG.info("Creating LocalContainerLauncher");
-    // TODO Post TEZ-2003. LocalContainerLauncher is special cased, since it makes use of
-    // extensive internals which are only available at runtime. Will likely require
-    // some kind of runtime binding of parameters in the payload to work correctly.
-    try {
-      return
-          new LocalContainerLauncher(containerLauncherContext, context, taskAttemptListener,
-              workingDirectory, isPureLocalMode);
-    } catch (UnknownHostException e) {
-      throw new TezUncheckedException(e);
-    }
-  }
-
-  @VisibleForTesting
-  @SuppressWarnings("unchecked")
-  ContainerLauncher createCustomContainerLauncher(ContainerLauncherContext containerLauncherContext,
-                                                  NamedEntityDescriptor containerLauncherDescriptor) {
-    LOG.info("Creating container launcher {}:{} ", containerLauncherDescriptor.getEntityName(),
-        containerLauncherDescriptor.getClassName());
-    Class<? extends ContainerLauncher> containerLauncherClazz =
-        (Class<? extends ContainerLauncher>) ReflectionUtils.getClazz(
-            containerLauncherDescriptor.getClassName());
-    try {
-      Constructor<? extends ContainerLauncher> ctor = containerLauncherClazz
-          .getConstructor(ContainerLauncherContext.class);
-      return ctor.newInstance(containerLauncherContext);
-    } catch (NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) {
-      throw new TezUncheckedException(e);
-    }
-
-  }
-
-  @Override
-  public void serviceInit(Configuration conf) {
-    for (int i = 0 ; i < containerLaunchers.length ; i++) {
-      containerLauncherServiceWrappers[i].init(conf);
-    }
-  }
-
-  @Override
-  public void serviceStart() {
-    for (int i = 0 ; i < containerLaunchers.length ; i++) {
-      containerLauncherServiceWrappers[i].start();
-    }
-  }
-
-  @Override
-  public void serviceStop() {
-    for (int i = 0 ; i < containerLaunchers.length ; i++) {
-      containerLauncherServiceWrappers[i].stop();
-    }
-  }
-
-  public void dagComplete(DAG dag) {
-    // Nothing required at the moment. Containers are shared across DAGs
-  }
-
-  public void dagSubmitted() {
-    // Nothing to do right now. Indicates that a new DAG has been submitted and
-    // the context has updated information.
-  }
-
-
-  @Override
-  public void handle(NMCommunicatorEvent event) {
-    int launcherId = event.getLauncherId();
-    String schedulerName = appContext.getTaskSchedulerName(event.getSchedulerId());
-    String taskCommName = appContext.getTaskCommunicatorName(event.getTaskCommId());
-    switch (event.getType()) {
-      case CONTAINER_LAUNCH_REQUEST:
-        NMCommunicatorLaunchRequestEvent launchEvent = (NMCommunicatorLaunchRequestEvent) event;
-        ContainerLaunchRequest launchRequest =
-            new ContainerLaunchRequest(launchEvent.getNodeId(), launchEvent.getContainerId(),
-                launchEvent.getContainerToken(), launchEvent.getContainerLaunchContext(),
-                launchEvent.getContainer(), schedulerName,
-                taskCommName);
-        containerLaunchers[launcherId].launchContainer(launchRequest);
-        break;
-      case CONTAINER_STOP_REQUEST:
-        ContainerStopRequest stopRequest =
-            new ContainerStopRequest(event.getNodeId(), event.getContainerId(),
-                event.getContainerToken(), schedulerName, taskCommName);
-        containerLaunchers[launcherId].stopContainer(stopRequest);
-        break;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
index 1d3e6df..6cd6fce 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
@@ -63,7 +63,7 @@ import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
 import org.apache.tez.dag.app.TezTaskCommunicatorImpl;
 import org.apache.tez.runtime.api.ExecutionContext;
 import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
@@ -83,7 +83,7 @@ public class LocalContainerLauncher extends ContainerLauncher {
   private final AppContext context;
   private final AtomicBoolean serviceStopped = new AtomicBoolean(false);
   private final String workingDirectory;
-  private final TaskAttemptListener tal;
+  private final TaskCommunicatorManagerInterface tal;
   private final Map<String, String> localEnv;
   private final ExecutionContext executionContext;
   private final int numExecutors;
@@ -105,7 +105,7 @@ public class LocalContainerLauncher extends ContainerLauncher {
 
   public LocalContainerLauncher(ContainerLauncherContext containerLauncherContext,
                                 AppContext context,
-                                TaskAttemptListener taskAttemptListener,
+                                TaskCommunicatorManagerInterface taskCommunicatorManagerInterface,
                                 String workingDirectory,
                                 boolean isPureLocalMode) throws UnknownHostException {
     // TODO Post TEZ-2003. Most of this information is dynamic and only available after the AM
@@ -114,7 +114,7 @@ public class LocalContainerLauncher extends ContainerLauncher {
     // after the AM starts up.
     super(containerLauncherContext);
     this.context = context;
-    this.tal = taskAttemptListener;
+    this.tal = taskCommunicatorManagerInterface;
     this.workingDirectory = workingDirectory;
     this.isPureLocalMode = isPureLocalMode;
     if (isPureLocalMode) {

http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java
new file mode 100644
index 0000000..3556c51
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java
@@ -0,0 +1,410 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.launcher;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
+import org.apache.tez.serviceplugins.api.ContainerLauncher;
+import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
+import org.apache.tez.serviceplugins.api.ContainerStopRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy;
+import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+
+// TODO See what part of this lifecycle and state management can be simplified.
+// Ideally, no state - only sendStart / sendStop.
+
+// TODO Review this entire code and clean it up.
+
+/**
+ * This class is responsible for launching of containers.
+ */
+public class TezContainerLauncherImpl extends ContainerLauncher {
+
+  // TODO Ensure the same thread is used to launch / stop the same container. Or - ensure event ordering.
+  static final Logger LOG = LoggerFactory.getLogger(TezContainerLauncherImpl.class);
+
+  private final ConcurrentHashMap<ContainerId, Container> containers =
+    new ConcurrentHashMap<>();
+  protected ThreadPoolExecutor launcherPool;
+  protected static final int INITIAL_POOL_SIZE = 10;
+  private final int limitOnPoolSize;
+  private final Configuration conf;
+  private Thread eventHandlingThread;
+  protected BlockingQueue<ContainerOp> eventQueue = new LinkedBlockingQueue<>();
+  private ContainerManagementProtocolProxy cmProxy;
+  private AtomicBoolean serviceStopped = new AtomicBoolean(false);
+
+  private Container getContainer(ContainerOp event) {
+    ContainerId id = event.getBaseOperation().getContainerId();
+    Container c = containers.get(id);
+    if(c == null) {
+      c = new Container(id,
+          event.getBaseOperation().getNodeId().toString(), event.getBaseOperation().getContainerToken());
+      Container old = containers.putIfAbsent(id, c);
+      if(old != null) {
+        c = old;
+      }
+    }
+    return c;
+  }
+
+  private void removeContainerIfDone(ContainerId id) {
+    Container c = containers.get(id);
+    if(c != null && c.isCompletelyDone()) {
+      containers.remove(id);
+    }
+  }
+
+
+  private static enum ContainerState {
+    PREP, FAILED, RUNNING, DONE, KILLED_BEFORE_LAUNCH
+  }
+
+  private class Container {
+    private ContainerState state;
+    // store enough information to be able to cleanup the container
+    private ContainerId containerID;
+    final private String containerMgrAddress;
+    private Token containerToken;
+
+    public Container(ContainerId containerID,
+        String containerMgrAddress, Token containerToken) {
+      this.state = ContainerState.PREP;
+      this.containerMgrAddress = containerMgrAddress;
+      this.containerID = containerID;
+      this.containerToken = containerToken;
+    }
+
+    public synchronized boolean isCompletelyDone() {
+      return state == ContainerState.DONE || state == ContainerState.FAILED;
+    }
+
+    @SuppressWarnings("unchecked")
+    public synchronized void launch(ContainerLaunchRequest event) {
+      LOG.info("Launching Container with Id: " + event.getContainerId());
+      if(this.state == ContainerState.KILLED_BEFORE_LAUNCH) {
+        state = ContainerState.DONE;
+        sendContainerLaunchFailedMsg(event.getContainerId(),
+            "Container was killed before it was launched");
+        return;
+      }
+
+      ContainerManagementProtocolProxyData proxy = null;
+      try {
+
+        proxy = getCMProxy(containerID, containerMgrAddress,
+            containerToken);
+
+        // Construct the actual Container
+        ContainerLaunchContext containerLaunchContext =
+          event.getContainerLaunchContext();
+
+        // Now launch the actual container
+        StartContainerRequest startRequest = Records
+          .newRecord(StartContainerRequest.class);
+        startRequest.setContainerToken(event.getContainerToken());
+        startRequest.setContainerLaunchContext(containerLaunchContext);
+
+        StartContainersResponse response =
+            proxy.getContainerManagementProtocol().startContainers(
+                StartContainersRequest.newInstance(
+                    Collections.singletonList(startRequest)));
+        if (response.getFailedRequests() != null
+            && !response.getFailedRequests().isEmpty()) {
+          throw response.getFailedRequests().get(containerID).deSerialize();
+        }
+
+        // after launching, send launched event to task attempt to move
+        // it from ASSIGNED to RUNNING state
+        getContext().containerLaunched(containerID);
+        this.state = ContainerState.RUNNING;
+      } catch (Throwable t) {
+        String message = "Container launch failed for " + containerID + " : "
+            + ExceptionUtils.getStackTrace(t);
+        this.state = ContainerState.FAILED;
+        sendContainerLaunchFailedMsg(containerID, message);
+      } finally {
+        if (proxy != null) {
+          cmProxy.mayBeCloseProxy(proxy);
+        }
+      }
+    }
+
+    @SuppressWarnings("unchecked")
+    public synchronized void kill() {
+
+      if(isCompletelyDone()) {
+        return;
+      }
+      if(this.state == ContainerState.PREP) {
+        this.state = ContainerState.KILLED_BEFORE_LAUNCH;
+      } else {
+        LOG.info("Sending a stop request to the NM for ContainerId: "
+            + containerID);
+
+        ContainerManagementProtocolProxyData proxy = null;
+        try {
+          proxy = getCMProxy(this.containerID, this.containerMgrAddress,
+              this.containerToken);
+
+            // kill the remote container if already launched
+            StopContainersRequest stopRequest = Records
+              .newRecord(StopContainersRequest.class);
+            stopRequest.setContainerIds(Collections.singletonList(containerID));
+
+            proxy.getContainerManagementProtocol().stopContainers(stopRequest);
+
+            // If stopContainer returns without an error, assuming the stop made
+            // it over to the NodeManager.
+          getContext().containerStopRequested(containerID);
+        } catch (Throwable t) {
+
+          // ignore the cleanup failure
+          String message = "cleanup failed for container "
+            + this.containerID + " : "
+            + ExceptionUtils.getStackTrace(t);
+          getContext().containerStopFailed(containerID, message);
+          LOG.warn(message);
+          this.state = ContainerState.DONE;
+          return;
+        } finally {
+          if (proxy != null) {
+            cmProxy.mayBeCloseProxy(proxy);
+          }
+        }
+        this.state = ContainerState.DONE;
+      }
+    }
+  }
+
+  public TezContainerLauncherImpl(ContainerLauncherContext containerLauncherContext) {
+    super(containerLauncherContext);
+    try {
+      this.conf = TezUtils.createConfFromUserPayload(containerLauncherContext.getInitialUserPayload());
+    } catch (IOException e) {
+      throw new TezUncheckedException(
+          "Failed to parse user payload for " + TezContainerLauncherImpl.class.getSimpleName(), e);
+    }
+    conf.setInt(
+        CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
+        0);
+    this.limitOnPoolSize = conf.getInt(
+        TezConfiguration.TEZ_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT,
+        TezConfiguration.TEZ_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT_DEFAULT);
+    LOG.info("Upper limit on the thread pool size is " + this.limitOnPoolSize);
+  }
+
+  @Override
+  public void start() {
+    // pass a copy of config to ContainerManagementProtocolProxy until YARN-3497 is fixed
+    cmProxy =
+        new ContainerManagementProtocolProxy(conf);
+
+    ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
+        "ContainerLauncher #%d").setDaemon(true).build();
+
+    // Start with a default core-pool size of 10 and change it dynamically.
+    launcherPool = new ThreadPoolExecutor(INITIAL_POOL_SIZE,
+        Integer.MAX_VALUE, 1, TimeUnit.HOURS,
+        new LinkedBlockingQueue<Runnable>(),
+        tf, new CustomizedRejectedExecutionHandler());
+    eventHandlingThread = new Thread() {
+      @Override
+      public void run() {
+        ContainerOp event = null;
+        while (!Thread.currentThread().isInterrupted()) {
+          try {
+            event = eventQueue.take();
+          } catch (InterruptedException e) {
+            if(!serviceStopped.get()) {
+              LOG.error("Returning, interrupted : " + e);
+            }
+            return;
+          }
+          int poolSize = launcherPool.getCorePoolSize();
+
+          // See if we need up the pool size only if haven't reached the
+          // maximum limit yet.
+          if (poolSize != limitOnPoolSize) {
+
+            // nodes where containers will run at *this* point of time. This is
+            // *not* the cluster size and doesn't need to be.
+            int numNodes =
+                getContext().getNumNodes(TezConstants.getTezYarnServicePluginName());
+            int idealPoolSize = Math.min(limitOnPoolSize, numNodes);
+
+            if (poolSize < idealPoolSize) {
+              // Bump up the pool size to idealPoolSize+INITIAL_POOL_SIZE, the
+              // later is just a buffer so we are not always increasing the
+              // pool-size
+              int newPoolSize = Math.min(limitOnPoolSize, idealPoolSize
+                  + INITIAL_POOL_SIZE);
+              LOG.info("Setting ContainerLauncher pool size to " + newPoolSize
+                  + " as number-of-nodes to talk to is " + numNodes);
+              launcherPool.setCorePoolSize(newPoolSize);
+            }
+          }
+
+          // the events from the queue are handled in parallel
+          // using a thread pool
+          launcherPool.execute(createEventProcessor(event));
+
+          // TODO: Group launching of multiple containers to a single
+          // NodeManager into a single connection
+        }
+      }
+    };
+    eventHandlingThread.setName("ContainerLauncher Event Handler");
+    eventHandlingThread.start();
+  }
+
+  private void shutdownAllContainers() {
+    for (Container ct : this.containers.values()) {
+      if (ct != null) {
+        ct.kill();
+      }
+    }
+  }
+
+  @Override
+  public void shutdown() {
+    if(!serviceStopped.compareAndSet(false, true)) {
+      LOG.info("Ignoring multiple stops");
+      return;
+    }
+    // shutdown any containers that might be left running
+    shutdownAllContainers();
+    if (eventHandlingThread != null) {
+      eventHandlingThread.interrupt();
+    }
+    if (launcherPool != null) {
+      launcherPool.shutdownNow();
+    }
+  }
+
+  protected EventProcessor createEventProcessor(ContainerOp event) {
+    return new EventProcessor(event);
+  }
+
+  protected ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData getCMProxy(
+      ContainerId containerID, final String containerManagerBindAddr,
+      Token containerToken) throws IOException {
+    return cmProxy.getProxy(containerManagerBindAddr, containerID);
+  }
+
+  /**
+   * Setup and start the container on remote nodemanager.
+   */
+  class EventProcessor implements Runnable {
+    private ContainerOp event;
+
+    EventProcessor(ContainerOp event) {
+      this.event = event;
+    }
+
+    @Override
+    public void run() {
+      LOG.info("Processing operation {}", event.toString());
+
+      // Load ContainerManager tokens before creating a connection.
+      // TODO: Do it only once per NodeManager.
+      ContainerId containerID = event.getBaseOperation().getContainerId();
+
+      Container c = getContainer(event);
+      switch(event.getOpType()) {
+        case LAUNCH_REQUEST:
+          ContainerLaunchRequest launchRequest = event.getLaunchRequest();
+          c.launch(launchRequest);
+          break;
+        case STOP_REQUEST:
+          c.kill();
+          break;
+      }
+      removeContainerIfDone(containerID);
+    }
+  }
+
+  /**
+   * ThreadPoolExecutor.submit may fail if you are submitting task
+   * when ThreadPoolExecutor is shutting down (DAGAppMaster is shutting down).
+   * Use this CustomizedRejectedExecutionHandler to just logging rather than abort the application.
+   */
+  private static class CustomizedRejectedExecutionHandler implements RejectedExecutionHandler {
+    @Override
+    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+      LOG.warn("Can't submit task to ThreadPoolExecutor:" + executor);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  void sendContainerLaunchFailedMsg(ContainerId containerId,
+      String message) {
+    LOG.error(message);
+    getContext().containerLaunchFailed(containerId, message);
+  }
+
+
+  @Override
+  public void launchContainer(ContainerLaunchRequest launchRequest) {
+    try {
+      eventQueue.put(new ContainerOp(ContainerOp.OPType.LAUNCH_REQUEST, launchRequest));
+    } catch (InterruptedException e) {
+      throw new TezUncheckedException(e);
+    }
+  }
+
+  @Override
+  public void stopContainer(ContainerStopRequest stopRequest) {
+    try {
+      eventQueue.put(new ContainerOp(ContainerOp.OPType.STOP_REQUEST, stopRequest));
+    } catch (InterruptedException e) {
+      throw new TezUncheckedException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerLauncherEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerLauncherEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerLauncherEvent.java
new file mode 100644
index 0000000..add3254
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerLauncherEvent.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class ContainerLauncherEvent extends AbstractEvent<ContainerLauncherEventType> {
+
+  private final ContainerId containerId;
+  private final NodeId nodeId;
+  private final Token containerToken;
+  private final int launcherId;
+  private final int schedulerId;
+  private final int taskCommId;
+
+  public ContainerLauncherEvent(ContainerId containerId, NodeId nodeId,
+                                Token containerToken, ContainerLauncherEventType type,
+                                int launcherId,
+                                int schedulerId, int taskCommId) {
+    super(type);
+    this.containerId = containerId;
+    this.nodeId = nodeId;
+    this.containerToken = containerToken;
+    this.launcherId = launcherId;
+    this.schedulerId = schedulerId;
+    this.taskCommId = taskCommId;
+  }
+
+  public ContainerId getContainerId() {
+    return this.containerId;
+  }
+
+  public NodeId getNodeId() {
+    return this.nodeId;
+  }
+
+  public Token getContainerToken() {
+    return this.containerToken;
+  }
+
+  public int getLauncherId() {
+    return launcherId;
+  }
+
+  public int getSchedulerId() {
+    return schedulerId;
+  }
+
+  public int getTaskCommId() {
+    return taskCommId;
+  }
+
+  public String toSrting() {
+    return super.toString() + " for container " + containerId + ", nodeId: "
+        + nodeId + ", launcherId: " + launcherId + ", schedulerId=" + schedulerId +
+        ", taskCommId=" + taskCommId;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result
+        + ((containerId == null) ? 0 : containerId.hashCode());
+    result = prime * result
+        + ((containerToken == null) ? 0 : containerToken.hashCode());
+    result = prime * result + ((nodeId == null) ? 0 : nodeId.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    ContainerLauncherEvent other = (ContainerLauncherEvent) obj;
+    if (containerId == null) {
+      if (other.containerId != null)
+        return false;
+    } else if (!containerId.equals(other.containerId))
+      return false;
+    if (containerToken == null) {
+      if (other.containerToken != null)
+        return false;
+    } else if (!containerToken.equals(other.containerToken))
+      return false;
+    if (nodeId == null) {
+      if (other.nodeId != null)
+        return false;
+    } else if (!nodeId.equals(other.nodeId))
+      return false;
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerLauncherEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerLauncherEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerLauncherEventType.java
new file mode 100644
index 0000000..079509c
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerLauncherEventType.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm;
+
+// TODO - Re-use the events in ContainerLauncher..
+public enum ContainerLauncherEventType {
+  CONTAINER_LAUNCH_REQUEST,
+  CONTAINER_STOP_REQUEST
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerLauncherLaunchRequestEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerLauncherLaunchRequestEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerLauncherLaunchRequestEvent.java
new file mode 100644
index 0000000..411451e
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerLauncherLaunchRequestEvent.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+
+public class ContainerLauncherLaunchRequestEvent extends ContainerLauncherEvent {
+
+  private final ContainerLaunchContext clc;
+  private final Container container;
+  // The task communicator index for the specific container being launched.
+
+  public ContainerLauncherLaunchRequestEvent(ContainerLaunchContext clc,
+                                             Container container, int launcherId, int schedulerId,
+                                             int taskCommId) {
+    super(container.getId(), container.getNodeId(), container
+        .getContainerToken(), ContainerLauncherEventType.CONTAINER_LAUNCH_REQUEST,
+        launcherId, schedulerId, taskCommId);
+    this.clc = clc;
+    this.container = container;
+  }
+
+  public ContainerLaunchContext getContainerLaunchContext() {
+    return this.clc;
+  }
+
+  public Container getContainer() {
+    return container;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    if (!super.equals(o)) {
+      return false;
+    }
+
+    ContainerLauncherLaunchRequestEvent that = (ContainerLauncherLaunchRequestEvent) o;
+
+    if (clc != null ? !clc.equals(that.clc) : that.clc != null) {
+      return false;
+    }
+    if (container != null ? !container.equals(that.container) : that.container != null) {
+      return false;
+    }
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = super.hashCode();
+    result = 7001 * result + (clc != null ? clc.hashCode() : 0);
+    result = 7001 * result + (container != null ? container.hashCode() : 0);
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerLauncherStopRequestEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerLauncherStopRequestEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerLauncherStopRequestEvent.java
new file mode 100644
index 0000000..69e7d30
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerLauncherStopRequestEvent.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Token;
+
+public class ContainerLauncherStopRequestEvent extends ContainerLauncherEvent {
+
+  public ContainerLauncherStopRequestEvent(ContainerId containerId, NodeId nodeId,
+                                           Token containerToken, int launcherId, int schedulerId,
+                                           int taskCommId) {
+    super(containerId, nodeId, containerToken,
+        ContainerLauncherEventType.CONTAINER_STOP_REQUEST, launcherId, schedulerId, taskCommId);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEvent.java
deleted file mode 100644
index dc50c37..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEvent.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.app.rm;
-
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Token;
-import org.apache.hadoop.yarn.event.AbstractEvent;
-
-public class NMCommunicatorEvent extends AbstractEvent<NMCommunicatorEventType> {
-
-  private final ContainerId containerId;
-  private final NodeId nodeId;
-  private final Token containerToken;
-  private final int launcherId;
-  private final int schedulerId;
-  private final int taskCommId;
-
-  public NMCommunicatorEvent(ContainerId containerId, NodeId nodeId,
-                             Token containerToken, NMCommunicatorEventType type, int launcherId,
-                             int schedulerId, int taskCommId) {
-    super(type);
-    this.containerId = containerId;
-    this.nodeId = nodeId;
-    this.containerToken = containerToken;
-    this.launcherId = launcherId;
-    this.schedulerId = schedulerId;
-    this.taskCommId = taskCommId;
-  }
-
-  public ContainerId getContainerId() {
-    return this.containerId;
-  }
-
-  public NodeId getNodeId() {
-    return this.nodeId;
-  }
-
-  public Token getContainerToken() {
-    return this.containerToken;
-  }
-
-  public int getLauncherId() {
-    return launcherId;
-  }
-
-  public int getSchedulerId() {
-    return schedulerId;
-  }
-
-  public int getTaskCommId() {
-    return taskCommId;
-  }
-
-  public String toSrting() {
-    return super.toString() + " for container " + containerId + ", nodeId: "
-        + nodeId + ", launcherId: " + launcherId + ", schedulerId=" + schedulerId +
-        ", taskCommId=" + taskCommId;
-  }
-
-  @Override
-  public int hashCode() {
-    final int prime = 31;
-    int result = 1;
-    result = prime * result
-        + ((containerId == null) ? 0 : containerId.hashCode());
-    result = prime * result
-        + ((containerToken == null) ? 0 : containerToken.hashCode());
-    result = prime * result + ((nodeId == null) ? 0 : nodeId.hashCode());
-    return result;
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj)
-      return true;
-    if (obj == null)
-      return false;
-    if (getClass() != obj.getClass())
-      return false;
-    NMCommunicatorEvent other = (NMCommunicatorEvent) obj;
-    if (containerId == null) {
-      if (other.containerId != null)
-        return false;
-    } else if (!containerId.equals(other.containerId))
-      return false;
-    if (containerToken == null) {
-      if (other.containerToken != null)
-        return false;
-    } else if (!containerToken.equals(other.containerToken))
-      return false;
-    if (nodeId == null) {
-      if (other.nodeId != null)
-        return false;
-    } else if (!nodeId.equals(other.nodeId))
-      return false;
-    return true;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEventType.java
deleted file mode 100644
index 9f3d989..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEventType.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.app.rm;
-
-// TODO - Re-use the events in ContainerLauncher..
-public enum NMCommunicatorEventType {
-  CONTAINER_LAUNCH_REQUEST,
-  CONTAINER_STOP_REQUEST
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java
deleted file mode 100644
index c57b6be..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.app.rm;
-
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-
-public class NMCommunicatorLaunchRequestEvent extends NMCommunicatorEvent {
-
-  private final ContainerLaunchContext clc;
-  private final Container container;
-  // The task communicator index for the specific container being launched.
-
-  public NMCommunicatorLaunchRequestEvent(ContainerLaunchContext clc,
-      Container container, int launcherId, int schedulerId, int taskCommId) {
-    super(container.getId(), container.getNodeId(), container
-        .getContainerToken(), NMCommunicatorEventType.CONTAINER_LAUNCH_REQUEST,
-        launcherId, schedulerId, taskCommId);
-    this.clc = clc;
-    this.container = container;
-  }
-
-  public ContainerLaunchContext getContainerLaunchContext() {
-    return this.clc;
-  }
-
-  public Container getContainer() {
-    return container;
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-    if (!super.equals(o)) {
-      return false;
-    }
-
-    NMCommunicatorLaunchRequestEvent that = (NMCommunicatorLaunchRequestEvent) o;
-
-    if (clc != null ? !clc.equals(that.clc) : that.clc != null) {
-      return false;
-    }
-    if (container != null ? !container.equals(that.container) : that.container != null) {
-      return false;
-    }
-
-    return true;
-  }
-
-  @Override
-  public int hashCode() {
-    int result = super.hashCode();
-    result = 7001 * result + (clc != null ? clc.hashCode() : 0);
-    result = 7001 * result + (container != null ? container.hashCode() : 0);
-    return result;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorStopRequestEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorStopRequestEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorStopRequestEvent.java
deleted file mode 100644
index 352f450..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorStopRequestEvent.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.app.rm;
-
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Token;
-
-public class NMCommunicatorStopRequestEvent extends NMCommunicatorEvent {
-
-  public NMCommunicatorStopRequestEvent(ContainerId containerId, NodeId nodeId,
-      Token containerToken, int launcherId, int schedulerId, int taskCommId) {
-    super(containerId, nodeId, containerToken,
-        NMCommunicatorEventType.CONTAINER_STOP_REQUEST, launcherId, schedulerId, taskCommId);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java
index 2a9797f..37aa96b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java
@@ -33,7 +33,7 @@ import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
 
 public class TaskSchedulerContextImpl implements TaskSchedulerContext {
 
-  private final TaskSchedulerEventHandler tseh;
+  private final TaskSchedulerManager taskSchedulerManager;
   private final AppContext appContext;
   private final int schedulerId;
   private final String trackingUrl;
@@ -42,11 +42,11 @@ public class TaskSchedulerContextImpl implements TaskSchedulerContext {
   private final int clientPort;
   private final UserPayload initialUserPayload;
 
-  public TaskSchedulerContextImpl(TaskSchedulerEventHandler tseh, AppContext appContext,
+  public TaskSchedulerContextImpl(TaskSchedulerManager taskSchedulerManager, AppContext appContext,
                                   int schedulerId, String trackingUrl, long customClusterIdentifier,
                                   String appHostname, int clientPort,
                                   UserPayload initialUserPayload) {
-    this.tseh = tseh;
+    this.taskSchedulerManager = taskSchedulerManager;
     this.appContext = appContext;
     this.schedulerId = schedulerId;
     this.trackingUrl = trackingUrl;
@@ -62,54 +62,55 @@ public class TaskSchedulerContextImpl implements TaskSchedulerContext {
   // taskAllocated() upcall and deallocateTask() downcall
   @Override
   public void taskAllocated(Object task, Object appCookie, Container container) {
-    tseh.taskAllocated(schedulerId, task, appCookie, container);
+    taskSchedulerManager.taskAllocated(schedulerId, task, appCookie, container);
   }
 
   @Override
   public void containerCompleted(Object taskLastAllocated, ContainerStatus containerStatus) {
-    tseh.containerCompleted(schedulerId, taskLastAllocated, containerStatus);
+    taskSchedulerManager.containerCompleted(schedulerId, taskLastAllocated, containerStatus);
   }
 
   @Override
   public void containerBeingReleased(ContainerId containerId) {
-    tseh.containerBeingReleased(schedulerId, containerId);
+    taskSchedulerManager.containerBeingReleased(schedulerId, containerId);
   }
 
   @Override
   public void nodesUpdated(List<NodeReport> updatedNodes) {
-    tseh.nodesUpdated(schedulerId, updatedNodes);
+    taskSchedulerManager.nodesUpdated(schedulerId, updatedNodes);
   }
 
   @Override
   public void appShutdownRequested() {
-    tseh.appShutdownRequested(schedulerId);
+    taskSchedulerManager.appShutdownRequested(schedulerId);
   }
 
   @Override
   public void setApplicationRegistrationData(Resource maxContainerCapability,
                                              Map<ApplicationAccessType, String> appAcls,
                                              ByteBuffer clientAMSecretKey) {
-    tseh.setApplicationRegistrationData(schedulerId, maxContainerCapability, appAcls, clientAMSecretKey);
+    taskSchedulerManager.setApplicationRegistrationData(schedulerId, maxContainerCapability, appAcls,
+        clientAMSecretKey);
   }
 
   @Override
   public void onError(Throwable t) {
-    tseh.onError(schedulerId, t);
+    taskSchedulerManager.onError(schedulerId, t);
   }
 
   @Override
   public float getProgress() {
-    return tseh.getProgress(schedulerId);
+    return taskSchedulerManager.getProgress(schedulerId);
   }
 
   @Override
   public void preemptContainer(ContainerId containerId) {
-    tseh.preemptContainer(schedulerId, containerId);
+    taskSchedulerManager.preemptContainer(schedulerId, containerId);
   }
 
   @Override
   public AppFinalStatus getFinalAppStatus() {
-    return tseh.getFinalAppStatus();
+    return taskSchedulerManager.getFinalAppStatus();
   }
 
   @Override
@@ -130,7 +131,7 @@ public class TaskSchedulerContextImpl implements TaskSchedulerContext {
 
   @Override
   public ContainerSignatureMatcher getContainerSignatureMatcher() {
-    return tseh.getContainerSignatureMatcher();
+    return taskSchedulerManager.getContainerSignatureMatcher();
   }
 
   @Override