You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/21 03:36:25 UTC

[17/50] [abbrv] tez git commit: TEZ-2005. Define basic interface for pluggable TaskScheduler. (sseth)

http://git-wip-us.apache.org/repos/asf/tez/blob/b6582f06/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
index d4cf317..1e76dc9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
@@ -37,6 +37,10 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.tez.serviceplugins.api.TaskScheduler;
+import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
+import org.apache.tez.serviceplugins.api.TaskSchedulerContext.AMState;
+import org.apache.tez.serviceplugins.api.TaskSchedulerContext.AppFinalStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.commons.math3.random.RandomDataGenerator;
@@ -59,10 +63,7 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.DAGAppMasterState;
-import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback.AppFinalStatus;
-import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher;
+import org.apache.tez.common.ContainerSignatureMatcher;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -80,17 +81,14 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
       eventHandler.handle(new AMNodeEventNodeCountUpdated(clusterNmCount));
     }
  */
-public class YarnTaskSchedulerService extends TaskSchedulerService
+public class YarnTaskSchedulerService extends TaskScheduler
                              implements AMRMClientAsync.CallbackHandler {
   private static final Logger LOG = LoggerFactory.getLogger(YarnTaskSchedulerService.class);
 
 
 
   final TezAMRMClientAsync<CookieContainerRequest> amRmClient;
-  final TaskSchedulerAppCallback realAppClient;
-  final TaskSchedulerAppCallback appClientDelegate;
   final ContainerSignatureMatcher containerSignatureMatcher;
-  ExecutorService appCallbackExecutor;
 
   // Container Re-Use configuration
   private boolean shouldReuseContainers;
@@ -131,7 +129,6 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
   final String appHostName;
   final int appHostPort;
   final String appTrackingUrl;
-  final AppContext appContext;
   private AtomicBoolean hasUnregistered = new AtomicBoolean(false);
 
   AtomicBoolean isStopped = new AtomicBoolean(false);
@@ -150,6 +147,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
   Set<ContainerId> sessionMinHeldContainers = Sets.newHashSet();
   
   RandomDataGenerator random = new RandomDataGenerator();
+  private final Configuration conf;
 
   @VisibleForTesting
   protected AtomicBoolean shouldUnregister = new AtomicBoolean(false);
@@ -213,51 +211,29 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
     }
   }
 
