You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/06/26 19:26:08 UTC

[1/4] TEZ-1218. Make TaskScheduler an Abstract class instead of an Inteface. Contributed by Jeff Zhang.

Repository: incubator-tez
Updated Branches:
  refs/heads/master af538639c -> 4dfd8341d


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/4dfd8341/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
index 5a0856b..b0ea644 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
@@ -55,8 +55,8 @@ import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.rm.TaskScheduler.CookieContainerRequest;
-import org.apache.tez.dag.app.rm.TaskScheduler.TaskSchedulerAppCallback;
+import org.apache.tez.dag.app.rm.YarnTaskSchedulerService.CookieContainerRequest;
+import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback;
 import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher;
 
 import com.google.common.base.Preconditions;
@@ -133,20 +133,20 @@ class TestTaskSchedulerHelpers {
     }
 
     @Override
-    public TaskSchedulerInterface createTaskScheduler(String host, int port,
+    public TaskSchedulerService createTaskScheduler(String host, int port,
         String trackingUrl, AppContext appContext) {
       return new TaskSchedulerWithDrainableAppCallback(this,
           containerSignatureMatcher, host, port, trackingUrl, amrmClientAsync,
           appContext);
     }
 
-    public TaskSchedulerInterface getSpyTaskScheduler() {
+    public TaskSchedulerService getSpyTaskScheduler() {
       return this.taskScheduler;
     }
 
     @Override
     public void serviceStart() {
-      TaskSchedulerInterface taskSchedulerReal = createTaskScheduler("host", 0, "",
+      TaskSchedulerService taskSchedulerReal = createTaskScheduler("host", 0, "",
         appContext);
       // Init the service so that reuse configuration is picked up.
       ((AbstractService)taskSchedulerReal).init(getConfig());
@@ -190,7 +190,7 @@ class TestTaskSchedulerHelpers {
     }
   }
 
-  static class TaskSchedulerWithDrainableAppCallback extends TaskScheduler {
+  static class TaskSchedulerWithDrainableAppCallback extends YarnTaskSchedulerService {
 
     private TaskSchedulerAppCallbackDrainable drainableAppCallback;
 


[4/4] git commit: TEZ-1218. Make TaskScheduler an Abstract class instead of an Inteface. Contributed by Jeff Zhang.

Posted by ss...@apache.org.
TEZ-1218. Make TaskScheduler an Abstract class instead of an Inteface.
Contributed by Jeff Zhang.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/4dfd8341
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/4dfd8341
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/4dfd8341

Branch: refs/heads/master
Commit: 4dfd8341d8642dea418195beb5d3778d3bcf35a9
Parents: af53863
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu Jun 26 10:25:27 2014 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu Jun 26 10:25:27 2014 -0700

----------------------------------------------------------------------
 .../tez/dag/app/rm/LocalTaskScheduler.java      |  337 ---
 .../dag/app/rm/LocalTaskSchedulerService.java   |  335 +++
 .../apache/tez/dag/app/rm/TaskScheduler.java    | 1985 ------------------
 .../app/rm/TaskSchedulerAppCallbackWrapper.java |    2 +-
 .../dag/app/rm/TaskSchedulerEventHandler.java   |   14 +-
 .../tez/dag/app/rm/TaskSchedulerInterface.java  |   56 -
 .../tez/dag/app/rm/TaskSchedulerService.java    |  106 +
 .../dag/app/rm/YarnTaskSchedulerService.java    | 1951 +++++++++++++++++
 .../tez/dag/app/rm/TestContainerReuse.java      |    6 +-
 .../tez/dag/app/rm/TestLocalTaskScheduler.java  |    8 +-
 .../tez/dag/app/rm/TestTaskScheduler.java       |   14 +-
 .../dag/app/rm/TestTaskSchedulerHelpers.java    |   12 +-
 12 files changed, 2420 insertions(+), 2406 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/4dfd8341/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskScheduler.java
deleted file mode 100644
index 87bfccd..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskScheduler.java
+++ /dev/null
@@ -1,337 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app.rm;
-
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.PriorityBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.rm.TaskScheduler.TaskSchedulerAppCallback;
-import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-public class LocalTaskScheduler extends AbstractService implements TaskSchedulerInterface {
-
-  private static final Log LOG = LogFactory.getLog(LocalTaskScheduler.class);
-
-  final TaskSchedulerAppCallback realAppClient;
-  final TaskSchedulerAppCallback appClientDelegate;
-  final ContainerSignatureMatcher containerSignatureMatcher;
-  final PriorityBlockingQueue<TaskRequest> taskRequestQueue;
-  AsyncDelegateRequestHandler taskRequestHandler;
-  Thread asyncDelegateRequestThread;
-  final ExecutorService appCallbackExecutor;
-
-  final HashMap<Object, Container> taskAllocations;
-  final String appHostName;
-  final int appHostPort;
-  final String appTrackingUrl;
-  final AppContext appContext;
-
-  public LocalTaskScheduler(TaskSchedulerAppCallback appClient,
-      ContainerSignatureMatcher containerSignatureMatcher, String appHostName,
-      int appHostPort, String appTrackingUrl, AppContext appContext) {
-    super(LocalTaskScheduler.class.getName());
-    this.realAppClient = appClient;
-    this.appCallbackExecutor = createAppCallbackExecutorService();
-    this.containerSignatureMatcher = containerSignatureMatcher;
-    this.appClientDelegate = createAppCallbackDelegate(appClient);
-    this.appHostName = appHostName;
-    this.appHostPort = appHostPort;
-    this.appTrackingUrl = appTrackingUrl;
-    this.appContext = appContext;
-    taskRequestQueue = new PriorityBlockingQueue<TaskRequest>();
-    taskAllocations = new LinkedHashMap<Object, Container>();
-  }
-
-  private ExecutorService createAppCallbackExecutorService() {
-    return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
-        .setNameFormat("TaskSchedulerAppCaller #%d").setDaemon(true).build());
-  }
-
-  private TaskSchedulerAppCallback createAppCallbackDelegate(
-      TaskSchedulerAppCallback realAppClient) {
-    return new TaskSchedulerAppCallbackWrapper(realAppClient,
-        appCallbackExecutor);
-  }
-
-  @Override
-  public Resource getAvailableResources() {
-    Resource freeResources = Resource.newInstance(
-        (int)Runtime.getRuntime().freeMemory()/(1024*1024),
-        Runtime.getRuntime().availableProcessors());
-    return freeResources;
-  }
-
-  @Override
-  public int getClusterNodeCount() {
-    return 1;
-  }
-
-  @Override
-  public void resetMatchLocalityForAllHeldContainers() {
-  }
-
-  @Override
-  public Resource getTotalResources() {
-    Resource totalResources = Resource.newInstance(
-        (int)Runtime.getRuntime().maxMemory()/(1024*1024),
-        Runtime.getRuntime().availableProcessors());
-    return totalResources;
-  }
-
-  @Override
-  public void blacklistNode(NodeId nodeId) {
-  }
-
-  @Override
-  public void unblacklistNode(NodeId nodeId) {
-  }
-
-  @Override
-  public void allocateTask(Object task, Resource capability, String[] hosts,
-      String[] racks, Priority priority, Object containerSignature,
-      Object clientCookie) {
-    taskRequestHandler.addAllocateTaskRequest(task, capability, priority, clientCookie);
-  }
-
-  @Override
-  public synchronized void allocateTask(Object task, Resource capability,
-      ContainerId containerId, Priority priority, Object containerSignature,
-      Object clientCookie) {
-    // in local mode every task is already container level local
-    taskRequestHandler.addAllocateTaskRequest(task, capability, priority, clientCookie);
-  }
-  
-  @Override
-  public boolean deallocateTask(Object task, boolean taskSucceeded) {
-    return taskRequestHandler.addDeallocateTaskRequest(task);
-  }
-
-  @Override
-  public Object deallocateContainer(ContainerId containerId) {
-    return null;
-  }
-
-  @Override
-  public void serviceInit(Configuration conf) {
-      taskRequestHandler = new AsyncDelegateRequestHandler(taskRequestQueue,
-        new LocalContainerFactory(appContext),
-        taskAllocations,
-        appClientDelegate,
-        conf);
-    asyncDelegateRequestThread = new Thread(taskRequestHandler);
-  }
-
-  @Override
-  public void serviceStart() {
-    asyncDelegateRequestThread.start();
-  }
-
-  @Override
-  public void serviceStop() throws InterruptedException {
-    if (asyncDelegateRequestThread != null) {
-      asyncDelegateRequestThread.interrupt();
-    }
-    appCallbackExecutor.shutdownNow();
-    appCallbackExecutor.awaitTermination(1000l, TimeUnit.MILLISECONDS);
-  }
-
-  @Override
-  public void setShouldUnregister() {
-  }
-
-  static class LocalContainerFactory {
-    final AppContext appContext;
-    AtomicInteger nextId;
-
-    public LocalContainerFactory(AppContext appContext) {
-      this.appContext = appContext;
-      this.nextId = new AtomicInteger(1);
-    }
-
-    public Container createContainer(Resource capability, Priority priority) {
-      ApplicationAttemptId appAttemptId = appContext.getApplicationAttemptId();
-      ContainerId containerId = ContainerId.newInstance(appAttemptId, nextId.getAndIncrement());
-      NodeId nodeId = NodeId.newInstance("127.0.0.1", 0);
-      String nodeHttpAddress = "127.0.0.1:0";
-
-      Container container = Container.newInstance(containerId,
-          nodeId,
-          nodeHttpAddress,
-          capability,
-          priority,
-          null);
-
-      return container;
-    }
-  }
-
-  static class TaskRequest implements Comparable<TaskRequest> {
-    // Higher prority than Priority.UNDEFINED
-    static final int HIGHEST_PRIORITY = -2;
-    Object task;
-    Priority priority;
-
-    public TaskRequest(Object task, Priority priority) {
-      this.task = task;
-      this.priority = priority;
-    }
-
-    @Override
-    public int compareTo(TaskRequest request) {
-      return request.priority.compareTo(this.priority);
-    }
-  }
-
-  static class AllocateTaskRequest extends TaskRequest {
-    Resource capability;
-    Object clientCookie;
-
-    public AllocateTaskRequest(Object task, Resource capability, Priority priority,
-        Object clientCookie) {
-      super(task, priority);
-      this.capability = capability;
-      this.clientCookie = clientCookie;
-    }
-  }
-
-  static class DeallocateTaskRequest extends TaskRequest {
-    static final Priority DEALLOCATE_PRIORITY = Priority.newInstance(HIGHEST_PRIORITY);
-
-    public DeallocateTaskRequest(Object task) {
-      super(task, DEALLOCATE_PRIORITY);
-    }
-  }
-
-  static class AsyncDelegateRequestHandler implements Runnable {
-    final BlockingQueue<TaskRequest> taskRequestQueue;
-    final LocalContainerFactory localContainerFactory;
-    final HashMap<Object, Container> taskAllocations;
-    final TaskSchedulerAppCallback appClientDelegate;
-    final int MAX_TASKS;
-
-    AsyncDelegateRequestHandler(BlockingQueue<TaskRequest> taskRequestQueue,
-        LocalContainerFactory localContainerFactory,
-        HashMap<Object, Container> taskAllocations,
-        TaskSchedulerAppCallback appClientDelegate,
-        Configuration conf) {
-      this.taskRequestQueue = taskRequestQueue;
-      this.localContainerFactory = localContainerFactory;
-      this.taskAllocations = taskAllocations;
-      this.appClientDelegate = appClientDelegate;
-      this.MAX_TASKS = conf.getInt(TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS,
-          TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS_DEFAULT);
-    }
-
-    public void addAllocateTaskRequest(Object task, Resource capability, Priority priority,
-        Object clientCookie) {
-      try {
-        taskRequestQueue.put(new AllocateTaskRequest(task, capability, priority, clientCookie));
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-      }
-    }
-
-    public boolean addDeallocateTaskRequest(Object task) {
-      try {
-        taskRequestQueue.put(new DeallocateTaskRequest(task));
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-      }
-      synchronized(taskRequestQueue) {
-        taskRequestQueue.notify();
-      }
-      return true;
-    }
-
-    boolean shouldWait() {
-      return taskAllocations.size() >= MAX_TASKS;
-    }
-
-    @Override
-    public void run() {
-      while(true) {
-        synchronized(taskRequestQueue) {
-          try {
-            if (shouldWait()) {
-              taskRequestQueue.wait();
-            }
-          } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-          }
-        }
-        processRequest();
-      }
-    }
-
-    void processRequest() {
-        try {
-          TaskRequest request = taskRequestQueue.take();
-          if (request instanceof AllocateTaskRequest) {
-            allocateTask((AllocateTaskRequest)request);
-          }
-          else if (request instanceof DeallocateTaskRequest) {
-            deallocateTask((DeallocateTaskRequest)request);
-          }
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-        } catch (NullPointerException e) {
-          LOG.warn("Task request was badly constructed");
-        }
-    }
-
-    void allocateTask(AllocateTaskRequest request) {
-      Container container = localContainerFactory.createContainer(request.capability,
-          request.priority);
-      taskAllocations.put(request.task, container);
-      appClientDelegate.taskAllocated(request.task, request.clientCookie, container);
-    }
-
-    void deallocateTask(DeallocateTaskRequest request) {
-      Container container = taskAllocations.remove(request.task);
-      if (container != null) {
-        appClientDelegate.containerBeingReleased(container.getId());
-      }
-      else {
-        LOG.warn("Unable to find and remove task " + request.task + " from task allocations");
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/4dfd8341/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
new file mode 100644
index 0000000..22f8557
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
@@ -0,0 +1,335 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.rm;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+public class LocalTaskSchedulerService extends TaskSchedulerService {
+
+  private static final Log LOG = LogFactory.getLog(LocalTaskSchedulerService.class);
+
+  final TaskSchedulerAppCallback realAppClient;
+  final TaskSchedulerAppCallback appClientDelegate;
+  final ContainerSignatureMatcher containerSignatureMatcher;
+  final PriorityBlockingQueue<TaskRequest> taskRequestQueue;
+  AsyncDelegateRequestHandler taskRequestHandler;
+  Thread asyncDelegateRequestThread;
+  final ExecutorService appCallbackExecutor;
+
+  final HashMap<Object, Container> taskAllocations;
+  final String appHostName;
+  final int appHostPort;
+  final String appTrackingUrl;
+  final AppContext appContext;
+
+  public LocalTaskSchedulerService(TaskSchedulerAppCallback appClient,
+      ContainerSignatureMatcher containerSignatureMatcher, String appHostName,
+      int appHostPort, String appTrackingUrl, AppContext appContext) {
+    super(LocalTaskSchedulerService.class.getName());
+    this.realAppClient = appClient;
+    this.appCallbackExecutor = createAppCallbackExecutorService();
+    this.containerSignatureMatcher = containerSignatureMatcher;
+    this.appClientDelegate = createAppCallbackDelegate(appClient);
+    this.appHostName = appHostName;
+    this.appHostPort = appHostPort;
+    this.appTrackingUrl = appTrackingUrl;
+    this.appContext = appContext;
+    taskRequestQueue = new PriorityBlockingQueue<TaskRequest>();
+    taskAllocations = new LinkedHashMap<Object, Container>();
+  }
+
+  private ExecutorService createAppCallbackExecutorService() {
+    return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
+        .setNameFormat("TaskSchedulerAppCaller #%d").setDaemon(true).build());
+  }
+
+  private TaskSchedulerAppCallback createAppCallbackDelegate(
+      TaskSchedulerAppCallback realAppClient) {
+    return new TaskSchedulerAppCallbackWrapper(realAppClient,
+        appCallbackExecutor);
+  }
+
+  @Override
+  public Resource getAvailableResources() {
+    Resource freeResources = Resource.newInstance(
+        (int)Runtime.getRuntime().freeMemory()/(1024*1024),
+        Runtime.getRuntime().availableProcessors());
+    return freeResources;
+  }
+
+  @Override
+  public int getClusterNodeCount() {
+    return 1;
+  }
+
+  @Override
+  public void resetMatchLocalityForAllHeldContainers() {
+  }
+
+  @Override
+  public Resource getTotalResources() {
+    Resource totalResources = Resource.newInstance(
+        (int)Runtime.getRuntime().maxMemory()/(1024*1024),
+        Runtime.getRuntime().availableProcessors());
+    return totalResources;
+  }
+
+  @Override
+  public void blacklistNode(NodeId nodeId) {
+  }
+
+  @Override
+  public void unblacklistNode(NodeId nodeId) {
+  }
+
+  @Override
+  public void allocateTask(Object task, Resource capability, String[] hosts,
+      String[] racks, Priority priority, Object containerSignature,
+      Object clientCookie) {
+    taskRequestHandler.addAllocateTaskRequest(task, capability, priority, clientCookie);
+  }
+
+  @Override
+  public synchronized void allocateTask(Object task, Resource capability,
+      ContainerId containerId, Priority priority, Object containerSignature,
+      Object clientCookie) {
+    // in local mode every task is already container level local
+    taskRequestHandler.addAllocateTaskRequest(task, capability, priority, clientCookie);
+  }
+  
+  @Override
+  public boolean deallocateTask(Object task, boolean taskSucceeded) {
+    return taskRequestHandler.addDeallocateTaskRequest(task);
+  }
+
+  @Override
+  public Object deallocateContainer(ContainerId containerId) {
+    return null;
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) {
+      taskRequestHandler = new AsyncDelegateRequestHandler(taskRequestQueue,
+        new LocalContainerFactory(appContext),
+        taskAllocations,
+        appClientDelegate,
+        conf);
+    asyncDelegateRequestThread = new Thread(taskRequestHandler);
+  }
+
+  @Override
+  public void serviceStart() {
+    asyncDelegateRequestThread.start();
+  }
+
+  @Override
+  public void serviceStop() throws InterruptedException {
+    if (asyncDelegateRequestThread != null) {
+      asyncDelegateRequestThread.interrupt();
+    }
+    appCallbackExecutor.shutdownNow();
+    appCallbackExecutor.awaitTermination(1000l, TimeUnit.MILLISECONDS);
+  }
+
+  @Override
+  public void setShouldUnregister() {
+  }
+
+  static class LocalContainerFactory {
+    final AppContext appContext;
+    AtomicInteger nextId;
+
+    public LocalContainerFactory(AppContext appContext) {
+      this.appContext = appContext;
+      this.nextId = new AtomicInteger(1);
+    }
+
+    public Container createContainer(Resource capability, Priority priority) {
+      ApplicationAttemptId appAttemptId = appContext.getApplicationAttemptId();
+      ContainerId containerId = ContainerId.newInstance(appAttemptId, nextId.getAndIncrement());
+      NodeId nodeId = NodeId.newInstance("127.0.0.1", 0);
+      String nodeHttpAddress = "127.0.0.1:0";
+
+      Container container = Container.newInstance(containerId,
+          nodeId,
+          nodeHttpAddress,
+          capability,
+          priority,
+          null);
+
+      return container;
+    }
+  }
+
+  static class TaskRequest implements Comparable<TaskRequest> {
+    // Higher prority than Priority.UNDEFINED
+    static final int HIGHEST_PRIORITY = -2;
+    Object task;
+    Priority priority;
+
+    public TaskRequest(Object task, Priority priority) {
+      this.task = task;
+      this.priority = priority;
+    }
+
+    @Override
+    public int compareTo(TaskRequest request) {
+      return request.priority.compareTo(this.priority);
+    }
+  }
+
+  static class AllocateTaskRequest extends TaskRequest {
+    Resource capability;
+    Object clientCookie;
+
+    public AllocateTaskRequest(Object task, Resource capability, Priority priority,
+        Object clientCookie) {
+      super(task, priority);
+      this.capability = capability;
+      this.clientCookie = clientCookie;
+    }
+  }
+
+  static class DeallocateTaskRequest extends TaskRequest {
+    static final Priority DEALLOCATE_PRIORITY = Priority.newInstance(HIGHEST_PRIORITY);
+
+    public DeallocateTaskRequest(Object task) {
+      super(task, DEALLOCATE_PRIORITY);
+    }
+  }
+
+  static class AsyncDelegateRequestHandler implements Runnable {
+    final BlockingQueue<TaskRequest> taskRequestQueue;
+    final LocalContainerFactory localContainerFactory;
+    final HashMap<Object, Container> taskAllocations;
+    final TaskSchedulerAppCallback appClientDelegate;
+    final int MAX_TASKS;
+
+    AsyncDelegateRequestHandler(BlockingQueue<TaskRequest> taskRequestQueue,
+        LocalContainerFactory localContainerFactory,
+        HashMap<Object, Container> taskAllocations,
+        TaskSchedulerAppCallback appClientDelegate,
+        Configuration conf) {
+      this.taskRequestQueue = taskRequestQueue;
+      this.localContainerFactory = localContainerFactory;
+      this.taskAllocations = taskAllocations;
+      this.appClientDelegate = appClientDelegate;
+      this.MAX_TASKS = conf.getInt(TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS,
+          TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS_DEFAULT);
+    }
+
+    public void addAllocateTaskRequest(Object task, Resource capability, Priority priority,
+        Object clientCookie) {
+      try {
+        taskRequestQueue.put(new AllocateTaskRequest(task, capability, priority, clientCookie));
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+    }
+
+    public boolean addDeallocateTaskRequest(Object task) {
+      try {
+        taskRequestQueue.put(new DeallocateTaskRequest(task));
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+      synchronized(taskRequestQueue) {
+        taskRequestQueue.notify();
+      }
+      return true;
+    }
+
+    boolean shouldWait() {
+      return taskAllocations.size() >= MAX_TASKS;
+    }
+
+    @Override
+    public void run() {
+      while(true) {
+        synchronized(taskRequestQueue) {
+          try {
+            if (shouldWait()) {
+              taskRequestQueue.wait();
+            }
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+          }
+        }
+        processRequest();
+      }
+    }
+
+    void processRequest() {
+        try {
+          TaskRequest request = taskRequestQueue.take();
+          if (request instanceof AllocateTaskRequest) {
+            allocateTask((AllocateTaskRequest)request);
+          }
+          else if (request instanceof DeallocateTaskRequest) {
+            deallocateTask((DeallocateTaskRequest)request);
+          }
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        } catch (NullPointerException e) {
+          LOG.warn("Task request was badly constructed");
+        }
+    }
+
+    void allocateTask(AllocateTaskRequest request) {
+      Container container = localContainerFactory.createContainer(request.capability,
+          request.priority);
+      taskAllocations.put(request.task, container);
+      appClientDelegate.taskAllocated(request.task, request.clientCookie, container);
+    }
+
+    void deallocateTask(DeallocateTaskRequest request) {
+      Container container = taskAllocations.remove(request.task);
+      if (container != null) {
+        appClientDelegate.containerBeingReleased(container.getId());
+      }
+      else {
+        LOG.warn("Unable to find and remove task " + request.task + " from task allocations");
+      }
+    }
+  }
+}


[2/4] TEZ-1218. Make TaskScheduler an Abstract class instead of an Inteface. Contributed by Jeff Zhang.

Posted by ss...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/4dfd8341/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java
new file mode 100644
index 0000000..823ce47
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java
@@ -0,0 +1,106 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.rm;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+public abstract class TaskSchedulerService extends AbstractService{
+
+  public TaskSchedulerService(String name) {
+    super(name);
+  }
+
+  public abstract Resource getAvailableResources();
+
+  public abstract int getClusterNodeCount();
+
+  public abstract void resetMatchLocalityForAllHeldContainers();
+
+  public abstract Resource getTotalResources();
+
+  public abstract void blacklistNode(NodeId nodeId);
+
+  public abstract void unblacklistNode(NodeId nodeId);
+
+  public abstract void allocateTask(Object task, Resource capability,
+      String[] hosts, String[] racks, Priority priority,
+      Object containerSignature, Object clientCookie);
+  
+  /**
+   * Allocate affinitized to a specific container
+   */
+  public abstract void allocateTask(Object task, Resource capability,
+      ContainerId containerId, Priority priority, Object containerSignature,
+      Object clientCookie);
+  
+  public abstract boolean deallocateTask(Object task, boolean taskSucceeded);
+
+  public abstract Object deallocateContainer(ContainerId containerId);
+
+  public abstract void setShouldUnregister();
+  
+  public interface TaskSchedulerAppCallback {
+    public class AppFinalStatus {
+      public final FinalApplicationStatus exitStatus;
+      public final String exitMessage;
+      public final String postCompletionTrackingUrl;
+      public AppFinalStatus(FinalApplicationStatus exitStatus,
+                             String exitMessage,
+                             String posCompletionTrackingUrl) {
+        this.exitStatus = exitStatus;
+        this.exitMessage = exitMessage;
+        this.postCompletionTrackingUrl = posCompletionTrackingUrl;
+      }
+    }
+    // upcall to app must be outside locks
+    public void taskAllocated(Object task,
+                               Object appCookie,
+                               Container container);
+    // this may end up being called for a task+container pair that the app
+    // has not heard about. this can happen because of a race between
+    // taskAllocated() upcall and deallocateTask() downcall
+    public void containerCompleted(Object taskLastAllocated,
+                                    ContainerStatus containerStatus);
+    public void containerBeingReleased(ContainerId containerId);
+    public void nodesUpdated(List<NodeReport> updatedNodes);
+    public void appShutdownRequested();
+    public void setApplicationRegistrationData(
+                                Resource maxContainerCapability,
+                                Map<ApplicationAccessType, String> appAcls,
+                                ByteBuffer clientAMSecretKey
+                                );
+    public void onError(Throwable t);
+    public float getProgress();
+    public void preemptContainer(ContainerId containerId);
+    public AppFinalStatus getFinalAppStatus();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/4dfd8341/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
new file mode 100644
index 0000000..597b24f
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
@@ -0,0 +1,1951 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.rm;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.RackResolver;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.DAGAppMasterState;
+import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback.AppFinalStatus;
+import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/* TODO not yet updating cluster nodes on every allocate response
+ * from RMContainerRequestor
+   import org.apache.tez.dag.app.rm.node.AMNodeEventNodeCountUpdated;
+    if (clusterNmCount != lastClusterNmCount) {
+      LOG.info("Num cluster nodes changed from " + lastClusterNmCount + " to "
+          + clusterNmCount);
+      eventHandler.handle(new AMNodeEventNodeCountUpdated(clusterNmCount));
+    }
+ */
+public class YarnTaskSchedulerService extends TaskSchedulerService
+                             implements AMRMClientAsync.CallbackHandler {
+  private static final Log LOG = LogFactory.getLog(YarnTaskSchedulerService.class);
+
+
+
+  final TezAMRMClientAsync<CookieContainerRequest> amRmClient;
+  final TaskSchedulerAppCallback realAppClient;
+  final TaskSchedulerAppCallback appClientDelegate;
+  final ContainerSignatureMatcher containerSignatureMatcher;
+  ExecutorService appCallbackExecutor;
+
+  // Container Re-Use configuration
+  private boolean shouldReuseContainers;
+  private boolean reuseRackLocal;
+  private boolean reuseNonLocal;
+
+  Map<Object, CookieContainerRequest> taskRequests =
+                  new HashMap<Object, CookieContainerRequest>();
+  // LinkedHashMap is need in getProgress()
+  LinkedHashMap<Object, Container> taskAllocations =
+                  new LinkedHashMap<Object, Container>();
+  /**
+   * Tracks last task assigned to a known container.
+   */
+  Map<ContainerId, Object> containerAssignments =
+                  new HashMap<ContainerId, Object>();
+  // Remove inUse depending on resolution of TEZ-1129
+  Set<ContainerId> inUseContainers = Sets.newHashSet(); 
+  HashMap<ContainerId, Object> releasedContainers =
+                  new HashMap<ContainerId, Object>();
+  /**
+   * Map of containers currently being held by the TaskScheduler.
+   */
+  Map<ContainerId, HeldContainer> heldContainers =
+      new HashMap<ContainerId, HeldContainer>();
+  
+  Set<Priority> priorityHasAffinity = Sets.newHashSet();
+  
+  Set<NodeId> blacklistedNodes = Collections
+      .newSetFromMap(new ConcurrentHashMap<NodeId, Boolean>());
+  
+  Resource totalResources = Resource.newInstance(0, 0);
+  Resource allocatedResources = Resource.newInstance(0, 0);
+  
+  final String appHostName;
+  final int appHostPort;
+  final String appTrackingUrl;
+  final AppContext appContext;
+
+  AtomicBoolean isStopped = new AtomicBoolean(false);
+
+  private ContainerAssigner NODE_LOCAL_ASSIGNER = new NodeLocalContainerAssigner();
+  private ContainerAssigner RACK_LOCAL_ASSIGNER = new RackLocalContainerAssigner();
+  private ContainerAssigner NON_LOCAL_ASSIGNER = new NonLocalContainerAssigner();
+
+  DelayedContainerManager delayedContainerManager;
+  long localitySchedulingDelay;
+  long sessionDelay;
+
+  @VisibleForTesting
+  protected AtomicBoolean shouldUnregister = new AtomicBoolean(false);
+
+  class CRCookie {
+    // Do not use these variables directly. Can caused mocked unit tests to fail.
+    private Object task;
+    private Object appCookie;
+    private Object containerSignature;
+    
+    CRCookie(Object task, Object appCookie, Object containerSignature) {
+      this.task = task;
+      this.appCookie = appCookie;
+      this.containerSignature = containerSignature;
+    }
+
+    Object getTask() {
+      return task;
+    }
+
+    Object getAppCookie() {
+      return appCookie;
+    }
+    
+    Object getContainerSignature() {
+      return containerSignature;
+    }
+  }
+
+  class CookieContainerRequest extends ContainerRequest {
+    CRCookie cookie;
+    ContainerId affinitizedContainerId;
+
+    public CookieContainerRequest(
+        Resource capability,
+        String[] hosts,
+        String[] racks,
+        Priority priority,
+        CRCookie cookie) {
+      super(capability, hosts, racks, priority);
+      this.cookie = cookie;
+    }
+
+    public CookieContainerRequest(
+        Resource capability,
+        ContainerId containerId,
+        String[] hosts,
+        String[] racks,
+        Priority priority,
+        CRCookie cookie) {
+      this(capability, hosts, racks, priority, cookie);
+      this.affinitizedContainerId = containerId;
+    }
+
+    CRCookie getCookie() {
+      return cookie;
+    }
+    
+    ContainerId getAffinitizedContainer() {
+      return affinitizedContainerId;
+    }
+  }
+
+  public YarnTaskSchedulerService(TaskSchedulerAppCallback appClient,
+                        ContainerSignatureMatcher containerSignatureMatcher,
+                        String appHostName,
+                        int appHostPort,
+                        String appTrackingUrl,
+                        AppContext appContext) {
+    super(YarnTaskSchedulerService.class.getName());
+    this.realAppClient = appClient;
+    this.appCallbackExecutor = createAppCallbackExecutorService();
+    this.containerSignatureMatcher = containerSignatureMatcher;
+    this.appClientDelegate = createAppCallbackDelegate(appClient);
+    this.amRmClient = TezAMRMClientAsync.createAMRMClientAsync(1000, this);
+    this.appHostName = appHostName;
+    this.appHostPort = appHostPort;
+    this.appTrackingUrl = appTrackingUrl;
+    this.appContext = appContext;
+  }
+
+  @Private
+  @VisibleForTesting
+  YarnTaskSchedulerService(TaskSchedulerAppCallback appClient,
+      ContainerSignatureMatcher containerSignatureMatcher,
+      String appHostName,
+      int appHostPort,
+      String appTrackingUrl,
+      TezAMRMClientAsync<CookieContainerRequest> client,
+      AppContext appContext) {
+    super(YarnTaskSchedulerService.class.getName());
+    this.realAppClient = appClient;
+    this.appCallbackExecutor = createAppCallbackExecutorService();
+    this.containerSignatureMatcher = containerSignatureMatcher;
+    this.appClientDelegate = createAppCallbackDelegate(appClient);
+    this.amRmClient = client;
+    this.appHostName = appHostName;
+    this.appHostPort = appHostPort;
+    this.appTrackingUrl = appTrackingUrl;
+    this.appContext = appContext;
+  }
+
+  @VisibleForTesting
+  ExecutorService createAppCallbackExecutorService() {
+    return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
+        .setNameFormat("TaskSchedulerAppCaller #%d").setDaemon(true).build());
+  }
+  
+  @Override
+  public Resource getAvailableResources() {
+    return amRmClient.getAvailableResources();
+  }
+
+  @Override
+  public int getClusterNodeCount() {
+    // this can potentially be cheaper after YARN-1722
+    return amRmClient.getClusterNodeCount();
+  }
+
+  TaskSchedulerAppCallback createAppCallbackDelegate(
+      TaskSchedulerAppCallback realAppClient) {
+    return new TaskSchedulerAppCallbackWrapper(realAppClient,
+        appCallbackExecutor);
+  }
+
+  @Override
+  public void setShouldUnregister() {
+    this.shouldUnregister.set(true);
+  }
+
+  // AbstractService methods
+  @Override
+  public synchronized void serviceInit(Configuration conf) {
+
+    amRmClient.init(conf);
+    int heartbeatIntervalMax = conf.getInt(
+        TezConfiguration.TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX,
+        TezConfiguration.TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX_DEFAULT);
+    amRmClient.setHeartbeatInterval(heartbeatIntervalMax);
+
+    shouldReuseContainers = conf.getBoolean(
+        TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED,
+        TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED_DEFAULT);
+    reuseRackLocal = conf.getBoolean(
+        TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED,
+        TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED_DEFAULT);
+    reuseNonLocal = conf
+      .getBoolean(
+        TezConfiguration.TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED,
+        TezConfiguration.TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED_DEFAULT);
+    Preconditions.checkArgument(
+      ((!reuseRackLocal && !reuseNonLocal) || (reuseRackLocal)),
+      "Re-use Rack-Local cannot be disabled if Re-use Non-Local has been"
+      + " enabled");
+
+    localitySchedulingDelay = conf.getLong(
+      TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS,
+      TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS_DEFAULT);
+    Preconditions.checkArgument(localitySchedulingDelay >= 0,
+        "Locality Scheduling delay should be >=0");
+
+    sessionDelay = conf.getLong(
+        TezConfiguration.TEZ_AM_CONTAINER_SESSION_DELAY_ALLOCATION_MILLIS,
+        TezConfiguration.TEZ_AM_CONTAINER_SESSION_DELAY_ALLOCATION_MILLIS_DEFAULT);
+    Preconditions.checkArgument(sessionDelay >= 0 || sessionDelay == -1,
+      "Session delay should be either -1 or >=0");
+
+    delayedContainerManager = new DelayedContainerManager();
+    LOG.info("TaskScheduler initialized with configuration: " +
+            "maxRMHeartbeatInterval: " + heartbeatIntervalMax +
+            ", containerReuseEnabled: " + shouldReuseContainers +
+            ", reuseRackLocal: " + reuseRackLocal +
+            ", reuseNonLocal: " + reuseNonLocal + 
+            ", localitySchedulingDelay: " + localitySchedulingDelay +
+            ", sessionDelay=" + sessionDelay);
+  }
+
+  @Override
+  public void serviceStart() {
+    try {
+      RegisterApplicationMasterResponse response;
+      synchronized (this) {
+        amRmClient.start();
+        response = amRmClient.registerApplicationMaster(appHostName,
+                                                        appHostPort,
+                                                        appTrackingUrl);
+      }
+      // upcall to app outside locks
+      appClientDelegate.setApplicationRegistrationData(
+          response.getMaximumResourceCapability(),
+          response.getApplicationACLs(),
+          response.getClientToAMTokenMasterKey());
+
+      delayedContainerManager.start();
+    } catch (YarnException e) {
+      LOG.error("Yarn Exception while registering", e);
+      throw new TezUncheckedException(e);
+    } catch (IOException e) {
+      LOG.error("IO Exception while registering", e);
+      throw new TezUncheckedException(e);
+    }
+  }
+
+  @Override
+  public void serviceStop() throws InterruptedException {
+    // upcall to app outside of locks
+    try {
+      delayedContainerManager.shutdown();
+      // Wait for contianers to be released.
+      delayedContainerManager.join(2000l);
+      synchronized (this) {
+        isStopped.set(true);
+        if (shouldUnregister.get()) {
+          AppFinalStatus status = appClientDelegate.getFinalAppStatus();
+          LOG.info("Unregistering application from RM"
+              + ", exitStatus=" + status.exitStatus
+              + ", exitMessage=" + status.exitMessage
+              + ", trackingURL=" + status.postCompletionTrackingUrl);
+          amRmClient.unregisterApplicationMaster(status.exitStatus,
+              status.exitMessage,
+              status.postCompletionTrackingUrl);
+        }
+      }
+
+      // call client.stop() without lock client will attempt to stop the callback
+      // operation and at the same time the callback operation might be trying
+      // to get our lock.
+      amRmClient.stop();
+      appCallbackExecutor.shutdown();
+      appCallbackExecutor.awaitTermination(1000l, TimeUnit.MILLISECONDS);
+    } catch (YarnException e) {
+      LOG.error("Yarn Exception while unregistering ", e);
+      throw new TezUncheckedException(e);
+    } catch (IOException e) {
+      LOG.error("IOException while unregistering ", e);
+      throw new TezUncheckedException(e);
+    }
+  }
+
+  // AMRMClientAsync interface methods
+  @Override
+  public void onContainersCompleted(List<ContainerStatus> statuses) {
+    if (isStopped.get()) {
+      return;
+    }
+    Map<Object, ContainerStatus> appContainerStatus =
+                        new HashMap<Object, ContainerStatus>(statuses.size());
+    synchronized (this) {
+      for(ContainerStatus containerStatus : statuses) {
+        ContainerId completedId = containerStatus.getContainerId();
+        HeldContainer delayedContainer = heldContainers.get(completedId);
+
+        Object task = releasedContainers.remove(completedId);
+        if(task != null){
+          if (delayedContainer != null) {
+            LOG.warn("Held container should be null since releasedContainer is not");
+          }
+          // TODO later we may want to check if exit code matched expectation
+          // e.g. successful container should not come back fail exit code after
+          // being released
+          // completion of a container we had released earlier
+          // an allocated container completed. notify app
+          LOG.info("Released container completed:" + completedId +
+                   " last allocated to task: " + task);
+          appContainerStatus.put(task, containerStatus);
+          continue;
+        }
+
+        // not found in released containers. check currently allocated containers
+        // no need to release this container as the RM has already completed it
+        task = unAssignContainer(completedId, false);
+        if (delayedContainer != null) {
+          heldContainers.remove(completedId);
+          Resources.subtract(allocatedResources, delayedContainer.getContainer().getResource());
+        } else {
+          LOG.warn("Held container expected to be not null for a non-AM-released container");
+        }
+        if(task != null) {
+          // completion of a container we have allocated currently
+          // an allocated container completed. notify app
+          LOG.info("Allocated container completed:" + completedId +
+                   " last allocated to task: " + task);
+          appContainerStatus.put(task, containerStatus);
+          continue;
+        }
+
+        // container neither allocated nor released
+        LOG.info("Ignoring unknown container: " + containerStatus.getContainerId());
+      }
+    }
+
+    // upcall to app must be outside locks
+    for (Entry<Object, ContainerStatus> entry : appContainerStatus.entrySet()) {
+      appClientDelegate.containerCompleted(entry.getKey(), entry.getValue());
+    }
+  }
+
+  @Override
+  public void onContainersAllocated(List<Container> containers) {
+    if (isStopped.get()) {
+      return;
+    }
+    Map<CookieContainerRequest, Container> assignedContainers;
+
+    if (LOG.isDebugEnabled()) {
+      StringBuilder sb = new StringBuilder();
+      for (Container container: containers) {
+        sb.append(container.getId()).append(", ");
+      }
+      LOG.debug("Assigned New Containers: " + sb.toString());
+    }
+
+    synchronized (this) {
+      if (!shouldReuseContainers) {
+        List<Container> modifiableContainerList = Lists.newLinkedList(containers);
+        assignedContainers = assignNewlyAllocatedContainers(
+            modifiableContainerList);
+      } else {
+        // unify allocations
+        pushNewContainerToDelayed(containers);
+        return;
+      }
+    }
+
+    // upcall to app must be outside locks
+    informAppAboutAssignments(assignedContainers);
+  }
+
+  /**
+   * Tries assigning the list of specified containers. Optionally, release
+   * containers or add them to the delayed container queue.
+   *
+   * The flags apply to all containers in the specified lists. So, separate
+   * calls should be made based on the expected behaviour.
+   *
+   * @param containers
+   *          The list of containers to be assigned. The list *may* be modified
+   *          in place based on allocations and releases.
+   * @return Assignments.
+   */
+  private synchronized Map<CookieContainerRequest, Container>
+      assignNewlyAllocatedContainers(Iterable<Container> containers) {
+
+    Map<CookieContainerRequest, Container> assignedContainers =
+        new HashMap<CookieContainerRequest, Container>();
+    assignNewContainersWithLocation(containers,
+      NODE_LOCAL_ASSIGNER, assignedContainers);
+    assignNewContainersWithLocation(containers,
+      RACK_LOCAL_ASSIGNER, assignedContainers);
+    assignNewContainersWithLocation(containers,
+      NON_LOCAL_ASSIGNER, assignedContainers);
+
+    // Release any unassigned containers given by the RM
+    releaseUnassignedContainers(containers);
+
+    return assignedContainers;
+  }
+
+  private synchronized Map<CookieContainerRequest, Container>
+      tryAssignReUsedContainers(Iterable<Container> containers) {
+
+    Map<CookieContainerRequest, Container> assignedContainers =
+      new HashMap<CookieContainerRequest, Container>();
+
+    // Honor locality and match as many as possible
+    assignReUsedContainersWithLocation(containers,
+      NODE_LOCAL_ASSIGNER, assignedContainers, true);
+    assignReUsedContainersWithLocation(containers,
+      RACK_LOCAL_ASSIGNER, assignedContainers, true);
+    assignReUsedContainersWithLocation(containers,
+      NON_LOCAL_ASSIGNER, assignedContainers, true);
+
+    return assignedContainers;
+  }
+
+  /**
+   * Try to assign a re-used container
+   * @param heldContainer Container to be used to assign to tasks
+   * @return Assigned container map
+   */
+
+  private synchronized Map<CookieContainerRequest, Container>
+      assignDelayedContainer(HeldContainer heldContainer) {
+
+    DAGAppMasterState state = appContext.getAMState();
+    boolean isNew = heldContainer.isNew();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Trying to assign a delayed container"
+        + ", containerId=" + heldContainer.getContainer().getId()
+        + ", nextScheduleTime=" + heldContainer.getNextScheduleTime()
+        + ", containerExpiryTime=" + heldContainer.getContainerExpiryTime()
+        + ", AMState=" + state
+        + ", matchLevel=" + heldContainer.getLocalityMatchLevel()
+        + ", taskRequestsCount=" + taskRequests.size()
+        + ", heldContainers=" + heldContainers.size()
+        + ", delayedContainers=" + delayedContainerManager.delayedContainers.size()
+        + ", isNew=" + isNew);
+    }
+
+    if (state.equals(DAGAppMasterState.IDLE) || taskRequests.isEmpty()) {
+      // reset locality level on held container
+      // if sessionDelay defined, push back into delayed queue if not already
+      // done so
+
+      heldContainer.resetLocalityMatchLevel();
+      long currentTime = System.currentTimeMillis();
+      if (isNew || (heldContainer.getContainerExpiryTime() <= currentTime
+          && sessionDelay != -1)) {
+        LOG.info("No taskRequests. Container's session delay expired or is new. " +
+        	"Releasing container"
+          + ", containerId=" + heldContainer.container.getId()
+          + ", containerExpiryTime="
+          + heldContainer.getContainerExpiryTime()
+          + ", sessionDelay=" + sessionDelay
+          + ", taskRequestsCount=" + taskRequests.size()
+          + ", heldContainers=" + heldContainers.size()
+          + ", delayedContainers=" + delayedContainerManager.delayedContainers.size()
+          + ", isNew=" + isNew);
+        releaseUnassignedContainers(
+            Lists.newArrayList(heldContainer.container));
+      } else {
+        if (!appContext.isSession()) {
+          releaseUnassignedContainers(
+            Lists.newArrayList(heldContainer.container));
+        } else {
+          // only put back in queue if this is a session
+          heldContainer.resetLocalityMatchLevel();
+          delayedContainerManager.addDelayedContainer(
+            heldContainer.getContainer(),
+            currentTime + localitySchedulingDelay);
+        }
+      }
+    } else if (state.equals(DAGAppMasterState.RUNNING)) {
+      HeldContainer.LocalityMatchLevel localityMatchLevel =
+        heldContainer.getLocalityMatchLevel();
+      Map<CookieContainerRequest, Container> assignedContainers =
+        new HashMap<CookieContainerRequest, Container>();
+
+      Container containerToAssign = heldContainer.container;
+
+      heldContainer.incrementAssignmentAttempts();
+      // Each time a container is seen, we try node, rack and non-local in that
+      // order depending on matching level allowed
+
+      // if match level is NEW or NODE, match only at node-local
+      // always try node local matches for other levels
+      if (isNew
+          || localityMatchLevel.equals(HeldContainer.LocalityMatchLevel.NEW)
+          || localityMatchLevel.equals(HeldContainer.LocalityMatchLevel.NODE)
+          || localityMatchLevel.equals(HeldContainer.LocalityMatchLevel.RACK)
+          || localityMatchLevel.equals(HeldContainer.LocalityMatchLevel.NON_LOCAL)) {
+        assignReUsedContainerWithLocation(containerToAssign,
+            NODE_LOCAL_ASSIGNER, assignedContainers, true);
+        if (LOG.isDebugEnabled() && assignedContainers.isEmpty()) {
+          LOG.info("Failed to assign tasks to delayed container using node"
+            + ", containerId=" + heldContainer.getContainer().getId());
+        }
+      }
+
+      // if re-use allowed at rack
+      // match against rack if match level is RACK or NON-LOCAL
+      // if scheduling delay is 0, match at RACK allowed without a sleep
+      if (assignedContainers.isEmpty()) {
+        if ((reuseRackLocal || isNew) && (localitySchedulingDelay == 0 ||
+          (localityMatchLevel.equals(HeldContainer.LocalityMatchLevel.RACK)
+            || localityMatchLevel.equals(
+              HeldContainer.LocalityMatchLevel.NON_LOCAL)))) {
+          assignReUsedContainerWithLocation(containerToAssign,
+              RACK_LOCAL_ASSIGNER, assignedContainers, false);
+          if (LOG.isDebugEnabled() && assignedContainers.isEmpty()) {
+            LOG.info("Failed to assign tasks to delayed container using rack"
+              + ", containerId=" + heldContainer.getContainer().getId());
+          }
+        }
+      }
+
+      // if re-use allowed at non-local
+      // match against rack if match level is NON-LOCAL
+      // if scheduling delay is 0, match at NON-LOCAL allowed without a sleep
+      if (assignedContainers.isEmpty()) {
+        if ((reuseNonLocal || isNew) && (localitySchedulingDelay == 0
+            || localityMatchLevel.equals(
+                HeldContainer.LocalityMatchLevel.NON_LOCAL))) {
+         assignReUsedContainerWithLocation(containerToAssign,
+              NON_LOCAL_ASSIGNER, assignedContainers, false);
+          if (LOG.isDebugEnabled() && assignedContainers.isEmpty()) {
+            LOG.info("Failed to assign tasks to delayed container using non-local"
+                + ", containerId=" + heldContainer.getContainer().getId());
+          }
+        }
+      }
+
+      if (assignedContainers.isEmpty()) {
+
+        long currentTime = System.currentTimeMillis();
+
+        // Release container if final expiry time is reached
+        // Dont release a new container. The RM may not give us new ones
+        if (!isNew && heldContainer.getContainerExpiryTime() <= currentTime
+          && sessionDelay != -1) {
+          LOG.info("Container's session delay expired. Releasing container"
+            + ", containerId=" + heldContainer.container.getId()
+            + ", containerExpiryTime="
+            + heldContainer.getContainerExpiryTime()
+            + ", sessionDelay=" + sessionDelay);
+          releaseUnassignedContainers(
+            Lists.newArrayList(heldContainer.container));
+        } else {
+
+          // Let's decide if this container has hit the end of the road
+
+          // EOL true if container's match level is NON-LOCAL
+          boolean hitFinalMatchLevel = localityMatchLevel.equals(
+            HeldContainer.LocalityMatchLevel.NON_LOCAL);
+          if (!hitFinalMatchLevel) {
+            // EOL also true if locality delay is 0
+            // or rack-local or non-local is disabled
+            heldContainer.incrementLocalityMatchLevel();
+            if (localitySchedulingDelay == 0 ||
+                (!reuseRackLocal
+                  || (!reuseNonLocal &&
+                    heldContainer.getLocalityMatchLevel().equals(
+                        HeldContainer.LocalityMatchLevel.NON_LOCAL)))) {
+              hitFinalMatchLevel = true;
+            }
+            // the above if-stmt does not apply to new containers since they will
+            // be matched at all locality levels. So there finalMatchLevel cannot
+            // be short-circuited
+            if (localitySchedulingDelay > 0 && isNew) {
+              hitFinalMatchLevel = false;
+            }
+          }
+          
+          if (hitFinalMatchLevel) {
+            boolean safeToRelease = true;
+            Priority topPendingPriority = amRmClient.getTopPriority();
+            Priority containerPriority = heldContainer.container.getPriority();
+            if (isNew && topPendingPriority != null &&
+                containerPriority.compareTo(topPendingPriority) < 0) {
+              // this container is of lower priority and given to us by the RM for
+              // a task that will be matched after the current top priority. Keep 
+              // this container for those pending tasks since the RM is not going
+              // to give this container to us again
+              safeToRelease = false;
+            }
+            
+            // Are there any pending requests at any priority?
+            // release if there are tasks or this is not a session
+            if (safeToRelease && 
+                (!taskRequests.isEmpty() || !appContext.isSession())) {
+              LOG.info("Releasing held container as either there are pending but "
+                + " unmatched requests or this is not a session"
+                + ", containerId=" + heldContainer.container.getId()
+                + ", pendingTasks=" + !taskRequests.isEmpty()
+                + ", isSession=" + appContext.isSession()
+                + ". isNew=" + isNew);
+              releaseUnassignedContainers(
+                Lists.newArrayList(heldContainer.container));
+            } else {
+              // if no tasks, treat this like an idle session
+              heldContainer.resetLocalityMatchLevel();
+              delayedContainerManager.addDelayedContainer(
+                heldContainer.getContainer(),
+                currentTime + localitySchedulingDelay);
+            }
+          } else {
+            // Schedule delay container to match at a later try
+            delayedContainerManager.addDelayedContainer(
+                heldContainer.getContainer(),
+                currentTime + localitySchedulingDelay);
+          }
+        }
+      } else if (LOG.isDebugEnabled()) {
+        LOG.debug("Delayed container assignment successful"
+            + ", containerId=" + heldContainer.getContainer().getId());
+      }
+
+      return assignedContainers;
+    } else {
+      // ignore all other cases?
+      LOG.warn("Received a request to assign re-used containers when AM was "
+        + " in state: " + state + ". Ignoring request and releasing container"
+        + ": " + heldContainer.getContainer().getId());
+      releaseUnassignedContainers(Lists.newArrayList(heldContainer.container));
+    }
+
+    return null;
+  }
+
+  @Override
+  public synchronized void resetMatchLocalityForAllHeldContainers() {
+    for (HeldContainer heldContainer : heldContainers.values()) {
+      heldContainer.resetLocalityMatchLevel();
+    }
+    synchronized(delayedContainerManager) {
+      delayedContainerManager.notify();
+    }
+  }
+
+  @Override
+  public void onShutdownRequest() {
+    if (isStopped.get()) {
+      return;
+    }
+    // upcall to app must be outside locks
+    appClientDelegate.appShutdownRequested();
+  }
+
+  @Override
+  public void onNodesUpdated(List<NodeReport> updatedNodes) {
+    if (isStopped.get()) {
+      return;
+    }
+    // ignore bad nodes for now
+    // upcall to app must be outside locks
+    appClientDelegate.nodesUpdated(updatedNodes);
+  }
+
+  @Override
+  public float getProgress() {
+    if (isStopped.get()) {
+      return 1;
+    }
+
+    if(totalResources.getMemory() == 0) {
+      // assume this is the first allocate callback. nothing is allocated.
+      // available resource = totalResource
+      // TODO this will not handle dynamic changes in resources
+      totalResources = Resources.clone(getAvailableResources());
+      LOG.info("App total resource memory: " + totalResources.getMemory() +
+               " cpu: " + totalResources.getVirtualCores() +
+               " taskAllocations: " + taskAllocations.size());
+    }
+
+    preemptIfNeeded();
+
+    return appClientDelegate.getProgress();
+  }
+
+  @Override
+  public void onError(Throwable t) {
+    if (isStopped.get()) {
+      return;
+    }
+    appClientDelegate.onError(t);
+  }
+
+  @Override
+  public Resource getTotalResources() {
+    return totalResources;
+  }
+
+  @Override
+  public synchronized void blacklistNode(NodeId nodeId) {
+    LOG.info("Blacklisting node: " + nodeId);
+    amRmClient.addNodeToBlacklist(nodeId);
+    blacklistedNodes.add(nodeId);
+  }
+  
+  @Override
+  public synchronized void unblacklistNode(NodeId nodeId) {
+    if (blacklistedNodes.remove(nodeId)) {
+      LOG.info("UnBlacklisting node: " + nodeId);
+      amRmClient.removeNodeFromBlacklist(nodeId);
+    }
+  }
+  
+  @Override
+  public synchronized void allocateTask(
+      Object task,
+      Resource capability,
+      String[] hosts,
+      String[] racks,
+      Priority priority,
+      Object containerSignature,
+      Object clientCookie) {
+
+    // XXX Have ContainerContext implement an interface defined by TaskScheduler.
+    // TODO check for nulls etc
+    // TODO extra memory allocation
+    CRCookie cookie = new CRCookie(task, clientCookie, containerSignature);
+    CookieContainerRequest request = new CookieContainerRequest(
+      capability, hosts, racks, priority, cookie);
+
+    addRequestAndTrigger(task, request, hosts, racks);
+  }
+  
+  @Override
+  public synchronized void allocateTask(
+      Object task,
+      Resource capability,
+      ContainerId containerId,
+      Priority priority,
+      Object containerSignature,
+      Object clientCookie) {
+
+    HeldContainer heldContainer = heldContainers.get(containerId);
+    String[] hosts = null;
+    String[] racks = null;
+    if (heldContainer != null) {
+      Container container = heldContainer.getContainer();
+      if (canFit(capability, container.getResource())) {
+        // just specify node and use YARN's soft locality constraint for the rest
+        hosts = new String[1];
+        hosts[0] = container.getNodeId().getHost();
+        priorityHasAffinity.add(priority);
+      } else {
+        LOG.warn("Matching requested to container: " + containerId +
+            " but requested capability: " + capability + 
+            " does not fit in container resource: "  + container.getResource());
+      }
+    } else {
+      LOG.warn("Matching requested to unknown container: " + containerId);
+    }
+    
+    CRCookie cookie = new CRCookie(task, clientCookie, containerSignature);
+    CookieContainerRequest request = new CookieContainerRequest(
+      capability, containerId, hosts, racks, priority, cookie);
+
+    addRequestAndTrigger(task, request, hosts, racks);
+  }
+  
+  private void addRequestAndTrigger(Object task, CookieContainerRequest request,
+      String[] hosts, String[] racks) {
+    addTaskRequest(task, request);
+    // See if any of the delayedContainers can be used for this task.
+    delayedContainerManager.triggerScheduling(true);
+    LOG.info("Allocation request for task: " + task +
+      " with request: " + request + 
+      " host: " + ((hosts!=null&&hosts.length>0)?hosts[0]:"null") +
+      " rack: " + ((racks!=null&&racks.length>0)?racks[0]:"null"));
+  }
+
+  /**
+   * @param task
+   *          the task to de-allocate.
+   * @param taskSucceeded
+   *          specify whether the task succeeded or failed.
+   * @return true if a container is assigned to this task.
+   */
+  @Override
+  public boolean deallocateTask(Object task, boolean taskSucceeded) {
+    Map<CookieContainerRequest, Container> assignedContainers = null;
+
+    synchronized (this) {
+      CookieContainerRequest request = removeTaskRequest(task);
+      if (request != null) {
+        // task not allocated yet
+        LOG.info("Deallocating task: " + task + " before allocation");
+        return false;
+      }
+
+      // task request not present. Look in allocations
+      Container container = doBookKeepingForTaskDeallocate(task);
+      if (container == null) {
+        // task neither requested nor allocated.
+        LOG.info("Ignoring removal of unknown task: " + task);
+        return false;
+      } else {
+        LOG.info("Deallocated task: " + task + " from container: "
+            + container.getId());
+
+        if (!taskSucceeded || !shouldReuseContainers) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Releasing container, containerId=" + container.getId()
+                + ", taskSucceeded=" + taskSucceeded
+                + ", reuseContainersFlag=" + shouldReuseContainers);
+          }
+          releaseContainer(container.getId());
+        } else {
+          // Don't attempt to delay containers if delay is 0.
+          HeldContainer heldContainer = heldContainers.get(container.getId());
+          if (heldContainer != null) {
+            heldContainer.resetLocalityMatchLevel();
+            long currentTime = System.currentTimeMillis();
+            if (sessionDelay > 0) {
+              heldContainer.setContainerExpiryTime(currentTime + sessionDelay);
+            }
+            assignedContainers = assignDelayedContainer(heldContainer);
+          } else {
+            LOG.info("Skipping container after task deallocate as container is"
+                + " no longer running, containerId=" + container.getId());
+          }
+        }
+      }
+    }
+
+    // up call outside of the lock.
+    if (assignedContainers != null && assignedContainers.size() == 1) {
+      informAppAboutAssignments(assignedContainers);
+    }
+    return true;
+  }
+  
+  @Override
+  public synchronized Object deallocateContainer(ContainerId containerId) {
+    Object task = unAssignContainer(containerId, true);
+    if(task != null) {
+      LOG.info("Deallocated container: " + containerId +
+        " from task: " + task);
+      return task;
+    }
+
+    LOG.info("Ignoring dealloction of unknown container: " + containerId);
+    return null;
+  }
+
+  boolean canFit(Resource arg0, Resource arg1) {
+    int mem0 = arg0.getMemory();
+    int mem1 = arg1.getMemory();
+    int cpu0 = arg0.getVirtualCores();
+    int cpu1 = arg1.getVirtualCores();
+    
+    if(mem0 <= mem1 && cpu0 <= cpu1) { 
+      return true;
+    }
+    return false; 
+  }
+  
+  void preemptIfNeeded() {
+    ContainerId preemptedContainer = null;
+    synchronized (this) {
+      Resource freeResources = Resources.subtract(totalResources,
+        allocatedResources);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Allocated resource memory: " + allocatedResources.getMemory() +
+          " cpu:" + allocatedResources.getVirtualCores() + 
+          " delayedContainers: " + delayedContainerManager.delayedContainers.size());
+      }
+      assert freeResources.getMemory() >= 0;
+  
+      CookieContainerRequest highestPriRequest = null;
+      for(CookieContainerRequest request : taskRequests.values()) {
+        if(highestPriRequest == null) {
+          highestPriRequest = request;
+        } else if(isHigherPriority(request.getPriority(),
+                                     highestPriRequest.getPriority())){
+          highestPriRequest = request;
+        }
+      }
+      if(highestPriRequest != null &&
+         !fitsIn(highestPriRequest.getCapability(), freeResources)) {
+        // highest priority request will not fit in existing free resources
+        // free up some more
+        // TODO this is subject to error wrt RM resource normalization
+        
+        // This request must have been considered for matching with all existing 
+        // containers when request was made.
+        Container lowestPriNewContainer = null;
+        // could not find anything to preempt. Check if we can release unused 
+        // containers
+        for (HeldContainer heldContainer : delayedContainerManager.delayedContainers) {
+          if (!heldContainer.isNew()) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Reused container exists. Wait for assignment loop to release it. "
+                  + heldContainer.getContainer().getId());
+            }
+            return;
+          }
+          if (heldContainer.geNumAssignmentAttempts() < 3) {
+            // we havent tried to assign this container at node/rack/ANY
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Brand new container. Wait for assignment loop to match it. "
+                  + heldContainer.getContainer().getId());
+            }
+            return;
+          }
+          Container container = heldContainer.getContainer();
+          if (lowestPriNewContainer == null ||
+              isHigherPriority(lowestPriNewContainer.getPriority(), container.getPriority())){
+            // there is a lower priority new container
+            lowestPriNewContainer = container;
+          }
+        }
+        
+        if (lowestPriNewContainer != null) {
+          LOG.info("Preempting new container: " + lowestPriNewContainer.getId() +
+              " with priority: " + lowestPriNewContainer.getPriority() + 
+              " to free resource for request: " + highestPriRequest +
+              " . Current free resources: " + freeResources);
+          releaseUnassignedContainers(Collections.singletonList(lowestPriNewContainer));
+          // We are returning an unused resource back the RM. The RM thinks it 
+          // has serviced our initial request and will not re-allocate this back
+          // to us anymore. So we need to ask for this again. If there is no
+          // outstanding request at that priority then its fine to not ask again.
+          // See TEZ-915 for more details
+          for (Map.Entry<Object, CookieContainerRequest> entry : taskRequests.entrySet()) {
+            Object task = entry.getKey();
+            CookieContainerRequest request = entry.getValue();
+            if (request.getPriority().equals(lowestPriNewContainer.getPriority())) {
+              LOG.info("Resending request for task again: " + task);
+              deallocateTask(task, true);
+              allocateTask(task, request.getCapability(), 
+                  (request.getNodes() == null ? null : 
+                    request.getNodes().toArray(new String[request.getNodes().size()])), 
+                    (request.getRacks() == null ? null : 
+                      request.getRacks().toArray(new String[request.getRacks().size()])), 
+                    request.getPriority(), 
+                    request.getCookie().getContainerSignature(),
+                    request.getCookie().getAppCookie());
+              break;
+            }
+          }
+          
+          return;
+        }
+        
+        // this assert will be a no-op in production but can help identify 
+        // invalid assumptions during testing
+        assert delayedContainerManager.delayedContainers.isEmpty();
+        
+        // there are no reused or new containers to release
+        // try to preempt running containers
+        Map.Entry<Object, Container> preemptedEntry = null;
+        for(Map.Entry<Object, Container> entry : taskAllocations.entrySet()) {
+          HeldContainer heldContainer = heldContainers.get(entry.getValue().getId());
+          CookieContainerRequest lastTaskInfo = heldContainer.getLastTaskInfo();
+          Priority taskPriority = lastTaskInfo.getPriority();
+          Object signature = lastTaskInfo.getCookie().getContainerSignature();
+          if(!isHigherPriority(highestPriRequest.getPriority(), taskPriority)) {
+            // higher or same priority
+            continue;
+          }
+          if (containerSignatureMatcher.isExactMatch(
+              highestPriRequest.getCookie().getContainerSignature(),
+              signature)) {
+            // exact match with different priorities
+            continue;
+          }
+          if(preemptedEntry == null ||
+             !isHigherPriority(taskPriority, 
+                 preemptedEntry.getValue().getPriority())) {
+            // keep the lower priority or the one added later
+            preemptedEntry = entry;
+          }
+        }
+        if(preemptedEntry != null) {
+          // found something to preempt
+          LOG.info("Preempting task: " + preemptedEntry.getKey() +
+              " to free resource for request: " + highestPriRequest +
+              " . Current free resources: " + freeResources);
+          preemptedContainer = preemptedEntry.getValue().getId();
+          // app client will be notified when after container is killed
+          // and we get its completed container status
+        }
+      }
+    }
+    
+    // upcall outside locks
+    if (preemptedContainer != null) {
+      appClientDelegate.preemptContainer(preemptedContainer);
+    }
+  }
+
+  private boolean fitsIn(Resource toFit, Resource resource) {
+    // YARN-893 prevents using correct library code
+    //return Resources.fitsIn(toFit, resource);
+    return resource.getMemory() >= toFit.getMemory();
+  }
+
+  private CookieContainerRequest getMatchingRequestWithPriority(
+      Container container,
+      String location) {
+    Priority priority = container.getPriority();
+    Resource capability = container.getResource();
+    List<? extends Collection<CookieContainerRequest>> requestsList =
+        amRmClient.getMatchingRequests(priority, location, capability);
+
+    if (!requestsList.isEmpty()) {
+      // pick first one
+      for (Collection<CookieContainerRequest> requests : requestsList) {
+        for (CookieContainerRequest cookieContainerRequest : requests) {
+          if (canAssignTaskToContainer(cookieContainerRequest, container)) {
+            return cookieContainerRequest;
+          }
+        }
+      }
+    }
+
+    return null;
+  }
+
+  private CookieContainerRequest getMatchingRequestWithoutPriority(
+      Container container,
+      String location,
+      boolean considerContainerAffinity) {
+    Resource capability = container.getResource();
+    List<? extends Collection<CookieContainerRequest>> pRequestsList =
+      amRmClient.getMatchingRequestsForTopPriority(location, capability);
+    if (considerContainerAffinity && 
+        !priorityHasAffinity.contains(amRmClient.getTopPriority())) {
+      considerContainerAffinity = false;
+    }
+    if (pRequestsList == null || pRequestsList.isEmpty()) {
+      return null;
+    }
+    CookieContainerRequest firstMatch = null;
+    for (Collection<CookieContainerRequest> requests : pRequestsList) {
+      for (CookieContainerRequest cookieContainerRequest : requests) {
+        if (firstMatch == null || // we dont have a match. So look for one 
+            // we have a match but are looking for a better container level match.
+            // skip the expensive canAssignTaskToContainer() if the request is 
+            // not affinitized to the container
+            container.getId().equals(cookieContainerRequest.getAffinitizedContainer())
+            ) {
+          if (canAssignTaskToContainer(cookieContainerRequest, container)) {
+            // request matched to container
+            if (!considerContainerAffinity) {
+              return cookieContainerRequest;
+            }
+            ContainerId affCId = cookieContainerRequest.getAffinitizedContainer();
+            boolean canMatchTaskWithAffinity = true;
+            if (affCId == null || 
+                !heldContainers.containsKey(affCId) ||
+                inUseContainers.contains(affCId)) {
+              // affinity not specified
+              // affinitized container is no longer held
+              // affinitized container is in use
+              canMatchTaskWithAffinity = false;
+            }
+            if (canMatchTaskWithAffinity) {
+              if (container.getId().equals(
+                  cookieContainerRequest.getAffinitizedContainer())) {
+                // container level match
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug("Matching with affinity for request: "
+                      + cookieContainerRequest + " container: " + affCId);
+                }
+                return cookieContainerRequest;
+              }
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Skipping request for container " + container.getId()
+                    + " due to affinity. Request: " + cookieContainerRequest
+                    + " affContainer: " + affCId);
+              }
+            } else {
+              firstMatch = cookieContainerRequest;
+            }
+          }
+        }
+      }
+    }
+    
+    return firstMatch;
+  }
+
+  private boolean canAssignTaskToContainer(
+      CookieContainerRequest cookieContainerRequest, Container container) {
+    HeldContainer heldContainer = heldContainers.get(container.getId());
+    if (heldContainer == null || heldContainer.isNew()) { // New container.
+      return true;
+    } else {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Trying to match task to a held container, "
+            + " containerId=" + heldContainer.container.getId());
+      }
+      if (containerSignatureMatcher.isSuperSet(heldContainer
+          .getFirstContainerSignature(), cookieContainerRequest.getCookie()
+          .getContainerSignature())) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Matched delayed container to task"
+            + " containerId=" + heldContainer.container.getId());
+        }
+        return true;
+      }
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Failed to match delayed container to task"
+        + " containerId=" + heldContainer.container.getId());
+    }
+    return false;
+  }
+
+  private Object getTask(CookieContainerRequest request) {
+    return request.getCookie().getTask();
+  }
+
+  private void releaseContainer(ContainerId containerId) {
+    Object assignedTask = containerAssignments.remove(containerId);
+    if (assignedTask != null) {
+      // A task was assigned to this container at some point. Inform the app.
+      appClientDelegate.containerBeingReleased(containerId);
+    }
+    HeldContainer delayedContainer = heldContainers.remove(containerId);
+    if (delayedContainer != null) {
+      Resources.subtractFrom(allocatedResources,
+          delayedContainer.getContainer().getResource());
+    }
+    if (delayedContainer != null || !shouldReuseContainers) {
+      amRmClient.releaseAssignedContainer(containerId);
+    }
+    if (assignedTask != null) {
+      // A task was assigned at some point. Add to release list since we are
+      // releasing the container.
+      releasedContainers.put(containerId, assignedTask);
+    }
+  }
+
+  private void assignContainer(Object task,
+      Container container,
+      CookieContainerRequest assigned) {
+    CookieContainerRequest request = removeTaskRequest(task);
+    assert request != null;
+    //assert assigned.equals(request);
+
+    Container result = taskAllocations.put(task, container);
+    assert result == null;
+    inUseContainers.add(container.getId());
+    containerAssignments.put(container.getId(), task);
+    HeldContainer heldContainer = heldContainers.get(container.getId()); 
+    if (!shouldReuseContainers && heldContainer == null) {
+      heldContainers.put(container.getId(), new HeldContainer(container,
+        -1, -1, assigned));
+      Resources.addTo(allocatedResources, container.getResource());
+    } else {
+      if (heldContainer.isNew()) {
+        // check for existence before adding since the first container potentially
+        // has the broadest signature as subsequent uses dont expand any dimension.
+        // This will need to be enhanced to track other signatures too when we
+        // think about preferring within vertex matching etc.
+        heldContainers.put(container.getId(),
+            new HeldContainer(container, heldContainer.getNextScheduleTime(),
+                heldContainer.getContainerExpiryTime(), assigned));
+      }
+      heldContainer.setLastTaskInfo(assigned);
+    }
+  }
+  
+  private void pushNewContainerToDelayed(List<Container> containers){
+    long expireTime = -1;
+    if (sessionDelay > 0) {
+      long currentTime = System.currentTimeMillis();
+      expireTime = currentTime + sessionDelay;
+    }
+
+    synchronized (delayedContainerManager) {
+      for (Container container : containers) {
+        if (heldContainers.put(container.getId(), new HeldContainer(container,
+            -1, expireTime, null)) != null) {
+          throw new TezUncheckedException("New container " + container.getId()
+              + " is already held.");
+        }
+        long nextScheduleTime = delayedContainerManager.maxScheduleTimeSeen;
+        if (delayedContainerManager.maxScheduleTimeSeen == -1) {
+          nextScheduleTime = System.currentTimeMillis();
+        }
+        Resources.addTo(allocatedResources, container.getResource());
+        delayedContainerManager.addDelayedContainer(container,
+          nextScheduleTime + 1);
+      }
+    }
+    delayedContainerManager.triggerScheduling(false);      
+  }
+
+  private CookieContainerRequest removeTaskRequest(Object task) {
+    CookieContainerRequest request = taskRequests.remove(task);
+    if(request != null) {
+      // remove all references of the request from AMRMClient
+      amRmClient.removeContainerRequest(request);
+    }
+    return request;
+  }
+
+  private void addTaskRequest(Object task,
+                                CookieContainerRequest request) {
+    CookieContainerRequest oldRequest = taskRequests.put(task, request);
+    if (oldRequest != null) {
+      // remove all references of the request from AMRMClient
+      amRmClient.removeContainerRequest(oldRequest);
+    }
+    amRmClient.addContainerRequest(request);
+  }
+
+  private Container doBookKeepingForTaskDeallocate(Object task) {
+    Container container = taskAllocations.remove(task);
+    if (container == null) {
+      return null;
+    }
+    inUseContainers.remove(container.getId());
+    return container;
+  }
+
+  private Object unAssignContainer(ContainerId containerId,
+                                    boolean releaseIfFound) {
+    // Not removing. containerAssignments tracks the last task run on a
+    // container.
+    Object task = containerAssignments.get(containerId);
+    if(task == null) {
+      return null;
+    }
+    Container container = taskAllocations.remove(task);
+    assert container != null;
+    inUseContainers.remove(containerId);
+    if(releaseIfFound) {
+      releaseContainer(containerId);
+    }
+    return task;
+  }
+
+  private boolean isHigherPriority(Priority lhs, Priority rhs) {
+    return lhs.getPriority() < rhs.getPriority();
+  }
+
+  private synchronized void assignNewContainersWithLocation(
+      Iterable<Container> containers,
+      ContainerAssigner assigner,
+      Map<CookieContainerRequest, Container> assignedContainers) {
+
+    Iterator<Container> containerIterator = containers.iterator();
+    while (containerIterator.hasNext()) {
+      Container container = containerIterator.next();
+      CookieContainerRequest assigned =
+        assigner.assignNewContainer(container);
+      if (assigned != null) {
+        assignedContainers.put(assigned, container);
+        containerIterator.remove();
+      }
+    }
+  }
+
+  private synchronized void assignReUsedContainersWithLocation(
+      Iterable<Container> containers,
+      ContainerAssigner assigner,
+      Map<CookieContainerRequest, Container> assignedContainers,
+      boolean honorLocality) {
+
+    Iterator<Container> containerIterator = containers.iterator();
+    while (containerIterator.hasNext()) {
+      Container container = containerIterator.next();
+      if (assignReUsedContainerWithLocation(container, assigner,
+          assignedContainers, honorLocality)) {
+        containerIterator.remove();
+      }
+    }
+  }
+
+  private synchronized boolean assignReUsedContainerWithLocation(
+    Container container,
+    ContainerAssigner assigner,
+    Map<CookieContainerRequest, Container> assignedContainers,
+    boolean honorLocality) {
+
+    Priority containerPriority = container.getPriority();
+    Priority topPendingTaskPriority = amRmClient.getTopPriority();
+    if (topPendingTaskPriority == null) {
+      // nothing left to assign
+      return false;
+    }
+    
+    if (topPendingTaskPriority.compareTo(containerPriority) > 0) {
+      // if the next task to assign is higher priority than the container then 
+      // dont assign this container to that task.
+      // if task and container are equal priority - then its first use or reuse
+      // within the same priority - safe to use
+      // if task is lower priority than container then its we use a container that
+      // is no longer needed by higher priority tasks All those higher pri tasks 
+      // have been assigned resources - safe to use (first use or reuse)
+      // if task is higher priority than container then we may end up using a 
+      // container that was assigned by the RM for a lower priority pending task 
+      // that will be assigned after this higher priority task is assigned. If we
+      // use that task's container now then we may not be able to match this 
+      // container to that task later on. However the RM has already assigned us 
+      // all containers and is not going to give us new containers. We will get 
+      // stuck for resources.
+      return false;
+    }
+    
+    CookieContainerRequest assigned =
+      assigner.assignReUsedContainer(container, honorLocality);
+    if (assigned != null) {
+      assignedContainers.put(assigned, container);
+      return true;
+    }
+    return false;
+  }
+
+  private void releaseUnassignedContainers(Iterable<Container> containers) {
+    for (Container container : containers) {
+      LOG.info("Releasing unused container: "
+          + container.getId());
+      releaseContainer(container.getId());
+    }
+  }
+
+  private void informAppAboutAssignment(CookieContainerRequest assigned,
+      Container container) {
+    appClientDelegate.taskAllocated(getTask(assigned),
+        assigned.getCookie().getAppCookie(), container);
+  }
+
+  private void informAppAboutAssignments(
+      Map<CookieContainerRequest, Container> assignedContainers) {
+    if (assignedContainers == null || assignedContainers.isEmpty()) {
+      return;
+    }
+    for (Entry<CookieContainerRequest, Container> entry : assignedContainers
+        .entrySet()) {
+      Container container = entry.getValue();
+      // check for blacklisted nodes. There may be race conditions between
+      // setting blacklist and receiving allocations
+      if (blacklistedNodes.contains(container.getNodeId())) {
+        CookieContainerRequest request = entry.getKey();
+        Object task = getTask(request);
+        LOG.info("Container: " + container.getId() + 
+            " allocated on blacklisted node: " + container.getNodeId() + 
+            " for task: " + task);
+        Object deAllocTask = deallocateContainer(container.getId());
+        assert deAllocTask.equals(task);
+        // its ok to submit the same request again because the RM will not give us
+        // the bad/unhealthy nodes again. The nodes may become healthy/unblacklisted
+        // and so its better to give the RM the full information.
+        allocateTask(task, request.getCapability(), 
+            (request.getNodes() == null ? null : 
+            request.getNodes().toArray(new String[request.getNodes().size()])), 
+            (request.getRacks() == null ? null : 
+              request.getRacks().toArray(new String[request.getRacks().size()])), 
+            request.getPriority(), 
+            request.getCookie().getContainerSignature(), 
+            request.getCookie().getAppCookie());
+      } else {
+        informAppAboutAssignment(entry.getKey(), container);
+      }
+    }
+  }
+
+  private abstract class ContainerAssigner {
+
+    protected final String locality;
+
+    protected ContainerAssigner(String locality) {
+      this.locality = locality;
+    }
+
+    public abstract CookieContainerRequest assignNewContainer(
+        Container container);
+
+    public abstract CookieContainerRequest assignReUsedContainer(
+      Container container, boolean honorLocality);
+
+    public void doBookKeepingForAssignedContainer(
+        CookieContainerRequest assigned, Container container,
+        String matchedLocation, boolean honorLocalityFlags) {
+      if (assigned == null) {
+        return;
+      }
+      Object task = getTask(assigned);
+      assert task != null;
+
+      LOG.info("Assigning container to task"
+        + ", container=" + container
+        + ", task=" + task
+        + ", containerHost=" + container.getNodeId().getHost()
+        + ", localityMatchType=" + locality
+        + ", matchedLocation=" + matchedLocation
+        + ", honorLocalityFlags=" + honorLocalityFlags
+        + ", reusedContainer="
+        + containerAssignments.containsKey(container.getId())
+        + ", delayedContainers=" + delayedContainerManager.delayedContainers.size()
+        + ", containerResourceMemory=" + container.getResource().getMemory()
+        + ", containerResourceVCores="
+        + container.getResource().getVirtualCores());
+
+      assignContainer(task, container, assigned);
+    }
+  }
+  
+  private class NodeLocalContainerAssigner extends ContainerAssigner {
+
+    NodeLocalContainerAssigner() {
+      super("NodeLocal");
+    }
+
+    @Override
+    public CookieContainerRequest assignNewContainer(Container container) {
+      String location = container.getNodeId().getHost();
+      CookieContainerRequest assigned = getMatchingRequestWithPriority(
+          container, location);
+      doBookKeepingForAssignedContainer(assigned, container, location, false);
+      return assigned;
+    }
+
+    @Override
+    public CookieContainerRequest assignReUsedContainer(Container container,
+        boolean honorLocality) {
+      String location = container.getNodeId().getHost();
+      CookieContainerRequest assigned = getMatchingRequestWithoutPriority(
+        container, location, true);
+      doBookKeepingForAssignedContainer(assigned, container, location, true);
+      return assigned;
+
+    }
+  }
+
+  private class RackLocalContainerAssigner extends ContainerAssigner {
+
+    RackLocalContainerAssigner() {
+      super("RackLocal");
+    }
+
+    @Override
+    public CookieContainerRequest assignNewContainer(Container container) {
+      String location = RackResolver.resolve(container.getNodeId().getHost())
+          .getNetworkLocation();
+      CookieContainerRequest assigned = getMatchingRequestWithPriority(container,
+          location);
+      doBookKeepingForAssignedContainer(assigned, container, location, false);
+      return assigned;
+    }
+
+    @Override
+    public CookieContainerRequest assignReUsedContainer(
+      Container container, boolean honorLocality) {
+      // TEZ-586 this is not match an actual rackLocal request unless honorLocality
+      // is false. This method is useless if honorLocality=true
+      if (!honorLocality) {
+        String location = RackResolver.resolve(container.getNodeId().getHost())
+          .getNetworkLocation();
+        CookieContainerRequest assigned = getMatchingRequestWithoutPriority(
+            container, location, false);
+        doBookKeepingForAssignedContainer(assigned, container, location,
+            honorLocality);
+        return assigned;
+      }
+      return null;
+    }
+  }
+
+  private class NonLocalContainerAssigner extends ContainerAssigner {
+
+    NonLocalContainerAssigner() {
+      super("NonLocal");
+    }
+
+    @Override
+    public CookieContainerRequest assignNewContainer(Container container) {
+      String location = ResourceRequest.ANY;
+      CookieContainerRequest assigned = getMatchingRequestWithPriority(container,
+          location);
+      doBookKeepingForAssignedContainer(assigned, container, location, false);
+      return assigned;
+    }
+
+    @Override
+    public CookieContainerRequest assignReUsedContainer(Container container,
+        boolean honorLocality) {
+      if (!honorLocality) {
+        String location = ResourceRequest.ANY;
+        CookieContainerRequest assigned = getMatchingRequestWithoutPriority(
+          container, location, false);
+        doBookKeepingForAssignedContainer(assigned, container, location,
+            honorLocality);
+        return assigned;
+      }
+      return null;
+    }
+
+  }
+  
+  
+  @VisibleForTesting
+  class DelayedContainerManager extends Thread {
+
+    class HeldContainerTimerComparator implements Comparator<HeldContainer> {
+
+      @Override
+      public int compare(HeldContainer c1,
+          HeldContainer c2) {
+        return (int) (c1.getNextScheduleTime() - c2.getNextScheduleTime());
+      }
+    }
+
+    PriorityBlockingQueue<HeldContainer> delayedContainers =
+      new PriorityBlockingQueue<HeldContainer>(20,
+        new HeldContainerTimerComparator());
+
+    private volatile boolean tryAssigningAll = false;
+    private volatile boolean running = true;
+    private long maxScheduleTimeSeen = -1;
+    
+    // used for testing only
+    @VisibleForTesting
+    volatile AtomicBoolean drainedDelayedContainersForTest = null;
+
+    DelayedContainerManager() {
+      super.setName("DelayedContainerManager");
+    }
+    
+    @Override
+    public void run() {
+      while(running) {
+        // Try assigning all containers if there's a request to do so.
+        if (tryAssigningAll) {
+          doAssignAll();
+          tryAssigningAll = false;
+        }
+
+        // Try allocating containers which have timed out.
+        // Required since these containers may get assigned without
+        // locality at this point.
+        if (delayedContainers.peek() == null) {
+          try {
+            // test only signaling to make TestTaskScheduler work
+            if (drainedDelayedContainersForTest != null) {
+              drainedDelayedContainersForTest.set(true);
+              synchronized (drainedDelayedContainersForTest) {
+                drainedDelayedContainersForTest.notifyAll();
+              }
+            }
+            synchronized(this) {
+              this.wait();
+            }
+            // Re-loop to see if tryAssignAll is set.
+            continue;
+          } catch (InterruptedException e) {
+            LOG.info("AllocatedContainerManager Thread interrupted");
+          }
+        } else {
+          // test only sleep to prevent tight loop cycling that makes tests stall
+          if (drainedDelayedContainersForTest != null) {
+            try {
+              Thread.sleep(100);
+            } catch (InterruptedException e) {
+              e.printStackTrace();
+            }
+          }
+          HeldContainer delayedContainer = delayedContainers.peek();
+          if (delayedContainer == null) {
+            continue;
+          }
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Considering HeldContainer: "
+              + delayedContainer + " for assignment");
+          }
+          long currentTs = System.currentTimeMillis();
+          long nextScheduleTs = delayedContainer.getNextScheduleTime();
+          if (currentTs >= nextScheduleTs) {
+            // Remove the container and try scheduling it.
+            // TEZ-587 what if container is released by RM after this
+            // in onContainerCompleted()
+            delayedContainer = delayedContainers.poll();
+            if (delayedContainer == null) {
+              continue;
+            }
+            Map<CookieContainerRequest, Container> assignedContainers = null;
+            synchronized(YarnTaskSchedulerService.this) {
+              if (null !=
+                  heldContainers.get(delayedContainer.getContainer().getId())) {
+                assignedContainers = assignDelayedContainer(delayedContainer);
+              } else {
+                LOG.info("Skipping delayed container as container is no longer"
+                  + " running, containerId="
+                  + delayedContainer.getContainer().getId());
+              }
+            }
+            // Inform App should be done outside of the lock
+            informAppAboutAssignments(assignedContainers);
+          } else {
+            synchronized(this) {
+              try {
+                // Wait for the next container to be assignable
+                delayedContainer = delayedContainers.peek();
+                long diff = localitySchedulingDelay;
+                if (delayedContainer != null) {
+                  diff = delayedContainer.getNextScheduleTime() - currentTs;
+                }
+                if (diff > 0) {
+                  this.wait(diff);
+                }
+              } catch (InterruptedException e) {
+                LOG.info("AllocatedContainerManager Thread interrupted");
+              }
+            }
+          }
+        }
+      }
+      releasePendingContainers();
+    }
+    
+    private void doAssignAll() {
+      // The allocatedContainers queue should not be modified in the middle of an iteration over it.
+      // Synchronizing here on TaskScheduler.this to prevent this from happening.
+      // The call to assignAll from within this method should NOT add any
+      // elements back to the allocatedContainers list. Since they're all
+      // delayed elements, de-allocation should not happen either - leaving the
+      // list of delayed containers intact, except for the contaienrs which end
+      // up getting assigned.
+      if (delayedContainers.isEmpty()) {
+        return;
+      }
+
+      Map<CookieContainerRequest, Container> assignedContainers;
+      synchronized(YarnTaskSchedulerService.this) {
+        // honor reuse-locality flags (container not timed out yet), Don't queue
+        // (already in queue), don't release (release happens when containers
+        // time-out)
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Trying to assign all delayed containers to newly received"
+            + " tasks");
+        }
+        Iterator<HeldContainer> iter = delayedContainers.iterator();
+        while(iter.hasNext()) {
+          HeldContainer delayedContainer = iter.next();
+          if (!heldContainers.containsKey(delayedContainer.getContainer().getId())) {
+            // this container is no longer held by us
+            LOG.info("AssignAll - Skipping delayed container as container is no longer"
+                + " running, containerId="
+                + delayedContainer.getContainer().getId());
+            iter.remove();
+          }
+        }
+        assignedContainers = tryAssignReUsedContainers(
+          new ContainerIterable(delayedContainers));
+      }
+      // Inform app
+      informAppAboutAssignments(assignedContainers);
+    }
+    
+    /**
+     * Indicate that an attempt should be made to allocate all available containers.
+     * Intended to be used in cases where new Container requests come in 
+     */
+    public void triggerScheduling(boolean scheduleAll) {
+      this.tryAssigningAll = scheduleAll;
+      synchronized(this) {
+        this.notify();
+      }
+    }
+
+    public void shutdown() {
+      this.running = false;
+      this.interrupt();
+    }
+    
+    private void releasePendingContainers() {
+      List<HeldContainer> pendingContainers = Lists.newArrayListWithCapacity(
+        delayedContainers.size());
+      delayedContainers.drainTo(pendingContainers);
+      releaseUnassignedContainers(new ContainerIterable(pendingContainers));
+    }
+
+    private void addDelayedContainer(Container container,
+        long nextScheduleTime) {
+      HeldContainer delayedContainer = heldContainers.get(container.getId());
+      if (delayedContainer == null) {
+        LOG.warn("Attempting to add a non-running container to the"
+            + " delayed container list, containerId=" + container.getId());
+        return;
+      } else {
+        delayedContainer.setNextScheduleTime(nextScheduleTime);
+      }
+      if (maxScheduleTimeSeen < nextScheduleTime) {
+        maxScheduleTimeSeen = nextScheduleTime;
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Adding container to delayed queue"
+          + ", containerId=" + delayedContainer.getContainer().getId()
+          + ", nextScheduleTime=" + delayedContainer.getNextScheduleTime()
+          + ", containerExpiry=" + delayedContainer.getContainerExpiryTime());
+      }
+      boolean added = delayedContainers.offer(delayedContainer);
+      synchronized(this) {
+        this.notify();
+      }
+      if (!added) {
+        releaseUnassignedContainers(Lists.newArrayList(container));
+      }
+    }
+
+  }
+
+  private class ContainerIterable implements Iterable<Container> {
+
+    private final Iterable<HeldContainer> delayedContainers;
+
+    ContainerIterable(Iterable<HeldContainer> delayedContainers) {
+      this.delayedContainers = delayedContainers;
+    }
+
+    @Override
+    public Iterator<Container> iterator() {
+
+      final Iterator<HeldContainer> delayedContainerIterator = delayedContainers
+          .iterator();
+
+      return new Iterator<Container>() {
+
+        @Override
+        public boolean hasNext() {
+          return delayedContainerIterator.hasNext();
+        }
+
+        @Override
+        public Container next() {
+          return delayedContainerIterator.next().getContainer();
+        }
+
+        @Override
+        public void remove() {
+          delayedContainerIterator.remove();
+        }
+      };
+    }
+  }
+
+  static class HeldContainer {
+
+    enum LocalityMatchLevel {
+      NEW,
+      NODE,
+      RACK,
+      NON_LOCAL
+    }
+
+    Container container;
+    private long nextScheduleTime;
+    private Object firstContainerSignature;
+    private LocalityMatchLevel localityMatchLevel;
+    private long containerExpiryTime;
+    private CookieContainerRequest lastTaskInfo;
+    private int numAssignmentAttempts = 0;
+    
+    HeldContainer(Container container,
+        long nextScheduleTime,
+        long containerExpiryTime,
+        CookieContainerRequest firstTaskInfo) {
+      this.container = container;
+      this.nextScheduleTime = nextScheduleTime;
+      if (firstTaskInfo != null) {
+        this.lastTaskInfo = firstTaskInfo;
+        this.firstContainerSignature = firstTaskInfo.getCookie().getContainerSignature();
+      }
+      this.localityMatchLevel = LocalityMatchLevel.NODE;
+      this.containerExpiryTime = containerExpiryTime;
+    }
+    
+    boolean isNew() {
+      return firstContainerSignature == null;
+    }
+    
+    int geNumAssignmentAttempts() {
+      return numAssignmentAttempts;
+    }
+    
+    void incrementAssignmentAttempts() {
+      numAssignmentAttempts++;
+    }
+    
+    public Container getContainer() {
+      return this.container;
+    }
+    
+    public long getNextScheduleTime() {
+      return this.nextScheduleTime;
+    }
+    
+    public void setNextScheduleTime(long nextScheduleTime) {
+      this.nextScheduleTime = nextScheduleTime;
+    }
+
+    public long getContainerExpiryTime() {
+      return this.containerExpiryTime;
+    }
+
+    public void setContainerExpiryTime(long containerExpiryTime) {
+      this.containerExpiryTime = containerExpiryTime;
+    }
+
+    public Object getFirstContainerSignature() {
+      return this.firstContainerSignature;
+    }
+    
+    public CookieContainerRequest getLastTaskInfo() {
+      return this.lastTaskInfo;
+    }
+    
+    public void setLastTaskInfo(CookieContainerRequest taskInfo) {
+      lastTaskInfo = taskInfo;
+    }
+
+    public synchronized void resetLocalityMatchLevel() {
+      localityMatchLevel = LocalityMatchLevel.NEW;
+    }
+
+    public synchronized void incrementLocalityMatchLevel() {
+      if (localityMatchLevel.equals(LocalityMatchLevel.NEW)) {
+        localityMatchLevel = LocalityMatchLevel.NODE;
+      } else if (localityMatchLevel.equals(LocalityMatchLevel.NODE)) {
+        localityMatchLevel = LocalityMatchLevel.RACK;
+      } else if (localityMatchLevel.equals(LocalityMatchLevel.RACK)) {
+        localityMatchLevel = LocalityMatchLevel.NON_LOCAL;
+      } else if (localityMatchLevel.equals(LocalityMatchLevel.NON_LOCAL)) {
+        throw new TezUncheckedException("Cannot increment locality level "
+          + " from current NON_LOCAL for container: " + container.getId());
+      }
+    }
+
+    public LocalityMatchLevel getLocalityMatchLevel() {
+      return this.localityMatchLevel;
+    }
+
+    @Override
+    public String toString() {
+      return "HeldContainer: id: " + container.getId()
+          + ", nextScheduleTime: " + nextScheduleTime
+          + ", localityMatchLevel=" + localityMatchLevel
+          + ", signature: "
+          + (firstContainerSignature != null? firstContainerSignature.toString():"null");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/4dfd8341/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
index 307af71..c5d20d7 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
@@ -66,9 +66,9 @@ import org.apache.tez.dag.app.ContainerHeartbeatHandler;
 import org.apache.tez.dag.app.DAGAppMasterState;
 import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.dag.TaskAttempt;
-import org.apache.tez.dag.app.rm.TaskScheduler.CookieContainerRequest;
-import org.apache.tez.dag.app.rm.TaskScheduler.TaskSchedulerAppCallback;
-import org.apache.tez.dag.app.rm.TaskScheduler.TaskSchedulerAppCallback.AppFinalStatus;
+import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback;
+import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback.AppFinalStatus;
+import org.apache.tez.dag.app.rm.YarnTaskSchedulerService.CookieContainerRequest;
 import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.AMRMClientAsyncForTest;
 import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.AMRMClientForTest;
 import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.AlwaysMatchesContainerMatcher;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/4dfd8341/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java
index 1fb393f..c6e83c0 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java
@@ -34,10 +34,10 @@ import org.apache.hadoop.yarn.api.records.Priority;
 
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.rm.LocalTaskScheduler.AsyncDelegateRequestHandler;
-import org.apache.tez.dag.app.rm.LocalTaskScheduler.LocalContainerFactory;
-import org.apache.tez.dag.app.rm.LocalTaskScheduler.TaskRequest;
-import org.apache.tez.dag.app.rm.TaskScheduler.TaskSchedulerAppCallback;
+import org.apache.tez.dag.app.rm.LocalTaskSchedulerService.AsyncDelegateRequestHandler;
+import org.apache.tez.dag.app.rm.LocalTaskSchedulerService.LocalContainerFactory;
+import org.apache.tez.dag.app.rm.LocalTaskSchedulerService.TaskRequest;
+import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback;
 
 public class TestLocalTaskScheduler {
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/4dfd8341/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
index 9652da4..2e94dc0 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
@@ -64,10 +64,10 @@ import org.apache.hadoop.yarn.util.RackResolver;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.DAGAppMasterState;
-import org.apache.tez.dag.app.rm.TaskScheduler.CookieContainerRequest;
-import org.apache.tez.dag.app.rm.TaskScheduler.HeldContainer;
-import org.apache.tez.dag.app.rm.TaskScheduler.TaskSchedulerAppCallback;
-import org.apache.tez.dag.app.rm.TaskScheduler.TaskSchedulerAppCallback.AppFinalStatus;
+import org.apache.tez.dag.app.rm.YarnTaskSchedulerService.CookieContainerRequest;
+import org.apache.tez.dag.app.rm.YarnTaskSchedulerService.HeldContainer;
+import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback;
+import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback.AppFinalStatus;
 import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerAppCallbackDrainable;
 import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerWithDrainableAppCallback;
 import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.AlwaysMatchesContainerMatcher;
@@ -1311,9 +1311,9 @@ public class TestTaskScheduler {
     matchingMap.put(hostsTask1[0], host1List);
     matchingMap.put(defaultRack[0], defaultRackList);
 
-    List<CookieContainerRequest> nonAllocatedHostList = new ArrayList<TaskScheduler.CookieContainerRequest>();
+    List<CookieContainerRequest> nonAllocatedHostList = new ArrayList<YarnTaskSchedulerService.CookieContainerRequest>();
     nonAllocatedHostList.add(mockCookie2);
-    List<CookieContainerRequest> otherRackList = new ArrayList<TaskScheduler.CookieContainerRequest>();
+    List<CookieContainerRequest> otherRackList = new ArrayList<YarnTaskSchedulerService.CookieContainerRequest>();
     otherRackList.add(mockCookie2);
     taskScheduler.allocateTask(mockTask2, resource, hostsTask2, otherRack,
         priority, null, mockCookie2);
@@ -1321,7 +1321,7 @@ public class TestTaskScheduler {
     matchingMap.put(hostsTask2[0], nonAllocatedHostList);
     matchingMap.put(otherRack[0], otherRackList);
 
-    List<CookieContainerRequest> anyList = new LinkedList<TaskScheduler.CookieContainerRequest>();
+    List<CookieContainerRequest> anyList = new LinkedList<YarnTaskSchedulerService.CookieContainerRequest>();
     anyList.add(mockCookie1);
     anyList.add(mockCookie2);
 


[3/4] TEZ-1218. Make TaskScheduler an Abstract class instead of an Inteface. Contributed by Jeff Zhang.

Posted by ss...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/4dfd8341/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
deleted file mode 100644
index 67369ee..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
+++ /dev/null
@@ -1,1985 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app.rm;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.PriorityBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.NodeReport;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
-import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.util.RackResolver;
-import org.apache.hadoop.yarn.util.resource.Resources;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.DAGAppMasterState;
-import org.apache.tez.dag.app.rm.TaskScheduler.TaskSchedulerAppCallback.AppFinalStatus;
-import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-/* TODO not yet updating cluster nodes on every allocate response
- * from RMContainerRequestor
-   import org.apache.tez.dag.app.rm.node.AMNodeEventNodeCountUpdated;
-    if (clusterNmCount != lastClusterNmCount) {
-      LOG.info("Num cluster nodes changed from " + lastClusterNmCount + " to "
-          + clusterNmCount);
-      eventHandler.handle(new AMNodeEventNodeCountUpdated(clusterNmCount));
-    }
- */
-public class TaskScheduler extends AbstractService
-                             implements AMRMClientAsync.CallbackHandler, TaskSchedulerInterface {
-  private static final Log LOG = LogFactory.getLog(TaskScheduler.class);
-
-  public interface TaskSchedulerAppCallback {
-    public class AppFinalStatus {
-      public final FinalApplicationStatus exitStatus;
-      public final String exitMessage;
-      public final String postCompletionTrackingUrl;
-      public AppFinalStatus(FinalApplicationStatus exitStatus,
-                             String exitMessage,
-                             String posCompletionTrackingUrl) {
-        this.exitStatus = exitStatus;
-        this.exitMessage = exitMessage;
-        this.postCompletionTrackingUrl = posCompletionTrackingUrl;
-      }
-    }
-    // upcall to app must be outside locks
-    public void taskAllocated(Object task,
-                               Object appCookie,
-                               Container container);
-    // this may end up being called for a task+container pair that the app
-    // has not heard about. this can happen because of a race between
-    // taskAllocated() upcall and deallocateTask() downcall
-    public void containerCompleted(Object taskLastAllocated,
-                                    ContainerStatus containerStatus);
-    public void containerBeingReleased(ContainerId containerId);
-    public void nodesUpdated(List<NodeReport> updatedNodes);
-    public void appShutdownRequested();
-    public void setApplicationRegistrationData(
-                                Resource maxContainerCapability,
-                                Map<ApplicationAccessType, String> appAcls,
-                                ByteBuffer clientAMSecretKey
-                                );
-    public void onError(Throwable t);
-    public float getProgress();
-    public void preemptContainer(ContainerId containerId);
-    public AppFinalStatus getFinalAppStatus();
-  }
-
-  final TezAMRMClientAsync<CookieContainerRequest> amRmClient;
-  final TaskSchedulerAppCallback realAppClient;
-  final TaskSchedulerAppCallback appClientDelegate;
-  final ContainerSignatureMatcher containerSignatureMatcher;
-  ExecutorService appCallbackExecutor;
-
-  // Container Re-Use configuration
-  private boolean shouldReuseContainers;
-  private boolean reuseRackLocal;
-  private boolean reuseNonLocal;
-
-  Map<Object, CookieContainerRequest> taskRequests =
-                  new HashMap<Object, CookieContainerRequest>();
-  // LinkedHashMap is need in getProgress()
-  LinkedHashMap<Object, Container> taskAllocations =
-                  new LinkedHashMap<Object, Container>();
-  /**
-   * Tracks last task assigned to a known container.
-   */
-  Map<ContainerId, Object> containerAssignments =
-                  new HashMap<ContainerId, Object>();
-  // Remove inUse depending on resolution of TEZ-1129
-  Set<ContainerId> inUseContainers = Sets.newHashSet(); 
-  HashMap<ContainerId, Object> releasedContainers =
-                  new HashMap<ContainerId, Object>();
-  /**
-   * Map of containers currently being held by the TaskScheduler.
-   */
-  Map<ContainerId, HeldContainer> heldContainers =
-      new HashMap<ContainerId, HeldContainer>();
-  
-  Set<Priority> priorityHasAffinity = Sets.newHashSet();
-  
-  Set<NodeId> blacklistedNodes = Collections
-      .newSetFromMap(new ConcurrentHashMap<NodeId, Boolean>());
-  
-  Resource totalResources = Resource.newInstance(0, 0);
-  Resource allocatedResources = Resource.newInstance(0, 0);
-  
-  final String appHostName;
-  final int appHostPort;
-  final String appTrackingUrl;
-  final AppContext appContext;
-
-  AtomicBoolean isStopped = new AtomicBoolean(false);
-
-  private ContainerAssigner NODE_LOCAL_ASSIGNER = new NodeLocalContainerAssigner();
-  private ContainerAssigner RACK_LOCAL_ASSIGNER = new RackLocalContainerAssigner();
-  private ContainerAssigner NON_LOCAL_ASSIGNER = new NonLocalContainerAssigner();
-
-  DelayedContainerManager delayedContainerManager;
-  long localitySchedulingDelay;
-  long sessionDelay;
-
-  @VisibleForTesting
-  protected AtomicBoolean shouldUnregister = new AtomicBoolean(false);
-
-  class CRCookie {
-    // Do not use these variables directly. Can caused mocked unit tests to fail.
-    private Object task;
-    private Object appCookie;
-    private Object containerSignature;
-    
-    CRCookie(Object task, Object appCookie, Object containerSignature) {
-      this.task = task;
-      this.appCookie = appCookie;
-      this.containerSignature = containerSignature;
-    }
-
-    Object getTask() {
-      return task;
-    }
-
-    Object getAppCookie() {
-      return appCookie;
-    }
-    
-    Object getContainerSignature() {
-      return containerSignature;
-    }
-  }
-
-  class CookieContainerRequest extends ContainerRequest {
-    CRCookie cookie;
-    ContainerId affinitizedContainerId;
-
-    public CookieContainerRequest(
-        Resource capability,
-        String[] hosts,
-        String[] racks,
-        Priority priority,
-        CRCookie cookie) {
-      super(capability, hosts, racks, priority);
-      this.cookie = cookie;
-    }
-
-    public CookieContainerRequest(
-        Resource capability,
-        ContainerId containerId,
-        String[] hosts,
-        String[] racks,
-        Priority priority,
-        CRCookie cookie) {
-      this(capability, hosts, racks, priority, cookie);
-      this.affinitizedContainerId = containerId;
-    }
-
-    CRCookie getCookie() {
-      return cookie;
-    }
-    
-    ContainerId getAffinitizedContainer() {
-      return affinitizedContainerId;
-    }
-  }
-
-  public TaskScheduler(TaskSchedulerAppCallback appClient,
-                        ContainerSignatureMatcher containerSignatureMatcher,
-                        String appHostName,
-                        int appHostPort,
-                        String appTrackingUrl,
-                        AppContext appContext) {
-    super(TaskScheduler.class.getName());
-    this.realAppClient = appClient;
-    this.appCallbackExecutor = createAppCallbackExecutorService();
-    this.containerSignatureMatcher = containerSignatureMatcher;
-    this.appClientDelegate = createAppCallbackDelegate(appClient);
-    this.amRmClient = TezAMRMClientAsync.createAMRMClientAsync(1000, this);
-    this.appHostName = appHostName;
-    this.appHostPort = appHostPort;
-    this.appTrackingUrl = appTrackingUrl;
-    this.appContext = appContext;
-  }
-
-  @Private
-  @VisibleForTesting
-  TaskScheduler(TaskSchedulerAppCallback appClient,
-      ContainerSignatureMatcher containerSignatureMatcher,
-      String appHostName,
-      int appHostPort,
-      String appTrackingUrl,
-      TezAMRMClientAsync<CookieContainerRequest> client,
-      AppContext appContext) {
-    super(TaskScheduler.class.getName());
-    this.realAppClient = appClient;
-    this.appCallbackExecutor = createAppCallbackExecutorService();
-    this.containerSignatureMatcher = containerSignatureMatcher;
-    this.appClientDelegate = createAppCallbackDelegate(appClient);
-    this.amRmClient = client;
-    this.appHostName = appHostName;
-    this.appHostPort = appHostPort;
-    this.appTrackingUrl = appTrackingUrl;
-    this.appContext = appContext;
-  }
-
-  @VisibleForTesting
-  ExecutorService createAppCallbackExecutorService() {
-    return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
-        .setNameFormat("TaskSchedulerAppCaller #%d").setDaemon(true).build());
-  }
-  
-  @Override
-  public Resource getAvailableResources() {
-    return amRmClient.getAvailableResources();
-  }
-
-  @Override
-  public int getClusterNodeCount() {
-    // this can potentially be cheaper after YARN-1722
-    return amRmClient.getClusterNodeCount();
-  }
-
-  TaskSchedulerAppCallback createAppCallbackDelegate(
-      TaskSchedulerAppCallback realAppClient) {
-    return new TaskSchedulerAppCallbackWrapper(realAppClient,
-        appCallbackExecutor);
-  }
-
-  @Override
-  public void setShouldUnregister() {
-    this.shouldUnregister.set(true);
-  }
-
-  // AbstractService methods
-  @Override
-  public synchronized void serviceInit(Configuration conf) {
-
-    amRmClient.init(conf);
-    int heartbeatIntervalMax = conf.getInt(
-        TezConfiguration.TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX,
-        TezConfiguration.TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX_DEFAULT);
-    amRmClient.setHeartbeatInterval(heartbeatIntervalMax);
-
-    shouldReuseContainers = conf.getBoolean(
-        TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED,
-        TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED_DEFAULT);
-    reuseRackLocal = conf.getBoolean(
-        TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED,
-        TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED_DEFAULT);
-    reuseNonLocal = conf
-      .getBoolean(
-        TezConfiguration.TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED,
-        TezConfiguration.TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED_DEFAULT);
-    Preconditions.checkArgument(
-      ((!reuseRackLocal && !reuseNonLocal) || (reuseRackLocal)),
-      "Re-use Rack-Local cannot be disabled if Re-use Non-Local has been"
-      + " enabled");
-
-    localitySchedulingDelay = conf.getLong(
-      TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS,
-      TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS_DEFAULT);
-    Preconditions.checkArgument(localitySchedulingDelay >= 0,
-        "Locality Scheduling delay should be >=0");
-
-    sessionDelay = conf.getLong(
-        TezConfiguration.TEZ_AM_CONTAINER_SESSION_DELAY_ALLOCATION_MILLIS,
-        TezConfiguration.TEZ_AM_CONTAINER_SESSION_DELAY_ALLOCATION_MILLIS_DEFAULT);
-    Preconditions.checkArgument(sessionDelay >= 0 || sessionDelay == -1,
-      "Session delay should be either -1 or >=0");
-
-    delayedContainerManager = new DelayedContainerManager();
-    LOG.info("TaskScheduler initialized with configuration: " +
-            "maxRMHeartbeatInterval: " + heartbeatIntervalMax +
-            ", containerReuseEnabled: " + shouldReuseContainers +
-            ", reuseRackLocal: " + reuseRackLocal +
-            ", reuseNonLocal: " + reuseNonLocal + 
-            ", localitySchedulingDelay: " + localitySchedulingDelay +
-            ", sessionDelay=" + sessionDelay);
-  }
-
-  @Override
-  public void serviceStart() {
-    try {
-      RegisterApplicationMasterResponse response;
-      synchronized (this) {
-        amRmClient.start();
-        response = amRmClient.registerApplicationMaster(appHostName,
-                                                        appHostPort,
-                                                        appTrackingUrl);
-      }
-      // upcall to app outside locks
-      appClientDelegate.setApplicationRegistrationData(
-          response.getMaximumResourceCapability(),
-          response.getApplicationACLs(),
-          response.getClientToAMTokenMasterKey());
-
-      delayedContainerManager.start();
-    } catch (YarnException e) {
-      LOG.error("Yarn Exception while registering", e);
-      throw new TezUncheckedException(e);
-    } catch (IOException e) {
-      LOG.error("IO Exception while registering", e);
-      throw new TezUncheckedException(e);
-    }
-  }
-
-  @Override
-  public void serviceStop() throws InterruptedException {
-    // upcall to app outside of locks
-    try {
-      delayedContainerManager.shutdown();
-      // Wait for contianers to be released.
-      delayedContainerManager.join(2000l);
-      synchronized (this) {
-        isStopped.set(true);
-        if (shouldUnregister.get()) {
-          AppFinalStatus status = appClientDelegate.getFinalAppStatus();
-          LOG.info("Unregistering application from RM"
-              + ", exitStatus=" + status.exitStatus
-              + ", exitMessage=" + status.exitMessage
-              + ", trackingURL=" + status.postCompletionTrackingUrl);
-          amRmClient.unregisterApplicationMaster(status.exitStatus,
-              status.exitMessage,
-              status.postCompletionTrackingUrl);
-        }
-      }
-
-      // call client.stop() without lock client will attempt to stop the callback
-      // operation and at the same time the callback operation might be trying
-      // to get our lock.
-      amRmClient.stop();
-      appCallbackExecutor.shutdown();
-      appCallbackExecutor.awaitTermination(1000l, TimeUnit.MILLISECONDS);
-    } catch (YarnException e) {
-      LOG.error("Yarn Exception while unregistering ", e);
-      throw new TezUncheckedException(e);
-    } catch (IOException e) {
-      LOG.error("IOException while unregistering ", e);
-      throw new TezUncheckedException(e);
-    }
-  }
-
-  // AMRMClientAsync interface methods
-  @Override
-  public void onContainersCompleted(List<ContainerStatus> statuses) {
-    if (isStopped.get()) {
-      return;
-    }
-    Map<Object, ContainerStatus> appContainerStatus =
-                        new HashMap<Object, ContainerStatus>(statuses.size());
-    synchronized (this) {
-      for(ContainerStatus containerStatus : statuses) {
-        ContainerId completedId = containerStatus.getContainerId();
-        HeldContainer delayedContainer = heldContainers.get(completedId);
-
-        Object task = releasedContainers.remove(completedId);
-        if(task != null){
-          if (delayedContainer != null) {
-            LOG.warn("Held container should be null since releasedContainer is not");
-          }
-          // TODO later we may want to check if exit code matched expectation
-          // e.g. successful container should not come back fail exit code after
-          // being released
-          // completion of a container we had released earlier
-          // an allocated container completed. notify app
-          LOG.info("Released container completed:" + completedId +
-                   " last allocated to task: " + task);
-          appContainerStatus.put(task, containerStatus);
-          continue;
-        }
-
-        // not found in released containers. check currently allocated containers
-        // no need to release this container as the RM has already completed it
-        task = unAssignContainer(completedId, false);
-        if (delayedContainer != null) {
-          heldContainers.remove(completedId);
-          Resources.subtract(allocatedResources, delayedContainer.getContainer().getResource());
-        } else {
-          LOG.warn("Held container expected to be not null for a non-AM-released container");
-        }
-        if(task != null) {
-          // completion of a container we have allocated currently
-          // an allocated container completed. notify app
-          LOG.info("Allocated container completed:" + completedId +
-                   " last allocated to task: " + task);
-          appContainerStatus.put(task, containerStatus);
-          continue;
-        }
-
-        // container neither allocated nor released
-        LOG.info("Ignoring unknown container: " + containerStatus.getContainerId());
-      }
-    }
-
-    // upcall to app must be outside locks
-    for (Entry<Object, ContainerStatus> entry : appContainerStatus.entrySet()) {
-      appClientDelegate.containerCompleted(entry.getKey(), entry.getValue());
-    }
-  }
-
-  @Override
-  public void onContainersAllocated(List<Container> containers) {
-    if (isStopped.get()) {
-      return;
-    }
-    Map<CookieContainerRequest, Container> assignedContainers;
-
-    if (LOG.isDebugEnabled()) {
-      StringBuilder sb = new StringBuilder();
-      for (Container container: containers) {
-        sb.append(container.getId()).append(", ");
-      }
-      LOG.debug("Assigned New Containers: " + sb.toString());
-    }
-
-    synchronized (this) {
-      if (!shouldReuseContainers) {
-        List<Container> modifiableContainerList = Lists.newLinkedList(containers);
-        assignedContainers = assignNewlyAllocatedContainers(
-            modifiableContainerList);
-      } else {
-        // unify allocations
-        pushNewContainerToDelayed(containers);
-        return;
-      }
-    }
-
-    // upcall to app must be outside locks
-    informAppAboutAssignments(assignedContainers);
-  }
-
-  /**
-   * Tries assigning the list of specified containers. Optionally, release
-   * containers or add them to the delayed container queue.
-   *
-   * The flags apply to all containers in the specified lists. So, separate
-   * calls should be made based on the expected behaviour.
-   *
-   * @param containers
-   *          The list of containers to be assigned. The list *may* be modified
-   *          in place based on allocations and releases.
-   * @return Assignments.
-   */
-  private synchronized Map<CookieContainerRequest, Container>
-      assignNewlyAllocatedContainers(Iterable<Container> containers) {
-
-    Map<CookieContainerRequest, Container> assignedContainers =
-        new HashMap<CookieContainerRequest, Container>();
-    assignNewContainersWithLocation(containers,
-      NODE_LOCAL_ASSIGNER, assignedContainers);
-    assignNewContainersWithLocation(containers,
-      RACK_LOCAL_ASSIGNER, assignedContainers);
-    assignNewContainersWithLocation(containers,
-      NON_LOCAL_ASSIGNER, assignedContainers);
-
-    // Release any unassigned containers given by the RM
-    releaseUnassignedContainers(containers);
-
-    return assignedContainers;
-  }
-
-  private synchronized Map<CookieContainerRequest, Container>
-      tryAssignReUsedContainers(Iterable<Container> containers) {
-
-    Map<CookieContainerRequest, Container> assignedContainers =
-      new HashMap<CookieContainerRequest, Container>();
-
-    // Honor locality and match as many as possible
-    assignReUsedContainersWithLocation(containers,
-      NODE_LOCAL_ASSIGNER, assignedContainers, true);
-    assignReUsedContainersWithLocation(containers,
-      RACK_LOCAL_ASSIGNER, assignedContainers, true);
-    assignReUsedContainersWithLocation(containers,
-      NON_LOCAL_ASSIGNER, assignedContainers, true);
-
-    return assignedContainers;
-  }
-
-  /**
-   * Try to assign a re-used container
-   * @param heldContainer Container to be used to assign to tasks
-   * @return Assigned container map
-   */
-
-  private synchronized Map<CookieContainerRequest, Container>
-      assignDelayedContainer(HeldContainer heldContainer) {
-
-    DAGAppMasterState state = appContext.getAMState();
-    boolean isNew = heldContainer.isNew();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Trying to assign a delayed container"
-        + ", containerId=" + heldContainer.getContainer().getId()
-        + ", nextScheduleTime=" + heldContainer.getNextScheduleTime()
-        + ", containerExpiryTime=" + heldContainer.getContainerExpiryTime()
-        + ", AMState=" + state
-        + ", matchLevel=" + heldContainer.getLocalityMatchLevel()
-        + ", taskRequestsCount=" + taskRequests.size()
-        + ", heldContainers=" + heldContainers.size()
-        + ", delayedContainers=" + delayedContainerManager.delayedContainers.size()
-        + ", isNew=" + isNew);
-    }
-
-    if (state.equals(DAGAppMasterState.IDLE) || taskRequests.isEmpty()) {
-      // reset locality level on held container
-      // if sessionDelay defined, push back into delayed queue if not already
-      // done so
-
-      heldContainer.resetLocalityMatchLevel();
-      long currentTime = System.currentTimeMillis();
-      if (isNew || (heldContainer.getContainerExpiryTime() <= currentTime
-          && sessionDelay != -1)) {
-        LOG.info("No taskRequests. Container's session delay expired or is new. " +
-        	"Releasing container"
-          + ", containerId=" + heldContainer.container.getId()
-          + ", containerExpiryTime="
-          + heldContainer.getContainerExpiryTime()
-          + ", sessionDelay=" + sessionDelay
-          + ", taskRequestsCount=" + taskRequests.size()
-          + ", heldContainers=" + heldContainers.size()
-          + ", delayedContainers=" + delayedContainerManager.delayedContainers.size()
-          + ", isNew=" + isNew);
-        releaseUnassignedContainers(
-            Lists.newArrayList(heldContainer.container));
-      } else {
-        if (!appContext.isSession()) {
-          releaseUnassignedContainers(
-            Lists.newArrayList(heldContainer.container));
-        } else {
-          // only put back in queue if this is a session
-          heldContainer.resetLocalityMatchLevel();
-          delayedContainerManager.addDelayedContainer(
-            heldContainer.getContainer(),
-            currentTime + localitySchedulingDelay);
-        }
-      }
-    } else if (state.equals(DAGAppMasterState.RUNNING)) {
-      HeldContainer.LocalityMatchLevel localityMatchLevel =
-        heldContainer.getLocalityMatchLevel();
-      Map<CookieContainerRequest, Container> assignedContainers =
-        new HashMap<CookieContainerRequest, Container>();
-
-      Container containerToAssign = heldContainer.container;
-
-      heldContainer.incrementAssignmentAttempts();
-      // Each time a container is seen, we try node, rack and non-local in that
-      // order depending on matching level allowed
-
-      // if match level is NEW or NODE, match only at node-local
-      // always try node local matches for other levels
-      if (isNew
-          || localityMatchLevel.equals(HeldContainer.LocalityMatchLevel.NEW)
-          || localityMatchLevel.equals(HeldContainer.LocalityMatchLevel.NODE)
-          || localityMatchLevel.equals(HeldContainer.LocalityMatchLevel.RACK)
-          || localityMatchLevel.equals(HeldContainer.LocalityMatchLevel.NON_LOCAL)) {
-        assignReUsedContainerWithLocation(containerToAssign,
-            NODE_LOCAL_ASSIGNER, assignedContainers, true);
-        if (LOG.isDebugEnabled() && assignedContainers.isEmpty()) {
-          LOG.info("Failed to assign tasks to delayed container using node"
-            + ", containerId=" + heldContainer.getContainer().getId());
-        }
-      }
-
-      // if re-use allowed at rack
-      // match against rack if match level is RACK or NON-LOCAL
-      // if scheduling delay is 0, match at RACK allowed without a sleep
-      if (assignedContainers.isEmpty()) {
-        if ((reuseRackLocal || isNew) && (localitySchedulingDelay == 0 ||
-          (localityMatchLevel.equals(HeldContainer.LocalityMatchLevel.RACK)
-            || localityMatchLevel.equals(
-              HeldContainer.LocalityMatchLevel.NON_LOCAL)))) {
-          assignReUsedContainerWithLocation(containerToAssign,
-              RACK_LOCAL_ASSIGNER, assignedContainers, false);
-          if (LOG.isDebugEnabled() && assignedContainers.isEmpty()) {
-            LOG.info("Failed to assign tasks to delayed container using rack"
-              + ", containerId=" + heldContainer.getContainer().getId());
-          }
-        }
-      }
-
-      // if re-use allowed at non-local
-      // match against rack if match level is NON-LOCAL
-      // if scheduling delay is 0, match at NON-LOCAL allowed without a sleep
-      if (assignedContainers.isEmpty()) {
-        if ((reuseNonLocal || isNew) && (localitySchedulingDelay == 0
-            || localityMatchLevel.equals(
-                HeldContainer.LocalityMatchLevel.NON_LOCAL))) {
-         assignReUsedContainerWithLocation(containerToAssign,
-              NON_LOCAL_ASSIGNER, assignedContainers, false);
-          if (LOG.isDebugEnabled() && assignedContainers.isEmpty()) {
-            LOG.info("Failed to assign tasks to delayed container using non-local"
-                + ", containerId=" + heldContainer.getContainer().getId());
-          }
-        }
-      }
-
-      if (assignedContainers.isEmpty()) {
-
-        long currentTime = System.currentTimeMillis();
-
-        // Release container if final expiry time is reached
-        // Dont release a new container. The RM may not give us new ones
-        if (!isNew && heldContainer.getContainerExpiryTime() <= currentTime
-          && sessionDelay != -1) {
-          LOG.info("Container's session delay expired. Releasing container"
-            + ", containerId=" + heldContainer.container.getId()
-            + ", containerExpiryTime="
-            + heldContainer.getContainerExpiryTime()
-            + ", sessionDelay=" + sessionDelay);
-          releaseUnassignedContainers(
-            Lists.newArrayList(heldContainer.container));
-        } else {
-
-          // Let's decide if this container has hit the end of the road
-
-          // EOL true if container's match level is NON-LOCAL
-          boolean hitFinalMatchLevel = localityMatchLevel.equals(
-            HeldContainer.LocalityMatchLevel.NON_LOCAL);
-          if (!hitFinalMatchLevel) {
-            // EOL also true if locality delay is 0
-            // or rack-local or non-local is disabled
-            heldContainer.incrementLocalityMatchLevel();
-            if (localitySchedulingDelay == 0 ||
-                (!reuseRackLocal
-                  || (!reuseNonLocal &&
-                    heldContainer.getLocalityMatchLevel().equals(
-                        HeldContainer.LocalityMatchLevel.NON_LOCAL)))) {
-              hitFinalMatchLevel = true;
-            }
-            // the above if-stmt does not apply to new containers since they will
-            // be matched at all locality levels. So there finalMatchLevel cannot
-            // be short-circuited
-            if (localitySchedulingDelay > 0 && isNew) {
-              hitFinalMatchLevel = false;
-            }
-          }
-          
-          if (hitFinalMatchLevel) {
-            boolean safeToRelease = true;
-            Priority topPendingPriority = amRmClient.getTopPriority();
-            Priority containerPriority = heldContainer.container.getPriority();
-            if (isNew && topPendingPriority != null &&
-                containerPriority.compareTo(topPendingPriority) < 0) {
-              // this container is of lower priority and given to us by the RM for
-              // a task that will be matched after the current top priority. Keep 
-              // this container for those pending tasks since the RM is not going
-              // to give this container to us again
-              safeToRelease = false;
-            }
-            
-            // Are there any pending requests at any priority?
-            // release if there are tasks or this is not a session
-            if (safeToRelease && 
-                (!taskRequests.isEmpty() || !appContext.isSession())) {
-              LOG.info("Releasing held container as either there are pending but "
-                + " unmatched requests or this is not a session"
-                + ", containerId=" + heldContainer.container.getId()
-                + ", pendingTasks=" + !taskRequests.isEmpty()
-                + ", isSession=" + appContext.isSession()
-                + ". isNew=" + isNew);
-              releaseUnassignedContainers(
-                Lists.newArrayList(heldContainer.container));
-            } else {
-              // if no tasks, treat this like an idle session
-              heldContainer.resetLocalityMatchLevel();
-              delayedContainerManager.addDelayedContainer(
-                heldContainer.getContainer(),
-                currentTime + localitySchedulingDelay);
-            }
-          } else {
-            // Schedule delay container to match at a later try
-            delayedContainerManager.addDelayedContainer(
-                heldContainer.getContainer(),
-                currentTime + localitySchedulingDelay);
-          }
-        }
-      } else if (LOG.isDebugEnabled()) {
-        LOG.debug("Delayed container assignment successful"
-            + ", containerId=" + heldContainer.getContainer().getId());
-      }
-
-      return assignedContainers;
-    } else {
-      // ignore all other cases?
-      LOG.warn("Received a request to assign re-used containers when AM was "
-        + " in state: " + state + ". Ignoring request and releasing container"
-        + ": " + heldContainer.getContainer().getId());
-      releaseUnassignedContainers(Lists.newArrayList(heldContainer.container));
-    }
-
-    return null;
-  }
-
-  @Override
-  public synchronized void resetMatchLocalityForAllHeldContainers() {
-    for (HeldContainer heldContainer : heldContainers.values()) {
-      heldContainer.resetLocalityMatchLevel();
-    }
-    synchronized(delayedContainerManager) {
-      delayedContainerManager.notify();
-    }
-  }
-
-  @Override
-  public void onShutdownRequest() {
-    if (isStopped.get()) {
-      return;
-    }
-    // upcall to app must be outside locks
-    appClientDelegate.appShutdownRequested();
-  }
-
-  @Override
-  public void onNodesUpdated(List<NodeReport> updatedNodes) {
-    if (isStopped.get()) {
-      return;
-    }
-    // ignore bad nodes for now
-    // upcall to app must be outside locks
-    appClientDelegate.nodesUpdated(updatedNodes);
-  }
-
-  @Override
-  public float getProgress() {
-    if (isStopped.get()) {
-      return 1;
-    }
-
-    if(totalResources.getMemory() == 0) {
-      // assume this is the first allocate callback. nothing is allocated.
-      // available resource = totalResource
-      // TODO this will not handle dynamic changes in resources
-      totalResources = Resources.clone(getAvailableResources());
-      LOG.info("App total resource memory: " + totalResources.getMemory() +
-               " cpu: " + totalResources.getVirtualCores() +
-               " taskAllocations: " + taskAllocations.size());
-    }
-
-    preemptIfNeeded();
-
-    return appClientDelegate.getProgress();
-  }
-
-  @Override
-  public void onError(Throwable t) {
-    if (isStopped.get()) {
-      return;
-    }
-    appClientDelegate.onError(t);
-  }
-
-  @Override
-  public Resource getTotalResources() {
-    return totalResources;
-  }
-
-  @Override
-  public synchronized void blacklistNode(NodeId nodeId) {
-    LOG.info("Blacklisting node: " + nodeId);
-    amRmClient.addNodeToBlacklist(nodeId);
-    blacklistedNodes.add(nodeId);
-  }
-  
-  @Override
-  public synchronized void unblacklistNode(NodeId nodeId) {
-    if (blacklistedNodes.remove(nodeId)) {
-      LOG.info("UnBlacklisting node: " + nodeId);
-      amRmClient.removeNodeFromBlacklist(nodeId);
-    }
-  }
-  
-  @Override
-  public synchronized void allocateTask(
-      Object task,
-      Resource capability,
-      String[] hosts,
-      String[] racks,
-      Priority priority,
-      Object containerSignature,
-      Object clientCookie) {
-
-    // XXX Have ContainerContext implement an interface defined by TaskScheduler.
-    // TODO check for nulls etc
-    // TODO extra memory allocation
-    CRCookie cookie = new CRCookie(task, clientCookie, containerSignature);
-    CookieContainerRequest request = new CookieContainerRequest(
-      capability, hosts, racks, priority, cookie);
-
-    addRequestAndTrigger(task, request, hosts, racks);
-  }
-  
-  @Override
-  public synchronized void allocateTask(
-      Object task,
-      Resource capability,
-      ContainerId containerId,
-      Priority priority,
-      Object containerSignature,
-      Object clientCookie) {
-
-    HeldContainer heldContainer = heldContainers.get(containerId);
-    String[] hosts = null;
-    String[] racks = null;
-    if (heldContainer != null) {
-      Container container = heldContainer.getContainer();
-      if (canFit(capability, container.getResource())) {
-        // just specify node and use YARN's soft locality constraint for the rest
-        hosts = new String[1];
-        hosts[0] = container.getNodeId().getHost();
-        priorityHasAffinity.add(priority);
-      } else {
-        LOG.warn("Matching requested to container: " + containerId +
-            " but requested capability: " + capability + 
-            " does not fit in container resource: "  + container.getResource());
-      }
-    } else {
-      LOG.warn("Matching requested to unknown container: " + containerId);
-    }
-    
-    CRCookie cookie = new CRCookie(task, clientCookie, containerSignature);
-    CookieContainerRequest request = new CookieContainerRequest(
-      capability, containerId, hosts, racks, priority, cookie);
-
-    addRequestAndTrigger(task, request, hosts, racks);
-  }
-  
-  private void addRequestAndTrigger(Object task, CookieContainerRequest request,
-      String[] hosts, String[] racks) {
-    addTaskRequest(task, request);
-    // See if any of the delayedContainers can be used for this task.
-    delayedContainerManager.triggerScheduling(true);
-    LOG.info("Allocation request for task: " + task +
-      " with request: " + request + 
-      " host: " + ((hosts!=null&&hosts.length>0)?hosts[0]:"null") +
-      " rack: " + ((racks!=null&&racks.length>0)?racks[0]:"null"));
-  }
-
-  /**
-   * @param task
-   *          the task to de-allocate.
-   * @param taskSucceeded
-   *          specify whether the task succeeded or failed.
-   * @return true if a container is assigned to this task.
-   */
-  @Override
-  public boolean deallocateTask(Object task, boolean taskSucceeded) {
-    Map<CookieContainerRequest, Container> assignedContainers = null;
-
-    synchronized (this) {
-      CookieContainerRequest request = removeTaskRequest(task);
-      if (request != null) {
-        // task not allocated yet
-        LOG.info("Deallocating task: " + task + " before allocation");
-        return false;
-      }
-
-      // task request not present. Look in allocations
-      Container container = doBookKeepingForTaskDeallocate(task);
-      if (container == null) {
-        // task neither requested nor allocated.
-        LOG.info("Ignoring removal of unknown task: " + task);
-        return false;
-      } else {
-        LOG.info("Deallocated task: " + task + " from container: "
-            + container.getId());
-
-        if (!taskSucceeded || !shouldReuseContainers) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Releasing container, containerId=" + container.getId()
-                + ", taskSucceeded=" + taskSucceeded
-                + ", reuseContainersFlag=" + shouldReuseContainers);
-          }
-          releaseContainer(container.getId());
-        } else {
-          // Don't attempt to delay containers if delay is 0.
-          HeldContainer heldContainer = heldContainers.get(container.getId());
-          if (heldContainer != null) {
-            heldContainer.resetLocalityMatchLevel();
-            long currentTime = System.currentTimeMillis();
-            if (sessionDelay > 0) {
-              heldContainer.setContainerExpiryTime(currentTime + sessionDelay);
-            }
-            assignedContainers = assignDelayedContainer(heldContainer);
-          } else {
-            LOG.info("Skipping container after task deallocate as container is"
-                + " no longer running, containerId=" + container.getId());
-          }
-        }
-      }
-    }
-
-    // up call outside of the lock.
-    if (assignedContainers != null && assignedContainers.size() == 1) {
-      informAppAboutAssignments(assignedContainers);
-    }
-    return true;
-  }
-  
-  @Override
-  public synchronized Object deallocateContainer(ContainerId containerId) {
-    Object task = unAssignContainer(containerId, true);
-    if(task != null) {
-      LOG.info("Deallocated container: " + containerId +
-        " from task: " + task);
-      return task;
-    }
-
-    LOG.info("Ignoring dealloction of unknown container: " + containerId);
-    return null;
-  }
-
-  boolean canFit(Resource arg0, Resource arg1) {
-    int mem0 = arg0.getMemory();
-    int mem1 = arg1.getMemory();
-    int cpu0 = arg0.getVirtualCores();
-    int cpu1 = arg1.getVirtualCores();
-    
-    if(mem0 <= mem1 && cpu0 <= cpu1) { 
-      return true;
-    }
-    return false; 
-  }
-  
-  void preemptIfNeeded() {
-    ContainerId preemptedContainer = null;
-    synchronized (this) {
-      Resource freeResources = Resources.subtract(totalResources,
-        allocatedResources);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Allocated resource memory: " + allocatedResources.getMemory() +
-          " cpu:" + allocatedResources.getVirtualCores() + 
-          " delayedContainers: " + delayedContainerManager.delayedContainers.size());
-      }
-      assert freeResources.getMemory() >= 0;
-  
-      CookieContainerRequest highestPriRequest = null;
-      for(CookieContainerRequest request : taskRequests.values()) {
-        if(highestPriRequest == null) {
-          highestPriRequest = request;
-        } else if(isHigherPriority(request.getPriority(),
-                                     highestPriRequest.getPriority())){
-          highestPriRequest = request;
-        }
-      }
-      if(highestPriRequest != null &&
-         !fitsIn(highestPriRequest.getCapability(), freeResources)) {
-        // highest priority request will not fit in existing free resources
-        // free up some more
-        // TODO this is subject to error wrt RM resource normalization
-        
-        // This request must have been considered for matching with all existing 
-        // containers when request was made.
-        Container lowestPriNewContainer = null;
-        // could not find anything to preempt. Check if we can release unused 
-        // containers
-        for (HeldContainer heldContainer : delayedContainerManager.delayedContainers) {
-          if (!heldContainer.isNew()) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Reused container exists. Wait for assignment loop to release it. "
-                  + heldContainer.getContainer().getId());
-            }
-            return;
-          }
-          if (heldContainer.geNumAssignmentAttempts() < 3) {
-            // we havent tried to assign this container at node/rack/ANY
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Brand new container. Wait for assignment loop to match it. "
-                  + heldContainer.getContainer().getId());
-            }
-            return;
-          }
-          Container container = heldContainer.getContainer();
-          if (lowestPriNewContainer == null ||
-              isHigherPriority(lowestPriNewContainer.getPriority(), container.getPriority())){
-            // there is a lower priority new container
-            lowestPriNewContainer = container;
-          }
-        }
-        
-        if (lowestPriNewContainer != null) {
-          LOG.info("Preempting new container: " + lowestPriNewContainer.getId() +
-              " with priority: " + lowestPriNewContainer.getPriority() + 
-              " to free resource for request: " + highestPriRequest +
-              " . Current free resources: " + freeResources);
-          releaseUnassignedContainers(Collections.singletonList(lowestPriNewContainer));
-          // We are returning an unused resource back the RM. The RM thinks it 
-          // has serviced our initial request and will not re-allocate this back
-          // to us anymore. So we need to ask for this again. If there is no
-          // outstanding request at that priority then its fine to not ask again.
-          // See TEZ-915 for more details
-          for (Map.Entry<Object, CookieContainerRequest> entry : taskRequests.entrySet()) {
-            Object task = entry.getKey();
-            CookieContainerRequest request = entry.getValue();
-            if (request.getPriority().equals(lowestPriNewContainer.getPriority())) {
-              LOG.info("Resending request for task again: " + task);
-              deallocateTask(task, true);
-              allocateTask(task, request.getCapability(), 
-                  (request.getNodes() == null ? null : 
-                    request.getNodes().toArray(new String[request.getNodes().size()])), 
-                    (request.getRacks() == null ? null : 
-                      request.getRacks().toArray(new String[request.getRacks().size()])), 
-                    request.getPriority(), 
-                    request.getCookie().getContainerSignature(),
-                    request.getCookie().getAppCookie());
-              break;
-            }
-          }
-          
-          return;
-        }
-        
-        // this assert will be a no-op in production but can help identify 
-        // invalid assumptions during testing
-        assert delayedContainerManager.delayedContainers.isEmpty();
-        
-        // there are no reused or new containers to release
-        // try to preempt running containers
-        Map.Entry<Object, Container> preemptedEntry = null;
-        for(Map.Entry<Object, Container> entry : taskAllocations.entrySet()) {
-          HeldContainer heldContainer = heldContainers.get(entry.getValue().getId());
-          CookieContainerRequest lastTaskInfo = heldContainer.getLastTaskInfo();
-          Priority taskPriority = lastTaskInfo.getPriority();
-          Object signature = lastTaskInfo.getCookie().getContainerSignature();
-          if(!isHigherPriority(highestPriRequest.getPriority(), taskPriority)) {
-            // higher or same priority
-            continue;
-          }
-          if (containerSignatureMatcher.isExactMatch(
-              highestPriRequest.getCookie().getContainerSignature(),
-              signature)) {
-            // exact match with different priorities
-            continue;
-          }
-          if(preemptedEntry == null ||
-             !isHigherPriority(taskPriority, 
-                 preemptedEntry.getValue().getPriority())) {
-            // keep the lower priority or the one added later
-            preemptedEntry = entry;
-          }
-        }
-        if(preemptedEntry != null) {
-          // found something to preempt
-          LOG.info("Preempting task: " + preemptedEntry.getKey() +
-              " to free resource for request: " + highestPriRequest +
-              " . Current free resources: " + freeResources);
-          preemptedContainer = preemptedEntry.getValue().getId();
-          // app client will be notified when after container is killed
-          // and we get its completed container status
-        }
-      }
-    }
-    
-    // upcall outside locks
-    if (preemptedContainer != null) {
-      appClientDelegate.preemptContainer(preemptedContainer);
-    }
-  }
-
-  private boolean fitsIn(Resource toFit, Resource resource) {
-    // YARN-893 prevents using correct library code
-    //return Resources.fitsIn(toFit, resource);
-    return resource.getMemory() >= toFit.getMemory();
-  }
-
-  private CookieContainerRequest getMatchingRequestWithPriority(
-      Container container,
-      String location) {
-    Priority priority = container.getPriority();
-    Resource capability = container.getResource();
-    List<? extends Collection<CookieContainerRequest>> requestsList =
-        amRmClient.getMatchingRequests(priority, location, capability);
-
-    if (!requestsList.isEmpty()) {
-      // pick first one
-      for (Collection<CookieContainerRequest> requests : requestsList) {
-        for (CookieContainerRequest cookieContainerRequest : requests) {
-          if (canAssignTaskToContainer(cookieContainerRequest, container)) {
-            return cookieContainerRequest;
-          }
-        }
-      }
-    }
-
-    return null;
-  }
-
-  private CookieContainerRequest getMatchingRequestWithoutPriority(
-      Container container,
-      String location,
-      boolean considerContainerAffinity) {
-    Resource capability = container.getResource();
-    List<? extends Collection<CookieContainerRequest>> pRequestsList =
-      amRmClient.getMatchingRequestsForTopPriority(location, capability);
-    if (considerContainerAffinity && 
-        !priorityHasAffinity.contains(amRmClient.getTopPriority())) {
-      considerContainerAffinity = false;
-    }
-    if (pRequestsList == null || pRequestsList.isEmpty()) {
-      return null;
-    }
-    CookieContainerRequest firstMatch = null;
-    for (Collection<CookieContainerRequest> requests : pRequestsList) {
-      for (CookieContainerRequest cookieContainerRequest : requests) {
-        if (firstMatch == null || // we dont have a match. So look for one 
-            // we have a match but are looking for a better container level match.
-            // skip the expensive canAssignTaskToContainer() if the request is 
-            // not affinitized to the container
-            container.getId().equals(cookieContainerRequest.getAffinitizedContainer())
-            ) {
-          if (canAssignTaskToContainer(cookieContainerRequest, container)) {
-            // request matched to container
-            if (!considerContainerAffinity) {
-              return cookieContainerRequest;
-            }
-            ContainerId affCId = cookieContainerRequest.getAffinitizedContainer();
-            boolean canMatchTaskWithAffinity = true;
-            if (affCId == null || 
-                !heldContainers.containsKey(affCId) ||
-                inUseContainers.contains(affCId)) {
-              // affinity not specified
-              // affinitized container is no longer held
-              // affinitized container is in use
-              canMatchTaskWithAffinity = false;
-            }
-            if (canMatchTaskWithAffinity) {
-              if (container.getId().equals(
-                  cookieContainerRequest.getAffinitizedContainer())) {
-                // container level match
-                if (LOG.isDebugEnabled()) {
-                  LOG.debug("Matching with affinity for request: "
-                      + cookieContainerRequest + " container: " + affCId);
-                }
-                return cookieContainerRequest;
-              }
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("Skipping request for container " + container.getId()
-                    + " due to affinity. Request: " + cookieContainerRequest
-                    + " affContainer: " + affCId);
-              }
-            } else {
-              firstMatch = cookieContainerRequest;
-            }
-          }
-        }
-      }
-    }
-    
-    return firstMatch;
-  }
-
-  private boolean canAssignTaskToContainer(
-      CookieContainerRequest cookieContainerRequest, Container container) {
-    HeldContainer heldContainer = heldContainers.get(container.getId());
-    if (heldContainer == null || heldContainer.isNew()) { // New container.
-      return true;
-    } else {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Trying to match task to a held container, "
-            + " containerId=" + heldContainer.container.getId());
-      }
-      if (containerSignatureMatcher.isSuperSet(heldContainer
-          .getFirstContainerSignature(), cookieContainerRequest.getCookie()
-          .getContainerSignature())) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Matched delayed container to task"
-            + " containerId=" + heldContainer.container.getId());
-        }
-        return true;
-      }
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Failed to match delayed container to task"
-        + " containerId=" + heldContainer.container.getId());
-    }
-    return false;
-  }
-
-  private Object getTask(CookieContainerRequest request) {
-    return request.getCookie().getTask();
-  }
-
-  private void releaseContainer(ContainerId containerId) {
-    Object assignedTask = containerAssignments.remove(containerId);
-    if (assignedTask != null) {
-      // A task was assigned to this container at some point. Inform the app.
-      appClientDelegate.containerBeingReleased(containerId);
-    }
-    HeldContainer delayedContainer = heldContainers.remove(containerId);
-    if (delayedContainer != null) {
-      Resources.subtractFrom(allocatedResources,
-          delayedContainer.getContainer().getResource());
-    }
-    if (delayedContainer != null || !shouldReuseContainers) {
-      amRmClient.releaseAssignedContainer(containerId);
-    }
-    if (assignedTask != null) {
-      // A task was assigned at some point. Add to release list since we are
-      // releasing the container.
-      releasedContainers.put(containerId, assignedTask);
-    }
-  }
-
-  private void assignContainer(Object task,
-      Container container,
-      CookieContainerRequest assigned) {
-    CookieContainerRequest request = removeTaskRequest(task);
-    assert request != null;
-    //assert assigned.equals(request);
-
-    Container result = taskAllocations.put(task, container);
-    assert result == null;
-    inUseContainers.add(container.getId());
-    containerAssignments.put(container.getId(), task);
-    HeldContainer heldContainer = heldContainers.get(container.getId()); 
-    if (!shouldReuseContainers && heldContainer == null) {
-      heldContainers.put(container.getId(), new HeldContainer(container,
-        -1, -1, assigned));
-      Resources.addTo(allocatedResources, container.getResource());
-    } else {
-      if (heldContainer.isNew()) {
-        // check for existence before adding since the first container potentially
-        // has the broadest signature as subsequent uses dont expand any dimension.
-        // This will need to be enhanced to track other signatures too when we
-        // think about preferring within vertex matching etc.
-        heldContainers.put(container.getId(),
-            new HeldContainer(container, heldContainer.getNextScheduleTime(),
-                heldContainer.getContainerExpiryTime(), assigned));
-      }
-      heldContainer.setLastTaskInfo(assigned);
-    }
-  }
-  
-  private void pushNewContainerToDelayed(List<Container> containers){
-    long expireTime = -1;
-    if (sessionDelay > 0) {
-      long currentTime = System.currentTimeMillis();
-      expireTime = currentTime + sessionDelay;
-    }
-
-    synchronized (delayedContainerManager) {
-      for (Container container : containers) {
-        if (heldContainers.put(container.getId(), new HeldContainer(container,
-            -1, expireTime, null)) != null) {
-          throw new TezUncheckedException("New container " + container.getId()
-              + " is already held.");
-        }
-        long nextScheduleTime = delayedContainerManager.maxScheduleTimeSeen;
-        if (delayedContainerManager.maxScheduleTimeSeen == -1) {
-          nextScheduleTime = System.currentTimeMillis();
-        }
-        Resources.addTo(allocatedResources, container.getResource());
-        delayedContainerManager.addDelayedContainer(container,
-          nextScheduleTime + 1);
-      }
-    }
-    delayedContainerManager.triggerScheduling(false);      
-  }
-
-  private CookieContainerRequest removeTaskRequest(Object task) {
-    CookieContainerRequest request = taskRequests.remove(task);
-    if(request != null) {
-      // remove all references of the request from AMRMClient
-      amRmClient.removeContainerRequest(request);
-    }
-    return request;
-  }
-
-  private void addTaskRequest(Object task,
-                                CookieContainerRequest request) {
-    CookieContainerRequest oldRequest = taskRequests.put(task, request);
-    if (oldRequest != null) {
-      // remove all references of the request from AMRMClient
-      amRmClient.removeContainerRequest(oldRequest);
-    }
-    amRmClient.addContainerRequest(request);
-  }
-
-  private Container doBookKeepingForTaskDeallocate(Object task) {
-    Container container = taskAllocations.remove(task);
-    if (container == null) {
-      return null;
-    }
-    inUseContainers.remove(container.getId());
-    return container;
-  }
-
-  private Object unAssignContainer(ContainerId containerId,
-                                    boolean releaseIfFound) {
-    // Not removing. containerAssignments tracks the last task run on a
-    // container.
-    Object task = containerAssignments.get(containerId);
-    if(task == null) {
-      return null;
-    }
-    Container container = taskAllocations.remove(task);
-    assert container != null;
-    inUseContainers.remove(containerId);
-    if(releaseIfFound) {
-      releaseContainer(containerId);
-    }
-    return task;
-  }
-
-  private boolean isHigherPriority(Priority lhs, Priority rhs) {
-    return lhs.getPriority() < rhs.getPriority();
-  }
-
-  private synchronized void assignNewContainersWithLocation(
-      Iterable<Container> containers,
-      ContainerAssigner assigner,
-      Map<CookieContainerRequest, Container> assignedContainers) {
-
-    Iterator<Container> containerIterator = containers.iterator();
-    while (containerIterator.hasNext()) {
-      Container container = containerIterator.next();
-      CookieContainerRequest assigned =
-        assigner.assignNewContainer(container);
-      if (assigned != null) {
-        assignedContainers.put(assigned, container);
-        containerIterator.remove();
-      }
-    }
-  }
-
-  private synchronized void assignReUsedContainersWithLocation(
-      Iterable<Container> containers,
-      ContainerAssigner assigner,
-      Map<CookieContainerRequest, Container> assignedContainers,
-      boolean honorLocality) {
-
-    Iterator<Container> containerIterator = containers.iterator();
-    while (containerIterator.hasNext()) {
-      Container container = containerIterator.next();
-      if (assignReUsedContainerWithLocation(container, assigner,
-          assignedContainers, honorLocality)) {
-        containerIterator.remove();
-      }
-    }
-  }
-
-  private synchronized boolean assignReUsedContainerWithLocation(
-    Container container,
-    ContainerAssigner assigner,
-    Map<CookieContainerRequest, Container> assignedContainers,
-    boolean honorLocality) {
-
-    Priority containerPriority = container.getPriority();
-    Priority topPendingTaskPriority = amRmClient.getTopPriority();
-    if (topPendingTaskPriority == null) {
-      // nothing left to assign
-      return false;
-    }
-    
-    if (topPendingTaskPriority.compareTo(containerPriority) > 0) {
-      // if the next task to assign is higher priority than the container then 
-      // dont assign this container to that task.
-      // if task and container are equal priority - then its first use or reuse
-      // within the same priority - safe to use
-      // if task is lower priority than container then its we use a container that
-      // is no longer needed by higher priority tasks All those higher pri tasks 
-      // have been assigned resources - safe to use (first use or reuse)
-      // if task is higher priority than container then we may end up using a 
-      // container that was assigned by the RM for a lower priority pending task 
-      // that will be assigned after this higher priority task is assigned. If we
-      // use that task's container now then we may not be able to match this 
-      // container to that task later on. However the RM has already assigned us 
-      // all containers and is not going to give us new containers. We will get 
-      // stuck for resources.
-      return false;
-    }
-    
-    CookieContainerRequest assigned =
-      assigner.assignReUsedContainer(container, honorLocality);
-    if (assigned != null) {
-      assignedContainers.put(assigned, container);
-      return true;
-    }
-    return false;
-  }
-
-  private void releaseUnassignedContainers(Iterable<Container> containers) {
-    for (Container container : containers) {
-      LOG.info("Releasing unused container: "
-          + container.getId());
-      releaseContainer(container.getId());
-    }
-  }
-
-  private void informAppAboutAssignment(CookieContainerRequest assigned,
-      Container container) {
-    appClientDelegate.taskAllocated(getTask(assigned),
-        assigned.getCookie().getAppCookie(), container);
-  }
-
-  private void informAppAboutAssignments(
-      Map<CookieContainerRequest, Container> assignedContainers) {
-    if (assignedContainers == null || assignedContainers.isEmpty()) {
-      return;
-    }
-    for (Entry<CookieContainerRequest, Container> entry : assignedContainers
-        .entrySet()) {
-      Container container = entry.getValue();
-      // check for blacklisted nodes. There may be race conditions between
-      // setting blacklist and receiving allocations
-      if (blacklistedNodes.contains(container.getNodeId())) {
-        CookieContainerRequest request = entry.getKey();
-        Object task = getTask(request);
-        LOG.info("Container: " + container.getId() + 
-            " allocated on blacklisted node: " + container.getNodeId() + 
-            " for task: " + task);
-        Object deAllocTask = deallocateContainer(container.getId());
-        assert deAllocTask.equals(task);
-        // its ok to submit the same request again because the RM will not give us
-        // the bad/unhealthy nodes again. The nodes may become healthy/unblacklisted
-        // and so its better to give the RM the full information.
-        allocateTask(task, request.getCapability(), 
-            (request.getNodes() == null ? null : 
-            request.getNodes().toArray(new String[request.getNodes().size()])), 
-            (request.getRacks() == null ? null : 
-              request.getRacks().toArray(new String[request.getRacks().size()])), 
-            request.getPriority(), 
-            request.getCookie().getContainerSignature(), 
-            request.getCookie().getAppCookie());
-      } else {
-        informAppAboutAssignment(entry.getKey(), container);
-      }
-    }
-  }
-
-  private abstract class ContainerAssigner {
-
-    protected final String locality;
-
-    protected ContainerAssigner(String locality) {
-      this.locality = locality;
-    }
-
-    public abstract CookieContainerRequest assignNewContainer(
-        Container container);
-
-    public abstract CookieContainerRequest assignReUsedContainer(
-      Container container, boolean honorLocality);
-
-    public void doBookKeepingForAssignedContainer(
-        CookieContainerRequest assigned, Container container,
-        String matchedLocation, boolean honorLocalityFlags) {
-      if (assigned == null) {
-        return;
-      }
-      Object task = getTask(assigned);
-      assert task != null;
-
-      LOG.info("Assigning container to task"
-        + ", container=" + container
-        + ", task=" + task
-        + ", containerHost=" + container.getNodeId().getHost()
-        + ", localityMatchType=" + locality
-        + ", matchedLocation=" + matchedLocation
-        + ", honorLocalityFlags=" + honorLocalityFlags
-        + ", reusedContainer="
-        + containerAssignments.containsKey(container.getId())
-        + ", delayedContainers=" + delayedContainerManager.delayedContainers.size()
-        + ", containerResourceMemory=" + container.getResource().getMemory()
-        + ", containerResourceVCores="
-        + container.getResource().getVirtualCores());
-
-      assignContainer(task, container, assigned);
-    }
-  }
-  
-  private class NodeLocalContainerAssigner extends ContainerAssigner {
-
-    NodeLocalContainerAssigner() {
-      super("NodeLocal");
-    }
-
-    @Override
-    public CookieContainerRequest assignNewContainer(Container container) {
-      String location = container.getNodeId().getHost();
-      CookieContainerRequest assigned = getMatchingRequestWithPriority(
-          container, location);
-      doBookKeepingForAssignedContainer(assigned, container, location, false);
-      return assigned;
-    }
-
-    @Override
-    public CookieContainerRequest assignReUsedContainer(Container container,
-        boolean honorLocality) {
-      String location = container.getNodeId().getHost();
-      CookieContainerRequest assigned = getMatchingRequestWithoutPriority(
-        container, location, true);
-      doBookKeepingForAssignedContainer(assigned, container, location, true);
-      return assigned;
-
-    }
-  }
-
-  private class RackLocalContainerAssigner extends ContainerAssigner {
-
-    RackLocalContainerAssigner() {
-      super("RackLocal");
-    }
-
-    @Override
-    public CookieContainerRequest assignNewContainer(Container container) {
-      String location = RackResolver.resolve(container.getNodeId().getHost())
-          .getNetworkLocation();
-      CookieContainerRequest assigned = getMatchingRequestWithPriority(container,
-          location);
-      doBookKeepingForAssignedContainer(assigned, container, location, false);
-      return assigned;
-    }
-
-    @Override
-    public CookieContainerRequest assignReUsedContainer(
-      Container container, boolean honorLocality) {
-      // TEZ-586 this is not match an actual rackLocal request unless honorLocality
-      // is false. This method is useless if honorLocality=true
-      if (!honorLocality) {
-        String location = RackResolver.resolve(container.getNodeId().getHost())
-          .getNetworkLocation();
-        CookieContainerRequest assigned = getMatchingRequestWithoutPriority(
-            container, location, false);
-        doBookKeepingForAssignedContainer(assigned, container, location,
-            honorLocality);
-        return assigned;
-      }
-      return null;
-    }
-  }
-
-  private class NonLocalContainerAssigner extends ContainerAssigner {
-
-    NonLocalContainerAssigner() {
-      super("NonLocal");
-    }
-
-    @Override
-    public CookieContainerRequest assignNewContainer(Container container) {
-      String location = ResourceRequest.ANY;
-      CookieContainerRequest assigned = getMatchingRequestWithPriority(container,
-          location);
-      doBookKeepingForAssignedContainer(assigned, container, location, false);
-      return assigned;
-    }
-
-    @Override
-    public CookieContainerRequest assignReUsedContainer(Container container,
-        boolean honorLocality) {
-      if (!honorLocality) {
-        String location = ResourceRequest.ANY;
-        CookieContainerRequest assigned = getMatchingRequestWithoutPriority(
-          container, location, false);
-        doBookKeepingForAssignedContainer(assigned, container, location,
-            honorLocality);
-        return assigned;
-      }
-      return null;
-    }
-
-  }
-  
-  
-  @VisibleForTesting
-  class DelayedContainerManager extends Thread {
-
-    class HeldContainerTimerComparator implements Comparator<HeldContainer> {
-
-      @Override
-      public int compare(HeldContainer c1,
-          HeldContainer c2) {
-        return (int) (c1.getNextScheduleTime() - c2.getNextScheduleTime());
-      }
-    }
-
-    PriorityBlockingQueue<HeldContainer> delayedContainers =
-      new PriorityBlockingQueue<HeldContainer>(20,
-        new HeldContainerTimerComparator());
-
-    private volatile boolean tryAssigningAll = false;
-    private volatile boolean running = true;
-    private long maxScheduleTimeSeen = -1;
-    
-    // used for testing only
-    @VisibleForTesting
-    volatile AtomicBoolean drainedDelayedContainersForTest = null;
-
-    DelayedContainerManager() {
-      super.setName("DelayedContainerManager");
-    }
-    
-    @Override
-    public void run() {
-      while(running) {
-        // Try assigning all containers if there's a request to do so.
-        if (tryAssigningAll) {
-          doAssignAll();
-          tryAssigningAll = false;
-        }
-
-        // Try allocating containers which have timed out.
-        // Required since these containers may get assigned without
-        // locality at this point.
-        if (delayedContainers.peek() == null) {
-          try {
-            // test only signaling to make TestTaskScheduler work
-            if (drainedDelayedContainersForTest != null) {
-              drainedDelayedContainersForTest.set(true);
-              synchronized (drainedDelayedContainersForTest) {
-                drainedDelayedContainersForTest.notifyAll();
-              }
-            }
-            synchronized(this) {
-              this.wait();
-            }
-            // Re-loop to see if tryAssignAll is set.
-            continue;
-          } catch (InterruptedException e) {
-            LOG.info("AllocatedContainerManager Thread interrupted");
-          }
-        } else {
-          // test only sleep to prevent tight loop cycling that makes tests stall
-          if (drainedDelayedContainersForTest != null) {
-            try {
-              Thread.sleep(100);
-            } catch (InterruptedException e) {
-              e.printStackTrace();
-            }
-          }
-          HeldContainer delayedContainer = delayedContainers.peek();
-          if (delayedContainer == null) {
-            continue;
-          }
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Considering HeldContainer: "
-              + delayedContainer + " for assignment");
-          }
-          long currentTs = System.currentTimeMillis();
-          long nextScheduleTs = delayedContainer.getNextScheduleTime();
-          if (currentTs >= nextScheduleTs) {
-            // Remove the container and try scheduling it.
-            // TEZ-587 what if container is released by RM after this
-            // in onContainerCompleted()
-            delayedContainer = delayedContainers.poll();
-            if (delayedContainer == null) {
-              continue;
-            }
-            Map<CookieContainerRequest, Container> assignedContainers = null;
-            synchronized(TaskScheduler.this) {
-              if (null !=
-                  heldContainers.get(delayedContainer.getContainer().getId())) {
-                assignedContainers = assignDelayedContainer(delayedContainer);
-              } else {
-                LOG.info("Skipping delayed container as container is no longer"
-                  + " running, containerId="
-                  + delayedContainer.getContainer().getId());
-              }
-            }
-            // Inform App should be done outside of the lock
-            informAppAboutAssignments(assignedContainers);
-          } else {
-            synchronized(this) {
-              try {
-                // Wait for the next container to be assignable
-                delayedContainer = delayedContainers.peek();
-                long diff = localitySchedulingDelay;
-                if (delayedContainer != null) {
-                  diff = delayedContainer.getNextScheduleTime() - currentTs;
-                }
-                if (diff > 0) {
-                  this.wait(diff);
-                }
-              } catch (InterruptedException e) {
-                LOG.info("AllocatedContainerManager Thread interrupted");
-              }
-            }
-          }
-        }
-      }
-      releasePendingContainers();
-    }
-    
-    private void doAssignAll() {
-      // The allocatedContainers queue should not be modified in the middle of an iteration over it.
-      // Synchronizing here on TaskScheduler.this to prevent this from happening.
-      // The call to assignAll from within this method should NOT add any
-      // elements back to the allocatedContainers list. Since they're all
-      // delayed elements, de-allocation should not happen either - leaving the
-      // list of delayed containers intact, except for the contaienrs which end
-      // up getting assigned.
-      if (delayedContainers.isEmpty()) {
-        return;
-      }
-
-      Map<CookieContainerRequest, Container> assignedContainers;
-      synchronized(TaskScheduler.this) {
-        // honor reuse-locality flags (container not timed out yet), Don't queue
-        // (already in queue), don't release (release happens when containers
-        // time-out)
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Trying to assign all delayed containers to newly received"
-            + " tasks");
-        }
-        Iterator<HeldContainer> iter = delayedContainers.iterator();
-        while(iter.hasNext()) {
-          HeldContainer delayedContainer = iter.next();
-          if (!heldContainers.containsKey(delayedContainer.getContainer().getId())) {
-            // this container is no longer held by us
-            LOG.info("AssignAll - Skipping delayed container as container is no longer"
-                + " running, containerId="
-                + delayedContainer.getContainer().getId());
-            iter.remove();
-          }
-        }
-        assignedContainers = tryAssignReUsedContainers(
-          new ContainerIterable(delayedContainers));
-      }
-      // Inform app
-      informAppAboutAssignments(assignedContainers);
-    }
-    
-    /**
-     * Indicate that an attempt should be made to allocate all available containers.
-     * Intended to be used in cases where new Container requests come in 
-     */
-    public void triggerScheduling(boolean scheduleAll) {
-      this.tryAssigningAll = scheduleAll;
-      synchronized(this) {
-        this.notify();
-      }
-    }
-
-    public void shutdown() {
-      this.running = false;
-      this.interrupt();
-    }
-    
-    private void releasePendingContainers() {
-      List<HeldContainer> pendingContainers = Lists.newArrayListWithCapacity(
-        delayedContainers.size());
-      delayedContainers.drainTo(pendingContainers);
-      releaseUnassignedContainers(new ContainerIterable(pendingContainers));
-    }
-
-    private void addDelayedContainer(Container container,
-        long nextScheduleTime) {
-      HeldContainer delayedContainer = heldContainers.get(container.getId());
-      if (delayedContainer == null) {
-        LOG.warn("Attempting to add a non-running container to the"
-            + " delayed container list, containerId=" + container.getId());
-        return;
-      } else {
-        delayedContainer.setNextScheduleTime(nextScheduleTime);
-      }
-      if (maxScheduleTimeSeen < nextScheduleTime) {
-        maxScheduleTimeSeen = nextScheduleTime;
-      }
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Adding container to delayed queue"
-          + ", containerId=" + delayedContainer.getContainer().getId()
-          + ", nextScheduleTime=" + delayedContainer.getNextScheduleTime()
-          + ", containerExpiry=" + delayedContainer.getContainerExpiryTime());
-      }
-      boolean added = delayedContainers.offer(delayedContainer);
-      synchronized(this) {
-        this.notify();
-      }
-      if (!added) {
-        releaseUnassignedContainers(Lists.newArrayList(container));
-      }
-    }
-
-  }
-
-  private class ContainerIterable implements Iterable<Container> {
-
-    private final Iterable<HeldContainer> delayedContainers;
-
-    ContainerIterable(Iterable<HeldContainer> delayedContainers) {
-      this.delayedContainers = delayedContainers;
-    }
-
-    @Override
-    public Iterator<Container> iterator() {
-
-      final Iterator<HeldContainer> delayedContainerIterator = delayedContainers
-          .iterator();
-
-      return new Iterator<Container>() {
-
-        @Override
-        public boolean hasNext() {
-          return delayedContainerIterator.hasNext();
-        }
-
-        @Override
-        public Container next() {
-          return delayedContainerIterator.next().getContainer();
-        }
-
-        @Override
-        public void remove() {
-          delayedContainerIterator.remove();
-        }
-      };
-    }
-  }
-
-  static class HeldContainer {
-
-    enum LocalityMatchLevel {
-      NEW,
-      NODE,
-      RACK,
-      NON_LOCAL
-    }
-
-    Container container;
-    private long nextScheduleTime;
-    private Object firstContainerSignature;
-    private LocalityMatchLevel localityMatchLevel;
-    private long containerExpiryTime;
-    private CookieContainerRequest lastTaskInfo;
-    private int numAssignmentAttempts = 0;
-    
-    HeldContainer(Container container,
-        long nextScheduleTime,
-        long containerExpiryTime,
-        CookieContainerRequest firstTaskInfo) {
-      this.container = container;
-      this.nextScheduleTime = nextScheduleTime;
-      if (firstTaskInfo != null) {
-        this.lastTaskInfo = firstTaskInfo;
-        this.firstContainerSignature = firstTaskInfo.getCookie().getContainerSignature();
-      }
-      this.localityMatchLevel = LocalityMatchLevel.NODE;
-      this.containerExpiryTime = containerExpiryTime;
-    }
-    
-    boolean isNew() {
-      return firstContainerSignature == null;
-    }
-    
-    int geNumAssignmentAttempts() {
-      return numAssignmentAttempts;
-    }
-    
-    void incrementAssignmentAttempts() {
-      numAssignmentAttempts++;
-    }
-    
-    public Container getContainer() {
-      return this.container;
-    }
-    
-    public long getNextScheduleTime() {
-      return this.nextScheduleTime;
-    }
-    
-    public void setNextScheduleTime(long nextScheduleTime) {
-      this.nextScheduleTime = nextScheduleTime;
-    }
-
-    public long getContainerExpiryTime() {
-      return this.containerExpiryTime;
-    }
-
-    public void setContainerExpiryTime(long containerExpiryTime) {
-      this.containerExpiryTime = containerExpiryTime;
-    }
-
-    public Object getFirstContainerSignature() {
-      return this.firstContainerSignature;
-    }
-    
-    public CookieContainerRequest getLastTaskInfo() {
-      return this.lastTaskInfo;
-    }
-    
-    public void setLastTaskInfo(CookieContainerRequest taskInfo) {
-      lastTaskInfo = taskInfo;
-    }
-
-    public synchronized void resetLocalityMatchLevel() {
-      localityMatchLevel = LocalityMatchLevel.NEW;
-    }
-
-    public synchronized void incrementLocalityMatchLevel() {
-      if (localityMatchLevel.equals(LocalityMatchLevel.NEW)) {
-        localityMatchLevel = LocalityMatchLevel.NODE;
-      } else if (localityMatchLevel.equals(LocalityMatchLevel.NODE)) {
-        localityMatchLevel = LocalityMatchLevel.RACK;
-      } else if (localityMatchLevel.equals(LocalityMatchLevel.RACK)) {
-        localityMatchLevel = LocalityMatchLevel.NON_LOCAL;
-      } else if (localityMatchLevel.equals(LocalityMatchLevel.NON_LOCAL)) {
-        throw new TezUncheckedException("Cannot increment locality level "
-          + " from current NON_LOCAL for container: " + container.getId());
-      }
-    }
-
-    public LocalityMatchLevel getLocalityMatchLevel() {
-      return this.localityMatchLevel;
-    }
-
-    @Override
-    public String toString() {
-      return "HeldContainer: id: " + container.getId()
-          + ", nextScheduleTime: " + nextScheduleTime
-          + ", localityMatchLevel=" + localityMatchLevel
-          + ", signature: "
-          + (firstContainerSignature != null? firstContainerSignature.toString():"null");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/4dfd8341/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java
index 53c3e95..5de8032 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java
@@ -32,7 +32,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.dag.app.rm.TaskScheduler.TaskSchedulerAppCallback;
+import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback;
 
 /**
  * Makes use of an ExecutionService to invoke application callbacks. Methods

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/4dfd8341/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index 865b192..ec35f28 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -51,7 +51,7 @@ import org.apache.tez.dag.app.dag.TaskAttempt;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEvent;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
 import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdateTAAssigned;
-import org.apache.tez.dag.app.rm.TaskScheduler.TaskSchedulerAppCallback;
+import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback;
 import org.apache.tez.dag.app.rm.container.AMContainer;
 import org.apache.tez.dag.app.rm.container.AMContainerEventAssignTA;
 import org.apache.tez.dag.app.rm.container.AMContainerEventCompleted;
@@ -75,7 +75,7 @@ public class TaskSchedulerEventHandler extends AbstractService
   protected final AppContext appContext;
   @SuppressWarnings("rawtypes")
   private final EventHandler eventHandler;
-  protected TaskSchedulerInterface taskScheduler;
+  protected TaskSchedulerService taskScheduler;
   private DAGAppMaster dagAppMaster;
   private Map<ApplicationAccessType, String> appAcls = null;
   private Thread eventHandlingThread;
@@ -288,16 +288,16 @@ public class TaskSchedulerEventHandler extends AbstractService
   }
 
 
-  protected TaskSchedulerInterface createTaskScheduler(String host, int port,
+  protected TaskSchedulerService createTaskScheduler(String host, int port,
       String trackingUrl, AppContext appContext) {
     boolean isLocal = getConfig().getBoolean(TezConfiguration.TEZ_LOCAL_MODE,
         TezConfiguration.TEZ_LOCAL_MODE_DEFAULT);
     if (isLocal) {
-      return new LocalTaskScheduler(this, this.containerSignatureMatcher,
+      return new LocalTaskSchedulerService(this, this.containerSignatureMatcher,
           host, port, trackingUrl, appContext);
     }
     else {
-      return new TaskScheduler(this, this.containerSignatureMatcher,
+      return new YarnTaskSchedulerService(this, this.containerSignatureMatcher,
           host, port, trackingUrl, appContext);
     }
   }
@@ -308,8 +308,8 @@ public class TaskSchedulerEventHandler extends AbstractService
     dagAppMaster = appContext.getAppMaster();
     taskScheduler = createTaskScheduler(serviceAddr.getHostName(),
         serviceAddr.getPort(), "", appContext);
-    ((AbstractService)taskScheduler).init(getConfig());
-    ((AbstractService)taskScheduler).start();
+    taskScheduler.init(getConfig());
+    taskScheduler.start();
     if (shouldUnregisterFlag.get()) {
       // Flag may have been set earlier when task scheduler was not initialized
       taskScheduler.setShouldUnregister();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/4dfd8341/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerInterface.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerInterface.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerInterface.java
deleted file mode 100644
index 1c35492..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerInterface.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app.rm;
-
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-
-public interface TaskSchedulerInterface {
-
-  public abstract Resource getAvailableResources();
-
-  public abstract int getClusterNodeCount();
-
-  public abstract void resetMatchLocalityForAllHeldContainers();
-
-  public abstract Resource getTotalResources();
-
-  public abstract void blacklistNode(NodeId nodeId);
-
-  public abstract void unblacklistNode(NodeId nodeId);
-
-  public abstract void allocateTask(Object task, Resource capability,
-      String[] hosts, String[] racks, Priority priority,
-      Object containerSignature, Object clientCookie);
-  
-  /**
-   * Allocate affinitized to a specific container
-   */
-  public abstract void allocateTask(Object task, Resource capability,
-      ContainerId containerId, Priority priority, Object containerSignature,
-      Object clientCookie);
-  
-  public abstract boolean deallocateTask(Object task, boolean taskSucceeded);
-
-  public abstract Object deallocateContainer(ContainerId containerId);
-
-  public abstract void setShouldUnregister();
-}