You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2016/05/01 03:30:58 UTC

[04/11] hive git commit: HIVE-13469. LLAP: Support delayed scheduling for locality. (Siddharth Seth, reviewed by Prasanth Jayachandran, Sergey Shelukhin)

HIVE-13469. LLAP: Support delayed scheduling for locality. (Siddharth Seth, reviewed by Prasanth Jayachandran, Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/347a5a55
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/347a5a55
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/347a5a55

Branch: refs/heads/llap
Commit: 347a5a5580742a36a875bd6a5f2ac8acd74d3cbf
Parents: 076f365
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri Apr 29 15:14:27 2016 +0530
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Apr 29 15:14:27 2016 +0530

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   2 +-
 .../hive/llap/tezplugins/ContainerFactory.java  |   3 +-
 .../tezplugins/LlapTaskSchedulerService.java    | 377 ++++++++--
 .../llap/tezplugins/helpers/MonotonicClock.java |  24 +
 .../scheduler/LoggingFutureCallback.java        |  44 ++
 .../TestLlapTaskSchedulerService.java           | 734 ++++++++++++++++++-
 6 files changed, 1068 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/347a5a55/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 566e9b6..fd725cb 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2762,7 +2762,7 @@ public class HiveConf extends Configuration {
         new TimeValidator(TimeUnit.MILLISECONDS, -1l, true, Long.MAX_VALUE, true),
         "Amount of time to wait before allocating a request which contains location information," +
             " to a location other than the ones requested. Set to -1 for an infinite delay, 0" +
-            "for a no delay. Currently these are the only two supported values"
+            "for no delay."
     ),
     LLAP_DAEMON_TASK_PREEMPTION_METRICS_INTERVALS(
         "hive.llap.daemon.task.preemption.metrics.intervals", "30,60,300",

http://git-wip-us.apache.org/repos/asf/hive/blob/347a5a55/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/ContainerFactory.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/ContainerFactory.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/ContainerFactory.java
index a314391..f1feec7 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/ContainerFactory.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/ContainerFactory.java
@@ -37,11 +37,10 @@ class ContainerFactory {
   }
 
   public Container createContainer(Resource capability, Priority priority, String hostname,
-      int port) {
+      int port, String nodeHttpAddress) {
     ContainerId containerId =
         ContainerId.newContainerId(customAppAttemptId, nextId.getAndIncrement());
     NodeId nodeId = NodeId.newInstance(hostname, port);
-    String nodeHttpAddress = "hostname:0"; // TODO: include UI ports
 
     Container container =
         Container.newInstance(containerId, nodeId, nodeHttpAddress, capability, priority, null);

http://git-wip-us.apache.org/repos/asf/hive/blob/347a5a55/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
index c3d3a1d..da1e17f 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
@@ -50,7 +50,6 @@ import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import org.apache.commons.lang.mutable.MutableInt;
 import org.apache.hadoop.conf.Configuration;
@@ -62,6 +61,8 @@ import org.apache.hadoop.hive.llap.registry.ServiceInstance;
 import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
 import org.apache.hadoop.hive.llap.registry.ServiceInstanceStateChangeListener;
 import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
+import org.apache.hadoop.hive.llap.tezplugins.helpers.MonotonicClock;
+import org.apache.hadoop.hive.llap.tezplugins.scheduler.LoggingFutureCallback;
 import org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerMetrics;
 import org.apache.hadoop.util.JvmPauseMonitor;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -77,7 +78,6 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.serviceplugins.api.ServicePluginErrorDefaults;
@@ -91,6 +91,8 @@ public class LlapTaskSchedulerService extends TaskScheduler {
 
   private static final Logger LOG = LoggerFactory.getLogger(LlapTaskSchedulerService.class);
 
+  private static final TaskStartComparator TASK_INFO_COMPARATOR = new TaskStartComparator();
+
   private final Configuration conf;
 
   // interface into the registry service
@@ -104,6 +106,9 @@ public class LlapTaskSchedulerService extends TaskScheduler {
 
   // Tracks tasks which could not be allocated immediately.
   @VisibleForTesting
+  // Tasks are tracked in the order requests come in, at different priority levels.
+  // TODO HIVE-13538 For tasks at the same priority level, it may be worth attempting to schedule tasks with
+  // locality information before those without locality information
   final TreeMap<Priority, List<TaskInfo>> pendingTasks = new TreeMap<>(new Comparator<Priority>() {
     @Override
     public int compare(Priority o1, Priority o2) {
@@ -113,23 +118,30 @@ public class LlapTaskSchedulerService extends TaskScheduler {
 
   // Tracks running and queued tasks. Cleared after a task completes.
   private final ConcurrentMap<Object, TaskInfo> knownTasks = new ConcurrentHashMap<>();
+  // Tracks tasks which are running. Useful for selecting a task to preempt based on when it started.
   private final TreeMap<Integer, TreeSet<TaskInfo>> runningTasks = new TreeMap<>();
-  private static final TaskStartComparator TASK_INFO_COMPARATOR = new TaskStartComparator();
+
 
   // Queue for disabled nodes. Nodes make it out of this queue when their expiration timeout is hit.
   @VisibleForTesting
   final DelayQueue<NodeInfo> disabledNodesQueue = new DelayQueue<>();
+  @VisibleForTesting
+  final DelayQueue<TaskInfo> delayedTaskQueue = new DelayQueue<>();
 
-  private final boolean forceLocation;
 
   private final ContainerFactory containerFactory;
   private final Random random = new Random();
-  private final Clock clock;
+  @VisibleForTesting
+  final Clock clock;
 
   private final ListeningExecutorService nodeEnabledExecutor;
   private final NodeEnablerCallable nodeEnablerCallable =
       new NodeEnablerCallable();
 
+  private final ListeningExecutorService delayedTaskSchedulerExecutor;
+  @VisibleForTesting
+  final DelayedTaskSchedulerCallable delayedTaskSchedulerCallable;
+
   private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
   private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
   private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
@@ -147,6 +159,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
   private final Map<String, MutableInt> pendingPreemptionsPerHost = new HashMap<>();
 
   private final NodeBlacklistConf nodeBlacklistConf;
+  private final LocalityDelayConf localityDelayConf;
 
   // Per daemon
   private final int memoryPerInstance;
@@ -168,6 +181,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
   private final LlapRegistryService registry = new LlapRegistryService(false);
 
   private volatile ListenableFuture<Void> nodeEnablerFuture;
+  private volatile ListenableFuture<Void> delayedTaskSchedulerFuture;
   private volatile ListenableFuture<Void> schedulerFuture;
 
   @VisibleForTesting
@@ -181,7 +195,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
   private final JvmPauseMonitor pauseMonitor;
 
   public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext) {
-    this(taskSchedulerContext, new SystemClock(), true);
+    this(taskSchedulerContext, new MonotonicClock(), true);
   }
 
   @VisibleForTesting
@@ -189,6 +203,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
       boolean initMetrics) {
     super(taskSchedulerContext);
     this.clock = clock;
+    this.delayedTaskSchedulerCallable = createDelayedTaskSchedulerCallable();
     try {
       this.conf = TezUtils.createConfFromUserPayload(taskSchedulerContext.getInitialUserPayload());
     } catch (IOException e) {
@@ -197,6 +212,9 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     }
     this.containerFactory = new ContainerFactory(taskSchedulerContext.getApplicationAttemptId(),
         taskSchedulerContext.getCustomClusterIdentifier());
+    // TODO HIVE-13483 Get all of these properties from the registry. This will need to take care of different instances
+    // publishing potentially different values when we support changing configurations dynamically.
+    // For now, this can simply be fetched from a single registry instance.
     this.memoryPerInstance = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB);
     this.coresPerInstance = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_VCPUS_PER_INSTANCE);
     this.executorsPerInstance = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_NUM_EXECUTORS);
@@ -212,11 +230,8 @@ public class LlapTaskSchedulerService extends TaskScheduler {
 
     long localityDelayMs = HiveConf
         .getTimeVar(conf, ConfVars.LLAP_TASK_SCHEDULER_LOCALITY_DELAY, TimeUnit.MILLISECONDS);
-    if (localityDelayMs == -1) {
-      this.forceLocation = true;
-    } else {
-      this.forceLocation = false;
-    }
+
+    this.localityDelayConf = new LocalityDelayConf(localityDelayMs);
 
     this.timeoutMonitor = new SchedulerTimeoutMonitor();
     this.timeout = HiveConf.getTimeVar(conf,
@@ -240,6 +255,12 @@ public class LlapTaskSchedulerService extends TaskScheduler {
             new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapSchedulerNodeEnabler").build());
     nodeEnabledExecutor = MoreExecutors.listeningDecorator(executorServiceRaw);
 
+    ExecutorService delayedTaskSchedulerExecutorRaw = Executors.newFixedThreadPool(1,
+        new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapSchedulerDelayedTaskHandler")
+            .build());
+    delayedTaskSchedulerExecutor =
+        MoreExecutors.listeningDecorator(delayedTaskSchedulerExecutorRaw);
+
     ExecutorService schedulerExecutorServiceRaw = Executors.newSingleThreadExecutor(
         new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapScheduler").build());
     schedulerExecutor = MoreExecutors.listeningDecorator(schedulerExecutorServiceRaw);
@@ -266,7 +287,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
         + ", vCoresPerInstance=" + coresPerInstance + ", executorsPerInstance="
         + executorsPerInstance + ", resourcePerInstanceInferred=" + resourcePerExecutor
         + ", nodeBlacklistConf=" + nodeBlacklistConf
-        + ", forceLocation=" + forceLocation);
+        + ", localityDelayMs=" + localityDelayMs);
   }
 
   @Override
@@ -279,29 +300,16 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     writeLock.lock();
     try {
       nodeEnablerFuture = nodeEnabledExecutor.submit(nodeEnablerCallable);
-      Futures.addCallback(nodeEnablerFuture, new FutureCallback<Void>() {
-        @Override
-        public void onSuccess(Void result) {
-          LOG.info("NodeEnabledThread exited");
-        }
+      Futures.addCallback(nodeEnablerFuture, new LoggingFutureCallback("NodeEnablerThread", LOG));
+
+      delayedTaskSchedulerFuture =
+          delayedTaskSchedulerExecutor.submit(delayedTaskSchedulerCallable);
+      Futures.addCallback(delayedTaskSchedulerFuture,
+          new LoggingFutureCallback("DelayedTaskSchedulerThread", LOG));
 
-        @Override
-        public void onFailure(Throwable t) {
-          LOG.warn("NodeEnabledThread exited with error", t);
-        }
-      });
       schedulerFuture = schedulerExecutor.submit(schedulerCallable);
-      Futures.addCallback(schedulerFuture, new FutureCallback<Void>() {
-        @Override
-        public void onSuccess(Void result) {
-          LOG.info("SchedulerThread exited");
-        }
+      Futures.addCallback(schedulerFuture, new LoggingFutureCallback("SchedulerThread", LOG));
 
-        @Override
-        public void onFailure(Throwable t) {
-          LOG.warn("SchedulerThread exited with error", t);
-        }
-      });
       registry.start();
       registry.registerStateChangeListener(new NodeStateChangeListener());
       activeInstances = registry.getInstances();
@@ -399,6 +407,12 @@ public class LlapTaskSchedulerService extends TaskScheduler {
         }
         timeoutExecutor.shutdownNow();
 
+        delayedTaskSchedulerCallable.shutdown();
+        if (delayedTaskSchedulerFuture != null) {
+          delayedTaskSchedulerFuture.cancel(true);
+        }
+        delayedTaskSchedulerExecutor.shutdownNow();
+
         schedulerCallable.shutdown();
         if (schedulerFuture != null) {
           schedulerFuture.cancel(true);
@@ -502,6 +516,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
   @Override
   public void blacklistNode(NodeId nodeId) {
     LOG.info("BlacklistNode not supported");
+    // TODO HIVE-13484 What happens when we try scheduling a task on a node that Tez at this point thinks is blacklisted.
   }
 
   @Override
@@ -513,7 +528,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
   public void allocateTask(Object task, Resource capability, String[] hosts, String[] racks,
       Priority priority, Object containerSignature, Object clientCookie) {
     TaskInfo taskInfo =
-        new TaskInfo(task, clientCookie, priority, capability, hosts, racks, clock.getTime());
+        new TaskInfo(localityDelayConf, clock, task, clientCookie, priority, capability, hosts, racks, clock.getTime());
     writeLock.lock();
     try {
       dagStats.registerTaskRequest(hosts, racks);
@@ -530,7 +545,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     // Container affinity can be implemented as Host affinity for LLAP. Not required until
     // 1:1 edges are used in Hive.
     TaskInfo taskInfo =
-        new TaskInfo(task, clientCookie, priority, capability, null, null, clock.getTime());
+        new TaskInfo(localityDelayConf, clock, task, clientCookie, priority, capability, null, null, clock.getTime());
     writeLock.lock();
     try {
       dagStats.registerTaskRequest(null, null);
@@ -558,7 +573,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
         return false;
       }
       if (taskInfo.containerId == null) {
-        if (taskInfo.assigned) {
+        if (taskInfo.getState() == TaskInfo.State.ASSIGNED) {
           LOG.error("Task: "
               + task
               + " assigned, but could not find the corresponding containerId."
@@ -577,7 +592,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
       assert nodeInfo != null;
 
       // Re-enable the node if preempted
-      if (taskInfo.preempted) {
+      if (taskInfo.getState() == TaskInfo.State.PREEMPTED) {
         LOG.info("Processing deallocateTask for {} which was preempted, EndReason={}", task, endReason);
         unregisterPendingPreemption(taskInfo.assignedInstance.getHost());
         nodeInfo.registerUnsuccessfulTaskEnd(true);
@@ -607,7 +622,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
           // In case of success, trigger a scheduling run for pending tasks.
           trySchedulingPendingTasks();
 
-        } else if (!taskSucceeded) {
+        } else { // Task Failed
           nodeInfo.registerUnsuccessfulTaskEnd(false);
           if (endReason != null && EnumSet
               .of(TaskAttemptEndReason.EXECUTOR_BUSY, TaskAttemptEndReason.COMMUNICATION_ERROR)
@@ -665,17 +680,13 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     return true;
   }
 
-  private ExecutorService createAppCallbackExecutorService() {
-    return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
-        .setNameFormat("TaskSchedulerAppCaller #%d").setDaemon(true).build());
-  }
-
   /**
    * @param request the list of preferred hosts. null implies any host
    * @return
    */
   private SelectHostResult selectHost(TaskInfo request) {
     String[] requestedHosts = request.requestedHosts;
+    long schedulerAttemptTime = clock.getTime();
     readLock.lock(); // Read-lock. Not updating any stats at the moment.
     try {
       // If there's no memory available, fail
@@ -683,32 +694,61 @@ public class LlapTaskSchedulerService extends TaskScheduler {
         return SELECT_HOST_RESULT_INADEQUATE_TOTAL_CAPACITY;
       }
 
+      boolean shouldDelayForLocality = request.shouldDelayForLocality(schedulerAttemptTime);
       if (requestedHosts != null && requestedHosts.length > 0) {
         int prefHostCount = -1;
-        boolean requestedHostExists = false;
+        boolean requestedHostsWillBecomeAvailable = false;
         for (String host : requestedHosts) {
           prefHostCount++;
           // Pick the first host always. Weak attempt at cache affinity.
           Set<ServiceInstance> instances = activeInstances.getByHost(host);
           if (!instances.isEmpty()) {
-            requestedHostExists = true;
             for (ServiceInstance inst : instances) {
               NodeInfo nodeInfo = instanceToNodeMap.get(inst.getWorkerIdentity());
-              if (nodeInfo != null && nodeInfo.canAcceptTask()) {
-                LOG.info("Assigning " + inst + " when looking for " + host + "." +
-                    " FirstRequestedHost=" + (prefHostCount == 0) +
-                    (requestedHosts.length > 1 ? "#prefLocations=" + requestedHosts.length : ""));
-                return new SelectHostResult(inst, nodeInfo);
+              if (nodeInfo != null) {
+                if  (nodeInfo.canAcceptTask()) {
+                  // Successfully scheduled.
+                  LOG.info(
+                      "Assigning " + nodeToString(inst, nodeInfo) + " when looking for " + host +
+                          ". local=true" + " FirstRequestedHost=" + (prefHostCount == 0) +
+                          (requestedHosts.length > 1 ? ", #prefLocations=" + requestedHosts.length :
+                              ""));
+                  return new SelectHostResult(inst, nodeInfo);
+                } else {
+                  // The node cannot accept a task at the moment.
+                  if (shouldDelayForLocality) {
+                    // Perform some checks on whether the node will become available or not.
+                    if (request.shouldForceLocality()) {
+                      requestedHostsWillBecomeAvailable = true;
+                    } else {
+                      if (nodeInfo.getEnableTime() > request.getLocalityDelayTimeout() &&
+                          nodeInfo.isDisabled() && nodeInfo.hadCommFailure()) {
+                        // This node will likely be activated after the task timeout expires.
+                      } else {
+                        // Worth waiting for the timeout.
+                        requestedHostsWillBecomeAvailable = true;
+                      }
+                    }
+                  }
+                }
+              } else {
+                LOG.warn(
+                    "Null NodeInfo when attempting to get host with worker identity {}, and host {}",
+                    inst.getWorkerIdentity(), host);
+                // Leave requestedHostWillBecomeAvailable as is. If some other host is found - delay,
+                // else ends up allocating to a random host immediately.
               }
             }
           }
         }
         // Check if forcing the location is required.
-        if (forceLocation) {
-          if (requestedHostExists) {
+        if (shouldDelayForLocality) {
+          if (requestedHostsWillBecomeAvailable) {
             if (LOG.isDebugEnabled()) {
               LOG.debug("Skipping non-local location allocation for [" + request.task +
-                  "] when trying to allocate on [" + Arrays.toString(requestedHosts) + "]");
+                  "] when trying to allocate on [" + Arrays.toString(requestedHosts) + "]" +
+                  ". ScheduleAttemptTime=" + schedulerAttemptTime + ", taskDelayTimeout=" +
+                  request.getLocalityDelayTimeout());
             }
             return SELECT_HOST_RESULT_DELAYED_LOCALITY;
           } else {
@@ -729,10 +769,11 @@ public class LlapTaskSchedulerService extends TaskScheduler {
         for (int i = 0; i < all.length; i++) {
           Entry<String, NodeInfo> inst = all[(i + n) % all.length];
           if (inst.getValue().canAcceptTask()) {
-            LOG.info("Assigning " + inst + " when looking for any host, from #hosts=" + all.length +
-                ", requestedHosts=" +
-                ((requestedHosts == null || requestedHosts.length == 0) ? "null" :
-                    Arrays.toString(requestedHosts)));
+            LOG.info(
+                "Assigning " + nodeToString(inst.getValue().getServiceInstance(), inst.getValue()) +
+                    " when looking for any host, from #hosts=" + all.length + ", requestedHosts=" +
+                    ((requestedHosts == null || requestedHosts.length == 0) ? "null" :
+                        Arrays.toString(requestedHosts)));
             return new SelectHostResult(inst.getValue().getServiceInstance(), inst.getValue());
           }
         }
@@ -820,6 +861,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
         tasksAtPriority = new LinkedList<>();
         pendingTasks.put(taskInfo.priority, tasksAtPriority);
       }
+      // Delayed tasks will not kick in right now. That will happen in the scheduling loop.
       tasksAtPriority.add(taskInfo);
       knownTasks.putIfAbsent(taskInfo.task, taskInfo);
       if (metrics != null) {
@@ -870,7 +912,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     try {
       TaskInfo taskInfo = knownTasks.remove(task);
       if (taskInfo != null) {
-        if (taskInfo.assigned) {
+        if (taskInfo.getState() == TaskInfo.State.ASSIGNED) {
           // Remove from the running list.
           int priority = taskInfo.priority.getPriority();
           Set<TaskInfo> tasksAtPriority = runningTasks.get(priority);
@@ -925,6 +967,9 @@ public class LlapTaskSchedulerService extends TaskScheduler {
           }
           taskInfo.triedAssigningTask();
           ScheduleResult scheduleResult = scheduleTask(taskInfo);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("ScheduleResult for Task: {} = {}", taskInfo, scheduleResult);
+          }
           if (scheduleResult == ScheduleResult.SCHEDULED) {
             taskIter.remove();
           } else {
@@ -938,6 +983,11 @@ public class LlapTaskSchedulerService extends TaskScheduler {
             // Preempt only if there's no pending preemptions to avoid preempting twice for a task.
             String[] potentialHosts;
             if (scheduleResult == ScheduleResult.DELAYED_LOCALITY) {
+
+              // Add the task to the delayed task queue if it does not already exist.
+              maybeAddToDelayedTaskQueue(taskInfo);
+
+              // Try preempting a lower priority task in any case.
               // preempt only on specific hosts, if no preemptions already exist on those.
               potentialHosts = taskInfo.requestedHosts;
               //Protect against a bad location being requested.
@@ -1008,7 +1058,8 @@ public class LlapTaskSchedulerService extends TaskScheduler {
       Container container =
           containerFactory.createContainer(resourcePerExecutor, taskInfo.priority,
               nsPair.getServiceInstance().getHost(),
-              nsPair.getServiceInstance().getRpcPort());
+              nsPair.getServiceInstance().getRpcPort(),
+              nsPair.getServiceInstance().getServicesAddress());
       writeLock.lock(); // While updating local structures
       try {
         LOG.info("Assigned task {} to container {}", taskInfo, container.getId());
@@ -1125,9 +1176,81 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     }
   }
 
+  private void maybeAddToDelayedTaskQueue(TaskInfo taskInfo) {
+    // There's no point adding a task with forceLocality set - since that will never exit the queue.
+    // Add other tasks if they are not already in the queue.
+    if (!taskInfo.shouldForceLocality() && !taskInfo.isInDelayedQueue()) {
+      taskInfo.setInDelayedQueue(true);
+      delayedTaskQueue.add(taskInfo);
+    }
+  }
+
+  private String nodeToString(ServiceInstance serviceInstance, NodeInfo nodeInfo) {
+    return serviceInstance.getHost() + ":" + serviceInstance.getRpcPort() + ", workerIdentity=" +
+        serviceInstance.getWorkerIdentity() + ", webAddress=" +
+        serviceInstance.getServicesAddress() + ", currentlyScheduledTasksOnNode=" + nodeInfo.numScheduledTasks;
+  }
+
+
+
+  // ------ Inner classes defined after this point ------
+
+  @VisibleForTesting
+  class DelayedTaskSchedulerCallable implements Callable<Void> {
+
+    private final AtomicBoolean isShutdown = new AtomicBoolean(false);
+
+    @Override
+    public Void call() {
+      while (!isShutdown.get() && !Thread.currentThread().isInterrupted()) {
+        try {
+          TaskInfo taskInfo = getNextTask();
+          taskInfo.setInDelayedQueue(false);
+          // Tasks can exist in the delayed queue even after they have been scheduled.
+          // Trigger scheduling only if the task is still in PENDING state.
+          processEvictedTask(taskInfo);
+
+        } catch (InterruptedException e) {
+          if (isShutdown.get()) {
+            LOG.info("DelayedTaskScheduler thread interrupted after shutdown");
+            break;
+          } else {
+            LOG.warn("DelayedTaskScheduler thread interrupted before being shutdown");
+            throw new RuntimeException(
+                "DelayedTaskScheduler thread interrupted without being shutdown", e);
+          }
+        }
+      }
+      return null;
+    }
+
+    public void shutdown() {
+      isShutdown.set(true);
+    }
+
+    public TaskInfo getNextTask() throws InterruptedException {
+      return delayedTaskQueue.take();
+    }
+
+    public void processEvictedTask(TaskInfo taskInfo) {
+      if (shouldScheduleTask(taskInfo)) {
+        trySchedulingPendingTasks();
+      }
+    }
+
+    public boolean shouldScheduleTask(TaskInfo taskInfo) {
+      return taskInfo.getState() == TaskInfo.State.PENDING;
+    }
+  }
+
+  @VisibleForTesting
+  DelayedTaskSchedulerCallable createDelayedTaskSchedulerCallable() {
+    return new DelayedTaskSchedulerCallable();
+  }
+
   private class NodeEnablerCallable implements Callable<Void> {
 
-    private AtomicBoolean isShutdown = new AtomicBoolean(false);
+    private final AtomicBoolean isShutdown = new AtomicBoolean(false);
     private static final long REFRESH_INTERVAL = 10000l;
     long nextPollInterval = REFRESH_INTERVAL;
     long lastRefreshTime;
@@ -1135,13 +1258,13 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     @Override
     public Void call() {
 
-      lastRefreshTime = System.currentTimeMillis();
+      lastRefreshTime = LlapTaskSchedulerService.this.clock.getTime();
       while (!isShutdown.get() && !Thread.currentThread().isInterrupted()) {
         try {
           while (true) {
             NodeInfo nodeInfo = disabledNodesQueue.poll(nextPollInterval, TimeUnit.MILLISECONDS);
             if (nodeInfo != null) {
-              long currentTime = System.currentTimeMillis();
+              long currentTime =  LlapTaskSchedulerService.this.clock.getTime();
               // A node became available. Enable the node and try scheduling.
               reenableDisabledNode(nodeInfo);
               trySchedulingPendingTasks();
@@ -1152,7 +1275,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
             if (nextPollInterval < 0 || nodeInfo == null) {
               // timeout expired. Reset the poll interval and refresh nodes.
               nextPollInterval = REFRESH_INTERVAL;
-              lastRefreshTime = System.currentTimeMillis();
+              lastRefreshTime = LlapTaskSchedulerService.this.clock.getTime();
               // TODO Get rid of this polling once we have notificaitons from the registry sub-system
               if (LOG.isDebugEnabled()) {
                 LOG.debug("Refreshing instances based on poll interval");
@@ -1232,6 +1355,8 @@ public class LlapTaskSchedulerService extends TaskScheduler {
         // will be handled in the next run.
         // A new request may come in right after this is set to false, but before the actual scheduling.
         // This will be handled in this run, but will cause an immediate run after, which is harmless.
+        // This is mainly to handle a trySchedue request while in the middle of a run - since the event
+        // which triggered it may not be processed for all tasks in the run.
         pendingScheduleInvocations.set(false);
         // Schedule outside of the scheduleLock - which should only be used to wait on the condition.
         schedulePendingTasks();
@@ -1245,6 +1370,8 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     }
   }
 
+  // ------ Additional static classes defined after this point ------
+
   @VisibleForTesting
   static class NodeInfo implements Delayed {
     private final NodeBlacklistConf blacklistConf;
@@ -1257,6 +1384,8 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     float cumulativeBackoffFactor = 1.0f;
 
     // Indicates whether a node had a recent communication failure.
+    // This is primarily for tracking and logging purposes for the moment.
+    // TODO At some point, treat task rejection and communication failures differently.
     private boolean hadCommFailure = false;
 
     // Indicates whether a node is disabled - for whatever reason - commFailure, busy, etc.
@@ -1375,6 +1504,13 @@ public class LlapTaskSchedulerService extends TaskScheduler {
       }
     }
 
+    /**
+     * @return the time at which this node will be re-enabled
+     */
+    public long getEnableTime() {
+      return expireTimeMillis;
+    }
+
     public boolean isDisabled() {
       return disabled;
     }
@@ -1382,13 +1518,20 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     public boolean hadCommFailure() {
       return hadCommFailure;
     }
+
     /* Returning true does not guarantee that the task will run, considering other queries
     may be running in the system. Also depends upon the capacity usage configuration
      */
     public boolean canAcceptTask() {
       boolean result = !hadCommFailure && !disabled && serviceInstance.isAlive()
           &&(numSchedulableTasks == -1 || ((numSchedulableTasks - numScheduledTasks) > 0));
-      LOG.info("canAcceptTask={}, numScheduledTasks={}, numSchedulableTasks={}, hadCommFailure={}, disabled={}, serviceInstance.isAlive={}", result, numScheduledTasks, numSchedulableTasks, hadCommFailure, disabled, serviceInstance.isAlive());
+      if (LOG.isInfoEnabled()) {
+        LOG.info("Node[" + serviceInstance.getHost() + ":" + serviceInstance.getRpcPort() + ", " +
+                serviceInstance.getWorkerIdentity() + "]: " +
+                "canAcceptTask={}, numScheduledTasks={}, numSchedulableTasks={}, hadCommFailure={}, disabled={}, serviceInstance.isAlive={}",
+            result, numScheduledTasks, numSchedulableTasks, hadCommFailure, disabled,
+            serviceInstance.isAlive());
+      }
       return result;
     }
 
@@ -1512,11 +1655,23 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     }
   }
 
-  private static class TaskInfo {
+
+  // TODO There needs to be a mechanism to figure out different attempts for the same task. Delays
+  // could potentially be changed based on this.
+
+  @VisibleForTesting
+  static class TaskInfo implements Delayed {
+
+    enum State {
+      PENDING, ASSIGNED, PREEMPTED
+    }
+
     // IDs used to ensure two TaskInfos are different without using the underlying task instance.
     // Required for insertion into a TreeMap
     static final AtomicLong ID_GEN = new AtomicLong(0);
     final long uniqueId;
+    final LocalityDelayConf localityDelayConf;
+    final Clock clock;
     final Object task;
     final Object clientCookie;
     final Priority priority;
@@ -1524,19 +1679,22 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     final String[] requestedHosts;
     final String[] requestedRacks;
     final long requestTime;
+    final long localityDelayTimeout;
     long startTime;
     long preemptTime;
     ContainerId containerId;
     ServiceInstance assignedInstance;
-    private boolean assigned = false;
-    private boolean preempted = false;
+    private State state = State.PENDING;
+    boolean inDelayedQueue = false;
 
     private int numAssignAttempts = 0;
 
     // TaskInfo instances for two different tasks will not be the same. Only a single instance should
     // ever be created for a taskAttempt
-    public TaskInfo(Object task, Object clientCookie, Priority priority, Resource capability,
+    public TaskInfo(LocalityDelayConf localityDelayConf, Clock clock, Object task, Object clientCookie, Priority priority, Resource capability,
         String[] hosts, String[] racks, long requestTime) {
+      this.localityDelayConf = localityDelayConf;
+      this.clock = clock;
       this.task = task;
       this.clientCookie = clientCookie;
       this.priority = priority;
@@ -1544,30 +1702,61 @@ public class LlapTaskSchedulerService extends TaskScheduler {
       this.requestedHosts = hosts;
       this.requestedRacks = racks;
       this.requestTime = requestTime;
+      if (localityDelayConf.getNodeLocalityDelay() == -1) {
+        localityDelayTimeout = Long.MAX_VALUE;
+      } else if (localityDelayConf.getNodeLocalityDelay() == 0) {
+        localityDelayTimeout = 0L;
+      } else {
+        localityDelayTimeout = requestTime + localityDelayConf.getNodeLocalityDelay();
+      }
       this.uniqueId = ID_GEN.getAndIncrement();
     }
 
-    void setAssignmentInfo(ServiceInstance instance, ContainerId containerId, long startTime) {
+    synchronized void setAssignmentInfo(ServiceInstance instance, ContainerId containerId, long startTime) {
       this.assignedInstance = instance;
-        this.containerId = containerId;
+      this.containerId = containerId;
       this.startTime = startTime;
-      assigned = true;
+      this.state = State.ASSIGNED;
     }
 
-    void setPreemptedInfo(long preemptTime) {
-      this.preempted = true;
-      this.assigned = false;
+    synchronized void setPreemptedInfo(long preemptTime) {
+      this.state = State.PREEMPTED;
       this.preemptTime = preemptTime;
     }
 
-    void triedAssigningTask() {
+    synchronized void setInDelayedQueue(boolean val) {
+      this.inDelayedQueue = val;
+    }
+
+    synchronized void triedAssigningTask() {
       numAssignAttempts++;
     }
 
-    int getNumPreviousAssignAttempts() {
+    synchronized int getNumPreviousAssignAttempts() {
       return numAssignAttempts;
     }
 
+    synchronized State getState() {
+      return state;
+    }
+
+    synchronized boolean isInDelayedQueue() {
+      return inDelayedQueue;
+    }
+
+    boolean shouldDelayForLocality(long schedulerAttemptTime) {
+      // getDelay <=0 means the task will be evicted from the queue.
+      return localityDelayTimeout > schedulerAttemptTime;
+    }
+
+    boolean shouldForceLocality() {
+      return localityDelayTimeout == Long.MAX_VALUE;
+    }
+
+    long getLocalityDelayTimeout() {
+      return localityDelayTimeout;
+    }
+
     @Override
     public boolean equals(Object o) {
       if (this == o) {
@@ -1602,8 +1791,26 @@ public class LlapTaskSchedulerService extends TaskScheduler {
           ", containerId=" + containerId +
           ", assignedInstance=" + assignedInstance +
           ", uniqueId=" + uniqueId +
+          ", localityDelayTimeout=" + localityDelayTimeout +
           '}';
     }
+
+    @Override
+    public long getDelay(TimeUnit unit) {
+      return unit.convert(localityDelayTimeout - clock.getTime(), TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public int compareTo(Delayed o) {
+      TaskInfo other = (TaskInfo) o;
+      if (other.localityDelayTimeout > this.localityDelayTimeout) {
+        return -1;
+      } else if (other.localityDelayTimeout < this.localityDelayTimeout) {
+        return 1;
+      } else {
+        return 0;
+      }
+    }
   }
 
   // Newer tasks first.
@@ -1689,4 +1896,24 @@ public class LlapTaskSchedulerService extends TaskScheduler {
           '}';
     }
   }
+
+  @VisibleForTesting
+  static final class LocalityDelayConf {
+    private final long nodeLocalityDelay;
+
+    public LocalityDelayConf(long nodeLocalityDelay) {
+      this.nodeLocalityDelay = nodeLocalityDelay;
+    }
+
+    public long getNodeLocalityDelay() {
+      return nodeLocalityDelay;
+    }
+
+    @Override
+    public String toString() {
+      return "LocalityDelayConf{" +
+          "nodeLocalityDelay=" + nodeLocalityDelay +
+          '}';
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/347a5a55/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/MonotonicClock.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/MonotonicClock.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/MonotonicClock.java
new file mode 100644
index 0000000..aaa6f91
--- /dev/null
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/MonotonicClock.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.tezplugins.helpers;
+
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.yarn.util.Clock;
+
+public class MonotonicClock implements Clock {
+  @Override
+  public long getTime() {
+    return Time.monotonicNow();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/347a5a55/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/scheduler/LoggingFutureCallback.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/scheduler/LoggingFutureCallback.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/scheduler/LoggingFutureCallback.java
new file mode 100644
index 0000000..ea700da
--- /dev/null
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/scheduler/LoggingFutureCallback.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.tezplugins.scheduler;
+
+import java.util.concurrent.CancellationException;
+
+import com.google.common.util.concurrent.FutureCallback;
+import org.slf4j.Logger;
+
+public final class LoggingFutureCallback implements FutureCallback<Void> {
+  private final String componentName;
+  private final Logger LOG;
+
+  public LoggingFutureCallback(String componentName, Logger log) {
+    this.componentName = componentName;
+    LOG = log;
+  }
+
+  @Override
+  public void onSuccess(Void result) {
+    LOG.info("{} exited", componentName);
+  }
+
+  @Override
+  public void onFailure(Throwable t) {
+    if (t instanceof CancellationException) {
+      LOG.info("{} was cancelled", componentName, t.getMessage());
+    } else {
+      LOG.warn("{} exited with error", componentName, t);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/347a5a55/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
index b2cd55e..e4fe79c 100644
--- a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
+++ b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
@@ -15,7 +15,9 @@
 package org.apache.hadoop.hive.llap.tezplugins;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doReturn;
@@ -26,6 +28,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
 import java.io.IOException;
+import java.util.concurrent.DelayQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -37,6 +40,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.registry.impl.LlapFixedRegistryImpl;
 import org.apache.hadoop.hive.llap.testhelpers.ControlledClock;
+import org.apache.hadoop.hive.llap.tezplugins.helpers.MonotonicClock;
+import org.apache.hadoop.util.Time;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -44,7 +49,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.util.Clock;
-import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
@@ -62,7 +66,7 @@ public class TestLlapTaskSchedulerService {
   private static final String HOST2 = "host2";
   private static final String HOST3 = "host3";
 
-  @Test (timeout = 5000)
+  @Test(timeout = 10000)
   public void testSimpleLocalAllocation() throws IOException, InterruptedException {
 
     TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper();
@@ -77,18 +81,17 @@ public class TestLlapTaskSchedulerService {
       tsWrapper.controlScheduler(true);
       tsWrapper.allocateTask(task1, hosts1, priority1, clientCookie1);
 
-      tsWrapper.signalSchedulerRun();
-      tsWrapper.awaitSchedulerRun();
+      tsWrapper.awaitLocalTaskAllocations(1);
 
       verify(tsWrapper.mockAppCallback).taskAllocated(eq(task1), eq(clientCookie1), any(Container.class));
-      // TODO Verify this is on host1.
       assertEquals(1, tsWrapper.ts.dagStats.numLocalAllocations);
+      assertEquals(1, tsWrapper.ts.dagStats.numAllocationsPerHost.get(HOST1).get());
     } finally {
       tsWrapper.shutdown();
     }
   }
 
-  @Test (timeout = 5000)
+  @Test(timeout = 10000)
   public void testSimpleNoLocalityAllocation() throws IOException, InterruptedException {
     TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper();
 
@@ -99,8 +102,7 @@ public class TestLlapTaskSchedulerService {
       Object clientCookie1 = new Object();
       tsWrapper.controlScheduler(true);
       tsWrapper.allocateTask(task1, null, priority1, clientCookie1);
-      tsWrapper.signalSchedulerRun();
-      tsWrapper.awaitSchedulerRun();
+      tsWrapper.awaitTotalTaskAllocations(1);
       verify(tsWrapper.mockAppCallback).taskAllocated(eq(task1), eq(clientCookie1), any(Container.class));
       assertEquals(1, tsWrapper.ts.dagStats.numAllocationsNoLocalityRequest);
     } finally {
@@ -109,7 +111,7 @@ public class TestLlapTaskSchedulerService {
   }
 
 
-  @Test(timeout=5000)
+  @Test(timeout = 10000)
   public void testPreemption() throws InterruptedException, IOException {
 
     Priority priority1 = Priority.newInstance(1);
@@ -174,7 +176,7 @@ public class TestLlapTaskSchedulerService {
 
   }
 
-  @Test(timeout=5000)
+  @Test(timeout = 10000)
   public void testNodeDisabled() throws IOException, InterruptedException {
     TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(10000l);
     try {
@@ -233,7 +235,7 @@ public class TestLlapTaskSchedulerService {
     }
   }
 
-  @Test(timeout=5000)
+  @Test(timeout = 10000)
   public void testNodeReEnabled() throws InterruptedException, IOException {
     // Based on actual timing.
     TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(1000l);
@@ -307,7 +309,7 @@ public class TestLlapTaskSchedulerService {
     }
   }
 
-  @Test (timeout = 5000)
+  @Test(timeout = 10000)
   public void testForceLocalityTest1() throws IOException, InterruptedException {
     // 2 hosts. 2 per host. 5 requests at the same priority.
     // First 3 on host1, Next at host2, Last with no host.
@@ -316,7 +318,7 @@ public class TestLlapTaskSchedulerService {
 
   }
 
-  @Test (timeout = 5000)
+  @Test(timeout = 10000)
   public void testNoForceLocalityCounterTest1() throws IOException, InterruptedException {
     // 2 hosts. 2 per host. 5 requests at the same priority.
     // First 3 on host1, Next at host2, Last with no host.
@@ -411,7 +413,7 @@ public class TestLlapTaskSchedulerService {
     }
   }
 
-  @Test(timeout = 5000)
+  @Test(timeout = 10000)
   public void testForcedLocalityUnknownHost() throws IOException, InterruptedException {
     Priority priority1 = Priority.newInstance(1);
 
@@ -454,15 +456,13 @@ public class TestLlapTaskSchedulerService {
     }
   }
 
-
-  @Test(timeout = 5000)
+  @Test(timeout = 10000)
   public void testForcedLocalityPreemption() throws IOException, InterruptedException {
     Priority priority1 = Priority.newInstance(1);
     Priority priority2 = Priority.newInstance(2);
     String [] hosts = new String[] {HOST1, HOST2};
 
     String [] hostsH1 = new String[] {HOST1};
-    String [] hostsH2 = new String[] {HOST2};
 
     TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1, -1l);
 
@@ -485,13 +485,7 @@ public class TestLlapTaskSchedulerService {
       tsWrapper.allocateTask(task2, hostsH1, priority2, clientCookie2);
       // This request at a lower priority should not affect anything.
       tsWrapper.allocateTask(task3, hostsH1, priority2, clientCookie3);
-      while (true) {
-        tsWrapper.signalSchedulerRun();
-        tsWrapper.awaitSchedulerRun();
-        if (tsWrapper.ts.dagStats.numLocalAllocations == 2) {
-          break;
-        }
-      }
+      tsWrapper.awaitLocalTaskAllocations(2);
 
       verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
       ArgumentCaptor<Object> argumentCaptor = ArgumentCaptor.forClass(Object.class);
@@ -517,13 +511,8 @@ public class TestLlapTaskSchedulerService {
 
       tsWrapper.deallocateTask(task1, false, TaskAttemptEndReason.INTERNAL_PREEMPTION);
 
-      while (true) {
-        tsWrapper.signalSchedulerRun();
-        tsWrapper.awaitSchedulerRun();
-        if (tsWrapper.ts.dagStats.numTotalAllocations == 3) {
-          break;
-        }
-      }
+      tsWrapper.awaitLocalTaskAllocations(3);
+
       verify(tsWrapper.mockAppCallback, times(1)).taskAllocated(eq(task4),
           eq(clientCookie4), any(Container.class));
 
@@ -532,11 +521,471 @@ public class TestLlapTaskSchedulerService {
     }
   }
 
+  @Test(timeout = 10000)
+  public void testForcedLocalityNotInDelayedQueue() throws IOException, InterruptedException {
+    String[] hosts = new String[]{HOST1, HOST2};
+
+    String[] hostsH1 = new String[]{HOST1};
+
+    TestTaskSchedulerServiceWrapper tsWrapper =
+        new TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1, -1l);
+    testNotInQueue(tsWrapper, hostsH1);
+  }
+
+  @Test(timeout = 10000)
+  public void testNoLocalityNotInDelayedQueue() throws IOException, InterruptedException {
+    String[] hosts = new String[]{HOST1};
+
+    String[] hostsH1 = new String[]{HOST1};
+
+    TestTaskSchedulerServiceWrapper tsWrapper =
+        new TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1, 0l);
+    testNotInQueue(tsWrapper, hostsH1);
+  }
+
+  private void testNotInQueue(TestTaskSchedulerServiceWrapper tsWrapper, String[] hosts) throws
+      InterruptedException {
+    Priority priority1 = Priority.newInstance(1);
+    try {
+      tsWrapper.controlScheduler(true);
+      tsWrapper.allocateTask(hosts, priority1);
+      tsWrapper.allocateTask(hosts, priority1);
+      tsWrapper.allocateTask(hosts, priority1); // 1 more than capacity.
+
+      tsWrapper.awaitLocalTaskAllocations(2);
+
+      assertEquals(0, tsWrapper.ts.delayedTaskQueue.size());
+
+    } finally {
+      tsWrapper.shutdown();
+    }
+  }
+
+  @Test(timeout = 10000)
+  public void testDelayedLocalityFallbackToNonLocal() throws IOException, InterruptedException {
+    Priority priority1 = Priority.newInstance(1);
+    String [] hosts = new String[] {HOST1, HOST2};
+
+    String [] hostsH1 = new String[] {HOST1};
+
+    TestTaskSchedulerServiceWrapper tsWrapper =
+        new TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1, 10000l, true);
+    LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled
+        delayedTaskSchedulerCallableControlled =
+        (LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled) tsWrapper.ts.delayedTaskSchedulerCallable;
+    ControlledClock clock = tsWrapper.getClock();
+    clock.setTime(clock.getTime());
+
+    // Fill up host1 with tasks. Leave host2 empty.
+    try {
+      tsWrapper.controlScheduler(true);
+      Object task1 = tsWrapper.allocateTask(hostsH1, priority1);
+      Object task2 = tsWrapper.allocateTask(hostsH1, priority1);
+      Object task3 = tsWrapper.allocateTask(hostsH1, priority1); // 1 more than capacity.
+
+      tsWrapper.awaitLocalTaskAllocations(2);
+
+      verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
+      ArgumentCaptor<Object> argumentCaptor = ArgumentCaptor.forClass(Object.class);
+      verify(tsWrapper.mockAppCallback, times(2))
+          .taskAllocated(argumentCaptor.capture(), any(Object.class), any(Container.class));
+      assertEquals(2, argumentCaptor.getAllValues().size());
+      assertEquals(task1, argumentCaptor.getAllValues().get(0));
+      assertEquals(task2, argumentCaptor.getAllValues().get(1));
+
+      reset(tsWrapper.mockAppCallback);
+
+      // No capacity left on node1. The next task should be allocated to node2 after it times out.
+      clock.setTime(clock.getTime() + 10000l); // Past the timeout.
+
+      assertEquals(
+          LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled.STATE_NOT_RUN,
+          delayedTaskSchedulerCallableControlled.lastState);
+
+      delayedTaskSchedulerCallableControlled.triggerGetNextTask();
+      delayedTaskSchedulerCallableControlled.awaitGetNextTaskProcessing();
+
+      // Verify that an attempt was made to schedule the task, but the decision was to skip scheduling
+      assertEquals(
+          LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled.STATE_RETURNED_TASK,
+          delayedTaskSchedulerCallableControlled.lastState);
+      assertTrue(delayedTaskSchedulerCallableControlled.shouldScheduleTaskTriggered &&
+          delayedTaskSchedulerCallableControlled.lastShouldScheduleTaskResult);
+
+      tsWrapper.awaitChangeInTotalAllocations(2);
+
+      verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
+      argumentCaptor = ArgumentCaptor.forClass(Object.class);
+      ArgumentCaptor<Container> containerCaptor = ArgumentCaptor.forClass(Container.class);
+      verify(tsWrapper.mockAppCallback, times(1))
+          .taskAllocated(argumentCaptor.capture(), any(Object.class), containerCaptor.capture());
+      assertEquals(1, argumentCaptor.getAllValues().size());
+      assertEquals(task3, argumentCaptor.getAllValues().get(0));
+      Container assignedContainer = containerCaptor.getValue();
+      assertEquals(HOST2, assignedContainer.getNodeId().getHost());
+
+
+      assertEquals(2, tsWrapper.ts.dagStats.numLocalAllocations);
+      assertEquals(1, tsWrapper.ts.dagStats.numNonLocalAllocations);
+      assertEquals(1, tsWrapper.ts.dagStats.numDelayedAllocations);
+      assertEquals(2, tsWrapper.ts.dagStats.numAllocationsPerHost.get(HOST1).get());
+      assertEquals(1, tsWrapper.ts.dagStats.numAllocationsPerHost.get(HOST2).get());
+
+    } finally {
+      tsWrapper.shutdown();
+    }
+  }
+
+  @Test(timeout = 10000)
+  public void testDelayedLocalityDelayedAllocation() throws InterruptedException, IOException {
+    Priority priority1 = Priority.newInstance(1);
+    String [] hosts = new String[] {HOST1, HOST2};
+
+    String [] hostsH1 = new String[] {HOST1};
+
+    TestTaskSchedulerServiceWrapper tsWrapper =
+        new TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1, 10000l, true);
+    LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled
+        delayedTaskSchedulerCallableControlled =
+        (LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled) tsWrapper.ts.delayedTaskSchedulerCallable;
+    ControlledClock clock = tsWrapper.getClock();
+    clock.setTime(clock.getTime());
+
+    // Fill up host1 with tasks. Leave host2 empty.
+    try {
+      tsWrapper.controlScheduler(true);
+      Object task1 = tsWrapper.allocateTask(hostsH1, priority1);
+      Object task2 = tsWrapper.allocateTask(hostsH1, priority1);
+      Object task3 = tsWrapper.allocateTask(hostsH1, priority1); // 1 more than capacity.
+
+      tsWrapper.awaitLocalTaskAllocations(2);
+
+      verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
+      ArgumentCaptor<Object> argumentCaptor = ArgumentCaptor.forClass(Object.class);
+      verify(tsWrapper.mockAppCallback, times(2))
+          .taskAllocated(argumentCaptor.capture(), any(Object.class), any(Container.class));
+      assertEquals(2, argumentCaptor.getAllValues().size());
+      assertEquals(task1, argumentCaptor.getAllValues().get(0));
+      assertEquals(task2, argumentCaptor.getAllValues().get(1));
+
+      reset(tsWrapper.mockAppCallback);
+
+      // Move the clock forward 2000ms, and check the delayed queue
+      clock.setTime(clock.getTime() + 2000l); // Past the timeout.
+
+      assertEquals(
+          LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled.STATE_NOT_RUN,
+          delayedTaskSchedulerCallableControlled.lastState);
+
+      delayedTaskSchedulerCallableControlled.triggerGetNextTask();
+      delayedTaskSchedulerCallableControlled.awaitGetNextTaskProcessing();
+
+      // Verify that an attempt was made to schedule the task, but the decision was to skip scheduling
+      assertEquals(
+          LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled.STATE_TIMEOUT_NOT_EXPIRED,
+          delayedTaskSchedulerCallableControlled.lastState);
+      assertFalse(delayedTaskSchedulerCallableControlled.shouldScheduleTaskTriggered);
+
+      tsWrapper.deallocateTask(task1, true, null);
+
+      // Node1 now has free capacity. task1 should be allocated to it.
+      tsWrapper.awaitChangeInTotalAllocations(2);
+
+      verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
+      argumentCaptor = ArgumentCaptor.forClass(Object.class);
+      ArgumentCaptor<Container> containerCaptor = ArgumentCaptor.forClass(Container.class);
+      verify(tsWrapper.mockAppCallback, times(1))
+          .taskAllocated(argumentCaptor.capture(), any(Object.class), containerCaptor.capture());
+      assertEquals(1, argumentCaptor.getAllValues().size());
+      assertEquals(task3, argumentCaptor.getAllValues().get(0));
+      Container assignedContainer = containerCaptor.getValue();
+      assertEquals(HOST1, assignedContainer.getNodeId().getHost());
+
+
+      assertEquals(3, tsWrapper.ts.dagStats.numLocalAllocations);
+      assertEquals(0, tsWrapper.ts.dagStats.numNonLocalAllocations);
+      assertEquals(1, tsWrapper.ts.dagStats.numDelayedAllocations);
+      assertEquals(3, tsWrapper.ts.dagStats.numAllocationsPerHost.get(HOST1).get());
+
+    } finally {
+      tsWrapper.shutdown();
+    }
+  }
+
+  @Test(timeout = 10000)
+  public void testDelayedQueeTaskSelectionAfterScheduled() throws IOException,
+      InterruptedException {
+    Priority priority1 = Priority.newInstance(1);
+    String [] hosts = new String[] {HOST1, HOST2};
+
+    String [] hostsH1 = new String[] {HOST1};
+
+    TestTaskSchedulerServiceWrapper tsWrapper =
+        new TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1, 10000l, true);
+    LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled
+        delayedTaskSchedulerCallableControlled =
+        (LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled) tsWrapper.ts.delayedTaskSchedulerCallable;
+    ControlledClock clock = tsWrapper.getClock();
+    clock.setTime(clock.getTime());
+
+    // Fill up host1 with tasks. Leave host2 empty.
+    try {
+      tsWrapper.controlScheduler(true);
+      Object task1 = tsWrapper.allocateTask(hostsH1, priority1);
+      Object task2 = tsWrapper.allocateTask(hostsH1, priority1);
+      Object task3 = tsWrapper.allocateTask(hostsH1, priority1); // 1 more than capacity.
+
+      tsWrapper.awaitLocalTaskAllocations(2);
+
+      verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
+      ArgumentCaptor<Object> argumentCaptor = ArgumentCaptor.forClass(Object.class);
+      verify(tsWrapper.mockAppCallback, times(2))
+          .taskAllocated(argumentCaptor.capture(), any(Object.class), any(Container.class));
+      assertEquals(2, argumentCaptor.getAllValues().size());
+      assertEquals(task1, argumentCaptor.getAllValues().get(0));
+      assertEquals(task2, argumentCaptor.getAllValues().get(1));
+
+      // Simulate a 2s delay before finishing the task.
+      clock.setTime(clock.getTime() + 2000);
+
+      assertEquals(
+          LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled.STATE_NOT_RUN,
+          delayedTaskSchedulerCallableControlled.lastState);
+
+      delayedTaskSchedulerCallableControlled.triggerGetNextTask();
+      delayedTaskSchedulerCallableControlled.awaitGetNextTaskProcessing();
+      assertEquals(
+          LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled.STATE_TIMEOUT_NOT_EXPIRED,
+          delayedTaskSchedulerCallableControlled.lastState);
+      assertFalse(delayedTaskSchedulerCallableControlled.shouldScheduleTaskTriggered);
+
+      reset(tsWrapper.mockAppCallback);
+
+      // Now finish task1, which will make capacity for task3 to run. Nothing is coming out of the delayed queue yet.
+      tsWrapper.deallocateTask(task1, true, null);
+      tsWrapper.awaitLocalTaskAllocations(3);
+
+      verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
+      argumentCaptor = ArgumentCaptor.forClass(Object.class);
+      ArgumentCaptor<Container> containerCaptor = ArgumentCaptor.forClass(Container.class);
+      verify(tsWrapper.mockAppCallback, times(1))
+          .taskAllocated(argumentCaptor.capture(), any(Object.class), containerCaptor.capture());
+      assertEquals(1, argumentCaptor.getAllValues().size());
+      assertEquals(task3, argumentCaptor.getAllValues().get(0));
+      Container assignedContainer = containerCaptor.getValue();
+      assertEquals(HOST1, assignedContainer.getNodeId().getHost());
+
+      reset(tsWrapper.mockAppCallback);
+
+      // Move the clock forward and trigger a run.
+      clock.setTime(clock.getTime() + 8000); // Set to start + 10000 which is the timeout
+      delayedTaskSchedulerCallableControlled.triggerGetNextTask();
+      delayedTaskSchedulerCallableControlled.awaitGetNextTaskProcessing();
+      assertEquals(
+          LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled.STATE_RETURNED_TASK,
+          delayedTaskSchedulerCallableControlled.lastState);
+      // Verify that an attempt was made to schedule the task, but the decision was to skip scheduling
+      assertTrue(delayedTaskSchedulerCallableControlled.shouldScheduleTaskTriggered &&
+          !delayedTaskSchedulerCallableControlled.lastShouldScheduleTaskResult);
+
+      // Ensure there's no more invocations.
+      verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
+      verify(tsWrapper.mockAppCallback, never()).taskAllocated(any(Object.class), any(Object.class), any(Container.class));
+
+    } finally {
+      tsWrapper.shutdown();
+    }
+  }
+
+  @Test(timeout = 10000)
+  public void testTaskInfoDelay() {
+
+    LlapTaskSchedulerService.LocalityDelayConf localityDelayConf1 =
+        new LlapTaskSchedulerService.LocalityDelayConf(3000);
+
+    ControlledClock clock = new ControlledClock(new MonotonicClock());
+    clock.setTime(clock.getTime());
+
+
+    // With a timeout of 3000.
+    LlapTaskSchedulerService.TaskInfo taskInfo =
+        new LlapTaskSchedulerService.TaskInfo(localityDelayConf1, clock, new Object(), new Object(),
+            mock(Priority.class), mock(Resource.class), null, null, clock.getTime());
+
+    assertFalse(taskInfo.shouldForceLocality());
+
+    assertEquals(3000, taskInfo.getDelay(TimeUnit.MILLISECONDS));
+    assertTrue(taskInfo.shouldDelayForLocality(clock.getTime()));
+
+    clock.setTime(clock.getTime() + 500);
+    assertEquals(2500, taskInfo.getDelay(TimeUnit.MILLISECONDS));
+    assertTrue(taskInfo.shouldDelayForLocality(clock.getTime()));
+
+    clock.setTime(clock.getTime() + 2500);
+    assertEquals(0, taskInfo.getDelay(TimeUnit.MILLISECONDS));
+    assertFalse(taskInfo.shouldDelayForLocality(clock.getTime()));
+
+
+    // No locality delay
+    LlapTaskSchedulerService.LocalityDelayConf localityDelayConf2 =
+        new LlapTaskSchedulerService.LocalityDelayConf(0);
+    taskInfo =
+        new LlapTaskSchedulerService.TaskInfo(localityDelayConf2, clock, new Object(), new Object(),
+            mock(Priority.class), mock(Resource.class), null, null, clock.getTime());
+    assertFalse(taskInfo.shouldDelayForLocality(clock.getTime()));
+    assertFalse(taskInfo.shouldForceLocality());
+    assertTrue(taskInfo.getDelay(TimeUnit.MILLISECONDS) < 0);
+
+    // Force locality
+    LlapTaskSchedulerService.LocalityDelayConf localityDelayConf3 =
+        new LlapTaskSchedulerService.LocalityDelayConf(-1);
+    taskInfo =
+        new LlapTaskSchedulerService.TaskInfo(localityDelayConf3, clock, new Object(), new Object(),
+            mock(Priority.class), mock(Resource.class), null, null, clock.getTime());
+    assertTrue(taskInfo.shouldDelayForLocality(clock.getTime()));
+    assertTrue(taskInfo.shouldForceLocality());
+    assertFalse(taskInfo.getDelay(TimeUnit.MILLISECONDS) < 0);
+  }
+
+  @Test(timeout = 10000)
+  public void testLocalityDelayTaskOrdering() throws InterruptedException, IOException {
+
+    LlapTaskSchedulerService.LocalityDelayConf localityDelayConf =
+        new LlapTaskSchedulerService.LocalityDelayConf(3000);
+
+    ControlledClock clock = new ControlledClock(new MonotonicClock());
+    clock.setTime(clock.getTime());
+
+    DelayQueue<LlapTaskSchedulerService.TaskInfo> delayedQueue = new DelayQueue<>();
+
+    LlapTaskSchedulerService.TaskInfo taskInfo1 =
+        new LlapTaskSchedulerService.TaskInfo(localityDelayConf, clock, new Object(), new Object(),
+            mock(Priority.class), mock(Resource.class), null, null, clock.getTime());
+
+    clock.setTime(clock.getTime() + 1000);
+    LlapTaskSchedulerService.TaskInfo taskInfo2 =
+        new LlapTaskSchedulerService.TaskInfo(localityDelayConf, clock, new Object(), new Object(),
+            mock(Priority.class), mock(Resource.class), null, null, clock.getTime());
+
+    delayedQueue.add(taskInfo1);
+    delayedQueue.add(taskInfo2);
+
+    assertEquals(taskInfo1, delayedQueue.peek());
+  }
+
+  @Test (timeout = 15000)
+  public void testDelayedLocalityNodeCommErrorImmediateAllocation() throws IOException, InterruptedException {
+    Priority priority1 = Priority.newInstance(1);
+    String [] hosts = new String[] {HOST1, HOST2};
+
+    String [] hostsH1 = new String[] {HOST1};
+
+    // Node disable timeout higher than locality delay.
+    TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(20000, hosts, 1, 1, 10000l);
+
+    // Fill up host1 with tasks. Leave host2 empty.
+    try {
+      long startTime = tsWrapper.getClock().getTime();
+      tsWrapper.controlScheduler(true);
+      Object task1 = tsWrapper.allocateTask(hostsH1, priority1);
+      Object task2 = tsWrapper.allocateTask(hostsH1, priority1);
+      Object task3 = tsWrapper.allocateTask(hostsH1, priority1); // 1 more than capacity.
+
+      tsWrapper.awaitLocalTaskAllocations(2);
+
+      verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
+      ArgumentCaptor<Object> argumentCaptor = ArgumentCaptor.forClass(Object.class);
+      verify(tsWrapper.mockAppCallback, times(2))
+          .taskAllocated(argumentCaptor.capture(), any(Object.class), any(Container.class));
+      assertEquals(2, argumentCaptor.getAllValues().size());
+      assertEquals(task1, argumentCaptor.getAllValues().get(0));
+      assertEquals(task2, argumentCaptor.getAllValues().get(1));
+
+      reset(tsWrapper.mockAppCallback);
+
+      // Mark a task as failed due to a comm failure.
+      tsWrapper.deallocateTask(task1, false, TaskAttemptEndReason.COMMUNICATION_ERROR);
+
+      // Node1 marked as failed, node2 has capacity.
+      // Timeout for nodes is larger than delay - immediate allocation
+      tsWrapper.awaitChangeInTotalAllocations(2);
+
+      long thirdAllocateTime = tsWrapper.getClock().getTime();
+      long diff = thirdAllocateTime - startTime;
+      // diffAfterSleep < total sleepTime
+      assertTrue("Task not allocated in expected time window: duration=" + diff, diff < 10000l);
+
+      verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
+      argumentCaptor = ArgumentCaptor.forClass(Object.class);
+      ArgumentCaptor<Container> containerCaptor = ArgumentCaptor.forClass(Container.class);
+      verify(tsWrapper.mockAppCallback, times(1))
+          .taskAllocated(argumentCaptor.capture(), any(Object.class), containerCaptor.capture());
+      assertEquals(1, argumentCaptor.getAllValues().size());
+      assertEquals(task3, argumentCaptor.getAllValues().get(0));
+      Container assignedContainer = containerCaptor.getValue();
+      assertEquals(HOST2, assignedContainer.getNodeId().getHost());
+
+
+      assertEquals(2, tsWrapper.ts.dagStats.numLocalAllocations);
+      assertEquals(1, tsWrapper.ts.dagStats.numNonLocalAllocations);
+      assertEquals(1, tsWrapper.ts.dagStats.numDelayedAllocations);
+      assertEquals(2, tsWrapper.ts.dagStats.numAllocationsPerHost.get(HOST1).get());
+      assertEquals(1, tsWrapper.ts.dagStats.numAllocationsPerHost.get(HOST2).get());
+
+    } finally {
+      tsWrapper.shutdown();
+    }
+  }
+
+  @Test (timeout = 15000)
+  public void testDelayedLocalityNodeCommErrorDelayedAllocation() throws IOException, InterruptedException {
+    Priority priority1 = Priority.newInstance(1);
+    String [] hosts = new String[] {HOST1, HOST2};
+
+    String [] hostsH1 = new String[] {HOST1};
+
+    TestTaskSchedulerServiceWrapper tsWrapper =
+        new TestTaskSchedulerServiceWrapper(5000, hosts, 1, 1, 10000l, true);
+    LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled
+        delayedTaskSchedulerCallableControlled =
+        (LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled) tsWrapper.ts.delayedTaskSchedulerCallable;
+    ControlledClock clock = tsWrapper.getClock();
+    clock.setTime(clock.getTime());
+
+    // Fill up host1 with tasks. Leave host2 empty.
+    try {
+      tsWrapper.controlScheduler(true);
+      Object task1 = tsWrapper.allocateTask(hostsH1, priority1);
+      Object task2 = tsWrapper.allocateTask(hostsH1, priority1);
+      Object task3 = tsWrapper.allocateTask(hostsH1, priority1); // 1 more than capacity.
+
+      tsWrapper.awaitLocalTaskAllocations(2);
+
+      verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
+      ArgumentCaptor<Object> argumentCaptor = ArgumentCaptor.forClass(Object.class);
+      verify(tsWrapper.mockAppCallback, times(2))
+          .taskAllocated(argumentCaptor.capture(), any(Object.class), any(Container.class));
+      assertEquals(2, argumentCaptor.getAllValues().size());
+      assertEquals(task1, argumentCaptor.getAllValues().get(0));
+      assertEquals(task2, argumentCaptor.getAllValues().get(1));
+
+      reset(tsWrapper.mockAppCallback);
+
+      // Mark a task as failed due to a comm failure.
+      tsWrapper.deallocateTask(task1, false, TaskAttemptEndReason.COMMUNICATION_ERROR);
+
+      // Node1 has free capacity but is disabled. Node 2 has capcaity. Delay > re-enable tiemout
+      tsWrapper.ensureNoChangeInTotalAllocations(2, 2000l);
+    } finally {
+      tsWrapper.shutdown();
+    }
+  }
+
   private static class TestTaskSchedulerServiceWrapper {
     static final Resource resource = Resource.newInstance(1024, 1);
     Configuration conf;
     TaskSchedulerContext mockAppCallback = mock(TaskSchedulerContext.class);
-    ControlledClock clock = new ControlledClock(new SystemClock());
+    ControlledClock clock = new ControlledClock(new MonotonicClock());
     ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(ApplicationId.newInstance(1000, 1), 1);
     LlapTaskSchedulerServiceForTest ts;
 
@@ -555,14 +1004,21 @@ public class TestLlapTaskSchedulerService {
       this(disableTimeoutMillis, hosts, numExecutors, waitQueueSize, 0l);
     }
 
-    TestTaskSchedulerServiceWrapper(long disableTimeoutMillis, String[] hosts, int numExecutors, int waitQueueSize, long localityDelayMs) throws
+    TestTaskSchedulerServiceWrapper(long nodeDisableTimeoutMillis, String[] hosts, int numExecutors,
+                                    int waitQueueSize, long localityDelayMs) throws
+        IOException, InterruptedException {
+      this(nodeDisableTimeoutMillis, hosts, numExecutors, waitQueueSize, localityDelayMs, false);
+    }
+
+    TestTaskSchedulerServiceWrapper(long nodeDisableTimeoutMillis, String[] hosts, int numExecutors,
+                                    int waitQueueSize, long localityDelayMs, boolean controlledDelayedTaskQueue) throws
         IOException, InterruptedException {
       conf = new Configuration();
       conf.setStrings(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname, hosts);
       conf.setInt(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname, numExecutors);
       conf.setInt(ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE.varname, waitQueueSize);
       conf.set(ConfVars.LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MS.varname,
-          disableTimeoutMillis + "ms");
+          nodeDisableTimeoutMillis + "ms");
       conf.setBoolean(LlapFixedRegistryImpl.FIXED_REGISTRY_RESOLVE_HOST_NAMES, false);
       conf.setLong(ConfVars.LLAP_TASK_SCHEDULER_LOCALITY_DELAY.varname, localityDelayMs);
 
@@ -571,7 +1027,11 @@ public class TestLlapTaskSchedulerService {
       UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf);
       doReturn(userPayload).when(mockAppCallback).getInitialUserPayload();
 
-      ts = new LlapTaskSchedulerServiceForTest(mockAppCallback, clock);
+      if (controlledDelayedTaskQueue) {
+        ts = new LlapTaskSchedulerServiceForTestControlled(mockAppCallback, clock);
+      } else {
+        ts = new LlapTaskSchedulerServiceForTest(mockAppCallback, clock);
+      }
 
       controlScheduler(true);
       ts.initialize();
@@ -582,6 +1042,10 @@ public class TestLlapTaskSchedulerService {
       awaitSchedulerRun();
     }
 
+    ControlledClock getClock() {
+      return clock;
+    }
+
     void controlScheduler(boolean val) {
       ts.forTestsetControlScheduling(val);
     }
@@ -591,8 +1055,19 @@ public class TestLlapTaskSchedulerService {
     }
 
     void awaitSchedulerRun() throws InterruptedException {
-      ts.forTestAwaitSchedulingRun();
+      ts.forTestAwaitSchedulingRun(-1);
+    }
+
+    /**
+     *
+     * @param timeoutMs
+     * @return false if the time elapsed
+     * @throws InterruptedException
+     */
+    boolean awaitSchedulerRun(long timeoutMs) throws InterruptedException {
+      return ts.forTestAwaitSchedulingRun(timeoutMs);
     }
+
     void resetAppCallback() {
       reset(mockAppCallback);
     }
@@ -605,6 +1080,8 @@ public class TestLlapTaskSchedulerService {
       ts.allocateTask(task, resource, hosts, null, priority, null, clientCookie);
     }
 
+
+
     void deallocateTask(Object task, boolean succeeded, TaskAttemptEndReason endReason) {
       ts.deallocateTask(task, succeeded, endReason, null);
     }
@@ -612,6 +1089,60 @@ public class TestLlapTaskSchedulerService {
     void rejectExecution(Object task) {
       ts.deallocateTask(task, false, TaskAttemptEndReason.EXECUTOR_BUSY, null);
     }
+
+
+    // More complex methods which may wrap multiple operations
+    Object allocateTask(String[] hosts, Priority priority) {
+      Object task = new Object();
+      Object clientCookie = new Object();
+      allocateTask(task, hosts, priority, clientCookie);
+      return task;
+    }
+
+    public void awaitTotalTaskAllocations(int numTasks) throws InterruptedException {
+      while (true) {
+        signalSchedulerRun();
+        awaitSchedulerRun();
+        if (ts.dagStats.numTotalAllocations == numTasks) {
+          break;
+        }
+      }
+    }
+
+    public void awaitLocalTaskAllocations(int numTasks) throws InterruptedException {
+      while (true) {
+        signalSchedulerRun();
+        awaitSchedulerRun();
+        if (ts.dagStats.numLocalAllocations == numTasks) {
+          break;
+        }
+      }
+    }
+
+    public void awaitChangeInTotalAllocations(int previousAllocations) throws InterruptedException {
+      while (true) {
+        signalSchedulerRun();
+        awaitSchedulerRun();
+        if (ts.dagStats.numTotalAllocations > previousAllocations) {
+          break;
+        }
+        Thread.sleep(200l);
+      }
+    }
+
+    public void ensureNoChangeInTotalAllocations(int previousAllocations, long timeout) throws
+        InterruptedException {
+      long startTime = Time.monotonicNow();
+      long timeLeft = timeout;
+      while (timeLeft > 0) {
+        signalSchedulerRun();
+        awaitSchedulerRun(Math.min(200, timeLeft));
+        if (ts.dagStats.numTotalAllocations != previousAllocations) {
+          throw new IllegalStateException("NumTotalAllocations expected to stay at " + previousAllocations + ". Actual=" + ts.dagStats.numTotalAllocations);
+        }
+        timeLeft = (startTime + timeout) - Time.monotonicNow();
+      }
+    }
   }
 
   private static class LlapTaskSchedulerServiceForTest extends LlapTaskSchedulerService {
@@ -632,6 +1163,7 @@ public class TestLlapTaskSchedulerService {
 
     @Override
     protected void schedulePendingTasks() {
+      LOG.info("Attempted schedulPendingTasks");
       testLock.lock();
       try {
         if (controlScheduling.get()) {
@@ -668,17 +1200,143 @@ public class TestLlapTaskSchedulerService {
       }
     }
 
-    void forTestAwaitSchedulingRun() throws InterruptedException {
+    boolean forTestAwaitSchedulingRun(long timeout) throws InterruptedException {
       testLock.lock();
       try {
+        boolean success = true;
         while (!schedulingComplete) {
-          schedulingCompleteCondition.await();
+          if (timeout == -1) {
+            schedulingCompleteCondition.await();
+          } else {
+            success = schedulingCompleteCondition.await(timeout, TimeUnit.MILLISECONDS);
+            break;
+          }
         }
         schedulingComplete = false;
+        return success;
       } finally {
         testLock.unlock();
       }
     }
 
   }
+
+  private static class LlapTaskSchedulerServiceForTestControlled extends LlapTaskSchedulerServiceForTest {
+
+    private DelayedTaskSchedulerCallableControlled controlledTSCallable;
+
+    public LlapTaskSchedulerServiceForTestControlled(
+        TaskSchedulerContext appClient, Clock clock) {
+      super(appClient, clock);
+    }
+
+    @Override
+    LlapTaskSchedulerService.DelayedTaskSchedulerCallable createDelayedTaskSchedulerCallable() {
+      controlledTSCallable = new DelayedTaskSchedulerCallableControlled();
+      return controlledTSCallable;
+    }
+
+    class DelayedTaskSchedulerCallableControlled extends DelayedTaskSchedulerCallable {
+      private final ReentrantLock lock = new ReentrantLock();
+      private final Condition triggerRunCondition = lock.newCondition();
+      private boolean shouldRun = false;
+      private final Condition runCompleteCondition = lock.newCondition();
+      private boolean runComplete = false;
+
+      static final int STATE_NOT_RUN = 0;
+      static final int STATE_NULL_FOUND = 1;
+      static final int STATE_TIMEOUT_NOT_EXPIRED = 2;
+      static final int STATE_RETURNED_TASK = 3;
+
+      volatile int lastState = STATE_NOT_RUN;
+
+      volatile boolean lastShouldScheduleTaskResult = false;
+      volatile boolean shouldScheduleTaskTriggered = false;
+
+      @Override
+      public void processEvictedTask(TaskInfo taskInfo) {
+        super.processEvictedTask(taskInfo);
+        signalRunComplete();
+      }
+
+      @Override
+      public TaskInfo getNextTask() throws InterruptedException {
+
+        while (true) {
+          lock.lock();
+          try {
+            while (!shouldRun) {
+              triggerRunCondition.await();
+            }
+            // Preven subsequent runs until a new trigger is set.
+            shouldRun = false;
+          } finally {
+            lock.unlock();
+          }
+          TaskInfo taskInfo = delayedTaskQueue.peek();
+          if (taskInfo == null) {
+            LOG.info("Triggered getTask but the queue is empty");
+            lastState = STATE_NULL_FOUND;
+            signalRunComplete();
+            continue;
+          }
+          if (taskInfo.shouldDelayForLocality(
+              LlapTaskSchedulerServiceForTestControlled.this.clock.getTime())) {
+            LOG.info("Triggered getTask but the first element is not ready to execute");
+            lastState = STATE_TIMEOUT_NOT_EXPIRED;
+            signalRunComplete();
+            continue;
+          } else {
+            delayedTaskQueue.poll(); // Remove the previously peeked element.
+            lastState = STATE_RETURNED_TASK;
+            return taskInfo;
+          }
+        }
+      }
+
+      @Override
+      public boolean shouldScheduleTask(TaskInfo taskInfo) {
+        shouldScheduleTaskTriggered = true;
+        lastShouldScheduleTaskResult = super.shouldScheduleTask(taskInfo);
+        return lastShouldScheduleTaskResult;
+      }
+
+      void resetShouldScheduleInformation() {
+        shouldScheduleTaskTriggered = false;
+        lastShouldScheduleTaskResult = false;
+      }
+
+      private void signalRunComplete() {
+        lock.lock();
+        try {
+          runComplete = true;
+          runCompleteCondition.signal();
+        } finally {
+          lock.unlock();
+        }
+      }
+
+      void triggerGetNextTask() {
+        lock.lock();
+        try {
+          shouldRun = true;
+          triggerRunCondition.signal();
+        } finally {
+          lock.unlock();
+        }
+      }
+
+      void awaitGetNextTaskProcessing() throws InterruptedException {
+        lock.lock();
+        try {
+          while (!runComplete) {
+            runCompleteCondition.await();
+          }
+          runComplete = false;
+        } finally {
+          lock.unlock();
+        }
+      }
+    }
+  }
 }