-  public YarnTaskSchedulerService(TaskSchedulerAppCallback appClient,
-                        ContainerSignatureMatcher containerSignatureMatcher,
-                        String appHostName,
-                        int appHostPort,
-                        String appTrackingUrl,
-                        AppContext appContext) {
-    super(YarnTaskSchedulerService.class.getName());
-    this.realAppClient = appClient;
-    this.appCallbackExecutor = createAppCallbackExecutorService();
-    this.containerSignatureMatcher = containerSignatureMatcher;
-    this.appClientDelegate = createAppCallbackDelegate(appClient);
+  public YarnTaskSchedulerService(TaskSchedulerContext taskSchedulerContext) {
+    super(taskSchedulerContext);
+    this.containerSignatureMatcher = taskSchedulerContext.getContainerSignatureMatcher();
     this.amRmClient = TezAMRMClientAsync.createAMRMClientAsync(1000, this);
-    this.appHostName = appHostName;
-    this.appHostPort = appHostPort;
-    this.appTrackingUrl = appTrackingUrl;
-    this.appContext = appContext;
+    this.appHostName = taskSchedulerContext.getAppHostName();
+    this.appHostPort = taskSchedulerContext.getAppClientPort();
+    this.appTrackingUrl = taskSchedulerContext.getAppTrackingUrl();
+    this.conf = taskSchedulerContext.getInitialConfiguration();
   }
 
   @Private
   @VisibleForTesting
-  YarnTaskSchedulerService(TaskSchedulerAppCallback appClient,
-      ContainerSignatureMatcher containerSignatureMatcher,
-      String appHostName,
-      int appHostPort,
-      String appTrackingUrl,
-      TezAMRMClientAsync<CookieContainerRequest> client,
-      AppContext appContext) {
-    super(YarnTaskSchedulerService.class.getName());
-    this.realAppClient = appClient;
-    this.appCallbackExecutor = createAppCallbackExecutorService();
-    this.containerSignatureMatcher = containerSignatureMatcher;
-    this.appClientDelegate = createAppCallbackDelegate(appClient);
+  YarnTaskSchedulerService(TaskSchedulerContext taskSchedulerContext,
+      TezAMRMClientAsync<CookieContainerRequest> client) {
+    super(taskSchedulerContext);
+    this.containerSignatureMatcher = taskSchedulerContext.getContainerSignatureMatcher();
     this.amRmClient = client;
-    this.appHostName = appHostName;
-    this.appHostPort = appHostPort;
-    this.appTrackingUrl = appTrackingUrl;
-    this.appContext = appContext;
+    this.appHostName = taskSchedulerContext.getAppHostName();
+    this.appHostPort = taskSchedulerContext.getAppClientPort();
+    this.appTrackingUrl = taskSchedulerContext.getAppTrackingUrl();
+    this.conf = taskSchedulerContext.getInitialConfiguration();
   }
 
-  @VisibleForTesting
-  ExecutorService createAppCallbackExecutorService() {
-    return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
-        .setNameFormat("TaskSchedulerAppCaller #%d").setDaemon(true).build());
-  }
-  
   @Override
   public Resource getAvailableResources() {
     return amRmClient.getAvailableResources();
@@ -269,12 +245,6 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
     return amRmClient.getClusterNodeCount();
   }
 
-  TaskSchedulerAppCallback createAppCallbackDelegate(
-      TaskSchedulerAppCallback realAppClient) {
-    return new TaskSchedulerAppCallbackWrapper(realAppClient,
-        appCallbackExecutor);
-  }
-
   @Override
   public void setShouldUnregister() {
     this.shouldUnregister.set(true);
@@ -287,8 +257,9 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
 
   // AbstractService methods
   @Override
-  public synchronized void serviceInit(Configuration conf) {
+  public synchronized void initialize() {
 
+    // TODO Post TEZ-2003. Make all of these final fields.
     amRmClient.init(conf);
     int heartbeatIntervalMax = conf.getInt(
         TezConfiguration.TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX,
@@ -361,7 +332,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
   }
 
   @Override
-  public void serviceStart() {
+  public void start() {
     try {
       RegisterApplicationMasterResponse response;
       synchronized (this) {
@@ -371,7 +342,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
                                                         appTrackingUrl);
       }
       // upcall to app outside locks
-      appClientDelegate.setApplicationRegistrationData(
+      getContext().setApplicationRegistrationData(
           response.getMaximumResourceCapability(),
           response.getApplicationACLs(),
           response.getClientToAMTokenMasterKey());
@@ -387,7 +358,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
   }
 
   @Override
-  public void serviceStop() throws InterruptedException {
+  public void shutdown() throws InterruptedException {
     // upcall to app outside of locks
     try {
       delayedContainerManager.shutdown();
@@ -396,7 +367,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
       synchronized (this) {
         isStopped.set(true);
         if (shouldUnregister.get()) {
-          AppFinalStatus status = appClientDelegate.getFinalAppStatus();
+          AppFinalStatus status = getContext().getFinalAppStatus();
           LOG.info("Unregistering application from RM"
               + ", exitStatus=" + status.exitStatus
               + ", exitMessage=" + status.exitMessage
@@ -413,8 +384,6 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
       // operation and at the same time the callback operation might be trying
       // to get our lock.
       amRmClient.stop();
-      appCallbackExecutor.shutdown();
-      appCallbackExecutor.awaitTermination(1000l, TimeUnit.MILLISECONDS);
     } catch (YarnException e) {
       LOG.error("Yarn Exception while unregistering ", e);
       throw new TezUncheckedException(e);
@@ -478,7 +447,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
 
     // upcall to app must be outside locks
     for (Entry<Object, ContainerStatus> entry : appContainerStatus.entrySet()) {
-      appClientDelegate.containerCompleted(entry.getKey(), entry.getValue());
+      getContext().containerCompleted(entry.getKey(), entry.getValue());
     }
   }
 
@@ -528,7 +497,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
   private synchronized Map<CookieContainerRequest, Container>
       assignNewlyAllocatedContainers(Iterable<Container> containers) {
 
-    boolean amInCompletionState = appContext.isAMInCompletionState();
+    boolean amInCompletionState = (getContext().getAMState() == AMState.COMPLETED);
     Map<CookieContainerRequest, Container> assignedContainers =
         new HashMap<CookieContainerRequest, Container>();
 
@@ -550,7 +519,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
   private synchronized Map<CookieContainerRequest, Container>
       tryAssignReUsedContainers(Iterable<Container> containers) {
 
-    boolean amInCompletionState = appContext.isAMInCompletionState();
+    boolean amInCompletionState = (getContext().getAMState() == AMState.COMPLETED);
     Map<CookieContainerRequest, Container> assignedContainers =
       new HashMap<CookieContainerRequest, Container>();
 
@@ -590,7 +559,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
   private synchronized Map<CookieContainerRequest, Container>
       assignDelayedContainer(HeldContainer heldContainer) {
 
-    DAGAppMasterState state = appContext.getAMState();
+    AMState state = getContext().getAMState();
 
     boolean isNew = heldContainer.isNew();
     if (LOG.isDebugEnabled()) {
@@ -606,13 +575,13 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
         + ", isNew=" + isNew);
     }
 
-    if (state.equals(DAGAppMasterState.IDLE) || taskRequests.isEmpty()) {
+    if (state.equals(AMState.IDLE) || taskRequests.isEmpty()) {
       // reset locality level on held container
       // if sessionDelay defined, push back into delayed queue if not already
       // done so
 
       // Compute min held containers.
-      if (appContext.isSession() && sessionNumMinHeldContainers > 0 &&
+      if (getContext().isSession() && sessionNumMinHeldContainers > 0 &&
           sessionMinHeldContainers.isEmpty()) {
         // session mode and need to hold onto containers and not done so already
         determineMinHeldContainers();
@@ -626,7 +595,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
           && idleContainerTimeoutMin != -1)) {
         // container idle timeout has expired or is a new unused container. 
         // new container is possibly a spurious race condition allocation.
-        if (appContext.isSession()
+        if (getContext().isSession()
             && sessionMinHeldContainers.contains(heldContainer.getContainer().getId())) {
           // There are no outstanding requests. So its safe to hold new containers.
           // We may have received more containers than necessary and some are unused
@@ -667,7 +636,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
             heldContainer.getContainer(), currentTime
                 + localitySchedulingDelay);        
       }
-    } else if (state.equals(DAGAppMasterState.RUNNING)) {
+    } else if (state.equals(AMState.RUNNING_APP)) {
       // clear min held containers since we need to allocate to tasks
       if (!sessionMinHeldContainers.isEmpty()) {
         // update the expire time of min held containers so that they are
@@ -806,12 +775,12 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
             // Are there any pending requests at any priority?
             // release if there are tasks or this is not a session
             if (safeToRelease && 
-                (!taskRequests.isEmpty() || !appContext.isSession())) {
+                (!taskRequests.isEmpty() || !getContext().isSession())) {
               LOG.info("Releasing held container as either there are pending but "
                 + " unmatched requests or this is not a session"
                 + ", containerId=" + heldContainer.container.getId()
                 + ", pendingTasks=" + taskRequests.size()
-                + ", isSession=" + appContext.isSession()
+                + ", isSession=" + getContext().isSession()
                 + ". isNew=" + isNew);
               releaseUnassignedContainers(
                 Lists.newArrayList(heldContainer.container));
@@ -862,7 +831,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
       return;
     }
     // upcall to app must be outside locks
-    appClientDelegate.appShutdownRequested();
+    getContext().appShutdownRequested();
   }
 
   @Override
@@ -872,7 +841,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
     }
     // ignore bad nodes for now
     // upcall to app must be outside locks
-    appClientDelegate.nodesUpdated(updatedNodes);
+    getContext().nodesUpdated(updatedNodes);
   }
 
   @Override
@@ -894,7 +863,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
     numHeartbeats++;
     preemptIfNeeded();
 
-    return appClientDelegate.getProgress();
+    return getContext().getProgress();
   }
 
   @Override
@@ -902,7 +871,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
     if (isStopped.get()) {
       return;
     }
-    appClientDelegate.onError(t);
+    getContext().onError(t);
   }
 
   @Override
@@ -1289,7 +1258,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
         ContainerId cId = preemptedContainers[i];
         if (cId != null) {
           LOG.info("Preempting container: " + cId + " currently allocated to a task.");
-          appClientDelegate.preemptContainer(cId);
+          getContext().preemptContainer(cId);
         }
       }
     }
@@ -1422,7 +1391,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
     Object assignedTask = containerAssignments.remove(containerId);
     if (assignedTask != null) {
       // A task was assigned to this container at some point. Inform the app.
-      appClientDelegate.containerBeingReleased(containerId);
+      getContext().containerBeingReleased(containerId);
     }
     HeldContainer delayedContainer = heldContainers.remove(containerId);
     if (delayedContainer != null) {
@@ -1626,7 +1595,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
 
   private void informAppAboutAssignment(CookieContainerRequest assigned,
       Container container) {
-    appClientDelegate.taskAllocated(getTask(assigned),
+    getContext().taskAllocated(getTask(assigned),
         assigned.getCookie().getAppCookie(), container);
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/b6582f06/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index 5cff766..aeacf84 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -29,6 +29,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 
 import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.serviceplugins.api.ContainerEndReason;
+import org.apache.tez.common.ContainerSignatureMatcher;
 import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/tez/blob/b6582f06/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
index 938096d..fcb9eaf 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
@@ -22,6 +22,7 @@ import java.util.Collection;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.common.ContainerSignatureMatcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.service.AbstractService;

http://git-wip-us.apache.org/repos/asf/tez/blob/b6582f06/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/ContainerContextMatcher.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/ContainerContextMatcher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/ContainerContextMatcher.java
index 211c537..436f098 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/ContainerContextMatcher.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/ContainerContextMatcher.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.tez.dag.app.ContainerContext;
 
 import com.google.common.base.Preconditions;
+import org.apache.tez.common.ContainerSignatureMatcher;
 
 public class ContainerContextMatcher implements ContainerSignatureMatcher {
 

http://git-wip-us.apache.org/repos/asf/tez/blob/b6582f06/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/ContainerSignatureMatcher.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/ContainerSignatureMatcher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/ContainerSignatureMatcher.java
deleted file mode 100644
index 0f9c2d6..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/ContainerSignatureMatcher.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.app.rm.container;
-
-import java.util.Map;
-
-import org.apache.hadoop.yarn.api.records.LocalResource;
-
-public interface ContainerSignatureMatcher {
-  /**
-   * Checks the compatibility between the specified container signatures.
-   *
-   * @return true if the first signature is a super set of the second
-   *         signature.
-   */
-  public boolean isSuperSet(Object cs1, Object cs2);
-  
-  /**
-   * Checks if the container signatures match exactly
-   * @return true if exact match
-   */
-  public boolean isExactMatch(Object cs1, Object cs2);
-  
-  /**
-   * Gets additional resources specified in lr2, which are not present for lr1
-   * 
-   * @param lr1
-   * @param lr2
-   * @return additional resources specified in lr2, which are not present for lr1
-   */
-  public Map<String, LocalResource> getAdditionalResources(Map<String, LocalResource> lr1,
-      Map<String, LocalResource> lr2);
-
-
-  /**
-   * Do a union of 2 signatures
-   * Pre-condition. This function should only be invoked iff cs1 is compatible with cs2.
-   * i.e. isSuperSet should not return false.
-   * @param cs1 Signature 1 Original signature
-   * @param cs2 Signature 2 New signature
-   * @return Union of 2 signatures
-   */
-  public Object union(Object cs1, Object cs2);
-
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/b6582f06/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
index e37ab4a..88f6066 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
@@ -22,7 +22,6 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
-import static org.mockito.Matchers.isNull;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
@@ -46,7 +45,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
@@ -68,16 +66,14 @@ import org.apache.tez.dag.app.ContainerHeartbeatHandler;
 import org.apache.tez.dag.app.DAGAppMasterState;
 import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.dag.TaskAttempt;
-import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback;
-import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback.AppFinalStatus;
 import org.apache.tez.dag.app.rm.YarnTaskSchedulerService.CookieContainerRequest;
 import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.AMRMClientAsyncForTest;
 import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.AMRMClientForTest;
 import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.AlwaysMatchesContainerMatcher;
 import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.CapturingEventHandler;
-import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerAppCallbackDrainable;
+import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerContextDrainable;
 import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerEventHandlerForTest;
-import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerWithDrainableAppCallback;
+import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerWithDrainableContext;
 import org.apache.tez.dag.app.rm.container.AMContainerEventAssignTA;
 import org.apache.tez.dag.app.rm.container.AMContainerEventStopRequest;
 import org.apache.tez.dag.app.rm.container.AMContainerMap;
@@ -116,14 +112,13 @@ public class TestContainerReuse {
     conf.setBoolean(
       TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, true);
     conf.setBoolean(
-      TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED, false);
+        TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED, false);
     conf.setBoolean(
-      TezConfiguration.TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED, false);
+        TezConfiguration.TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED, false);
     conf.setLong(
       TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 3000l);
     conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 0);
     conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 0);
-    TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
 
     CapturingEventHandler eventHandler = new CapturingEventHandler();
     TezDAGID dagID = TezDAGID.getInstance("0", 0, 0);
@@ -132,12 +127,6 @@ public class TestContainerReuse {
     AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest();
     TezAMRMClientAsync<CookieContainerRequest> rmClient =
       spy(new AMRMClientAsyncForTest(rmClientCore, 100));
-    String appUrl = "url";
-    String appMsg = "success";
-    AppFinalStatus finalStatus =
-        new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
-
-    doReturn(finalStatus).when(mockApp).getFinalAppStatus();
 
     AppContext appContext = mock(AppContext.class);
     doReturn(conf).when(appContext).getAMConf();
@@ -161,11 +150,11 @@ public class TestContainerReuse {
     taskSchedulerEventHandler.init(conf);
     taskSchedulerEventHandler.start();
 
-    TaskSchedulerWithDrainableAppCallback taskScheduler =
-      (TaskSchedulerWithDrainableAppCallback)
+    TaskSchedulerWithDrainableContext taskScheduler =
+      (TaskSchedulerWithDrainableContext)
         ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler)
         .getSpyTaskScheduler();
-    TaskSchedulerAppCallbackDrainable drainableAppCallback =
+    TaskSchedulerContextDrainable drainableAppCallback =
       taskScheduler.getDrainableAppCallback();
 
     AtomicBoolean drainNotifier = new AtomicBoolean(false);
@@ -251,8 +240,7 @@ public class TestContainerReuse {
       }
     }
     assertTrue("containerHost2 was not released", exception == null);
-    taskScheduler.stop();
-    taskScheduler.close();
+    taskScheduler.shutdown();
     taskSchedulerEventHandler.close();
   }
 
@@ -267,7 +255,6 @@ public class TestContainerReuse {
     conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 1000l);
     conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 0);
     conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 0);
-    TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
 
     CapturingEventHandler eventHandler = new CapturingEventHandler();
     TezDAGID dagID = TezDAGID.getInstance("0", 0, 0);
@@ -276,12 +263,6 @@ public class TestContainerReuse {
     AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest();
     TezAMRMClientAsync<CookieContainerRequest> rmClient =
       spy(new AMRMClientAsyncForTest(rmClientCore, 100));
-    String appUrl = "url";
-    String appMsg = "success";
-    AppFinalStatus finalStatus =
-        new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
-
-    doReturn(finalStatus).when(mockApp).getFinalAppStatus();
 
     AppContext appContext = mock(AppContext.class);
     doReturn(new Configuration(false)).when(appContext).getAMConf();
@@ -304,11 +285,11 @@ public class TestContainerReuse {
     taskSchedulerEventHandler.init(conf);
     taskSchedulerEventHandler.start();
 
-    TaskSchedulerWithDrainableAppCallback taskScheduler =
-      (TaskSchedulerWithDrainableAppCallback)
+    TaskSchedulerWithDrainableContext taskScheduler =
+      (TaskSchedulerWithDrainableContext)
         ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler)
           .getSpyTaskScheduler();
-    TaskSchedulerAppCallbackDrainable drainableAppCallback =
+    TaskSchedulerContextDrainable drainableAppCallback =
       taskScheduler.getDrainableAppCallback();
     
     AtomicBoolean drainNotifier = new AtomicBoolean(false);
@@ -366,8 +347,7 @@ public class TestContainerReuse {
       eq(containerHost2.getId()));
     eventHandler.verifyInvocation(AMContainerEventStopRequest.class);
 
-    taskScheduler.stop();
-    taskScheduler.close();
+    taskScheduler.shutdown();
     taskSchedulerEventHandler.close();
   }
 
@@ -380,19 +360,12 @@ public class TestContainerReuse {
     tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 0);
     tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 0);
     tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 0);
-    TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
 
     CapturingEventHandler eventHandler = new CapturingEventHandler();
     TezDAGID dagID = TezDAGID.getInstance("0", 0, 0);
 
     AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest();
     TezAMRMClientAsync<CookieContainerRequest> rmClient = spy(new AMRMClientAsyncForTest(rmClientCore, 100));
-    String appUrl = "url";
-    String appMsg = "success";
-    AppFinalStatus finalStatus =
-        new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
-
-    doReturn(finalStatus).when(mockApp).getFinalAppStatus();
 
     AppContext appContext = mock(AppContext.class);
     doReturn(new Configuration(false)).when(appContext).getAMConf();
@@ -410,9 +383,9 @@ public class TestContainerReuse {
     taskSchedulerEventHandler.init(tezConf);
     taskSchedulerEventHandler.start();
 
-    TaskSchedulerWithDrainableAppCallback taskScheduler = (TaskSchedulerWithDrainableAppCallback) ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler)
+    TaskSchedulerWithDrainableContext taskScheduler = (TaskSchedulerWithDrainableContext) ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler)
         .getSpyTaskScheduler();
-    TaskSchedulerAppCallbackDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
+    TaskSchedulerContextDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
     AtomicBoolean drainNotifier = new AtomicBoolean(false);
     taskScheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier;
 
@@ -504,7 +477,7 @@ public class TestContainerReuse {
     eventHandler.verifyInvocation(AMContainerEventStopRequest.class);
     eventHandler.reset();
 
-    taskScheduler.close();
+    taskScheduler.shutdown();
     taskSchedulerEventHandler.close();
   }
 
@@ -522,19 +495,11 @@ public class TestContainerReuse {
     tezConf.set(TezConfiguration.TEZ_TASK_SPECIFIC_LAUNCH_CMD_OPTS, "dir=/tmp/__VERTEX_NAME__/__TASK_INDEX__");
     TaskSpecificLaunchCmdOption taskSpecificLaunchCmdOption =  new TaskSpecificLaunchCmdOption(tezConf);
 
-    TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
-
     CapturingEventHandler eventHandler = new CapturingEventHandler();
     TezDAGID dagID = TezDAGID.getInstance("0", 0, 0);
 
     AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest();
     TezAMRMClientAsync<CookieContainerRequest> rmClient = spy(new AMRMClientAsyncForTest(rmClientCore, 100));
-    String appUrl = "url";
-    String appMsg = "success";
-    AppFinalStatus finalStatus =
-        new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
-
-    doReturn(finalStatus).when(mockApp).getFinalAppStatus();
 
     AppContext appContext = mock(AppContext.class);
     doReturn(new Configuration(false)).when(appContext).getAMConf();
@@ -554,10 +519,10 @@ public class TestContainerReuse {
     taskSchedulerEventHandler.init(tezConf);
     taskSchedulerEventHandler.start();
 
-    TaskSchedulerWithDrainableAppCallback taskScheduler =
-        (TaskSchedulerWithDrainableAppCallback) ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler)
+    TaskSchedulerWithDrainableContext taskScheduler =
+        (TaskSchedulerWithDrainableContext) ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler)
           .getSpyTaskScheduler();
-    TaskSchedulerAppCallbackDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
+    TaskSchedulerContextDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
     AtomicBoolean drainNotifier = new AtomicBoolean(false);
     taskScheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier;
 
@@ -705,7 +670,7 @@ public class TestContainerReuse {
     verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta16), any(Object.class), eq(container3));
     eventHandler.reset();
 
-    taskScheduler.close();
+    taskScheduler.shutdown();
     taskSchedulerEventHandler.close();
   }
 
@@ -721,20 +686,12 @@ public class TestContainerReuse {
     tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 1000l);
     tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 1000l);
 
-    TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
-
     CapturingEventHandler eventHandler = new CapturingEventHandler();
     TezDAGID dagID = TezDAGID.getInstance("0", 0, 0);
 
     AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest();
     TezAMRMClientAsync<CookieContainerRequest> rmClient =
         spy(new AMRMClientAsyncForTest(rmClientCore, 100));
-    String appUrl = "url";
-    String appMsg = "success";
-    AppFinalStatus finalStatus =
-        new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
-
-    doReturn(finalStatus).when(mockApp).getFinalAppStatus();
 
     AppContext appContext = mock(AppContext.class);
     doReturn(new Configuration(false)).when(appContext).getAMConf();
@@ -758,11 +715,11 @@ public class TestContainerReuse {
     taskSchedulerEventHandler.init(tezConf);
     taskSchedulerEventHandler.start();
 
-    TaskSchedulerWithDrainableAppCallback taskScheduler =
-      (TaskSchedulerWithDrainableAppCallback)
+    TaskSchedulerWithDrainableContext taskScheduler =
+      (TaskSchedulerWithDrainableContext)
         ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler)
         .getSpyTaskScheduler();
-    TaskSchedulerAppCallbackDrainable drainableAppCallback =
+    TaskSchedulerContextDrainable drainableAppCallback =
       taskScheduler.getDrainableAppCallback();
     AtomicBoolean drainNotifier = new AtomicBoolean(false);
     taskScheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier;
@@ -836,7 +793,7 @@ public class TestContainerReuse {
     verify(rmClient).releaseAssignedContainer(eq(container1.getId()));
     eventHandler.verifyInvocation(AMContainerEventStopRequest.class);
 
-    taskScheduler.close();
+    taskScheduler.shutdown();
     taskSchedulerEventHandler.close();
   }
 
@@ -853,20 +810,12 @@ public class TestContainerReuse {
     tezConf.setInt(
         TezConfiguration.TEZ_AM_SESSION_MIN_HELD_CONTAINERS, 1);
 
-    TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
-
     CapturingEventHandler eventHandler = new CapturingEventHandler();
     TezDAGID dagID = TezDAGID.getInstance("0", 0, 0);
 
     AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest();
     TezAMRMClientAsync<CookieContainerRequest> rmClient =
       spy(new AMRMClientAsyncForTest(rmClientCore, 100));
-    String appUrl = "url";
-    String appMsg = "success";
-    AppFinalStatus finalStatus =
-        new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
-
-    doReturn(finalStatus).when(mockApp).getFinalAppStatus();
 
     AppContext appContext = mock(AppContext.class);
     doReturn(new Configuration(false)).when(appContext).getAMConf();
@@ -890,11 +839,11 @@ public class TestContainerReuse {
     taskSchedulerEventHandler.init(tezConf);
     taskSchedulerEventHandler.start();
 
-    TaskSchedulerWithDrainableAppCallback taskScheduler =
-      (TaskSchedulerWithDrainableAppCallback)
+    TaskSchedulerWithDrainableContext taskScheduler =
+      (TaskSchedulerWithDrainableContext)
         ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler)
           .getSpyTaskScheduler();
-    TaskSchedulerAppCallbackDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
+    TaskSchedulerContextDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
 
     AtomicBoolean drainNotifier = new AtomicBoolean(false);
     taskScheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier;
@@ -965,7 +914,7 @@ public class TestContainerReuse {
     // container should not get released due to min held containers
     verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
 
-    taskScheduler.close();
+    taskScheduler.shutdown();
     taskSchedulerEventHandler.close();
   }
   
@@ -979,19 +928,11 @@ public class TestContainerReuse {
     tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 0);
     tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, -1);
 
-    TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
-
     CapturingEventHandler eventHandler = new CapturingEventHandler();
     TezDAGID dagID1 = TezDAGID.getInstance("0", 1, 0);
 
     AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest();
     TezAMRMClientAsync<CookieContainerRequest> rmClient = spy(new AMRMClientAsyncForTest(rmClientCore, 100));
-    String appUrl = "url";
-    String appMsg = "success";
-    AppFinalStatus finalStatus =
-        new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
-
-    doReturn(finalStatus).when(mockApp).getFinalAppStatus();
 
     AppContext appContext = mock(AppContext.class);
     doReturn(new Configuration(false)).when(appContext).getAMConf();
@@ -1011,9 +952,9 @@ public class TestContainerReuse {
     taskSchedulerEventHandler.init(tezConf);
     taskSchedulerEventHandler.start();
 
-    TaskSchedulerWithDrainableAppCallback taskScheduler = (TaskSchedulerWithDrainableAppCallback) ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler)
+    TaskSchedulerWithDrainableContext taskScheduler = (TaskSchedulerWithDrainableContext) ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler)
         .getSpyTaskScheduler();
-    TaskSchedulerAppCallbackDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
+    TaskSchedulerContextDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
     AtomicBoolean drainNotifier = new AtomicBoolean(false);
     taskScheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier;
 
@@ -1129,7 +1070,7 @@ public class TestContainerReuse {
     assertEquals(2, assignEvent.getRemoteTaskLocalResources().size());
     eventHandler.reset();
 
-    taskScheduler.close();
+    taskScheduler.shutdown();
     taskSchedulerEventHandler.close();
   }
 
@@ -1143,19 +1084,11 @@ public class TestContainerReuse {
     tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 0);
     tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, -1);
 
-    TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
-
     CapturingEventHandler eventHandler = new CapturingEventHandler();
     TezDAGID dagID1 = TezDAGID.getInstance("0", 1, 0);
 
     AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest();
     TezAMRMClientAsync<CookieContainerRequest> rmClient = spy(new AMRMClientAsyncForTest(rmClientCore, 100));
-    String appUrl = "url";
-    String appMsg = "success";
-    AppFinalStatus finalStatus =
-        new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
-
-    doReturn(finalStatus).when(mockApp).getFinalAppStatus();
 
     AppContext appContext = mock(AppContext.class);
     doReturn(new Configuration(false)).when(appContext).getAMConf();
@@ -1177,9 +1110,9 @@ public class TestContainerReuse {
     taskSchedulerEventHandler.init(tezConf);
     taskSchedulerEventHandler.start();
 
-    TaskSchedulerWithDrainableAppCallback taskScheduler = (TaskSchedulerWithDrainableAppCallback) ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler)
+    TaskSchedulerWithDrainableContext taskScheduler = (TaskSchedulerWithDrainableContext) ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler)
         .getSpyTaskScheduler();
-    TaskSchedulerAppCallbackDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
+    TaskSchedulerContextDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
     AtomicBoolean drainNotifier = new AtomicBoolean(false);
     taskScheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier;
 
@@ -1291,7 +1224,7 @@ public class TestContainerReuse {
     verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta211), any(Object.class), eq(container2));
     eventHandler.reset();
 
-    taskScheduler.close();
+    taskScheduler.shutdown();
     taskSchedulerEventHandler.close();
   }
 
@@ -1305,19 +1238,12 @@ public class TestContainerReuse {
     tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 0);
     tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 0);
     tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 0);
-    TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
 
     CapturingEventHandler eventHandler = new CapturingEventHandler();
     TezDAGID dagID = TezDAGID.getInstance("0", 0, 0);
 
     AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest();
     TezAMRMClientAsync<CookieContainerRequest> rmClient = spy(new AMRMClientAsyncForTest(rmClientCore, 100));
-    String appUrl = "url";
-    String appMsg = "success";
-    AppFinalStatus finalStatus =
-        new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
-
-    doReturn(finalStatus).when(mockApp).getFinalAppStatus();
 
     AppContext appContext = mock(AppContext.class);
     doReturn(new Configuration(false)).when(appContext).getAMConf();
@@ -1326,7 +1252,7 @@ public class TestContainerReuse {
     AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext);
     doReturn(amContainerMap).when(appContext).getAllContainers();
     doReturn(amNodeTracker).when(appContext).getNodeTracker();
