You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/26 01:48:52 UTC

[5/7] tez git commit: TEZ-2708. Rename classes and variables post TEZ-2003 changes. (sseth)

http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
deleted file mode 100644
index 76d870c..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ /dev/null
@@ -1,784 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.app.rm;
-
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.tez.dag.api.NamedEntityDescriptor;
-import org.apache.tez.dag.api.TezConstants;
-import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
-import org.apache.tez.serviceplugins.api.TaskScheduler;
-import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
-import org.apache.tez.serviceplugins.api.TaskSchedulerContext.AppFinalStatus;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.NodeReport;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.event.Event;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.tez.common.ReflectionUtils;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.dag.api.TaskLocationHint;
-import org.apache.tez.dag.api.TaskLocationHint.TaskBasedLocationAffinity;
-import org.apache.tez.dag.api.client.DAGClientServer;
-import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
-import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.DAGAppMaster;
-import org.apache.tez.dag.app.DAGAppMasterState;
-import org.apache.tez.dag.app.dag.TaskAttempt;
-import org.apache.tez.dag.app.dag.Vertex;
-import org.apache.tez.dag.app.dag.event.DAGAppMasterEvent;
-import org.apache.tez.dag.app.dag.event.DAGAppMasterEventSchedulingServiceError;
-import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
-import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdateTAAssigned;
-import org.apache.tez.dag.app.rm.container.AMContainer;
-import org.apache.tez.dag.app.rm.container.AMContainerEventAssignTA;
-import org.apache.tez.dag.app.rm.container.AMContainerEventCompleted;
-import org.apache.tez.dag.app.rm.container.AMContainerEventLaunchRequest;
-import org.apache.tez.dag.app.rm.container.AMContainerEventStopRequest;
-import org.apache.tez.dag.app.rm.container.AMContainerEventTASucceeded;
-import org.apache.tez.dag.app.rm.container.AMContainerState;
-import org.apache.tez.common.ContainerSignatureMatcher;
-import org.apache.tez.dag.app.rm.node.AMNodeEventContainerAllocated;
-import org.apache.tez.dag.app.rm.node.AMNodeEventNodeCountUpdated;
-import org.apache.tez.dag.app.rm.node.AMNodeEventStateChanged;
-import org.apache.tez.dag.app.rm.node.AMNodeEventTaskAttemptEnded;
-import org.apache.tez.dag.app.rm.node.AMNodeEventTaskAttemptSucceeded;
-import org.apache.tez.dag.app.web.WebUIService;
-import org.apache.tez.dag.records.TaskAttemptTerminationCause;
-
-import com.google.common.base.Preconditions;
-
-
-public class TaskSchedulerEventHandler extends AbstractService implements
-                                               EventHandler<AMSchedulerEvent> {
-  static final Logger LOG = LoggerFactory.getLogger(TaskSchedulerEventHandler.class);
-
-  static final String APPLICATION_ID_PLACEHOLDER = "__APPLICATION_ID__";
-  static final String HISTORY_URL_BASE = "__HISTORY_URL_BASE__";
-
-  protected final AppContext appContext;
-  @SuppressWarnings("rawtypes")
-  private final EventHandler eventHandler;
-  private final String historyUrl;
-  private DAGAppMaster dagAppMaster;
-  private Map<ApplicationAccessType, String> appAcls = null;
-  private Thread eventHandlingThread;
-  private volatile boolean stopEventHandling;
-  // Has a signal (SIGTERM etc) been issued?
-  protected volatile boolean isSignalled = false;
-  final DAGClientServer clientService;
-  private final ContainerSignatureMatcher containerSignatureMatcher;
-  private int cachedNodeCount = -1;
-  private AtomicBoolean shouldUnregisterFlag =
-      new AtomicBoolean(false);
-  private final WebUIService webUI;
-  private final NamedEntityDescriptor[] taskSchedulerDescriptors;
-  protected final TaskScheduler[]taskSchedulers;
-  protected final ServicePluginLifecycleAbstractService []taskSchedulerServiceWrappers;
-
-  // Single executor service shared by all Schedulers for context callbacks
-  @VisibleForTesting
-  final ExecutorService appCallbackExecutor;
-
-  private final boolean isPureLocalMode;
-  // If running in non local-only mode, the YARN task scheduler will always run to take care of
-  // registration with YARN and heartbeats to YARN.
-  // Splitting registration and heartbeats is not straight-forward due to the taskScheduler being
-  // tied to a ContainerRequestType.
-  // Custom AppIds to avoid container conflicts if there's multiple sources
-  private final long SCHEDULER_APP_ID_BASE = 111101111;
-  private final long SCHEDULER_APP_ID_INCREMENT = 111111111;
-
-  BlockingQueue<AMSchedulerEvent> eventQueue
-                              = new LinkedBlockingQueue<AMSchedulerEvent>();
-
-  // Not tracking container / task to schedulerId. Instead relying on everything flowing through
-  // the system and being propagated back via events.
-
-  /**
-   *
-   * @param appContext
-   * @param clientService
-   * @param eventHandler
-   * @param containerSignatureMatcher
-   * @param webUI
-   * @param schedulerDescriptors the list of scheduler descriptors. Tez internal classes will not have the class names populated.
-   *                         An empty list defaults to using the YarnTaskScheduler as the only source.
-   * @param isPureLocalMode whether the AM is running in local mode
-   */
-  @SuppressWarnings("rawtypes")
-  public TaskSchedulerEventHandler(AppContext appContext,
-      DAGClientServer clientService, EventHandler eventHandler, 
-      ContainerSignatureMatcher containerSignatureMatcher, WebUIService webUI,
-      List<NamedEntityDescriptor> schedulerDescriptors, boolean isPureLocalMode) {
-    super(TaskSchedulerEventHandler.class.getName());
-    Preconditions.checkArgument(schedulerDescriptors != null && !schedulerDescriptors.isEmpty(),
-        "TaskSchedulerDescriptors must be specified");
-    this.appContext = appContext;
-    this.eventHandler = eventHandler;
-    this.clientService = clientService;
-    this.containerSignatureMatcher = containerSignatureMatcher;
-    this.webUI = webUI;
-    this.historyUrl = getHistoryUrl();
-    this.isPureLocalMode = isPureLocalMode;
-    this.appCallbackExecutor = createAppCallbackExecutorService();
-    if (this.webUI != null) {
-      this.webUI.setHistoryUrl(this.historyUrl);
-    }
-
-    this.taskSchedulerDescriptors = schedulerDescriptors.toArray(new NamedEntityDescriptor[schedulerDescriptors.size()]);
-
-    taskSchedulers = new TaskScheduler[this.taskSchedulerDescriptors.length];
-    taskSchedulerServiceWrappers = new ServicePluginLifecycleAbstractService[this.taskSchedulerDescriptors.length];
-  }
-
-  public Map<ApplicationAccessType, String> getApplicationAcls() {
-    return appAcls;
-  }
-
-  public void setSignalled(boolean isSignalled) {
-    this.isSignalled = isSignalled;
-    LOG.info("TaskScheduler notified that iSignalled was : " + isSignalled);
-  }
-
-  public int getNumClusterNodes() {
-    return cachedNodeCount;
-  }
-  
-  public Resource getAvailableResources(int schedulerId) {
-    return taskSchedulers[schedulerId].getAvailableResources();
-  }
-
-  public Resource getTotalResources(int schedulerId) {
-    return taskSchedulers[schedulerId].getTotalResources();
-  }
-
-  private ExecutorService createAppCallbackExecutorService() {
-    return Executors.newSingleThreadExecutor(
-        new ThreadFactoryBuilder().setNameFormat("TaskSchedulerAppCallbackExecutor #%d")
-            .setDaemon(true)
-            .build());
-  }
-
-  public synchronized void handleEvent(AMSchedulerEvent sEvent) {
-    LOG.info("Processing the event " + sEvent.toString());
-    switch (sEvent.getType()) {
-    case S_TA_LAUNCH_REQUEST:
-      handleTaLaunchRequest((AMSchedulerEventTALaunchRequest) sEvent);
-      break;
-    case S_TA_ENDED: // TaskAttempt considered complete.
-      AMSchedulerEventTAEnded event = (AMSchedulerEventTAEnded)sEvent;
-      switch(event.getState()) {
-      case FAILED:
-      case KILLED:
-        handleTAUnsuccessfulEnd(event);
-        break;
-      case SUCCEEDED:
-        handleTASucceeded(event);
-        break;
-      default:
-        throw new TezUncheckedException("Unexecpted TA_ENDED state: " + event.getState());
-      }
-      break;
-    case S_CONTAINER_DEALLOCATE:
-      handleContainerDeallocate((AMSchedulerEventDeallocateContainer)sEvent);
-      break;
-    case S_NODE_UNBLACKLISTED:
-      // fall through
-    case S_NODE_BLACKLISTED:
-      handleNodeBlacklistUpdate((AMSchedulerEventNodeBlacklistUpdate)sEvent);
-      break;
-    case S_NODE_UNHEALTHY:
-      break;
-    case S_NODE_HEALTHY:
-      // Consider changing this to work like BLACKLISTING.
-      break;
-    default:
-      break;
-    }
-  }
-
-  @Override
-  public void handle(AMSchedulerEvent event) {
-    int qSize = eventQueue.size();
-    if (qSize != 0 && qSize % 1000 == 0) {
-      LOG.info("Size of event-queue in RMContainerAllocator is " + qSize);
-    }
-    int remCapacity = eventQueue.remainingCapacity();
-    if (remCapacity < 1000) {
-      LOG.warn("Very low remaining capacity in the event-queue "
-          + "of RMContainerAllocator: " + remCapacity);
-    }
-    try {
-      eventQueue.put(event);
-    } catch (InterruptedException e) {
-      throw new TezUncheckedException(e);
-    }
-  }
-
-  @SuppressWarnings("unchecked")
-  private void sendEvent(Event<?> event) {
-    eventHandler.handle(event);
-  }
-
-  private void handleNodeBlacklistUpdate(AMSchedulerEventNodeBlacklistUpdate event) {
-    if (event.getType() == AMSchedulerEventType.S_NODE_BLACKLISTED) {
-      taskSchedulers[event.getSchedulerId()].blacklistNode(event.getNodeId());
-    } else if (event.getType() == AMSchedulerEventType.S_NODE_UNBLACKLISTED) {
-      taskSchedulers[event.getSchedulerId()].unblacklistNode(event.getNodeId());
-    } else {
-      throw new TezUncheckedException("Invalid event type: " + event.getType());
-    }
-  }
-
-  private void handleContainerDeallocate(
-                                  AMSchedulerEventDeallocateContainer event) {
-    ContainerId containerId = event.getContainerId();
-    // TODO what happens to the task that was connected to this container?
-    // current assumption is that it will eventually call handleTaStopRequest
-    //TaskAttempt taskAttempt = (TaskAttempt)
-    taskSchedulers[event.getSchedulerId()].deallocateContainer(containerId);
-    // TODO does this container need to be stopped via C_STOP_REQUEST
-    sendEvent(new AMContainerEventStopRequest(containerId));
-  }
-
-  private void handleTAUnsuccessfulEnd(AMSchedulerEventTAEnded event) {
-    TaskAttempt attempt = event.getAttempt();
-    // Propagate state and failure cause (if any) when informing the scheduler about the de-allocation.
-    boolean wasContainerAllocated = taskSchedulers[event.getSchedulerId()]
-        .deallocateTask(attempt, false, event.getTaskAttemptEndReason(), event.getDiagnostics());
-    // use stored value of container id in case the scheduler has removed this
-    // assignment because the task has been deallocated earlier.
-    // retroactive case
-    ContainerId attemptContainerId = attempt.getAssignedContainerID();
-
-    if(!wasContainerAllocated) {
-      LOG.info("Task: " + attempt.getID() +
-          " has no container assignment in the scheduler");
-      if (attemptContainerId != null) {
-        LOG.error("No container allocated to task: " + attempt.getID()
-            + " according to scheduler. Task reported container id: "
-            + attemptContainerId);
-      }
-    }
-
-    if (attemptContainerId != null) {
-      // TODO either ways send the necessary events
-      // Ask the container to stop.
-      sendEvent(new AMContainerEventStopRequest(attemptContainerId));
-      // Inform the Node - the task has asked to be STOPPED / has already
-      // stopped.
-      // AMNodeImpl blacklisting logic does not account for KILLED attempts.
-      sendEvent(new AMNodeEventTaskAttemptEnded(appContext.getAllContainers().
-          get(attemptContainerId).getContainer().getNodeId(), event.getSchedulerId(),
-          attemptContainerId,
-          attempt.getID(), event.getState() == TaskAttemptState.FAILED));
-    }
-  }
-
-  private void handleTASucceeded(AMSchedulerEventTAEnded event) {
-    TaskAttempt attempt = event.getAttempt();
-    ContainerId usedContainerId = event.getUsedContainerId();
-
-    // This could be null if a task fails / is killed before a container is
-    // assigned to it.
-    if (event.getUsedContainerId() != null) {
-      sendEvent(new AMContainerEventTASucceeded(usedContainerId,
-          event.getAttemptID()));
-      sendEvent(new AMNodeEventTaskAttemptSucceeded(appContext.getAllContainers().
-          get(usedContainerId).getContainer().getNodeId(), event.getSchedulerId(), usedContainerId,
-          event.getAttemptID()));
-    }
-
-    boolean wasContainerAllocated = taskSchedulers[event.getSchedulerId()].deallocateTask(attempt,
-        true, null, event.getDiagnostics());
-    if (!wasContainerAllocated) {
-      LOG.error("De-allocated successful task: " + attempt.getID()
-          + ", but TaskScheduler reported no container assigned to task");
-    }
-  }
-
-  private void handleTaLaunchRequest(AMSchedulerEventTALaunchRequest event) {
-    TaskAttempt taskAttempt = event.getTaskAttempt();
-    TaskLocationHint locationHint = event.getLocationHint();
-    String hosts[] = null;
-    String racks[] = null;
-    if (locationHint != null) {
-      TaskBasedLocationAffinity taskAffinity = locationHint.getAffinitizedTask();
-      if (taskAffinity != null) {
-        Vertex vertex = appContext.getCurrentDAG().getVertex(taskAffinity.getVertexName());
-        Preconditions.checkNotNull(vertex, "Invalid vertex in task based affinity " + taskAffinity 
-            + " for attempt: " + taskAttempt.getID());
-        int taskIndex = taskAffinity.getTaskIndex(); 
-        Preconditions.checkState(taskIndex >=0 && taskIndex < vertex.getTotalTasks(), 
-            "Invalid taskIndex in task based affinity " + taskAffinity 
-            + " for attempt: " + taskAttempt.getID());
-        TaskAttempt affinityAttempt = vertex.getTask(taskIndex).getSuccessfulAttempt();
-        if (affinityAttempt != null) {
-          Preconditions.checkNotNull(affinityAttempt.getAssignedContainerID(), affinityAttempt.getID());
-          taskSchedulers[event.getSchedulerId()].allocateTask(taskAttempt,
-              event.getCapability(),
-              affinityAttempt.getAssignedContainerID(),
-              Priority.newInstance(event.getPriority()),
-              event.getContainerContext(),
-              event);
-          return;
-        }
-        LOG.info("Attempt: " + taskAttempt.getID() + " has task based affinity to " + taskAffinity 
-            + " but no locality information exists for it. Ignoring hint.");
-        // fall through with null hosts/racks
-      } else {
-        hosts = (locationHint.getHosts() != null) ? locationHint
-            .getHosts().toArray(
-                new String[locationHint.getHosts().size()]) : null;
-        racks = (locationHint.getRacks() != null) ? locationHint.getRacks()
-            .toArray(new String[locationHint.getRacks().size()]) : null;
-      }
-    }
-
-    taskSchedulers[event.getSchedulerId()].allocateTask(taskAttempt,
-        event.getCapability(),
-        hosts,
-        racks,
-        Priority.newInstance(event.getPriority()),
-        event.getContainerContext(),
-        event);
-  }
-
-  @VisibleForTesting
-  TaskScheduler createTaskScheduler(String host, int port, String trackingUrl,
-                                                   AppContext appContext,
-                                                   NamedEntityDescriptor taskSchedulerDescriptor,
-                                                   long customAppIdIdentifier,
-                                                   int schedulerId) {
-    TaskSchedulerContext rawContext =
-        new TaskSchedulerContextImpl(this, appContext, schedulerId, trackingUrl,
-            customAppIdIdentifier, host, port, taskSchedulerDescriptor.getUserPayload());
-    TaskSchedulerContext wrappedContext = wrapTaskSchedulerContext(rawContext);
-    String schedulerName = taskSchedulerDescriptor.getEntityName();
-    if (schedulerName.equals(TezConstants.getTezYarnServicePluginName())) {
-      return createYarnTaskScheduler(wrappedContext, schedulerId);
-    } else if (schedulerName.equals(TezConstants.getTezUberServicePluginName())) {
-      return createUberTaskScheduler(wrappedContext, schedulerId);
-    } else {
-      return createCustomTaskScheduler(wrappedContext, taskSchedulerDescriptor, schedulerId);
-    }
-  }
-
-  @VisibleForTesting
-  TaskSchedulerContext wrapTaskSchedulerContext(TaskSchedulerContext rawContext) {
-    return new TaskSchedulerContextImplWrapper(rawContext, appCallbackExecutor);
-  }
-
-  @VisibleForTesting
-  TaskScheduler createYarnTaskScheduler(TaskSchedulerContext taskSchedulerContext,
-                                        int schedulerId) {
-    LOG.info("Creating TaskScheduler: YarnTaskSchedulerService");
-    return new YarnTaskSchedulerService(taskSchedulerContext);
-  }
-
-  @VisibleForTesting
-  TaskScheduler createUberTaskScheduler(TaskSchedulerContext taskSchedulerContext,
-                                        int schedulerId) {
-    LOG.info("Creating TaskScheduler: Local TaskScheduler");
-    return new LocalTaskSchedulerService(taskSchedulerContext);
-  }
-
-  @SuppressWarnings("unchecked")
-  TaskScheduler createCustomTaskScheduler(TaskSchedulerContext taskSchedulerContext,
-                                          NamedEntityDescriptor taskSchedulerDescriptor,
-                                          int schedulerId) {
-    LOG.info("Creating custom TaskScheduler {}:{}", taskSchedulerDescriptor.getEntityName(),
-        taskSchedulerDescriptor.getClassName());
-    Class<? extends TaskScheduler> taskSchedulerClazz =
-        (Class<? extends TaskScheduler>) ReflectionUtils
-            .getClazz(taskSchedulerDescriptor.getClassName());
-    try {
-      Constructor<? extends TaskScheduler> ctor = taskSchedulerClazz
-          .getConstructor(TaskSchedulerContext.class);
-      return ctor.newInstance(taskSchedulerContext);
-    } catch (NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) {
-      throw new TezUncheckedException(e);
-    }
-  }
-
-  @VisibleForTesting
-  protected void instantiateSchedulers(String host, int port, String trackingUrl,
-                                       AppContext appContext) {
-    // Iterate over the list and create all the taskSchedulers
-    int j = 0;
-    for (int i = 0; i < taskSchedulerDescriptors.length; i++) {
-      long customAppIdIdentifier;
-      if (isPureLocalMode || taskSchedulerDescriptors[i].getEntityName().equals(
-          TezConstants.getTezYarnServicePluginName())) { // Use the app identifier from the appId.
-        customAppIdIdentifier = appContext.getApplicationID().getClusterTimestamp();
-      } else {
-        customAppIdIdentifier = SCHEDULER_APP_ID_BASE + (j++ * SCHEDULER_APP_ID_INCREMENT);
-      }
-      LOG.info("ClusterIdentifier for TaskScheduler [" + i + ":" + taskSchedulerDescriptors[i].getEntityName() + "]=" +
-          customAppIdIdentifier);
-      taskSchedulers[i] = createTaskScheduler(host, port,
-          trackingUrl, appContext, taskSchedulerDescriptors[i], customAppIdIdentifier, i);
-      taskSchedulerServiceWrappers[i] = new ServicePluginLifecycleAbstractService<>(taskSchedulers[i]);
-    }
-  }
-
-  
-  @Override
-  public synchronized void serviceStart() {
-    InetSocketAddress serviceAddr = clientService.getBindAddress();
-    dagAppMaster = appContext.getAppMaster();
-    // if web service is enabled then set tracking url. else disable it (value = "").
-    // the actual url set on the rm web ui will be the proxy url set by WebAppProxyServlet, which
-    // always try to connect to AM and proxy the response. hence it wont work if the webUIService
-    // is not enabled.
-    String trackingUrl = (webUI != null) ? webUI.getTrackingURL() : "";
-    instantiateSchedulers(serviceAddr.getHostName(), serviceAddr.getPort(), trackingUrl, appContext);
-
-    for (int i = 0 ; i < taskSchedulers.length ; i++) {
-      taskSchedulerServiceWrappers[i].init(getConfig());
-      taskSchedulerServiceWrappers[i].start();
-      if (shouldUnregisterFlag.get()) {
-        // Flag may have been set earlier when task scheduler was not initialized
-        // External services could need to talk to some other entity.
-        taskSchedulers[i].setShouldUnregister();
-      }
-    }
-
-    this.eventHandlingThread = new Thread("TaskSchedulerEventHandlerThread") {
-      @Override
-      public void run() {
-
-        AMSchedulerEvent event;
-
-        while (!stopEventHandling && !Thread.currentThread().isInterrupted()) {
-          try {
-            if (TaskSchedulerEventHandler.this.eventQueue.peek() == null) {
-              notifyForTest();
-            }
-            event = TaskSchedulerEventHandler.this.eventQueue.take();
-          } catch (InterruptedException e) {
-            if(!stopEventHandling) {
-              LOG.warn("Continuing after interrupt : ", e);
-            }
-            continue;
-          }
-
-          try {
-            handleEvent(event);
-          } catch (Throwable t) {
-            LOG.error("Error in handling event type " + event.getType()
-                + " to the TaskScheduler", t);
-            // Kill the AM.
-            sendEvent(new DAGAppMasterEvent(DAGAppMasterEventType.INTERNAL_ERROR));
-            return;
-          } finally {
-            notifyForTest();
-          }
-        }
-      }
-    };
-    this.eventHandlingThread.start();
-  }
-  
-  protected void notifyForTest() {
-  }
-
-  public void initiateStop() {
-    for (int i = 0 ; i < taskSchedulers.length ; i++) {
-      taskSchedulers[i].initiateStop();
-    }
-  }
-
-  @Override
-  public void serviceStop() throws InterruptedException {
-    synchronized(this) {
-      this.stopEventHandling = true;
-      if (eventHandlingThread != null)
-        eventHandlingThread.interrupt();
-    }
-    for (int i = 0 ; i < taskSchedulers.length ; i++) {
-      if (taskSchedulers[i] != null) {
-        taskSchedulerServiceWrappers[i].stop();
-      }
-    }
-    LOG.info("Shutting down AppCallbackExecutor");
-    appCallbackExecutor.shutdownNow();
-    appCallbackExecutor.awaitTermination(1000l, TimeUnit.MILLISECONDS);
-  }
-
-  // TaskSchedulerAppCallback methods with schedulerId, where relevant
-  public synchronized void taskAllocated(int schedulerId, Object task,
-                                           Object appCookie,
-                                           Container container) {
-    AMSchedulerEventTALaunchRequest event =
-        (AMSchedulerEventTALaunchRequest) appCookie;
-    ContainerId containerId = container.getId();
-    if (appContext.getAllContainers()
-        .addContainerIfNew(container, schedulerId, event.getLauncherId(),
-            event.getTaskCommId())) {
-      appContext.getNodeTracker().nodeSeen(container.getNodeId(), schedulerId);
-      sendEvent(new AMNodeEventContainerAllocated(container
-          .getNodeId(), schedulerId, container.getId()));
-    }
-
-
-    TaskAttempt taskAttempt = event.getTaskAttempt();
-    // TODO - perhaps check if the task still needs this container
-    // because the deallocateTask downcall may have raced with the
-    // taskAllocated() upcall
-    assert task.equals(taskAttempt);
- 
-    if (appContext.getAllContainers().get(containerId).getState() == AMContainerState.ALLOCATED) {
-      sendEvent(new AMContainerEventLaunchRequest(containerId, taskAttempt.getVertexID(),
-          event.getContainerContext(), event.getLauncherId(), event.getTaskCommId()));
-    }
-    sendEvent(new DAGEventSchedulerUpdateTAAssigned(taskAttempt, container));
-    sendEvent(new AMContainerEventAssignTA(containerId, taskAttempt.getID(),
-        event.getRemoteTaskSpec(), event.getContainerContext().getLocalResources(), event
-            .getContainerContext().getCredentials(), event.getPriority()));
-  }
-
-  public synchronized void containerCompleted(int schedulerId, Object task, ContainerStatus containerStatus) {
-    // SchedulerId isn't used here since no node updates are sent out
-    // Inform the Containers about completion.
-    AMContainer amContainer = appContext.getAllContainers().get(containerStatus.getContainerId());
-    if (amContainer != null) {
-      String message = "Container completed. ";
-      TaskAttemptTerminationCause errCause = TaskAttemptTerminationCause.CONTAINER_EXITED;
-      int exitStatus = containerStatus.getExitStatus();
-      if (exitStatus == ContainerExitStatus.PREEMPTED) {
-        message = "Container preempted externally. ";
-        errCause = TaskAttemptTerminationCause.EXTERNAL_PREEMPTION;
-      } else if (exitStatus == ContainerExitStatus.DISKS_FAILED) {
-        message = "Container disk failed. ";
-        errCause = TaskAttemptTerminationCause.NODE_DISK_ERROR;
-      } else if (exitStatus != ContainerExitStatus.SUCCESS){
-        message = "Container failed, exitCode=" + exitStatus + ". ";
-      }
-      if (containerStatus.getDiagnostics() != null) {
-        message += containerStatus.getDiagnostics();
-      }
-      sendEvent(new AMContainerEventCompleted(amContainer.getContainerId(), exitStatus, message, errCause));
-    }
-  }
-
-  public synchronized void containerBeingReleased(int schedulerId, ContainerId containerId) {
-    // SchedulerId isn't used here since no node updates are sent out
-    AMContainer amContainer = appContext.getAllContainers().get(containerId);
-    if (amContainer != null) {
-      sendEvent(new AMContainerEventStopRequest(containerId));
-    }
-  }
-
-  @SuppressWarnings("unchecked")
-  public synchronized void nodesUpdated(int schedulerId, List<NodeReport> updatedNodes) {
-    for (NodeReport nr : updatedNodes) {
-      // Scheduler will find out from the node, if at all.
-      // Relying on the RM to not allocate containers on an unhealthy node.
-      eventHandler.handle(new AMNodeEventStateChanged(nr, schedulerId));
-    }
-  }
-
-  public synchronized void appShutdownRequested(int schedulerId) {
-    // This can happen if the RM has been restarted. If it is in that state,
-    // this application must clean itself up.
-    LOG.info("App shutdown requested by scheduler {}", schedulerId);
-    sendEvent(new DAGAppMasterEvent(DAGAppMasterEventType.AM_REBOOT));
-  }
-
-  public synchronized void setApplicationRegistrationData(
-      int schedulerId,
-      Resource maxContainerCapability,
-      Map<ApplicationAccessType, String> appAcls, 
-      ByteBuffer clientAMSecretKey) {
-    this.appContext.getClusterInfo().setMaxContainerCapability(
-        maxContainerCapability);
-    this.appAcls = appAcls;
-    this.clientService.setClientAMSecretKey(clientAMSecretKey);
-  }
-
-  // Not synchronized to avoid deadlocks from TaskScheduler callbacks.
-  // TaskScheduler uses a separate thread for it's callbacks. Since this method
-  // returns a value which is required, the TaskScheduler wait for the call to
-  // complete and can hence lead to a deadlock if called from within a TSEH lock.
-  public AppFinalStatus getFinalAppStatus() {
-    FinalApplicationStatus finishState = FinalApplicationStatus.UNDEFINED;
-    StringBuffer sb = new StringBuffer();
-    if (dagAppMaster == null) {
-      finishState = FinalApplicationStatus.UNDEFINED;
-      sb.append("App not yet initialized");
-    } else {
-      DAGAppMasterState appMasterState = dagAppMaster.getState();
-      if (appMasterState == DAGAppMasterState.SUCCEEDED) {
-        finishState = FinalApplicationStatus.SUCCEEDED;
-      } else if (appMasterState == DAGAppMasterState.KILLED
-          || (appMasterState == DAGAppMasterState.RUNNING && isSignalled)) {
-        finishState = FinalApplicationStatus.KILLED;
-      } else if (appMasterState == DAGAppMasterState.FAILED
-          || appMasterState == DAGAppMasterState.ERROR) {
-        finishState = FinalApplicationStatus.FAILED;
-      } else {
-        finishState = FinalApplicationStatus.UNDEFINED;
-      }
-      List<String> diagnostics = dagAppMaster.getDiagnostics();
-      if(diagnostics != null) {
-        for (String s : diagnostics) {
-          sb.append(s).append("\n");
-        }
-      }
-    }
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("Setting job diagnostics to " + sb.toString());
-    }
-
-    // if history url is set use the same, if historyUrl is set to "" then rm ui disables the
-    // history url
-    return new AppFinalStatus(finishState, sb.toString(), historyUrl);
-  }
-
-
-
-  // Not synchronized to avoid deadlocks from TaskScheduler callbacks.
-  // TaskScheduler uses a separate thread for it's callbacks. Since this method
-  // returns a value which is required, the TaskScheduler wait for the call to
-  // complete and can hence lead to a deadlock if called from within a TSEH lock.
-  public float getProgress(int schedulerId) {
-    // at this point allocate has been called and so node count must be available
-    // may change after YARN-1722
-    // This is a heartbeat in from the scheduler into the APP, and is being used to piggy-back and
-    // node updates from the cluster.
-
-    // Doubles as a mechanism to update node counts periodically. Hence schedulerId required.
-
-    // TODO Handle this in TEZ-2124. Need a way to know which scheduler is calling in.
-    int nodeCount = taskSchedulers[0].getClusterNodeCount();
-    if (nodeCount != cachedNodeCount) {
-      cachedNodeCount = nodeCount;
-      sendEvent(new AMNodeEventNodeCountUpdated(cachedNodeCount, schedulerId));
-    }
-    return dagAppMaster.getProgress();
-  }
-
-  public void onError(int schedulerId, Throwable t) {
-    LOG.info("Error reported by scheduler {} - {}", schedulerId, t);
-    sendEvent(new DAGAppMasterEventSchedulingServiceError(t));
-  }
-
-  public void dagCompleted() {
-    for (int i = 0 ; i < taskSchedulers.length ; i++) {
-      taskSchedulers[i].dagComplete();
-    }
-  }
-
-  public void dagSubmitted() {
-    // Nothing to do right now. Indicates that a new DAG has been submitted and
-    // the context has updated information.
-  }
-
-  public void preemptContainer(int schedulerId, ContainerId containerId) {
-    // TODO Why is this making a call back into the scheduler, when the call is originating from there.
-    // An AMContainer instance should already exist if an attempt is being made to preempt it
-    AMContainer amContainer = appContext.getAllContainers().get(containerId);
-    taskSchedulers[amContainer.getTaskSchedulerIdentifier()].deallocateContainer(containerId);
-    // Inform the Containers about completion.
-    sendEvent(new AMContainerEventCompleted(containerId, ContainerExitStatus.INVALID,
-        "Container preempted internally", TaskAttemptTerminationCause.INTERNAL_PREEMPTION));
-  }
-
-  public void setShouldUnregisterFlag() {
-    LOG.info("TaskScheduler notified that it should unregister from RM");
-    this.shouldUnregisterFlag.set(true);
-    for (int i = 0 ; i < taskSchedulers.length ; i++) {
-      if (this.taskSchedulers[i] != null) {
-        this.taskSchedulers[i].setShouldUnregister();
-      }
-    }
-  }
-
-  public ContainerSignatureMatcher getContainerSignatureMatcher() {
-    return containerSignatureMatcher;
-  }
-
-  public boolean hasUnregistered() {
-    boolean result = true;
-    for (int i = 0 ; i < taskSchedulers.length ; i++) {
-      result = result & this.taskSchedulers[i].hasUnregistered();
-      if (result == false) {
-        return result;
-      }
-    }
-    return result;
-  }
-
-  @VisibleForTesting
-  public String getHistoryUrl() {
-    Configuration config = this.appContext.getAMConf();
-    String historyUrl = "";
-
-    String loggingClass =  config.get(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, "");
-    String historyUrlTemplate = config.get(TezConfiguration.TEZ_AM_TEZ_UI_HISTORY_URL_TEMPLATE,
-            TezConfiguration.TEZ_AM_TEZ_UI_HISTORY_URL_TEMPLATE_DEFAULT);
-    String historyUrlBase = config.get(TezConfiguration.TEZ_HISTORY_URL_BASE, "");
-
-
-    if (loggingClass.equals("org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService") &&
-        !historyUrlTemplate.isEmpty() &&
-        !historyUrlBase.isEmpty()) {
-      // replace the placeholders, while tolerating extra or missing "/" in input. replace all
-      // instances of consecutive "/" with single (except for the http(s):// case
-      historyUrl = historyUrlTemplate
-          .replaceAll(APPLICATION_ID_PLACEHOLDER, appContext.getApplicationID().toString())
-          .replaceAll(HISTORY_URL_BASE, historyUrlBase)
-          .replaceAll("([^:])/{2,}", "$1/");
-
-      // make sure we have a valid scheme
-      if (!historyUrl.startsWith("http")) {
-        historyUrl = "http://" + historyUrl;
-      }
-    }
-
-    return historyUrl;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
new file mode 100644
index 0000000..29143a2
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
@@ -0,0 +1,786 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.tez.dag.api.NamedEntityDescriptor;
+import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
+import org.apache.tez.serviceplugins.api.TaskScheduler;
+import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
+import org.apache.tez.serviceplugins.api.TaskSchedulerContext.AppFinalStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.TaskLocationHint;
+import org.apache.tez.dag.api.TaskLocationHint.TaskBasedLocationAffinity;
+import org.apache.tez.dag.api.client.DAGClientServer;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.DAGAppMaster;
+import org.apache.tez.dag.app.DAGAppMasterState;
+import org.apache.tez.dag.app.dag.TaskAttempt;
+import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.event.DAGAppMasterEvent;
+import org.apache.tez.dag.app.dag.event.DAGAppMasterEventSchedulingServiceError;
+import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
+import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdateTAAssigned;
+import org.apache.tez.dag.app.rm.container.AMContainer;
+import org.apache.tez.dag.app.rm.container.AMContainerEventAssignTA;
+import org.apache.tez.dag.app.rm.container.AMContainerEventCompleted;
+import org.apache.tez.dag.app.rm.container.AMContainerEventLaunchRequest;
+import org.apache.tez.dag.app.rm.container.AMContainerEventStopRequest;
+import org.apache.tez.dag.app.rm.container.AMContainerEventTASucceeded;
+import org.apache.tez.dag.app.rm.container.AMContainerState;
+import org.apache.tez.common.ContainerSignatureMatcher;
+import org.apache.tez.dag.app.rm.node.AMNodeEventContainerAllocated;
+import org.apache.tez.dag.app.rm.node.AMNodeEventNodeCountUpdated;
+import org.apache.tez.dag.app.rm.node.AMNodeEventStateChanged;
+import org.apache.tez.dag.app.rm.node.AMNodeEventTaskAttemptEnded;
+import org.apache.tez.dag.app.rm.node.AMNodeEventTaskAttemptSucceeded;
+import org.apache.tez.dag.app.web.WebUIService;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
+
+import com.google.common.base.Preconditions;
+
+
+public class TaskSchedulerManager extends AbstractService implements
+                                               EventHandler<AMSchedulerEvent> {
+  static final Logger LOG = LoggerFactory.getLogger(TaskSchedulerManager.class);
+
+  static final String APPLICATION_ID_PLACEHOLDER = "__APPLICATION_ID__";
+  static final String HISTORY_URL_BASE = "__HISTORY_URL_BASE__";
+
+  protected final AppContext appContext;
+  @SuppressWarnings("rawtypes")
+  private final EventHandler eventHandler;
+  private final String historyUrl;
+  private DAGAppMaster dagAppMaster;
+  private Map<ApplicationAccessType, String> appAcls = null;
+  private Thread eventHandlingThread;
+  private volatile boolean stopEventHandling;
+  // Has a signal (SIGTERM etc) been issued?
+  protected volatile boolean isSignalled = false;
+  final DAGClientServer clientService;
+  private final ContainerSignatureMatcher containerSignatureMatcher;
+  private int cachedNodeCount = -1;
+  private AtomicBoolean shouldUnregisterFlag =
+      new AtomicBoolean(false);
+  private final WebUIService webUI;
+  private final NamedEntityDescriptor[] taskSchedulerDescriptors;
+  protected final TaskScheduler[]taskSchedulers;
+  protected final ServicePluginLifecycleAbstractService []taskSchedulerServiceWrappers;
+
+  // Single executor service shared by all Schedulers for context callbacks
+  @VisibleForTesting
+  final ExecutorService appCallbackExecutor;
+
+  private final boolean isPureLocalMode;
+  // If running in non local-only mode, the YARN task scheduler will always run to take care of
+  // registration with YARN and heartbeats to YARN.
+  // Splitting registration and heartbeats is not straight-forward due to the taskScheduler being
+  // tied to a ContainerRequestType.
+  // Custom AppIds to avoid container conflicts if there's multiple sources
+  private final long SCHEDULER_APP_ID_BASE = 111101111;
+  private final long SCHEDULER_APP_ID_INCREMENT = 111111111;
+
+  BlockingQueue<AMSchedulerEvent> eventQueue
+                              = new LinkedBlockingQueue<AMSchedulerEvent>();
+
+  // Not tracking container / task to schedulerId. Instead relying on everything flowing through
+  // the system and being propagated back via events.
+
+  /**
+   *
+   * @param appContext
+   * @param clientService
+   * @param eventHandler
+   * @param containerSignatureMatcher
+   * @param webUI
+   * @param schedulerDescriptors the list of scheduler descriptors. Tez internal classes will not have the class names populated.
+   *                         An empty list defaults to using the YarnTaskScheduler as the only source.
+   * @param isPureLocalMode whether the AM is running in local mode
+   */
+  @SuppressWarnings("rawtypes")
+  public TaskSchedulerManager(AppContext appContext,
+                              DAGClientServer clientService, EventHandler eventHandler,
+                              ContainerSignatureMatcher containerSignatureMatcher,
+                              WebUIService webUI,
+                              List<NamedEntityDescriptor> schedulerDescriptors,
+                              boolean isPureLocalMode) {
+    super(TaskSchedulerManager.class.getName());
+    Preconditions.checkArgument(schedulerDescriptors != null && !schedulerDescriptors.isEmpty(),
+        "TaskSchedulerDescriptors must be specified");
+    this.appContext = appContext;
+    this.eventHandler = eventHandler;
+    this.clientService = clientService;
+    this.containerSignatureMatcher = containerSignatureMatcher;
+    this.webUI = webUI;
+    this.historyUrl = getHistoryUrl();
+    this.isPureLocalMode = isPureLocalMode;
+    this.appCallbackExecutor = createAppCallbackExecutorService();
+    if (this.webUI != null) {
+      this.webUI.setHistoryUrl(this.historyUrl);
+    }
+
+    this.taskSchedulerDescriptors = schedulerDescriptors.toArray(new NamedEntityDescriptor[schedulerDescriptors.size()]);
+
+    taskSchedulers = new TaskScheduler[this.taskSchedulerDescriptors.length];
+    taskSchedulerServiceWrappers = new ServicePluginLifecycleAbstractService[this.taskSchedulerDescriptors.length];
+  }
+
+  public Map<ApplicationAccessType, String> getApplicationAcls() {
+    return appAcls;
+  }
+
+  public void setSignalled(boolean isSignalled) {
+    this.isSignalled = isSignalled;
+    LOG.info("TaskScheduler notified that iSignalled was : " + isSignalled);
+  }
+
+  public int getNumClusterNodes() {
+    return cachedNodeCount;
+  }
+  
+  public Resource getAvailableResources(int schedulerId) {
+    return taskSchedulers[schedulerId].getAvailableResources();
+  }
+
+  public Resource getTotalResources(int schedulerId) {
+    return taskSchedulers[schedulerId].getTotalResources();
+  }
+
+  private ExecutorService createAppCallbackExecutorService() {
+    return Executors.newSingleThreadExecutor(
+        new ThreadFactoryBuilder().setNameFormat("TaskSchedulerAppCallbackExecutor #%d")
+            .setDaemon(true)
+            .build());
+  }
+
+  public synchronized void handleEvent(AMSchedulerEvent sEvent) {
+    LOG.info("Processing the event " + sEvent.toString());
+    switch (sEvent.getType()) {
+    case S_TA_LAUNCH_REQUEST:
+      handleTaLaunchRequest((AMSchedulerEventTALaunchRequest) sEvent);
+      break;
+    case S_TA_ENDED: // TaskAttempt considered complete.
+      AMSchedulerEventTAEnded event = (AMSchedulerEventTAEnded)sEvent;
+      switch(event.getState()) {
+      case FAILED:
+      case KILLED:
+        handleTAUnsuccessfulEnd(event);
+        break;
+      case SUCCEEDED:
+        handleTASucceeded(event);
+        break;
+      default:
+        throw new TezUncheckedException("Unexecpted TA_ENDED state: " + event.getState());
+      }
+      break;
+    case S_CONTAINER_DEALLOCATE:
+      handleContainerDeallocate((AMSchedulerEventDeallocateContainer)sEvent);
+      break;
+    case S_NODE_UNBLACKLISTED:
+      // fall through
+    case S_NODE_BLACKLISTED:
+      handleNodeBlacklistUpdate((AMSchedulerEventNodeBlacklistUpdate)sEvent);
+      break;
+    case S_NODE_UNHEALTHY:
+      break;
+    case S_NODE_HEALTHY:
+      // Consider changing this to work like BLACKLISTING.
+      break;
+    default:
+      break;
+    }
+  }
+
+  @Override
+  public void handle(AMSchedulerEvent event) {
+    int qSize = eventQueue.size();
+    if (qSize != 0 && qSize % 1000 == 0) {
+      LOG.info("Size of event-queue in RMContainerAllocator is " + qSize);
+    }
+    int remCapacity = eventQueue.remainingCapacity();
+    if (remCapacity < 1000) {
+      LOG.warn("Very low remaining capacity in the event-queue "
+          + "of RMContainerAllocator: " + remCapacity);
+    }
+    try {
+      eventQueue.put(event);
+    } catch (InterruptedException e) {
+      throw new TezUncheckedException(e);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private void sendEvent(Event<?> event) {
+    eventHandler.handle(event);
+  }
+
+  private void handleNodeBlacklistUpdate(AMSchedulerEventNodeBlacklistUpdate event) {
+    if (event.getType() == AMSchedulerEventType.S_NODE_BLACKLISTED) {
+      taskSchedulers[event.getSchedulerId()].blacklistNode(event.getNodeId());
+    } else if (event.getType() == AMSchedulerEventType.S_NODE_UNBLACKLISTED) {
+      taskSchedulers[event.getSchedulerId()].unblacklistNode(event.getNodeId());
+    } else {
+      throw new TezUncheckedException("Invalid event type: " + event.getType());
+    }
+  }
+
+  private void handleContainerDeallocate(
+                                  AMSchedulerEventDeallocateContainer event) {
+    ContainerId containerId = event.getContainerId();
+    // TODO what happens to the task that was connected to this container?
+    // current assumption is that it will eventually call handleTaStopRequest
+    //TaskAttempt taskAttempt = (TaskAttempt)
+    taskSchedulers[event.getSchedulerId()].deallocateContainer(containerId);
+    // TODO does this container need to be stopped via C_STOP_REQUEST
+    sendEvent(new AMContainerEventStopRequest(containerId));
+  }
+
+  private void handleTAUnsuccessfulEnd(AMSchedulerEventTAEnded event) {
+    TaskAttempt attempt = event.getAttempt();
+    // Propagate state and failure cause (if any) when informing the scheduler about the de-allocation.
+    boolean wasContainerAllocated = taskSchedulers[event.getSchedulerId()]
+        .deallocateTask(attempt, false, event.getTaskAttemptEndReason(), event.getDiagnostics());
+    // use stored value of container id in case the scheduler has removed this
+    // assignment because the task has been deallocated earlier.
+    // retroactive case
+    ContainerId attemptContainerId = attempt.getAssignedContainerID();
+
+    if(!wasContainerAllocated) {
+      LOG.info("Task: " + attempt.getID() +
+          " has no container assignment in the scheduler");
+      if (attemptContainerId != null) {
+        LOG.error("No container allocated to task: " + attempt.getID()
+            + " according to scheduler. Task reported container id: "
+            + attemptContainerId);
+      }
+    }
+
+    if (attemptContainerId != null) {
+      // TODO either ways send the necessary events
+      // Ask the container to stop.
+      sendEvent(new AMContainerEventStopRequest(attemptContainerId));
+      // Inform the Node - the task has asked to be STOPPED / has already
+      // stopped.
+      // AMNodeImpl blacklisting logic does not account for KILLED attempts.
+      sendEvent(new AMNodeEventTaskAttemptEnded(appContext.getAllContainers().
+          get(attemptContainerId).getContainer().getNodeId(), event.getSchedulerId(),
+          attemptContainerId,
+          attempt.getID(), event.getState() == TaskAttemptState.FAILED));
+    }
+  }
+
+  private void handleTASucceeded(AMSchedulerEventTAEnded event) {
+    TaskAttempt attempt = event.getAttempt();
+    ContainerId usedContainerId = event.getUsedContainerId();
+
+    // This could be null if a task fails / is killed before a container is
+    // assigned to it.
+    if (event.getUsedContainerId() != null) {
+      sendEvent(new AMContainerEventTASucceeded(usedContainerId,
+          event.getAttemptID()));
+      sendEvent(new AMNodeEventTaskAttemptSucceeded(appContext.getAllContainers().
+          get(usedContainerId).getContainer().getNodeId(), event.getSchedulerId(), usedContainerId,
+          event.getAttemptID()));
+    }
+
+    boolean wasContainerAllocated = taskSchedulers[event.getSchedulerId()].deallocateTask(attempt,
+        true, null, event.getDiagnostics());
+    if (!wasContainerAllocated) {
+      LOG.error("De-allocated successful task: " + attempt.getID()
+          + ", but TaskScheduler reported no container assigned to task");
+    }
+  }
+
+  private void handleTaLaunchRequest(AMSchedulerEventTALaunchRequest event) {
+    TaskAttempt taskAttempt = event.getTaskAttempt();
+    TaskLocationHint locationHint = event.getLocationHint();
+    String hosts[] = null;
+    String racks[] = null;
+    if (locationHint != null) {
+      TaskBasedLocationAffinity taskAffinity = locationHint.getAffinitizedTask();
+      if (taskAffinity != null) {
+        Vertex vertex = appContext.getCurrentDAG().getVertex(taskAffinity.getVertexName());
+        Preconditions.checkNotNull(vertex, "Invalid vertex in task based affinity " + taskAffinity 
+            + " for attempt: " + taskAttempt.getID());
+        int taskIndex = taskAffinity.getTaskIndex(); 
+        Preconditions.checkState(taskIndex >=0 && taskIndex < vertex.getTotalTasks(), 
+            "Invalid taskIndex in task based affinity " + taskAffinity 
+            + " for attempt: " + taskAttempt.getID());
+        TaskAttempt affinityAttempt = vertex.getTask(taskIndex).getSuccessfulAttempt();
+        if (affinityAttempt != null) {
+          Preconditions.checkNotNull(affinityAttempt.getAssignedContainerID(), affinityAttempt.getID());
+          taskSchedulers[event.getSchedulerId()].allocateTask(taskAttempt,
+              event.getCapability(),
+              affinityAttempt.getAssignedContainerID(),
+              Priority.newInstance(event.getPriority()),
+              event.getContainerContext(),
+              event);
+          return;
+        }
+        LOG.info("Attempt: " + taskAttempt.getID() + " has task based affinity to " + taskAffinity 
+            + " but no locality information exists for it. Ignoring hint.");
+        // fall through with null hosts/racks
+      } else {
+        hosts = (locationHint.getHosts() != null) ? locationHint
+            .getHosts().toArray(
+                new String[locationHint.getHosts().size()]) : null;
+        racks = (locationHint.getRacks() != null) ? locationHint.getRacks()
+            .toArray(new String[locationHint.getRacks().size()]) : null;
+      }
+    }
+
+    taskSchedulers[event.getSchedulerId()].allocateTask(taskAttempt,
+        event.getCapability(),
+        hosts,
+        racks,
+        Priority.newInstance(event.getPriority()),
+        event.getContainerContext(),
+        event);
+  }
+
+  @VisibleForTesting
+  TaskScheduler createTaskScheduler(String host, int port, String trackingUrl,
+                                                   AppContext appContext,
+                                                   NamedEntityDescriptor taskSchedulerDescriptor,
+                                                   long customAppIdIdentifier,
+                                                   int schedulerId) {
+    TaskSchedulerContext rawContext =
+        new TaskSchedulerContextImpl(this, appContext, schedulerId, trackingUrl,
+            customAppIdIdentifier, host, port, taskSchedulerDescriptor.getUserPayload());
+    TaskSchedulerContext wrappedContext = wrapTaskSchedulerContext(rawContext);
+    String schedulerName = taskSchedulerDescriptor.getEntityName();
+    if (schedulerName.equals(TezConstants.getTezYarnServicePluginName())) {
+      return createYarnTaskScheduler(wrappedContext, schedulerId);
+    } else if (schedulerName.equals(TezConstants.getTezUberServicePluginName())) {
+      return createUberTaskScheduler(wrappedContext, schedulerId);
+    } else {
+      return createCustomTaskScheduler(wrappedContext, taskSchedulerDescriptor, schedulerId);
+    }
+  }
+
+  @VisibleForTesting
+  TaskSchedulerContext wrapTaskSchedulerContext(TaskSchedulerContext rawContext) {
+    return new TaskSchedulerContextImplWrapper(rawContext, appCallbackExecutor);
+  }
+
+  @VisibleForTesting
+  TaskScheduler createYarnTaskScheduler(TaskSchedulerContext taskSchedulerContext,
+                                        int schedulerId) {
+    LOG.info("Creating TaskScheduler: YarnTaskSchedulerService");
+    return new YarnTaskSchedulerService(taskSchedulerContext);
+  }
+
+  @VisibleForTesting
+  TaskScheduler createUberTaskScheduler(TaskSchedulerContext taskSchedulerContext,
+                                        int schedulerId) {
+    LOG.info("Creating TaskScheduler: Local TaskScheduler");
+    return new LocalTaskSchedulerService(taskSchedulerContext);
+  }
+
+  @SuppressWarnings("unchecked")
+  TaskScheduler createCustomTaskScheduler(TaskSchedulerContext taskSchedulerContext,
+                                          NamedEntityDescriptor taskSchedulerDescriptor,
+                                          int schedulerId) {
+    LOG.info("Creating custom TaskScheduler {}:{}", taskSchedulerDescriptor.getEntityName(),
+        taskSchedulerDescriptor.getClassName());
+    Class<? extends TaskScheduler> taskSchedulerClazz =
+        (Class<? extends TaskScheduler>) ReflectionUtils
+            .getClazz(taskSchedulerDescriptor.getClassName());
+    try {
+      Constructor<? extends TaskScheduler> ctor = taskSchedulerClazz
+          .getConstructor(TaskSchedulerContext.class);
+      return ctor.newInstance(taskSchedulerContext);
+    } catch (NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) {
+      throw new TezUncheckedException(e);
+    }
+  }
+
+  @VisibleForTesting
+  protected void instantiateSchedulers(String host, int port, String trackingUrl,
+                                       AppContext appContext) {
+    // Iterate over the list and create all the taskSchedulers
+    int j = 0;
+    for (int i = 0; i < taskSchedulerDescriptors.length; i++) {
+      long customAppIdIdentifier;
+      if (isPureLocalMode || taskSchedulerDescriptors[i].getEntityName().equals(
+          TezConstants.getTezYarnServicePluginName())) { // Use the app identifier from the appId.
+        customAppIdIdentifier = appContext.getApplicationID().getClusterTimestamp();
+      } else {
+        customAppIdIdentifier = SCHEDULER_APP_ID_BASE + (j++ * SCHEDULER_APP_ID_INCREMENT);
+      }
+      LOG.info("ClusterIdentifier for TaskScheduler [" + i + ":" + taskSchedulerDescriptors[i].getEntityName() + "]=" +
+          customAppIdIdentifier);
+      taskSchedulers[i] = createTaskScheduler(host, port,
+          trackingUrl, appContext, taskSchedulerDescriptors[i], customAppIdIdentifier, i);
+      taskSchedulerServiceWrappers[i] = new ServicePluginLifecycleAbstractService<>(taskSchedulers[i]);
+    }
+  }
+
+  
+  @Override
+  public synchronized void serviceStart() {
+    InetSocketAddress serviceAddr = clientService.getBindAddress();
+    dagAppMaster = appContext.getAppMaster();
+    // if web service is enabled then set tracking url. else disable it (value = "").
+    // the actual url set on the rm web ui will be the proxy url set by WebAppProxyServlet, which
+    // always try to connect to AM and proxy the response. hence it wont work if the webUIService
+    // is not enabled.
+    String trackingUrl = (webUI != null) ? webUI.getTrackingURL() : "";
+    instantiateSchedulers(serviceAddr.getHostName(), serviceAddr.getPort(), trackingUrl, appContext);
+
+    for (int i = 0 ; i < taskSchedulers.length ; i++) {
+      taskSchedulerServiceWrappers[i].init(getConfig());
+      taskSchedulerServiceWrappers[i].start();
+      if (shouldUnregisterFlag.get()) {
+        // Flag may have been set earlier when task scheduler was not initialized
+        // External services could need to talk to some other entity.
+        taskSchedulers[i].setShouldUnregister();
+      }
+    }
+
+    this.eventHandlingThread = new Thread("TaskSchedulerEventHandlerThread") {
+      @Override
+      public void run() {
+
+        AMSchedulerEvent event;
+
+        while (!stopEventHandling && !Thread.currentThread().isInterrupted()) {
+          try {
+            if (TaskSchedulerManager.this.eventQueue.peek() == null) {
+              notifyForTest();
+            }
+            event = TaskSchedulerManager.this.eventQueue.take();
+          } catch (InterruptedException e) {
+            if(!stopEventHandling) {
+              LOG.warn("Continuing after interrupt : ", e);
+            }
+            continue;
+          }
+
+          try {
+            handleEvent(event);
+          } catch (Throwable t) {
+            LOG.error("Error in handling event type " + event.getType()
+                + " to the TaskScheduler", t);
+            // Kill the AM.
+            sendEvent(new DAGAppMasterEvent(DAGAppMasterEventType.INTERNAL_ERROR));
+            return;
+          } finally {
+            notifyForTest();
+          }
+        }
+      }
+    };
+    this.eventHandlingThread.start();
+  }
+  
+  protected void notifyForTest() {
+  }
+
+  public void initiateStop() {
+    for (int i = 0 ; i < taskSchedulers.length ; i++) {
+      taskSchedulers[i].initiateStop();
+    }
+  }
+
+  @Override
+  public void serviceStop() throws InterruptedException {
+    synchronized(this) {
+      this.stopEventHandling = true;
+      if (eventHandlingThread != null)
+        eventHandlingThread.interrupt();
+    }
+    for (int i = 0 ; i < taskSchedulers.length ; i++) {
+      if (taskSchedulers[i] != null) {
+        taskSchedulerServiceWrappers[i].stop();
+      }
+    }
+    LOG.info("Shutting down AppCallbackExecutor");
+    appCallbackExecutor.shutdownNow();
+    appCallbackExecutor.awaitTermination(1000l, TimeUnit.MILLISECONDS);
+  }
+
+  // TaskSchedulerAppCallback methods with schedulerId, where relevant
+  public synchronized void taskAllocated(int schedulerId, Object task,
+                                           Object appCookie,
+                                           Container container) {
+    AMSchedulerEventTALaunchRequest event =
+        (AMSchedulerEventTALaunchRequest) appCookie;
+    ContainerId containerId = container.getId();
+    if (appContext.getAllContainers()
+        .addContainerIfNew(container, schedulerId, event.getLauncherId(),
+            event.getTaskCommId())) {
+      appContext.getNodeTracker().nodeSeen(container.getNodeId(), schedulerId);
+      sendEvent(new AMNodeEventContainerAllocated(container
+          .getNodeId(), schedulerId, container.getId()));
+    }
+
+
+    TaskAttempt taskAttempt = event.getTaskAttempt();
+    // TODO - perhaps check if the task still needs this container
+    // because the deallocateTask downcall may have raced with the
+    // taskAllocated() upcall
+    assert task.equals(taskAttempt);
+ 
+    if (appContext.getAllContainers().get(containerId).getState() == AMContainerState.ALLOCATED) {
+      sendEvent(new AMContainerEventLaunchRequest(containerId, taskAttempt.getVertexID(),
+          event.getContainerContext(), event.getLauncherId(), event.getTaskCommId()));
+    }
+    sendEvent(new DAGEventSchedulerUpdateTAAssigned(taskAttempt, container));
+    sendEvent(new AMContainerEventAssignTA(containerId, taskAttempt.getID(),
+        event.getRemoteTaskSpec(), event.getContainerContext().getLocalResources(), event
+            .getContainerContext().getCredentials(), event.getPriority()));
+  }
+
+  public synchronized void containerCompleted(int schedulerId, Object task, ContainerStatus containerStatus) {
+    // SchedulerId isn't used here since no node updates are sent out
+    // Inform the Containers about completion.
+    AMContainer amContainer = appContext.getAllContainers().get(containerStatus.getContainerId());
+    if (amContainer != null) {
+      String message = "Container completed. ";
+      TaskAttemptTerminationCause errCause = TaskAttemptTerminationCause.CONTAINER_EXITED;
+      int exitStatus = containerStatus.getExitStatus();
+      if (exitStatus == ContainerExitStatus.PREEMPTED) {
+        message = "Container preempted externally. ";
+        errCause = TaskAttemptTerminationCause.EXTERNAL_PREEMPTION;
+      } else if (exitStatus == ContainerExitStatus.DISKS_FAILED) {
+        message = "Container disk failed. ";
+        errCause = TaskAttemptTerminationCause.NODE_DISK_ERROR;
+      } else if (exitStatus != ContainerExitStatus.SUCCESS){
+        message = "Container failed, exitCode=" + exitStatus + ". ";
+      }
+      if (containerStatus.getDiagnostics() != null) {
+        message += containerStatus.getDiagnostics();
+      }
+      sendEvent(new AMContainerEventCompleted(amContainer.getContainerId(), exitStatus, message, errCause));
+    }
+  }
+
+  public synchronized void containerBeingReleased(int schedulerId, ContainerId containerId) {
+    // SchedulerId isn't used here since no node updates are sent out
+    AMContainer amContainer = appContext.getAllContainers().get(containerId);
+    if (amContainer != null) {
+      sendEvent(new AMContainerEventStopRequest(containerId));
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  public synchronized void nodesUpdated(int schedulerId, List<NodeReport> updatedNodes) {
+    for (NodeReport nr : updatedNodes) {
+      // Scheduler will find out from the node, if at all.
+      // Relying on the RM to not allocate containers on an unhealthy node.
+      eventHandler.handle(new AMNodeEventStateChanged(nr, schedulerId));
+    }
+  }
+
+  public synchronized void appShutdownRequested(int schedulerId) {
+    // This can happen if the RM has been restarted. If it is in that state,
+    // this application must clean itself up.
+    LOG.info("App shutdown requested by scheduler {}", schedulerId);
+    sendEvent(new DAGAppMasterEvent(DAGAppMasterEventType.AM_REBOOT));
+  }
+
+  public synchronized void setApplicationRegistrationData(
+      int schedulerId,
+      Resource maxContainerCapability,
+      Map<ApplicationAccessType, String> appAcls, 
+      ByteBuffer clientAMSecretKey) {
+    this.appContext.getClusterInfo().setMaxContainerCapability(
+        maxContainerCapability);
+    this.appAcls = appAcls;
+    this.clientService.setClientAMSecretKey(clientAMSecretKey);
+  }
+
+  // Not synchronized to avoid deadlocks from TaskScheduler callbacks.
+  // TaskScheduler uses a separate thread for it's callbacks. Since this method
+  // returns a value which is required, the TaskScheduler wait for the call to
+  // complete and can hence lead to a deadlock if called from within a TSEH lock.
+  public AppFinalStatus getFinalAppStatus() {
+    FinalApplicationStatus finishState = FinalApplicationStatus.UNDEFINED;
+    StringBuffer sb = new StringBuffer();
+    if (dagAppMaster == null) {
+      finishState = FinalApplicationStatus.UNDEFINED;
+      sb.append("App not yet initialized");
+    } else {
+      DAGAppMasterState appMasterState = dagAppMaster.getState();
+      if (appMasterState == DAGAppMasterState.SUCCEEDED) {
+        finishState = FinalApplicationStatus.SUCCEEDED;
+      } else if (appMasterState == DAGAppMasterState.KILLED
+          || (appMasterState == DAGAppMasterState.RUNNING && isSignalled)) {
+        finishState = FinalApplicationStatus.KILLED;
+      } else if (appMasterState == DAGAppMasterState.FAILED
+          || appMasterState == DAGAppMasterState.ERROR) {
+        finishState = FinalApplicationStatus.FAILED;
+      } else {
+        finishState = FinalApplicationStatus.UNDEFINED;
+      }
+      List<String> diagnostics = dagAppMaster.getDiagnostics();
+      if(diagnostics != null) {
+        for (String s : diagnostics) {
+          sb.append(s).append("\n");
+        }
+      }
+    }
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("Setting job diagnostics to " + sb.toString());
+    }
+
+    // if history url is set use the same, if historyUrl is set to "" then rm ui disables the
+    // history url
+    return new AppFinalStatus(finishState, sb.toString(), historyUrl);
+  }
+
+
+
+  // Not synchronized to avoid deadlocks from TaskScheduler callbacks.
+  // TaskScheduler uses a separate thread for it's callbacks. Since this method
+  // returns a value which is required, the TaskScheduler wait for the call to
+  // complete and can hence lead to a deadlock if called from within a TSEH lock.
+  public float getProgress(int schedulerId) {
+    // at this point allocate has been called and so node count must be available
+    // may change after YARN-1722
+    // This is a heartbeat in from the scheduler into the APP, and is being used to piggy-back and
+    // node updates from the cluster.
+
+    // Doubles as a mechanism to update node counts periodically. Hence schedulerId required.
+
+    // TODO Handle this in TEZ-2124. Need a way to know which scheduler is calling in.
+    int nodeCount = taskSchedulers[0].getClusterNodeCount();
+    if (nodeCount != cachedNodeCount) {
+      cachedNodeCount = nodeCount;
+      sendEvent(new AMNodeEventNodeCountUpdated(cachedNodeCount, schedulerId));
+    }
+    return dagAppMaster.getProgress();
+  }
+
+  public void onError(int schedulerId, Throwable t) {
+    LOG.info("Error reported by scheduler {} - {}", schedulerId, t);
+    sendEvent(new DAGAppMasterEventSchedulingServiceError(t));
+  }
+
+  public void dagCompleted() {
+    for (int i = 0 ; i < taskSchedulers.length ; i++) {
+      taskSchedulers[i].dagComplete();
+    }
+  }
+
+  public void dagSubmitted() {
+    // Nothing to do right now. Indicates that a new DAG has been submitted and
+    // the context has updated information.
+  }
+
+  public void preemptContainer(int schedulerId, ContainerId containerId) {
+    // TODO Why is this making a call back into the scheduler, when the call is originating from there.
+    // An AMContainer instance should already exist if an attempt is being made to preempt it
+    AMContainer amContainer = appContext.getAllContainers().get(containerId);
+    taskSchedulers[amContainer.getTaskSchedulerIdentifier()].deallocateContainer(containerId);
+    // Inform the Containers about completion.
+    sendEvent(new AMContainerEventCompleted(containerId, ContainerExitStatus.INVALID,
+        "Container preempted internally", TaskAttemptTerminationCause.INTERNAL_PREEMPTION));
+  }
+
+  public void setShouldUnregisterFlag() {
+    LOG.info("TaskScheduler notified that it should unregister from RM");
+    this.shouldUnregisterFlag.set(true);
+    for (int i = 0 ; i < taskSchedulers.length ; i++) {
+      if (this.taskSchedulers[i] != null) {
+        this.taskSchedulers[i].setShouldUnregister();
+      }
+    }
+  }
+
+  public ContainerSignatureMatcher getContainerSignatureMatcher() {
+    return containerSignatureMatcher;
+  }
+
+  public boolean hasUnregistered() {
+    boolean result = true;
+    for (int i = 0 ; i < taskSchedulers.length ; i++) {
+      result = result & this.taskSchedulers[i].hasUnregistered();
+      if (result == false) {
+        return result;
+      }
+    }
+    return result;
+  }
+
+  @VisibleForTesting
+  public String getHistoryUrl() {
+    Configuration config = this.appContext.getAMConf();
+    String historyUrl = "";
+
+    String loggingClass =  config.get(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, "");
+    String historyUrlTemplate = config.get(TezConfiguration.TEZ_AM_TEZ_UI_HISTORY_URL_TEMPLATE,
+            TezConfiguration.TEZ_AM_TEZ_UI_HISTORY_URL_TEMPLATE_DEFAULT);
+    String historyUrlBase = config.get(TezConfiguration.TEZ_HISTORY_URL_BASE, "");
+
+
+    if (loggingClass.equals("org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService") &&
+        !historyUrlTemplate.isEmpty() &&
+        !historyUrlBase.isEmpty()) {
+      // replace the placeholders, while tolerating extra or missing "/" in input. replace all
+      // instances of consecutive "/" with single (except for the http(s):// case
+      historyUrl = historyUrlTemplate
+          .replaceAll(APPLICATION_ID_PLACEHOLDER, appContext.getApplicationID().toString())
+          .replaceAll(HISTORY_URL_BASE, historyUrlBase)
+          .replaceAll("([^:])/{2,}", "$1/");
+
+      // make sure we have a valid scheme
+      if (!historyUrl.startsWith("http")) {
+        historyUrl = "http://" + historyUrl;
+      }
+    }
+
+    return historyUrl;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index 99cec2b..69c21d4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -50,15 +50,15 @@ import org.apache.hadoop.yarn.util.Clock;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ContainerHeartbeatHandler;
 import org.apache.tez.dag.app.ContainerContext;
-import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
 import org.apache.tez.dag.app.dag.event.DiagnosableEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminatedBySystem;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminated;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminating;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventNodeFailed;
 import org.apache.tez.dag.app.rm.AMSchedulerEventDeallocateContainer;
-import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent;
-import org.apache.tez.dag.app.rm.NMCommunicatorStopRequestEvent;
+import org.apache.tez.dag.app.rm.ContainerLauncherLaunchRequestEvent;
+import org.apache.tez.dag.app.rm.ContainerLauncherStopRequestEvent;
 import org.apache.tez.dag.history.DAGHistoryEvent;
 import org.apache.tez.dag.history.HistoryEventHandler;
 import org.apache.tez.dag.history.events.ContainerStoppedEvent;
@@ -81,7 +81,7 @@ public class AMContainerImpl implements AMContainer {
   private final Container container;
   private final AppContext appContext;
   private final ContainerHeartbeatHandler containerHeartbeatHandler;
-  private final TaskAttemptListener taskAttemptListener;
+  private final TaskCommunicatorManagerInterface taskCommunicatorManagerInterface;
   protected final EventHandler eventHandler;
   private final ContainerSignatureMatcher signatureMatcher;
   private final int schedulerId;
@@ -308,7 +308,7 @@ public class AMContainerImpl implements AMContainer {
   // Attempting to use a container based purely on reosurces required, etc needs
   // additional change - JvmID, YarnChild, etc depend on TaskType.
   public AMContainerImpl(Container container, ContainerHeartbeatHandler chh,
-      TaskAttemptListener tal, ContainerSignatureMatcher signatureMatcher,
+      TaskCommunicatorManagerInterface tal, ContainerSignatureMatcher signatureMatcher,
       AppContext appContext, int schedulerId, int launcherId, int taskCommId) {
     ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
     this.readLock = rwLock.readLock();
@@ -319,7 +319,7 @@ public class AMContainerImpl implements AMContainer {
     this.signatureMatcher = signatureMatcher;
     this.appContext = appContext;
     this.containerHeartbeatHandler = chh;
-    this.taskAttemptListener = tal;
+    this.taskCommunicatorManagerInterface = tal;
     this.failedAssignments = new LinkedList<TezTaskAttemptID>();
     this.schedulerId = schedulerId;
     this.launcherId = launcherId;
@@ -466,7 +466,7 @@ public class AMContainerImpl implements AMContainer {
           containerContext.getLocalResources(),
           containerContext.getEnvironment(),
           containerContext.getJavaOpts(),
-          container.taskAttemptListener.getTaskCommunicator(container.taskCommId).getAddress(), containerContext.getCredentials(),
+          container.taskCommunicatorManagerInterface.getTaskCommunicator(container.taskCommId).getAddress(), containerContext.getCredentials(),
           container.appContext, container.container.getResource(),
           container.appContext.getAMConf());
 
@@ -1095,28 +1095,28 @@ public class AMContainerImpl implements AMContainer {
   }
 
   protected void sendStartRequestToNM(ContainerLaunchContext clc) {
-    sendEvent(new NMCommunicatorLaunchRequestEvent(clc, container, launcherId, schedulerId, taskCommId));
+    sendEvent(new ContainerLauncherLaunchRequestEvent(clc, container, launcherId, schedulerId, taskCommId));
   }
 
   protected void sendStopRequestToNM() {
-    sendEvent(new NMCommunicatorStopRequestEvent(containerId,
+    sendEvent(new ContainerLauncherStopRequestEvent(containerId,
         container.getNodeId(), container.getContainerToken(), launcherId, schedulerId, taskCommId));
   }
 
   protected void unregisterAttemptFromListener(TezTaskAttemptID attemptId, TaskAttemptEndReason endReason, String diagnostics) {
-    taskAttemptListener.unregisterTaskAttempt(attemptId, taskCommId, endReason, diagnostics);
+    taskCommunicatorManagerInterface.unregisterTaskAttempt(attemptId, taskCommId, endReason, diagnostics);
   }
 
   protected void registerAttemptWithListener(AMContainerTask amContainerTask) {
-    taskAttemptListener.registerTaskAttempt(amContainerTask, this.containerId, taskCommId);
+    taskCommunicatorManagerInterface.registerTaskAttempt(amContainerTask, this.containerId, taskCommId);
   }
 
   protected void registerWithTAListener() {
-    taskAttemptListener.registerRunningContainer(containerId, taskCommId);
+    taskCommunicatorManagerInterface.registerRunningContainer(containerId, taskCommId);
   }
 
   protected void unregisterFromTAListener(ContainerEndReason endReason, String diagnostics) {
-    this.taskAttemptListener.unregisterRunningContainer(containerId, taskCommId, endReason, diagnostics);
+    this.taskCommunicatorManagerInterface.unregisterRunningContainer(containerId, taskCommId, endReason, diagnostics);
   }
 
   protected void registerWithContainerListener() {

http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
index fcb9eaf..ab43db1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
@@ -31,19 +31,19 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ContainerHeartbeatHandler;
-import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
 
 public class AMContainerMap extends AbstractService implements EventHandler<AMContainerEvent> {
 
   private static final Logger LOG = LoggerFactory.getLogger(AMContainerMap.class);
 
   private final ContainerHeartbeatHandler chh;
-  private final TaskAttemptListener tal;
+  private final TaskCommunicatorManagerInterface tal;
   private final AppContext context;
   private final ContainerSignatureMatcher containerSignatureMatcher;
   private final ConcurrentHashMap<ContainerId, AMContainer> containerMap;
 
-  public AMContainerMap(ContainerHeartbeatHandler chh, TaskAttemptListener tal,
+  public AMContainerMap(ContainerHeartbeatHandler chh, TaskCommunicatorManagerInterface tal,
       ContainerSignatureMatcher containerSignatureMatcher, AppContext context) {
     super("AMContainerMaps");
     this.chh = chh;

http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
index b09eb86..3cab2da 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -60,8 +60,8 @@ import org.apache.tez.common.ContainerTask;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.dag.app.launcher.ContainerLauncherRouter;
-import org.apache.tez.dag.app.rm.NMCommunicatorEvent;
+import org.apache.tez.dag.app.launcher.ContainerLauncherManager;
+import org.apache.tez.dag.app.rm.ContainerLauncherEvent;
 import org.apache.tez.dag.history.HistoryEventHandler;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
@@ -122,13 +122,13 @@ public class MockDAGAppMaster extends DAGAppMaster {
   // It can be used to preempt the container for a given task
   public class MockContainerLauncher extends ContainerLauncher implements Runnable {
 
-    BlockingQueue<NMCommunicatorEvent> eventQueue = new LinkedBlockingQueue<NMCommunicatorEvent>();
+    BlockingQueue<ContainerLauncherEvent> eventQueue = new LinkedBlockingQueue<ContainerLauncherEvent>();
     Thread eventHandlingThread;
     ListeningExecutorService executorService;
     
     Map<ContainerId, ContainerData> containers = Maps.newConcurrentMap();
     ArrayBlockingQueue<Worker> workers;
-    TaskAttemptListenerImpTezDag taListener;
+    TaskCommunicatorManager taskCommunicatorManager;
     TezTaskCommunicatorImpl taskCommunicator;
     
     AtomicBoolean startScheduling = new AtomicBoolean(true);
@@ -187,8 +187,8 @@ public class MockDAGAppMaster extends DAGAppMaster {
     
     @Override
     public void start() throws Exception {
-      taListener = (TaskAttemptListenerImpTezDag) getTaskAttemptListener();
-      taskCommunicator = (TezTaskCommunicatorImpl) taListener.getTaskCommunicator(0);
+      taskCommunicatorManager = (TaskCommunicatorManager) getTaskCommunicatorManager();
+      taskCommunicator = (TezTaskCommunicatorImpl) taskCommunicatorManager.getTaskCommunicator(0);
       eventHandlingThread = new Thread(this);
       eventHandlingThread.start();
       ExecutorService rawExecutor = Executors.newFixedThreadPool(handlerConcurrency,
@@ -256,7 +256,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
     }
     
     public void preemptContainer(ContainerData cData) {
-      getTaskSchedulerEventHandler().containerCompleted(0, null,
+      getTaskSchedulerManager().containerCompleted(0, null,
           ContainerStatus.newInstance(cData.cId, null, "Preempted", ContainerExitStatus.PREEMPTED));
       cData.clear();
     }
@@ -495,7 +495,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
       throw new TezUncheckedException(e);
     }
     containerLauncherContext =
-        new ContainerLauncherContextImpl(getContext(), getTaskAttemptListener(), userPayload);
+        new ContainerLauncherContextImpl(getContext(), getTaskCommunicatorManager(), userPayload);
     containerLauncher = new MockContainerLauncher(launcherGoFlag, containerLauncherContext);
     shutdownHandler = new MockDAGAppMasterShutdownHandler();
     this.initFailFlag = initFailFlag;
@@ -507,10 +507,11 @@ public class MockDAGAppMaster extends DAGAppMaster {
 
   // use mock container launcher for tests
   @Override
-  protected ContainerLauncherRouter createContainerLauncherRouter(List<NamedEntityDescriptor> containerLauncherDescirptors,
-                                                                  boolean isLocal)
+  protected ContainerLauncherManager createContainerLauncherManager(
+      List<NamedEntityDescriptor> containerLauncherDescirptors,
+      boolean isLocal)
       throws UnknownHostException {
-    return new ContainerLauncherRouter(containerLauncher, getContext());
+    return new ContainerLauncherManager(containerLauncher, getContext());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
index 7584b4c..b0bc571 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
@@ -200,7 +200,7 @@ public class TestMockDAGAppMaster {
     mockLauncher.waitTillContainersLaunched();
     ContainerData cData = mockLauncher.getContainers().values().iterator().next();
     DAGImpl dagImpl = (DAGImpl) mockApp.getContext().getCurrentDAG();
-    mockApp.getTaskSchedulerEventHandler().preemptContainer(0, cData.cId);
+    mockApp.getTaskSchedulerManager().preemptContainer(0, cData.cId);
     
     mockLauncher.startScheduling(true);
     dagClient.waitForCompletion();