-    doReturn(DAGAppMasterState.RUNNING).when(appContext).getAMState();
+    doReturn(DAGAppMasterState.SUCCEEDED).when(appContext).getAMState();
     doReturn(true).when(appContext).isAMInCompletionState();
     doReturn(dagID).when(appContext).getCurrentDAGID();
     doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo();
@@ -1338,10 +1264,10 @@ public class TestContainerReuse {
     taskSchedulerEventHandler.init(tezConf);
     taskSchedulerEventHandler.start();
 
-    TaskSchedulerWithDrainableAppCallback taskScheduler = (TaskSchedulerWithDrainableAppCallback)
+    TaskSchedulerWithDrainableContext taskScheduler = (TaskSchedulerWithDrainableContext)
         ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler)
         .getSpyTaskScheduler();
-    TaskSchedulerAppCallbackDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
+    TaskSchedulerContextDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
     AtomicBoolean drainNotifier = new AtomicBoolean(false);
     taskScheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier;
 
@@ -1369,7 +1295,7 @@ public class TestContainerReuse {
     drainableAppCallback.drain();
     verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(0), eq(ta11),
         any(Object.class), eq(container1));
-    taskScheduler.close();
+    taskScheduler.shutdown();
     taskSchedulerEventHandler.close();
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/b6582f06/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java
index 12390b2..2ada2f1 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java
@@ -18,12 +18,12 @@
 
 package org.apache.tez.dag.app.rm;
 
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.concurrent.PriorityBlockingQueue;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -33,24 +33,12 @@ import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.Priority;
 
 import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.rm.LocalTaskSchedulerService.AsyncDelegateRequestHandler;
 import org.apache.tez.dag.app.rm.LocalTaskSchedulerService.LocalContainerFactory;
 import org.apache.tez.dag.app.rm.LocalTaskSchedulerService.TaskRequest;
-import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback;
 
 public class TestLocalTaskScheduler {
 
-  public AppContext createMockAppContext() {
-
-    ApplicationId appId = ApplicationId.newInstance(2000, 1);
-    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
-
-    AppContext appContext = mock(AppContext.class);
-    doReturn(appAttemptId).when(appContext).getApplicationAttemptId();
-
-    return appContext;
-  }
 
   @Test(timeout = 5000)
   public void maxTasksAllocationsCannotBeExceeded() {
@@ -59,17 +47,24 @@ public class TestLocalTaskScheduler {
     TezConfiguration tezConf = new TezConfiguration();
     tezConf.setInt(TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS, MAX_TASKS);
 
-    LocalContainerFactory containerFactory = new LocalContainerFactory(createMockAppContext(), 1000);
+    ApplicationId appId = ApplicationId.newInstance(2000, 1);
+    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
+
+    TaskSchedulerContext
+        mockContext = TestTaskSchedulerHelpers.setupMockTaskSchedulerContext("", 0, "", true,
+        appAttemptId, 1000l, null, new Configuration());
+
+    LocalContainerFactory containerFactory = new LocalContainerFactory(appAttemptId, 1000);
+
     HashMap<Object, Container> taskAllocations = new LinkedHashMap<Object, Container>();
     PriorityBlockingQueue<TaskRequest> taskRequestQueue = new PriorityBlockingQueue<TaskRequest>();
-    TaskSchedulerAppCallback appClientDelegate = mock(TaskSchedulerAppCallback.class);
 
     // Object under test
     AsyncDelegateRequestHandler requestHandler =
       new AsyncDelegateRequestHandler(taskRequestQueue,
           containerFactory,
           taskAllocations,
-          appClientDelegate,
+          mockContext,
           tezConf);
 
     // Allocate up to max tasks

http://git-wip-us.apache.org/repos/asf/tez/blob/b6582f06/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java
index b555c62..c637f5f 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java
@@ -27,11 +27,9 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.dag.Task;
-import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback;
 import org.apache.tez.dag.app.rm.TestLocalTaskSchedulerService.MockLocalTaskSchedulerSerivce.MockAsyncDelegateRequestHandler;
-import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher;
+import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -82,14 +80,15 @@ public class TestLocalTaskSchedulerService {
    * Normal flow of TaskAttempt
    */
   @Test(timeout = 5000)
-  public void testDeallocationBeforeAllocation() {
-    AppContext appContext = mock(AppContext.class);
+  public void testDeallocationBeforeAllocation() throws InterruptedException {
     ApplicationAttemptId appAttemptId =
         ApplicationAttemptId.newInstance(ApplicationId.newInstance(10000l, 1), 1);
-    doReturn(appAttemptId).when(appContext).getApplicationAttemptId();
-    MockLocalTaskSchedulerSerivce taskSchedulerService = new MockLocalTaskSchedulerSerivce
-        (mock(TaskSchedulerAppCallback.class), mock(ContainerSignatureMatcher.class), "", 0, "", appContext);
-    taskSchedulerService.init(new Configuration());
+
+    TaskSchedulerContext mockContext = TestTaskSchedulerHelpers
+        .setupMockTaskSchedulerContext("", 0, "", false, appAttemptId, 10000l, null, new Configuration());
+
+    MockLocalTaskSchedulerSerivce taskSchedulerService = new MockLocalTaskSchedulerSerivce(mockContext);
+    taskSchedulerService.initialize();
     taskSchedulerService.start();
 
     Task task = mock(Task.class);
@@ -103,21 +102,24 @@ public class TestLocalTaskSchedulerService {
     assertEquals(1, requestHandler.deallocateCount);
     // The corresponding AllocateTaskRequest will be removed, so won't been processed.
     assertEquals(0, requestHandler.allocateCount);
-    taskSchedulerService.stop();
+    taskSchedulerService.shutdown();
   }
 
   /**
    * TaskAttempt Killed from START_WAIT
    */
   @Test(timeout = 5000)
-  public void testDeallocationAfterAllocation() {
-    AppContext appContext = mock(AppContext.class);
+  public void testDeallocationAfterAllocation() throws InterruptedException {
     ApplicationAttemptId appAttemptId =
         ApplicationAttemptId.newInstance(ApplicationId.newInstance(10000l, 1), 1);
-    doReturn(appAttemptId).when(appContext).getApplicationAttemptId();
-    MockLocalTaskSchedulerSerivce taskSchedulerService = new MockLocalTaskSchedulerSerivce
-        (mock(TaskSchedulerAppCallback.class), mock(ContainerSignatureMatcher.class), "", 0, "", appContext);
-    taskSchedulerService.init(new Configuration());
+
+    TaskSchedulerContext mockContext = TestTaskSchedulerHelpers
+        .setupMockTaskSchedulerContext("", 0, "", false, appAttemptId, 10000l, null, new Configuration());
+
+    MockLocalTaskSchedulerSerivce taskSchedulerService =
+        new MockLocalTaskSchedulerSerivce(mockContext);
+
+    taskSchedulerService.initialize();
     taskSchedulerService.start();
 
     Task task = mock(Task.class);
@@ -130,33 +132,29 @@ public class TestLocalTaskSchedulerService {
     requestHandler.drainRequest(2);
     assertEquals(1, requestHandler.deallocateCount);
     assertEquals(1, requestHandler.allocateCount);
-    taskSchedulerService.stop();
+    taskSchedulerService.shutdown();
   }
 
   static class MockLocalTaskSchedulerSerivce extends LocalTaskSchedulerService {
 
     private MockAsyncDelegateRequestHandler requestHandler;
 
-    public MockLocalTaskSchedulerSerivce(TaskSchedulerAppCallback appClient,
-        ContainerSignatureMatcher containerSignatureMatcher,
-        String appHostName, int appHostPort, String appTrackingUrl,
-        AppContext appContext) {
-      super(appClient, containerSignatureMatcher, appHostName, appHostPort,
-          appTrackingUrl, 10000l, appContext);
+    public MockLocalTaskSchedulerSerivce(TaskSchedulerContext appClient) {
+      super(appClient);
     }
 
     @Override
     public AsyncDelegateRequestHandler createRequestHandler(Configuration conf) {
       requestHandler = new MockAsyncDelegateRequestHandler(taskRequestQueue,
-          new LocalContainerFactory(appContext, customContainerAppId),
+          new LocalContainerFactory(getContext().getApplicationAttemptId(), customContainerAppId),
           taskAllocations,
-          appClientDelegate,
+          getContext(),
           conf);
       return requestHandler;
     }
 
     @Override
-    public void serviceStart() {
+    public void start() {
       // don't start RequestHandler thread, control it in unit test
     }
 
@@ -178,7 +176,7 @@ public class TestLocalTaskSchedulerService {
           BlockingQueue<TaskRequest> taskRequestQueue,
           LocalContainerFactory localContainerFactory,
           HashMap<Object, Container> taskAllocations,
-          TaskSchedulerAppCallback appClientDelegate, Configuration conf) {
+          TaskSchedulerContext appClientDelegate, Configuration conf) {
         super(taskRequestQueue, localContainerFactory, taskAllocations,
             appClientDelegate, conf);
       }

http://git-wip-us.apache.org/repos/asf/tez/blob/b6582f06/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
index 807e772..123a4d7 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
@@ -18,6 +18,8 @@
 
 package org.apache.tez.dag.app.rm;
 
+import static org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.createCountingExecutingService;
+import static org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.setupMockTaskSchedulerContext;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -42,8 +44,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@@ -59,23 +64,21 @@ import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.util.RackResolver;
+import org.apache.tez.common.ContainerSignatureMatcher;
 import org.apache.tez.common.MockDNSToSwitchMapping;
 import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.DAGAppMasterState;
 import org.apache.tez.dag.app.rm.YarnTaskSchedulerService.CookieContainerRequest;
 import org.apache.tez.dag.app.rm.YarnTaskSchedulerService.HeldContainer;
-import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback;
-import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback.AppFinalStatus;
-import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerAppCallbackDrainable;
-import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerWithDrainableAppCallback;
+import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerContextDrainable;
+import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerWithDrainableContext;
 import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.AlwaysMatchesContainerMatcher;
 import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.PreemptionMatcher;
-import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher;
+import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
+import org.apache.tez.serviceplugins.api.TaskSchedulerContext.AppFinalStatus;
+import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
@@ -88,23 +91,39 @@ import com.google.common.collect.Sets;
 
 public class TestTaskScheduler {
 
-  RecordFactory recordFactory =
-      RecordFactoryProvider.getRecordFactory(null);
-
   static ContainerSignatureMatcher containerSignatureMatcher = new AlwaysMatchesContainerMatcher();
+  private ExecutorService contextCallbackExecutor;
 
   @BeforeClass
   public static void beforeClass() {
     MockDNSToSwitchMapping.initializeMockRackResolver();
   }
 
+  @Before
+  public void preTest() {
+    contextCallbackExecutor = Executors.newSingleThreadExecutor(
+        new ThreadFactoryBuilder().setNameFormat("TaskSchedulerAppCallbackExecutor #%d")
+            .setDaemon(true)
+            .build());
+  }
+
+  @After
+  public void postTest() {
+    contextCallbackExecutor.shutdownNow();
+  }
+
+  private TaskSchedulerContextDrainable createDrainableContext(
+      TaskSchedulerContext taskSchedulerContext) {
+    TaskSchedulerContextImplWrapper wrapper =
+        new TaskSchedulerContextImplWrapper(taskSchedulerContext,
+            createCountingExecutingService(contextCallbackExecutor));
+    return new TaskSchedulerContextDrainable(wrapper);
+  }
+
   @SuppressWarnings({ "unchecked" })
   @Test(timeout=10000)
   public void testTaskSchedulerNoReuse() throws Exception {
     RackResolver.init(new YarnConfiguration());
-    TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
-    AppContext mockAppContext = mock(AppContext.class);
-    when(mockAppContext.getAMState()).thenReturn(DAGAppMasterState.RUNNING);
 
     TezAMRMClientAsync<CookieContainerRequest> mockRMClient =
                                                   mock(TezAMRMClientAsync.class);
@@ -112,18 +131,19 @@ public class TestTaskScheduler {
     String appHost = "host";
     int appPort = 0;
     String appUrl = "url";
-    TaskSchedulerWithDrainableAppCallback scheduler =
-      new TaskSchedulerWithDrainableAppCallback(
-        mockApp, new AlwaysMatchesContainerMatcher(), appHost, appPort,
-        appUrl, mockRMClient, mockAppContext);
-    TaskSchedulerAppCallbackDrainable drainableAppCallback = scheduler
-        .getDrainableAppCallback();
 
     Configuration conf = new Configuration();
     conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, false);
     int interval = 100;
     conf.setInt(TezConfiguration.TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX, interval);
-    scheduler.init(conf);
+
+    TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(appHost, appPort, appUrl, conf);
+    TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(mockApp);
+
+    TaskSchedulerWithDrainableContext scheduler =
+        new TaskSchedulerWithDrainableContext(drainableAppCallback, mockRMClient);
+
+    scheduler.initialize();
     drainableAppCallback.drain();
     verify(mockRMClient).init(conf);
     verify(mockRMClient).setHeartbeatInterval(interval);
@@ -495,22 +515,18 @@ public class TestTaskScheduler {
     AppFinalStatus finalStatus =
         new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
     when(mockApp.getFinalAppStatus()).thenReturn(finalStatus);
-    scheduler.stop();
+    scheduler.shutdown();
     drainableAppCallback.drain();
     verify(mockRMClient).
                   unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
-                                              appMsg, appUrl);
+                      appMsg, appUrl);
     verify(mockRMClient).stop();
-    scheduler.close();
   }
   
   @SuppressWarnings({ "unchecked" })
   @Test(timeout=10000)
   public void testTaskSchedulerWithReuse() throws Exception {
     RackResolver.init(new YarnConfiguration());
-    TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
-    AppContext mockAppContext = mock(AppContext.class);
-    when(mockAppContext.getAMState()).thenReturn(DAGAppMasterState.RUNNING);
 
     TezAMRMClientAsync<CookieContainerRequest> mockRMClient =
                                                   mock(TezAMRMClientAsync.class);
@@ -518,12 +534,6 @@ public class TestTaskScheduler {
     String appHost = "host";
     int appPort = 0;
     String appUrl = "url";
-    TaskSchedulerWithDrainableAppCallback scheduler =
-      new TaskSchedulerWithDrainableAppCallback(
-        mockApp, new AlwaysMatchesContainerMatcher(), appHost, appPort,
-        appUrl, mockRMClient, mockAppContext);
-    final TaskSchedulerAppCallbackDrainable drainableAppCallback = scheduler
-        .getDrainableAppCallback();
 
     Configuration conf = new Configuration();
     // to match all in the same pass
@@ -531,7 +541,15 @@ public class TestTaskScheduler {
     // to release immediately after deallocate
     conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 0);
     conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 0);
-    scheduler.init(conf);
+
+    TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(appHost, appPort, appUrl, conf);
+    final TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(mockApp);
+
+    TaskSchedulerWithDrainableContext scheduler =
+        new TaskSchedulerWithDrainableContext(drainableAppCallback, mockRMClient);
+
+
+    scheduler.initialize();
     drainableAppCallback.drain();
 
     RegisterApplicationMasterResponse mockRegResponse =
@@ -992,23 +1010,18 @@ public class TestTaskScheduler {
     AppFinalStatus finalStatus =
         new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
     when(mockApp.getFinalAppStatus()).thenReturn(finalStatus);
-    scheduler.stop();
+    scheduler.shutdown();
     drainableAppCallback.drain();
     verify(mockRMClient).
                   unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
-                                              appMsg, appUrl);
+                      appMsg, appUrl);
     verify(mockRMClient).stop();
-    scheduler.close();
   }
   
   @SuppressWarnings("unchecked")
   @Test (timeout=5000)
   public void testTaskSchedulerDetermineMinHeldContainers() throws Exception {
     RackResolver.init(new YarnConfiguration());
-    TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
-    AppContext mockAppContext = mock(AppContext.class);
-    when(mockAppContext.getAMState()).thenReturn(DAGAppMasterState.RUNNING);
-    when(mockAppContext.isSession()).thenReturn(true);
 
     TezAMRMClientAsync<CookieContainerRequest> mockRMClient =
                                                   mock(TezAMRMClientAsync.class);
@@ -1016,15 +1029,15 @@ public class TestTaskScheduler {
     String appHost = "host";
     int appPort = 0;
     String appUrl = "url";
-    TaskSchedulerWithDrainableAppCallback scheduler =
-      new TaskSchedulerWithDrainableAppCallback(
-        mockApp, new AlwaysMatchesContainerMatcher(), appHost, appPort,
-        appUrl, mockRMClient, mockAppContext);
-    TaskSchedulerAppCallbackDrainable drainableAppCallback = scheduler
-        .getDrainableAppCallback();
 
-    Configuration conf = new Configuration();
-    scheduler.init(conf);
+    TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(appHost, appPort, appUrl, true,
+        new Configuration());
+    final TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(mockApp);
+
+    TaskSchedulerWithDrainableContext scheduler =
+        new TaskSchedulerWithDrainableContext(drainableAppCallback, mockRMClient);
+
+    scheduler.initialize();
     RegisterApplicationMasterResponse mockRegResponse = mock(RegisterApplicationMasterResponse.class);
     Resource mockMaxResource = mock(Resource.class);
     Map<ApplicationAccessType, String> mockAcls = mock(Map.class);
@@ -1176,17 +1189,13 @@ public class TestTaskScheduler {
     AppFinalStatus finalStatus =
         new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
     when(mockApp.getFinalAppStatus()).thenReturn(finalStatus);
-    scheduler.stop();
-    scheduler.close();
+    scheduler.shutdown();
   }
   
   @SuppressWarnings("unchecked")
   @Test(timeout=5000)
   public void testTaskSchedulerRandomReuseExpireTime() throws Exception {
     RackResolver.init(new YarnConfiguration());
-    TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
-    AppContext mockAppContext = mock(AppContext.class);
-    when(mockAppContext.getAMState()).thenReturn(DAGAppMasterState.RUNNING);
 
     TezAMRMClientAsync<CookieContainerRequest> mockRMClient =
                                                   mock(TezAMRMClientAsync.class);
@@ -1194,25 +1203,31 @@ public class TestTaskScheduler {
     String appHost = "host";
     int appPort = 0;
     String appUrl = "url";
-    TaskSchedulerWithDrainableAppCallback scheduler1 =
-      new TaskSchedulerWithDrainableAppCallback(
-        mockApp, new AlwaysMatchesContainerMatcher(), appHost, appPort,
-        appUrl, mockRMClient, mockAppContext);
-    TaskSchedulerWithDrainableAppCallback scheduler2 =
-        new TaskSchedulerWithDrainableAppCallback(
-          mockApp, new AlwaysMatchesContainerMatcher(), appHost, appPort,
-          appUrl, mockRMClient, mockAppContext);
 
     long minTime = 1000l;
     long maxTime = 100000l;
     Configuration conf1 = new Configuration();
     conf1.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, minTime);
     conf1.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, minTime);
-    scheduler1.init(conf1);
+
     Configuration conf2 = new Configuration();
     conf2.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, minTime);
     conf2.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, maxTime);
-    scheduler2.init(conf2);
+
+    TaskSchedulerContext mockApp1 = setupMockTaskSchedulerContext(appHost, appPort, appUrl, conf1);
+    TaskSchedulerContext mockApp2 = setupMockTaskSchedulerContext(appHost, appPort, appUrl, conf2);
+    final TaskSchedulerContextDrainable drainableAppCallback1 = createDrainableContext(mockApp1);
+    final TaskSchedulerContextDrainable drainableAppCallback2 = createDrainableContext(mockApp2);
+
+
+    TaskSchedulerWithDrainableContext scheduler1 =
+        new TaskSchedulerWithDrainableContext(drainableAppCallback1, mockRMClient);
+    TaskSchedulerWithDrainableContext scheduler2 =
+        new TaskSchedulerWithDrainableContext(drainableAppCallback2, mockRMClient);
+
+    scheduler1.initialize();
+    scheduler2.initialize();
+
 
     RegisterApplicationMasterResponse mockRegResponse =
                                 mock(RegisterApplicationMasterResponse.class);
@@ -1250,20 +1265,16 @@ public class TestTaskScheduler {
     String appMsg = "success";
     AppFinalStatus finalStatus =
         new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
-    when(mockApp.getFinalAppStatus()).thenReturn(finalStatus);
-    scheduler1.stop();
-    scheduler1.close();
-    scheduler2.stop();
-    scheduler2.close();
+    when(mockApp1.getFinalAppStatus()).thenReturn(finalStatus);
+    when(mockApp2.getFinalAppStatus()).thenReturn(finalStatus);
+    scheduler1.shutdown();
+    scheduler2.shutdown();
   }
 
   @SuppressWarnings({ "unchecked", "rawtypes" })
   @Test (timeout=5000)
   public void testTaskSchedulerPreemption() throws Exception {
     RackResolver.init(new YarnConfiguration());
-    TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
-    AppContext mockAppContext = mock(AppContext.class);
-    when(mockAppContext.getAMState()).thenReturn(DAGAppMasterState.RUNNING);
 
     TezAMRMClientAsync<CookieContainerRequest> mockRMClient =
                                                   mock(TezAMRMClientAsync.class);
@@ -1271,16 +1282,18 @@ public class TestTaskScheduler {
     String appHost = "host";
     int appPort = 0;
     String appUrl = "url";
-    final TaskSchedulerWithDrainableAppCallback scheduler =
-      new TaskSchedulerWithDrainableAppCallback(
-        mockApp, new PreemptionMatcher(), appHost, appPort,
-        appUrl, mockRMClient, mockAppContext);
-    TaskSchedulerAppCallbackDrainable drainableAppCallback = scheduler
-        .getDrainableAppCallback();
 
     Configuration conf = new Configuration();
     conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, false);
-    scheduler.init(conf);
+
+    TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(appHost, appPort, appUrl, false,
+        null, null, new PreemptionMatcher(), conf);
+    final TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(mockApp);
+
+    final TaskSchedulerWithDrainableContext scheduler =
+        new TaskSchedulerWithDrainableContext(drainableAppCallback, mockRMClient);
+
+    scheduler.initialize();
 
     RegisterApplicationMasterResponse mockRegResponse =
                        mock(RegisterApplicationMasterResponse.class);
@@ -1530,7 +1543,7 @@ public class TestTaskScheduler {
     scheduler.getProgress();
     scheduler.getProgress();
     scheduler.getProgress();
-    verify(mockRMClient, times(2)).releaseAssignedContainer((ContainerId)any());
+    verify(mockRMClient, times(2)).releaseAssignedContainer((ContainerId) any());
     scheduler.getProgress();
     drainableAppCallback.drain();
     // Next oldest mockTaskPri3KillA gets preempted to clear 10% of outstanding running preemptable tasks
@@ -1540,9 +1553,8 @@ public class TestTaskScheduler {
     AppFinalStatus finalStatus =
         new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, "", appUrl);
     when(mockApp.getFinalAppStatus()).thenReturn(finalStatus);
-    scheduler.stop();
+    scheduler.shutdown();
     drainableAppCallback.drain();
-    scheduler.close();
   }
 
   @SuppressWarnings("unchecked")
@@ -1550,22 +1562,19 @@ public class TestTaskScheduler {
   public void testLocalityMatching() throws Exception {
 
     RackResolver.init(new Configuration());
-    TaskSchedulerAppCallback appClient = mock(TaskSchedulerAppCallback.class);
     TezAMRMClientAsync<CookieContainerRequest> amrmClient =
       mock(TezAMRMClientAsync.class);
-    AppContext mockAppContext = mock(AppContext.class);
-    when(mockAppContext.getAMState()).thenReturn(DAGAppMasterState.RUNNING);
-
-    TaskSchedulerWithDrainableAppCallback taskScheduler =
-      new TaskSchedulerWithDrainableAppCallback(
-        appClient, new AlwaysMatchesContainerMatcher(), "host", 0, "",
-        amrmClient, mockAppContext);
-    TaskSchedulerAppCallbackDrainable drainableAppCallback = taskScheduler
-        .getDrainableAppCallback();
-    
+
     Configuration conf = new Configuration();
     conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, false);
-    taskScheduler.init(conf);
+
+    TaskSchedulerContext appClient = setupMockTaskSchedulerContext("host", 0, "", conf);
+    final TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(appClient);
+
+    TaskSchedulerWithDrainableContext taskScheduler =
+        new TaskSchedulerWithDrainableContext(drainableAppCallback, amrmClient);
+
+    taskScheduler.initialize();
     
     RegisterApplicationMasterResponse mockRegResponse = mock(RegisterApplicationMasterResponse.class);
     Resource mockMaxResource = mock(Resource.class);
@@ -1693,7 +1702,7 @@ public class TestTaskScheduler {
     AppFinalStatus finalStatus = new AppFinalStatus(
         FinalApplicationStatus.SUCCEEDED, "", "");
     when(appClient.getFinalAppStatus()).thenReturn(finalStatus);
-    taskScheduler.close();
+    taskScheduler.shutdown();
   }
   
   @Test (timeout=5000)

http://git-wip-us.apache.org/repos/asf/tez/blob/b6582f06/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
index 005692e..3ea0446 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
@@ -47,11 +47,13 @@ import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.common.ContainerSignatureMatcher;
 import org.apache.tez.dag.api.TaskLocationHint;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.client.DAGClientServer;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ContainerContext;
+import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
 import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl;
 import org.apache.tez.dag.app.dag.impl.TaskImpl;
 import org.apache.tez.dag.app.dag.impl.VertexImpl;
@@ -61,10 +63,10 @@ import org.apache.tez.dag.app.rm.container.AMContainerEventCompleted;
 import org.apache.tez.dag.app.rm.container.AMContainerEventType;
 import org.apache.tez.dag.app.rm.container.AMContainerMap;
 import org.apache.tez.dag.app.rm.container.AMContainerState;
-import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher;
 import org.apache.tez.dag.app.web.WebUIService;
 import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.serviceplugins.api.TaskScheduler;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -96,6 +98,7 @@ public class TestTaskSchedulerEventHandler {
     protected void instantiateScheduelrs(String host, int port, String trackingUrl,
                                          AppContext appContext) {
       taskSchedulers[0] = mockTaskScheduler;
+      taskSchedulerServiceWrappers[0] = new ServicePluginLifecycleAbstractService(taskSchedulers[0]);
     }
     
     @Override
@@ -113,7 +116,7 @@ public class TestTaskSchedulerEventHandler {
   TestEventHandler mockEventHandler;
   ContainerSignatureMatcher mockSigMatcher;
   MockTaskSchedulerEventHandler schedulerHandler;
-  TaskSchedulerService mockTaskScheduler;
+  TaskScheduler mockTaskScheduler;
   AMContainerMap mockAMContainerMap;
   WebUIService mockWebUIService;
 
@@ -124,7 +127,7 @@ public class TestTaskSchedulerEventHandler {
     mockClientService = mock(DAGClientServer.class);
     mockEventHandler = new TestEventHandler();
     mockSigMatcher = mock(ContainerSignatureMatcher.class);
-    mockTaskScheduler = mock(TaskSchedulerService.class);
+    mockTaskScheduler = mock(TaskScheduler.class);
     mockAMContainerMap = mock(AMContainerMap.class);
     mockWebUIService = mock(WebUIService.class);
     when(mockAppContext.getAllContainers()).thenReturn(mockAMContainerMap);

http://git-wip-us.apache.org/repos/asf/tez/blob/b6582f06/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
index 04610ab..966c95a 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
@@ -40,9 +40,13 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -54,13 +58,12 @@ import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.common.ContainerSignatureMatcher;
 import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
 import org.apache.tez.dag.app.rm.YarnTaskSchedulerService.CookieContainerRequest;
-import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback;
-import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
+import org.apache.tez.serviceplugins.api.TaskScheduler;
+import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
 
 class TestTaskSchedulerHelpers {
 
@@ -134,12 +137,19 @@ class TestTaskSchedulerHelpers {
 
     @Override
     public void instantiateScheduelrs(String host, int port, String trackingUrl, AppContext appContext) {
-      taskSchedulers[0] = new TaskSchedulerWithDrainableAppCallback(new TaskSchedulerAppCallbackImpl(this, 0),
-          containerSignatureMatcher, host, port, trackingUrl, amrmClientAsync,
-          appContext);
-    }
-
-    public TaskSchedulerService getSpyTaskScheduler() {
+      TaskSchedulerContext taskSchedulerContext =
+          new TaskSchedulerContextImpl(this, appContext, 0, trackingUrl, 1000, host, port,
+              getConfig());
+      TaskSchedulerContextImplWrapper wrapper =
+          new TaskSchedulerContextImplWrapper(taskSchedulerContext,
+              new CountingExecutorService(appCallbackExecutor));
+      TaskSchedulerContextDrainable drainable = new TaskSchedulerContextDrainable(wrapper);
+      taskSchedulers[0] =
+          new TaskSchedulerWithDrainableContext(drainable, amrmClientAsync);
+      taskSchedulerServiceWrappers[0] = new ServicePluginLifecycleAbstractService(taskSchedulers[0]);
+    }
+
+    public TaskScheduler getSpyTaskScheduler() {
       return taskSchedulers[0];
     }
 
@@ -147,8 +157,8 @@ class TestTaskSchedulerHelpers {
     public void serviceStart() {
       instantiateScheduelrs("host", 0, "", appContext);
       // Init the service so that reuse configuration is picked up.
-      ((AbstractService)taskSchedulers[0]).init(getConfig());
-      ((AbstractService)taskSchedulers[0]).start();
+      ((AbstractService)taskSchedulerServiceWrappers[0]).init(getConfig());
+      ((AbstractService)taskSchedulerServiceWrappers[0]).start();
       taskSchedulers[0] = spy(taskSchedulers[0]);
     }
 
@@ -188,61 +198,31 @@ class TestTaskSchedulerHelpers {
     }
   }
 
-  static class TaskSchedulerWithDrainableAppCallback extends YarnTaskSchedulerService {
+  static class TaskSchedulerWithDrainableContext extends YarnTaskSchedulerService {
 
-    private TaskSchedulerAppCallbackDrainable drainableAppCallback;
 
-    public TaskSchedulerWithDrainableAppCallback(
-        TaskSchedulerAppCallback appClient,
-        ContainerSignatureMatcher containerSignatureMatcher,
-        String appHostName, int appHostPort, String appTrackingUrl,
-        AppContext appContext) {
-      super(appClient, containerSignatureMatcher, appHostName, appHostPort,
-          appTrackingUrl, appContext);
+    public TaskSchedulerWithDrainableContext(
+        TaskSchedulerContextDrainable appClient,
+        TezAMRMClientAsync<CookieContainerRequest> client) {
+      super(appClient, client);
       shouldUnregister.set(true);
     }
 
-    public TaskSchedulerWithDrainableAppCallback(
-        TaskSchedulerAppCallback appClient,
-        ContainerSignatureMatcher containerSignatureMatcher,
-        String appHostName, int appHostPort, String appTrackingUrl,
-        TezAMRMClientAsync<CookieContainerRequest> client,
-        AppContext appContext) {
-      super(appClient, containerSignatureMatcher, appHostName, appHostPort,
-          appTrackingUrl, client, appContext);
-      shouldUnregister.set(true);
-    }
-
-    @Override
-    TaskSchedulerAppCallback createAppCallbackDelegate(
-        TaskSchedulerAppCallback realAppClient) {
-      drainableAppCallback = new TaskSchedulerAppCallbackDrainable(
-          new TaskSchedulerAppCallbackWrapper(realAppClient,
-              appCallbackExecutor));
-      return drainableAppCallback;
-    }
-    
-    @Override
-    ExecutorService createAppCallbackExecutorService() {
-      ExecutorService real = super.createAppCallbackExecutorService();
-      return new CountingExecutorService(real);
-    }
-
-    public TaskSchedulerAppCallbackDrainable getDrainableAppCallback() {
-      return drainableAppCallback;
+    public TaskSchedulerContextDrainable getDrainableAppCallback() {
+      return (TaskSchedulerContextDrainable)getContext();
     }
   }
 
   @SuppressWarnings("rawtypes")
-  static class TaskSchedulerAppCallbackDrainable implements TaskSchedulerAppCallback {
+  static class TaskSchedulerContextDrainable implements TaskSchedulerContext {
     int completedEvents;
     int invocations;
-    private TaskSchedulerAppCallback real;
+    private TaskSchedulerContext real;
     private CountingExecutorService countingExecutorService;
     final AtomicInteger count = new AtomicInteger(0);
     
-    public TaskSchedulerAppCallbackDrainable(TaskSchedulerAppCallbackWrapper real) {
-      countingExecutorService = (CountingExecutorService) real.executorService;
+    public TaskSchedulerContextDrainable(TaskSchedulerContextImplWrapper real) {
+      countingExecutorService = (CountingExecutorService) real.getExecutorService();
       this.real = real;
     }
 
@@ -303,6 +283,53 @@ class TestTaskSchedulerHelpers {
       return real.getFinalAppStatus();
     }
 
+    // Not incrementing invocations for methods which to not obtain locks,
+    // and do not go via the executor service.
+    @Override
+    public Configuration getInitialConfiguration() {
+      return real.getInitialConfiguration();
+    }
+
+    @Override
+    public String getAppTrackingUrl() {
+      return real.getAppTrackingUrl();
+    }
+
+    @Override
+    public long getCustomClusterIdentifier() {
+      return real.getCustomClusterIdentifier();
+    }
+
+    @Override
+    public ContainerSignatureMatcher getContainerSignatureMatcher() {
+      return real.getContainerSignatureMatcher();
+    }
+
+    @Override
+    public ApplicationAttemptId getApplicationAttemptId() {
+      return real.getApplicationAttemptId();
+    }
+
+    @Override
+    public String getAppHostName() {
+      return real.getAppHostName();
+    }
+
+    @Override
+    public int getAppClientPort() {
+      return real.getAppClientPort();
+    }
+
+    @Override
+    public boolean isSession() {
+      return real.isSession();
+    }
+
+    @Override
+    public AMState getAMState() {
+      return real.getAMState();
+    }
+
     @Override
     public void preemptContainer(ContainerId cId) {
       invocations++;
@@ -384,7 +411,11 @@ class TestTaskSchedulerHelpers {
       }
     }
   }
-  
+
+  static CountingExecutorService createCountingExecutingService(ExecutorService rawExecutor) {
+    return new CountingExecutorService(rawExecutor);
+  }
+
   @SuppressWarnings({"rawtypes", "unchecked"})
   private static class CountingExecutorService implements ExecutorService {
 
@@ -464,7 +495,50 @@ class TestTaskSchedulerHelpers {
         throws InterruptedException, ExecutionException, TimeoutException {
       throw new UnsupportedOperationException("Not expected to be used");
     }
-    
+  }
+
+  static TaskSchedulerContext setupMockTaskSchedulerContext(String appHost, int appPort,
+                                                            String appUrl, Configuration conf) {
+    return setupMockTaskSchedulerContext(appHost, appPort, appUrl, false, conf);
+  }
+
+  static TaskSchedulerContext setupMockTaskSchedulerContext(String appHost, int appPort,
+                                                            String appUrl, boolean isSession,
+                                                            Configuration conf) {
+    return setupMockTaskSchedulerContext(appHost, appPort, appUrl, isSession, null, null, null,
+        conf);
+  }
+
+  static TaskSchedulerContext setupMockTaskSchedulerContext(String appHost, int appPort,
+                                                            String appUrl, boolean isSession,
+                                                            ApplicationAttemptId appAttemptId,
+                                                            Long customAppIdentifier,
+                                                            ContainerSignatureMatcher containerSignatureMatcher,
+                                                            Configuration conf) {
+
+    TaskSchedulerContext mockContext = mock(TaskSchedulerContext.class);
+    when(mockContext.getAppHostName()).thenReturn(appHost);
+    when(mockContext.getAppClientPort()).thenReturn(appPort);
+    when(mockContext.getAppTrackingUrl()).thenReturn(appUrl);
+
+    when(mockContext.getAMState()).thenReturn(TaskSchedulerContext.AMState.RUNNING_APP);
+    when(mockContext.getInitialConfiguration()).thenReturn(conf);
+    when(mockContext.isSession()).thenReturn(isSession);
+    if (containerSignatureMatcher != null) {
+      when(mockContext.getContainerSignatureMatcher())
+          .thenReturn(containerSignatureMatcher);
+    } else {
+      when(mockContext.getContainerSignatureMatcher())
+          .thenReturn(new AlwaysMatchesContainerMatcher());
+    }
+    if (appAttemptId != null) {
+      when(mockContext.getApplicationAttemptId()).thenReturn(appAttemptId);
+    }
+    if (customAppIdentifier != null) {
+      when(mockContext.getCustomClusterIdentifier()).thenReturn(customAppIdentifier);
+    }
+
+    return mockContext;
   }
 
 }