You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/02/07 20:59:20 UTC

[52/70] [abbrv] hive git commit: HIVE-11687. TaskExecutorService can reject work even if capacity is available. (Siddharth Seth, reviewed by Prasanth Jayachandran)

HIVE-11687. TaskExecutorService can reject work even if capacity is available. (Siddharth Seth, reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/29e671ed
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/29e671ed
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/29e671ed

Branch: refs/heads/hive-14535
Commit: 29e671ed9608ce1577b3b16b0290d022bf39d950
Parents: 8230b57
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Feb 6 13:23:58 2017 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Feb 6 13:23:58 2017 -0800

----------------------------------------------------------------------
 .../SchedulerFragmentCompletingListener.java    |  29 +++++
 .../llap/daemon/impl/ContainerRunnerImpl.java   |   6 +-
 .../impl/EvictingPriorityBlockingQueue.java     |  69 +++++++---
 .../hive/llap/daemon/impl/LlapTaskReporter.java |  38 +++++-
 .../llap/daemon/impl/PriorityBlockingDeque.java |   4 +
 .../llap/daemon/impl/TaskExecutorService.java   | 111 ++++++++++++++--
 .../llap/daemon/impl/TaskRunnerCallable.java    |  24 +++-
 .../llap/metrics/LlapDaemonExecutorInfo.java    |  10 +-
 .../llap/metrics/LlapDaemonExecutorMetrics.java |  64 +++++++++-
 .../daemon/impl/TaskExecutorTestHelpers.java    |  17 ++-
 .../impl/TestEvictingPriorityBlockingQueue.java | 128 +++++++++++++++++++
 .../daemon/impl/TestTaskExecutorService.java    |  63 ++++++++-
 .../TestFirstInFirstOutComparator.java          |  90 ++++++-------
 .../TestShortestJobFirstComparator.java         |  92 ++++++-------
 14 files changed, 608 insertions(+), 137 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/29e671ed/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/SchedulerFragmentCompletingListener.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/SchedulerFragmentCompletingListener.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/SchedulerFragmentCompletingListener.java
new file mode 100644
index 0000000..9f580f6
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/SchedulerFragmentCompletingListener.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.daemon;
+
+public interface SchedulerFragmentCompletingListener {
+
+  enum State {
+    SUCCESS, FAILED, KILLED
+  }
+
+  /**
+   * Indicates that a fragment is about to complete.
+   * @param fragmentId
+   */
+  void fragmentCompleting(String fragmentId, State state);
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/29e671ed/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index 8c33fa2..6908138 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler;
 import org.apache.hadoop.hive.llap.daemon.HistoryLogger;
 import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler;
 import org.apache.hadoop.hive.llap.daemon.QueryFailedHandler;
+import org.apache.hadoop.hive.llap.daemon.SchedulerFragmentCompletingListener;
 import org.apache.hadoop.hive.llap.daemon.impl.LlapTokenChecker.LlapTokenInfo;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GroupInputSpecProto;
@@ -93,6 +94,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
   private final AMReporter amReporter;
   private final QueryTracker queryTracker;
   private final Scheduler<TaskRunnerCallable> executorService;
+  private final SchedulerFragmentCompletingListener completionListener;
   private final AtomicReference<InetSocketAddress> localAddress;
   private final AtomicReference<Integer> localShufflePort;
   private final Map<String, String> localEnv = new HashMap<>();
@@ -129,6 +131,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
         conf, ConfVars.LLAP_DAEMON_WAIT_QUEUE_COMPARATOR_CLASS_NAME);
     this.executorService = new TaskExecutorService(numExecutors, waitQueueSize,
         waitQueueSchedulerClassName, enablePreemption, classLoader, metrics);
+    completionListener = (SchedulerFragmentCompletingListener) executorService;
 
     addIfService(executorService);
 
@@ -251,7 +254,8 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
       TaskRunnerCallable callable = new TaskRunnerCallable(request, fragmentInfo, callableConf,
           new ExecutionContextImpl(localAddress.get().getHostName()), env,
           credentials, memoryPerExecutor, amReporter, confParams, metrics, killedTaskHandler,
-          this, tezHadoopShim, attemptId, vertex, initialEvent, taskUgi);
+          this, tezHadoopShim, attemptId, vertex, initialEvent, taskUgi,
+          completionListener);
       submissionState = executorService.schedule(callable);
 
       if (LOG.isInfoEnabled()) {

http://git-wip-us.apache.org/repos/asf/hive/blob/29e671ed/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java
index adc86ea..a80bb9b 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java
@@ -36,27 +36,28 @@ public class EvictingPriorityBlockingQueue<E> {
 
   private final PriorityBlockingDeque<E> deque;
   private final Comparator<E> comparator;
+  private final int waitQueueSize;
+
+  private int currentSize = 0;
 
   public EvictingPriorityBlockingQueue(Comparator<E> comparator, int maxSize) {
-    this.deque = new PriorityBlockingDeque<>(comparator, maxSize);
+    this.deque = new PriorityBlockingDeque<>(comparator);
+    this.waitQueueSize = maxSize;
     this.comparator = comparator;
   }
 
-  public synchronized E offer(E e) {
-    if (deque.offer(e)) {
+  public synchronized E offer(E e, int additionalElementsAllowed) {
+    if (currentSize < waitQueueSize + additionalElementsAllowed) {
+      // Capacity exists.
+      offerToDequeueInternal(e);
+      currentSize++;
       return null;
     } else {
+      // No capacity. Check if an element needs to be evicted.
       E last = deque.peekLast();
       if (comparator.compare(e, last) < 0) {
         deque.removeLast();
-        if (!deque.offer(e)) {
-          LOG.error(
-              "Failed to insert element into queue with capacity available. size={}, element={}",
-              size(), e);
-          throw new RuntimeException(
-              "Failed to insert element into queue with capacity available. size=" +
-                  size());
-        }
+        offerToDequeueInternal(e);
         return last;
       }
       return e;
@@ -64,7 +65,7 @@ public class EvictingPriorityBlockingQueue<E> {
   }
 
   public synchronized boolean isEmpty() {
-    return deque.isEmpty();
+    return currentSize == 0;
   }
 
   public synchronized E peek() {
@@ -72,19 +73,55 @@ public class EvictingPriorityBlockingQueue<E> {
   }
 
   public synchronized E take() throws InterruptedException {
-    return deque.take();
+    E e = deque.take();
+    currentSize--; // Decrement only if an element was removed.
+    return e;
   }
 
   public synchronized boolean remove(E e) {
-    return deque.remove(e);
+    boolean removed = deque.remove(e);
+    if (removed) {
+      currentSize--;
+    }
+    return removed;
+  }
+
+  /**
+   * Re-insert an element if it exists (mainly to force a re-order)
+   * @param e
+   * @return false if the element was not found. true otherwise.
+   */
+  public synchronized boolean reinsertIfExists(E e) {
+    if (remove(e)) {
+      offerToDequeueInternal(e);
+      currentSize++;
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  private void offerToDequeueInternal(E e) {
+    boolean result = deque.offer(e);
+    if (!result) {
+      LOG.error(
+          "Failed to insert element into queue with capacity available. size={}, element={}",
+          size(), e);
+      throw new RuntimeException(
+          "Failed to insert element into queue with capacity available. size=" +
+              size());
+    }
   }
 
   public synchronized int size() {
-    return deque.size();
+    return currentSize;
   }
 
   @Override
   public synchronized String toString() {
-    return deque.toString();
+    StringBuilder sb = new StringBuilder();
+    sb.append("currentSize=").append(size()).append(", queue=")
+        .append(deque.toString());
+    return sb.toString();
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/29e671ed/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
index 39b4b0e..2fe1017 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
@@ -35,6 +35,7 @@ import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.hadoop.hive.llap.counters.FragmentCountersMap;
+import org.apache.hadoop.hive.llap.daemon.SchedulerFragmentCompletingListener;
 import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.TezException;
@@ -82,15 +83,19 @@ public class LlapTaskReporter implements TaskReporterInterface {
   private final String containerIdStr;
   private final String fragmentId;
   private final TezEvent initialEvent;
+  private final SchedulerFragmentCompletingListener completionListener;
+  // The same id as reported by TaskRunnerCallable.getRequestId
+  private final String fragmentRequestId;
 
   private final ListeningExecutorService heartbeatExecutor;
 
   @VisibleForTesting
   HeartbeatCallable currentCallable;
 
-  public LlapTaskReporter(LlapTaskUmbilicalProtocol umbilical, long amPollInterval,
+  public LlapTaskReporter(SchedulerFragmentCompletingListener completionListener, LlapTaskUmbilicalProtocol umbilical, long amPollInterval,
                       long sendCounterInterval, int maxEventsToGet, AtomicLong requestCounter,
-      String containerIdStr, final String fragmentId, TezEvent initialEvent) {
+      String containerIdStr, final String fragmentId, TezEvent initialEvent,
+                          String fragmentRequestId) {
     this.umbilical = umbilical;
     this.pollInterval = amPollInterval;
     this.sendCounterInterval = sendCounterInterval;
@@ -102,6 +107,8 @@ public class LlapTaskReporter implements TaskReporterInterface {
     ExecutorService executor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
         .setDaemon(true).setNameFormat("TaskHeartbeatThread").build());
     heartbeatExecutor = MoreExecutors.listeningDecorator(executor);
+    this.completionListener = completionListener;
+    this.fragmentRequestId = fragmentRequestId;
   }
 
   /**
@@ -113,8 +120,8 @@ public class LlapTaskReporter implements TaskReporterInterface {
     TezCounters tezCounters = task.addAndGetTezCounter(fragmentId);
     FragmentCountersMap.registerCountersForFragment(fragmentId, tezCounters);
     LOG.info("Registered counters for fragment: {} vertexName: {}", fragmentId, task.getVertexName());
-    currentCallable = new HeartbeatCallable(task, umbilical, pollInterval, sendCounterInterval,
-        maxEventsToGet, requestCounter, containerIdStr, initialEvent);
+    currentCallable = new HeartbeatCallable(completionListener, task, umbilical, pollInterval, sendCounterInterval,
+        maxEventsToGet, requestCounter, containerIdStr, initialEvent, fragmentRequestId);
     ListenableFuture<Boolean> future = heartbeatExecutor.submit(currentCallable);
     Futures.addCallback(future, new HeartbeatCallback(errorReporter));
   }
@@ -144,6 +151,8 @@ public class LlapTaskReporter implements TaskReporterInterface {
 
     private final RuntimeTask task;
     private final EventMetaData updateEventMetadata;
+    private final SchedulerFragmentCompletingListener completionListener;
+    private final String fragmentRequestId;
 
     private final LlapTaskUmbilicalProtocol umbilical;
 
@@ -174,9 +183,12 @@ public class LlapTaskReporter implements TaskReporterInterface {
     private int prevCounterSendHeartbeatNum = 0;
     private TezEvent initialEvent;
 
-    public HeartbeatCallable(RuntimeTask task, LlapTaskUmbilicalProtocol umbilical,
+    public HeartbeatCallable(
+        SchedulerFragmentCompletingListener completionListener,
+        RuntimeTask task, LlapTaskUmbilicalProtocol umbilical,
         long amPollInterval, long sendCounterInterval, int maxEventsToGet,
-        AtomicLong requestCounter, String containerIdStr, TezEvent initialEvent) {
+        AtomicLong requestCounter, String containerIdStr,
+        TezEvent initialEvent, String fragmentRequestId) {
 
       this.pollInterval = amPollInterval;
       this.sendCounterInterval = sendCounterInterval;
@@ -184,6 +196,8 @@ public class LlapTaskReporter implements TaskReporterInterface {
       this.requestCounter = requestCounter;
       this.containerIdStr = containerIdStr;
       this.initialEvent = initialEvent;
+      this.completionListener = completionListener;
+      this.fragmentRequestId = fragmentRequestId;
 
       this.task = task;
       this.umbilical = umbilical;
@@ -367,6 +381,10 @@ public class LlapTaskReporter implements TaskReporterInterface {
         TezEvent statusUpdateEvent = new TezEvent(getStatusUpdateEvent(true), updateEventMetadata);
         TezEvent taskCompletedEvent = new TezEvent(new TaskAttemptCompletedEvent(),
             updateEventMetadata);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Invoking OOB heartbeat for successful attempt: {}, isTaskDone={}", taskAttemptID, task.isTaskDone());
+        }
+        completionListener.fragmentCompleting(fragmentRequestId, SchedulerFragmentCompletingListener.State.SUCCESS);
         return !heartbeat(Lists.newArrayList(statusUpdateEvent, taskCompletedEvent)).shouldDie;
       } else {
         LOG.warn("A final task state event has already been sent. Not sending again");
@@ -431,6 +449,14 @@ public class LlapTaskReporter implements TaskReporterInterface {
           // Counter may exceed limitation
           LOG.warn("Error when get constructing TaskStatusUpdateEvent. Not sending it out");
         }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(
+              "Invoking OOB heartbeat for failed/killed attempt: {}, isTaskDone={}, isKilled={}",
+              taskAttemptID, task.isTaskDone(), isKilled);
+        }
+        completionListener.fragmentCompleting(fragmentRequestId,
+            isKilled ? SchedulerFragmentCompletingListener.State.KILLED :
+                SchedulerFragmentCompletingListener.State.FAILED);
         return !heartbeat(tezEvents).shouldDie;
       } else {
         LOG.warn("A final task state event has already been sent. Not sending again");

http://git-wip-us.apache.org/repos/asf/hive/blob/29e671ed/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/PriorityBlockingDeque.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/PriorityBlockingDeque.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/PriorityBlockingDeque.java
index e27efa5..3bf51cd 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/PriorityBlockingDeque.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/PriorityBlockingDeque.java
@@ -81,6 +81,10 @@ public class PriorityBlockingDeque<E>
     this(null, capacity);
   }
 
+  public PriorityBlockingDeque(Comparator<E> comparator) {
+    this(comparator, Integer.MAX_VALUE);
+  }
+
   public PriorityBlockingDeque(Comparator<E> comparator, int capacity) {
     if (capacity <= 0) throw new IllegalArgumentException();
     this.capacity = capacity;

http://git-wip-us.apache.org/repos/asf/hive/blob/29e671ed/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
index 58863af..dcad3d8 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
@@ -42,10 +42,13 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.hadoop.hive.llap.daemon.FinishableStateUpdateHandler;
+import org.apache.hadoop.hive.llap.daemon.SchedulerFragmentCompletingListener;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics;
+import org.apache.hadoop.hive.llap.tezplugins.helpers.MonotonicClock;
 import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.util.Clock;
 import org.apache.tez.runtime.task.EndReason;
 import org.apache.tez.runtime.task.TaskRunner2Result;
 import org.slf4j.Logger;
@@ -78,7 +81,8 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
  * Task executor service can be shut down which will terminated all running tasks and reject all
  * new tasks. Shutting down of the task executor service can be done gracefully or immediately.
  */
-public class TaskExecutorService extends AbstractService implements Scheduler<TaskRunnerCallable> {
+public class TaskExecutorService extends AbstractService
+    implements Scheduler<TaskRunnerCallable>, SchedulerFragmentCompletingListener {
   private static final Logger LOG = LoggerFactory.getLogger(TaskExecutorService.class);
   private static final boolean isInfoEnabled = LOG.isInfoEnabled();
   private static final boolean isDebugEnabled = LOG.isDebugEnabled();
@@ -89,7 +93,8 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
 
   // Thread pool for actual execution of work.
   private final ListeningExecutorService executorService;
-  private final EvictingPriorityBlockingQueue<TaskWrapper> waitQueue;
+  @VisibleForTesting
+  final EvictingPriorityBlockingQueue<TaskWrapper> waitQueue;
   // Thread pool for taking entities off the wait queue.
   private final ListeningExecutorService waitQueueExecutorService;
   // Thread pool for callbacks on completion of execution of a work unit.
@@ -100,6 +105,13 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
   private final boolean enablePreemption;
   private final ThreadPoolExecutor threadPoolExecutor;
   private final AtomicInteger numSlotsAvailable;
+  private final int maxParallelExecutors;
+  private final Clock clock = new MonotonicClock();
+
+  // Tracks running fragments, and completing fragments.
+  // Completing since we have a race in the AM being notified and the task actually
+  // falling off, and the executor service being ready to schedule a new task.
+  private final AtomicInteger runningFragmentCount = new AtomicInteger(0);
 
 
   @VisibleForTesting
@@ -121,6 +133,7 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
 
     final Comparator<TaskWrapper> waitQueueComparator = createComparator(
         waitQueueComparatorClassName);
+    this.maxParallelExecutors = numExecutors;
     this.waitQueue = new EvictingPriorityBlockingQueue<>(waitQueueComparator, waitQueueSize);
     this.threadPoolExecutor = new ThreadPoolExecutor(numExecutors, // core pool size
         numExecutors, // max pool size
@@ -344,8 +357,15 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
       // TODO HIVE-11687 It's possible for a bunch of tasks to come in around the same time, without the
       // actual executor threads picking up any work. This will lead to unnecessary rejection of tasks.
       // The wait queue should be able to fit at least (waitQueue + currentFreeExecutor slots)
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+            "Offering to wait queue with: waitQueueSize={}, numSlotsAvailable={}, runningFragmentCount={} ",
+            waitQueue.size(), numSlotsAvailable.get(),
+            runningFragmentCount.get());
+      }
+
       canFinish = taskWrapper.getTaskRunnerCallable().canFinish();
-      evictedTask = waitQueue.offer(taskWrapper);
+      evictedTask = waitQueue.offer(taskWrapper, maxParallelExecutors - runningFragmentCount.get());
       // Finishable state is checked on the task, via an explicit query to the TaskRunnerCallable
 
       // null evicted task means offer accepted
@@ -366,7 +386,9 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
         }
       } else {
         if (isInfoEnabled) {
-          LOG.info("wait queue full, size={}. {} not added", waitQueue.size(), task.getRequestId());
+          LOG.info(
+              "wait queue full, size={}. numSlotsAvailable={}, runningFragmentCount={}. {} not added",
+              waitQueue.size(), numSlotsAvailable.get(), runningFragmentCount.get(), task.getRequestId());
         }
         evictedTask.getTaskRunnerCallable().killTask();
 
@@ -473,6 +495,34 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
     }
   }
 
+  private static final class FragmentCompletion {
+
+    public FragmentCompletion(
+        State state, long completingTime) {
+      this.state = state;
+      this.completingTime = completingTime;
+    }
+
+    State state;
+    long completingTime;
+  }
+
+  @VisibleForTesting
+  final ConcurrentMap<String, FragmentCompletion>
+      completingFragmentMap = new ConcurrentHashMap<>();
+
+  @Override
+  public void fragmentCompleting(String fragmentId, State state) {
+    int count = runningFragmentCount.decrementAndGet();
+    if (count < 0) {
+      LOG.warn(
+          "RunningFragmentCount went negative. Multiple calls for the same completion. Resetting to 0");
+      runningFragmentCount.set(0);
+    }
+    completingFragmentMap
+        .put(fragmentId, new FragmentCompletion(state, clock.getTime()));
+  }
+
   @VisibleForTesting
   void trySchedule(final TaskWrapper taskWrapper) throws RejectedExecutionException {
 
@@ -481,6 +531,7 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
         LOG.info("Attempting to execute {}", taskWrapper);
         ListenableFuture<TaskRunner2Result> future = executorService.submit(
             taskWrapper.getTaskRunnerCallable());
+        runningFragmentCount.incrementAndGet();
         taskWrapper.setIsInWaitQueue(false);
         FutureCallback<TaskRunner2Result> wrappedCallback = createInternalCompletionListener(
             taskWrapper);
@@ -547,10 +598,8 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
         // Re-order the wait queue
         LOG.debug("Re-ordering the wait queue since {} finishable state moved to {}",
             taskWrapper.getRequestId(), newFinishableState);
-        if (waitQueue.remove(taskWrapper)) {
-          // Put element back only if it existed.
-          waitQueue.offer(taskWrapper);
-        } else {
+        boolean reInserted = waitQueue.reinsertIfExists(taskWrapper);
+        if (!reInserted) {
           LOG.warn("Failed to remove {} from waitQueue",
               taskWrapper.getTaskRunnerCallable().getRequestId());
         }
@@ -653,6 +702,11 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
 
     @Override
     public void onSuccess(TaskRunner2Result result) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Received successful completion for: {}",
+            taskWrapper.getRequestId());
+      }
+      updateFallOffStats(taskWrapper.getRequestId());
       knownTasks.remove(taskWrapper.getRequestId());
       taskWrapper.setIsInPreemptableQueue(false);
       taskWrapper.maybeUnregisterForFinishedStateNotifications();
@@ -662,6 +716,11 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
 
     @Override
     public void onFailure(Throwable t) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Received failed completion for: {}",
+            taskWrapper.getRequestId());
+      }
+      updateFallOffStats(taskWrapper.getRequestId());
       knownTasks.remove(taskWrapper.getRequestId());
       taskWrapper.setIsInPreemptableQueue(false);
       taskWrapper.maybeUnregisterForFinishedStateNotifications();
@@ -698,6 +757,42 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
       }
     }
 
+    private void updateFallOffStats(
+        String requestId) {
+      long now = clock.getTime();
+      FragmentCompletion fragmentCompletion =
+          completingFragmentMap.remove(requestId);
+      if (fragmentCompletion == null) {
+        LOG.warn(
+            "Received onSuccess/onFailure for a fragment for which a completing message was not received: {}",
+            requestId);
+        // Happens due to AM side pre-emption, or the AM asking for a task to die.
+        // There's no hooks at the moment to get information over.
+        // For now - decrement the count to avoid accounting errors.
+        runningFragmentCount.decrementAndGet();
+        // TODO: Extend TaskRunner2 or see if an API with callbacks will work
+      } else {
+        long timeTaken = now - fragmentCompletion.completingTime;
+        switch (fragmentCompletion.state) {
+          case SUCCESS:
+            if (metrics != null) {
+              metrics.addMetricsFallOffSuccessTimeLost(timeTaken);
+            }
+            break;
+          case FAILED:
+            if (metrics != null) {
+              metrics.addMetricsFallOffFailedTimeLost(timeTaken);
+            }
+            break;
+          case KILLED:
+            if (metrics != null) {
+              metrics.addMetricsFallOffKilledTimeLost(timeTaken);
+            }
+            break;
+        }
+      }
+    }
+
   }
 
   public void shutDown(boolean awaitTermination) {

http://git-wip-us.apache.org/repos/asf/hive/blob/29e671ed/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index bfb155a..4b677aa 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler;
 import org.apache.hadoop.hive.llap.daemon.HistoryLogger;
 import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler;
+import org.apache.hadoop.hive.llap.daemon.SchedulerFragmentCompletingListener;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec;
@@ -114,16 +115,17 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
   private final AtomicBoolean killInvoked = new AtomicBoolean(false);
   private final SignableVertexSpec vertex;
   private final TezEvent initialEvent;
+  private final SchedulerFragmentCompletingListener completionListener;
   private UserGroupInformation taskUgi;
 
   @VisibleForTesting
   public TaskRunnerCallable(SubmitWorkRequestProto request, QueryFragmentInfo fragmentInfo,
-      Configuration conf, ExecutionContext executionContext, Map<String, String> envMap,
-      Credentials credentials, long memoryAvailable, AMReporter amReporter, ConfParams confParams,
-      LlapDaemonExecutorMetrics metrics, KilledTaskHandler killedTaskHandler,
-      FragmentCompletionHandler fragmentCompleteHandler, HadoopShim tezHadoopShim,
-      TezTaskAttemptID attemptId, SignableVertexSpec vertex, TezEvent initialEvent,
-      UserGroupInformation taskUgi) {
+                            Configuration conf, ExecutionContext executionContext, Map<String, String> envMap,
+                            Credentials credentials, long memoryAvailable, AMReporter amReporter, ConfParams confParams,
+                            LlapDaemonExecutorMetrics metrics, KilledTaskHandler killedTaskHandler,
+                            FragmentCompletionHandler fragmentCompleteHandler, HadoopShim tezHadoopShim,
+                            TezTaskAttemptID attemptId, SignableVertexSpec vertex, TezEvent initialEvent,
+                            UserGroupInformation taskUgi, SchedulerFragmentCompletingListener completionListener) {
     this.request = request;
     this.fragmentInfo = fragmentInfo;
     this.conf = conf;
@@ -152,6 +154,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
     this.tezHadoopShim = tezHadoopShim;
     this.initialEvent = initialEvent;
     this.taskUgi = taskUgi;
+    this.completionListener = completionListener;
   }
 
   public long getStartTime() {
@@ -219,6 +222,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
 
       String fragmentId = LlapTezUtils.stripAttemptPrefix(taskSpec.getTaskAttemptID().toString());
       taskReporter = new LlapTaskReporter(
+          completionListener,
           umbilical,
           confParams.amHeartbeatIntervalMsMax,
           confParams.amCounterHeartbeatInterval,
@@ -226,7 +230,8 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
           new AtomicLong(0),
           request.getContainerIdString(),
           fragmentId,
-          initialEvent);
+          initialEvent,
+          requestId);
 
       String attemptId = fragmentInfo.getFragmentIdentifierString();
       IOContextMap.setThreadAttemptId(attemptId);
@@ -297,9 +302,14 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
             killtimerWatch.start();
             LOG.info("Issuing kill to task {}", taskSpec.getTaskAttemptID());
             boolean killed = taskRunner.killTask();
+
             if (killed) {
               // Sending a kill message to the AM right here. Don't need to wait for the task to complete.
               LOG.info("Kill request for task {} completed. Informing AM", ta);
+              // Inform the scheduler that this fragment has been killed.
+              // If the kill failed - that means the task has already hit a final condition,
+              // and a notification comes from the LlapTaskReporter
+              completionListener.fragmentCompleting(getRequestId(), SchedulerFragmentCompletingListener.State.KILLED);
               reportTaskKilled();
             } else {
               LOG.info("Kill request for task {} did not complete because the task is already complete",

http://git-wip-us.apache.org/repos/asf/hive/blob/29e671ed/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java
index db5fd4f..69d1c6f 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java
@@ -51,7 +51,15 @@ public enum LlapDaemonExecutorInfo implements MetricsInfo {
   ExecutorPercentileTimeLost("Percentile cluster time wasted due to pre-emption"),
   ExecutorMaxPreemptionTimeToKill("Max time for killing pre-empted task"),
   ExecutorMaxPreemptionTimeLost("Max cluster time lost due to pre-emption"),
-  ExecutorTotalEvictedFromWaitQueue("Total number of tasks evicted from wait queue because of low priority");
+  ExecutorTotalEvictedFromWaitQueue("Total number of tasks evicted from wait queue because of low priority"),
+  ExecutorFallOffSuccessTimeLost("Total time lost in an executor completing after informing the AM - successful fragments"),
+  ExecutorFallOffSuccessMaxTimeLost("Max value of time lost in an executor completing after informing the AM - successful fragments"),
+  ExecutorFallOffFailedTimeLost("Total time lost in an executor completing after informing the AM - failed fragments"),
+  ExecutorFallOffFailedMaxTimeLost("Max value of time lost in an executor completing after informing the AM - failed fragments"),
+  ExecutorFallOffKilledTimeLost("Total time lost in an executor completing after informing the AM - killed fragments"),
+  ExecutorFallOffKilledMaxTimeLost("Max value of time lost in an executor completing after informing the AM - killed fragments"),
+  ExecutorFallOffNumCompletedFragments("Number of completed fragments w.r.t falloff values"),
+  ;
 
   private final String desc;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/29e671ed/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java b/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java
index 92c8913..7a0ecc9 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.llap.metrics;
 import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorAvailableFreeSlots;
 import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorAvailableFreeSlotsPercent;
 import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorCacheMemoryPerInstance;
+import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorFallOffNumCompletedFragments;
 import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorJvmMaxMemory;
 import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorMaxFreeSlots;
 import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorMaxPreemptionTimeLost;
@@ -41,6 +42,12 @@ import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.Executo
 import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalPreemptionTimeLost;
 import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalPreemptionTimeToKill;
 import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorWaitQueueSize;
+import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorFallOffSuccessTimeLost;
+import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorFallOffSuccessMaxTimeLost;
+import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorFallOffFailedTimeLost;
+import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorFallOffFailedMaxTimeLost;
+import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorFallOffKilledTimeLost;
+import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorFallOffKilledMaxTimeLost;
 import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName;
 import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId;
 
@@ -83,6 +90,10 @@ public class LlapDaemonExecutorMetrics implements MetricsSource {
   private long maxTimeLost = Long.MIN_VALUE;
   private long maxTimeToKill = Long.MIN_VALUE;
 
+  private long fallOffMaxSuccessTimeLostLong = 0L;
+  private long fallOffMaxFailedTimeLostLong = 0L;
+  private long fallOffMaxKilledTimeLostLong = 0L;
+
   private final Map<String, Integer> executorNames;
 
   final MutableGaugeLong[] executorThreadCpuTime;
@@ -126,6 +137,23 @@ public class LlapDaemonExecutorMetrics implements MetricsSource {
   @Metric
   final MutableQuantiles[] percentileTimeLost;
 
+  @Metric
+  MutableCounterLong fallOffNumCompletedFragments;
+  @Metric
+  MutableCounterLong fallOffSuccessTimeLost;
+  @Metric
+  MutableCounterLong fallOffFailedTimeLost;
+  @Metric
+  MutableCounterLong fallOffKilledTimeLost;
+  @Metric
+  MutableGaugeLong fallOffMaxSuccessTimeLost;
+  @Metric
+  MutableGaugeLong fallOffMaxFailedTimeLost;
+  @Metric
+  MutableGaugeLong fallOffMaxKilledTimeLost;
+
+
+
   private LlapDaemonExecutorMetrics(String displayName, JvmMetrics jm, String sessionId,
       int numExecutors, final int[] intervals) {
     this.name = displayName;
@@ -244,6 +272,33 @@ public class LlapDaemonExecutorMetrics implements MetricsSource {
     }
   }
 
+  public void addMetricsFallOffSuccessTimeLost(long timeLost) {
+    fallOffNumCompletedFragments.incr();
+    fallOffSuccessTimeLost.incr(timeLost);
+    if (timeLost > fallOffMaxSuccessTimeLostLong) {
+      fallOffMaxSuccessTimeLostLong = timeLost;
+      fallOffMaxSuccessTimeLost.set(timeLost);
+    }
+  }
+
+  public void addMetricsFallOffFailedTimeLost(long timeLost) {
+    fallOffNumCompletedFragments.incr();
+    fallOffFailedTimeLost.incr(timeLost);
+    if (timeLost > fallOffMaxFailedTimeLostLong) {
+      fallOffMaxFailedTimeLostLong = timeLost;
+      fallOffMaxFailedTimeLost.set(timeLost);
+    }
+  }
+
+  public void addMetricsFallOffKilledTimeLost(long timeLost) {
+    fallOffNumCompletedFragments.incr();
+    fallOffKilledTimeLost.incr(timeLost);
+    if (timeLost > fallOffMaxKilledTimeLostLong) {
+      fallOffMaxKilledTimeLostLong = timeLost;
+      fallOffMaxKilledTimeLost.set(timeLost);
+    }
+  }
+
   public void incrExecutorTotalKilled() {
     executorTotalIKilled.incr();
   }
@@ -292,7 +347,14 @@ public class LlapDaemonExecutorMetrics implements MetricsSource {
         .addCounter(ExecutorTotalPreemptionTimeToKill, totalPreemptionTimeToKill.value())
         .addCounter(ExecutorTotalPreemptionTimeLost, totalPreemptionTimeLost.value())
         .addGauge(ExecutorMaxPreemptionTimeToKill, maxPreemptionTimeToKill.value())
-        .addGauge(ExecutorMaxPreemptionTimeLost, maxPreemptionTimeLost.value());
+        .addGauge(ExecutorMaxPreemptionTimeLost, maxPreemptionTimeLost.value())
+        .addCounter(ExecutorFallOffSuccessTimeLost, fallOffSuccessTimeLost.value())
+        .addGauge(ExecutorFallOffSuccessMaxTimeLost, fallOffMaxSuccessTimeLost.value())
+        .addCounter(ExecutorFallOffFailedTimeLost, fallOffFailedTimeLost.value())
+        .addGauge(ExecutorFallOffFailedMaxTimeLost, fallOffMaxFailedTimeLost.value())
+        .addCounter(ExecutorFallOffKilledTimeLost, fallOffKilledTimeLost.value())
+        .addGauge(ExecutorFallOffKilledMaxTimeLost, fallOffMaxKilledTimeLost.value())
+        .addCounter(ExecutorFallOffNumCompletedFragments, fallOffNumCompletedFragments.value());
 
     for (MutableQuantiles q : percentileTimeToKill) {
       q.snapshot(rb, true);

http://git-wip-us.apache.org/repos/asf/hive/blob/29e671ed/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
index 6506d07..5dc1be5 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
@@ -25,6 +25,7 @@ import java.util.concurrent.locks.ReentrantLock;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler;
 import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler;
+import org.apache.hadoop.hive.llap.daemon.SchedulerFragmentCompletingListener;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec;
@@ -54,6 +55,18 @@ public class TaskExecutorTestHelpers {
     return createMockRequest(canFinish, workTime, request);
   }
 
+  public static MockRequest createMockRequest(int fragmentNum, int parallelism,
+                                              int withinDagPriority,
+                                              long firstAttemptStartTime,
+                                              long currentAttemptStartTime,
+                                              boolean canFinish,
+                                              long workTime) {
+    SubmitWorkRequestProto
+        request = createSubmitWorkRequestProto(fragmentNum, parallelism, 0,
+        firstAttemptStartTime, currentAttemptStartTime, withinDagPriority);
+    return createMockRequest(canFinish, workTime, request);
+  }
+
   private static MockRequest createMockRequest(boolean canFinish,
       long workTime, SubmitWorkRequestProto request) {
     QueryFragmentInfo queryFragmentInfo = createQueryFragmentInfo(
@@ -170,7 +183,8 @@ public class TaskExecutorTestHelpers {
               LlapDaemonExecutorMetrics.class),
           mock(KilledTaskHandler.class), mock(
               FragmentCompletionHandler.class), new DefaultHadoopShim(), null,
-              requestProto.getWorkSpec().getVertex(), initialEvent, null);
+              requestProto.getWorkSpec().getVertex(), initialEvent, null, mock(
+              SchedulerFragmentCompletingListener.class));
       this.workTime = workTime;
       this.canFinish = canFinish;
     }
@@ -285,4 +299,5 @@ public class TaskExecutorTestHelpers {
     logInfo(message, null);
   }
 
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/29e671ed/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestEvictingPriorityBlockingQueue.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestEvictingPriorityBlockingQueue.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestEvictingPriorityBlockingQueue.java
new file mode 100644
index 0000000..62407b5
--- /dev/null
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestEvictingPriorityBlockingQueue.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.daemon.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Comparator;
+
+import org.junit.Test;
+
+public class TestEvictingPriorityBlockingQueue {
+
+  @Test (timeout = 10000)
+  public void test() throws InterruptedException {
+    Element e;
+    EvictingPriorityBlockingQueue<Element> queue = new EvictingPriorityBlockingQueue<>(new ElementComparator(), 3);
+
+    Element[] elements = new Element[10];
+    for (int i = 0 ; i < elements.length ; i++) {
+      elements[i] = new Element(i);
+    }
+
+    assertNull(queue.offer(elements[0], 0));
+    assertNull(queue.offer(elements[1], 0));
+    assertNull(queue.offer(elements[2], 0));
+    e = queue.offer(elements[3], 0);
+    assertEquals(elements[0], e);
+
+    e = queue.offer(elements[0], 0);
+    assertEquals(elements[0], e);
+    // 1,2,3
+
+    e = queue.offer(elements[4], 0);
+    assertEquals(elements[1], e);
+    //2,3,4
+
+    e = queue.offer(elements[1], 1);
+    assertNull(e);
+    assertEquals(4, queue.size());
+    // 1,2,3,4
+
+    e = queue.take();
+    assertEquals(elements[4], e); //Highest priority at this point should have come out.
+    //1,2,3
+
+    e = queue.offer(elements[4], 1);
+    assertNull(e);
+    //1,2,3,4
+
+    e = queue.offer(elements[0], 1);
+    assertEquals(elements[0], e); // Rejected
+    //1,2,3,4
+
+    assertTrue(queue.reinsertIfExists(elements[2]));
+    assertEquals(4, queue.size());
+
+    assertFalse(queue.reinsertIfExists(elements[5]));
+    assertEquals(4, queue.size());
+
+    //1,2,3,4
+
+    e = queue.offer(elements[5], 1);
+    assertEquals(elements[1], e);
+    //2,3,4,5
+
+    assertNull(queue.offer(elements[1], 2));
+
+    assertNull(queue.offer(elements[6], 5));
+    assertNull(queue.offer(elements[7], 5));
+    //1,2,3,4,5,6,7
+
+    assertEquals(7, queue.size());
+  }
+
+  private static class Element {
+    public Element(int x) {
+      this.x = x;
+    }
+
+    int x;
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      Element element = (Element) o;
+
+      return x == element.x;
+    }
+
+    @Override
+    public int hashCode() {
+      return x;
+    }
+  }
+
+  private static class ElementComparator implements Comparator<Element> {
+
+    @Override
+    public int compare(Element o1,
+                       Element o2) {
+      return o2.x - o1.x;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/29e671ed/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
index de7f2fc..bf7d1d8 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
@@ -39,6 +39,7 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.llap.daemon.SchedulerFragmentCompletingListener;
 import org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorService.TaskWrapper;
 import org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorTestHelpers.MockRequest;
 import org.apache.hadoop.hive.llap.daemon.impl.comparator.ShortestJobFirstComparator;
@@ -174,12 +175,65 @@ public class TestTaskExecutorService {
     }
   }
 
+  // Tests wait queue behaviour for fragments which have reported to the AM, but have not given up their executor slot.
+  @Test (timeout = 10000)
+  public void testWaitQueueAcceptAfterAMTaskReport() throws
+      InterruptedException {
+
+    TaskExecutorServiceForTest taskExecutorService =
+        new TaskExecutorServiceForTest(1, 2, ShortestJobFirstComparator.class.getName(), true);
+
+    // Fourth is lower priority as a result of canFinish being set to false.
+    MockRequest r1 = createMockRequest(1, 1, 100, 200, true, 20000l);
+    MockRequest r2 = createMockRequest(2, 1, 1, 200, 2000, true, 20000l);
+    MockRequest r3 = createMockRequest(3, 1, 2, 300, 420, true, 20000l);
+    MockRequest r4 = createMockRequest(4, 1, 3, 400, 510, false, 20000l);
+
+    taskExecutorService.init(new Configuration());
+    taskExecutorService.start();
+    try {
+      Scheduler.SubmissionState submissionState;
+      submissionState = taskExecutorService.schedule(r1);
+      assertEquals(Scheduler.SubmissionState.ACCEPTED, submissionState);
+      r1.awaitStart();
+
+      submissionState = taskExecutorService.schedule(r2);
+      assertEquals(Scheduler.SubmissionState.ACCEPTED, submissionState);
+
+      submissionState = taskExecutorService.schedule(r3);
+      assertEquals(Scheduler.SubmissionState.ACCEPTED, submissionState);
+
+      submissionState = taskExecutorService.schedule(r4);
+      assertEquals(Scheduler.SubmissionState.REJECTED, submissionState);
+
+      // Mark a fragment as completing, but don't actually complete it yet.
+      // The wait queue should now have capacity to accept one more fragment.
+      taskExecutorService.fragmentCompleting(r1.getRequestId(),
+          SchedulerFragmentCompletingListener.State.SUCCESS);
+
+      submissionState = taskExecutorService.schedule(r4);
+      assertEquals(Scheduler.SubmissionState.ACCEPTED, submissionState);
+
+      assertEquals(3, taskExecutorService.waitQueue.size());
+      assertEquals(1, taskExecutorService.completingFragmentMap.size());
+
+      r1.complete();
+      r1.awaitEnd();
+      // r2 can only start once 1 fragment has completed. the map should be clear at this point.
+      awaitStartAndSchedulerRun(r2, taskExecutorService);
+      assertEquals(0, taskExecutorService.completingFragmentMap.size());
+
+    } finally {
+      taskExecutorService.shutDown(false);
+    }
+  }
+
   @Test(timeout = 10000)
   public void testWaitQueuePreemption() throws InterruptedException {
     MockRequest r1 = createMockRequest(1, 1, 100, 200, true, 20000l);
-    MockRequest r2 = createMockRequest(2, 1, 200, 330, false, 20000l);
-    MockRequest r3 = createMockRequest(3, 1, 300, 420, false, 20000l);
-    MockRequest r4 = createMockRequest(4, 1, 400, 510, false, 20000l);
+    MockRequest r2 = createMockRequest(2, 1, 1,200, 330, false, 20000l);
+    MockRequest r3 = createMockRequest(3, 2, 2,300, 420, false, 20000l);
+    MockRequest r4 = createMockRequest(4, 1, 3,400, 510, false, 20000l);
     MockRequest r5 = createMockRequest(5, 1, 500, 610, true, 20000l);
 
     TaskExecutorServiceForTest taskExecutorService =
@@ -190,8 +244,7 @@ public class TestTaskExecutorService {
     try {
       taskExecutorService.schedule(r1);
 
-      // TODO HIVE-11687. Remove the awaitStart once offer can handle (waitQueueSize + numFreeExecutionSlots)
-      // This currently serves to allow the task to be removed from the waitQueue.
+      // 1 scheduling run will happen, which may or may not pick up this task in the test..
       awaitStartAndSchedulerRun(r1, taskExecutorService);
       Scheduler.SubmissionState submissionState = taskExecutorService.schedule(r2);
       assertEquals(Scheduler.SubmissionState.ACCEPTED, submissionState);

http://git-wip-us.apache.org/repos/asf/hive/blob/29e671ed/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java
index 8cce0cb..79c2564 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java
@@ -107,16 +107,16 @@ public class TestFirstInFirstOutComparator {
     TaskWrapper r5 = createTaskWrapper(createRequest(5, 10, 1, 500), false, 1000000);
     EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
         new FirstInFirstOutComparator(), 4);
-    assertNull(queue.offer(r1));
+    assertNull(queue.offer(r1, 0));
     assertEquals(r1, queue.peek());
-    assertNull(queue.offer(r2));
+    assertNull(queue.offer(r2, 0));
     assertEquals(r2, queue.peek());
-    assertNull(queue.offer(r3));
+    assertNull(queue.offer(r3, 0));
     assertEquals(r3, queue.peek());
-    assertNull(queue.offer(r4));
+    assertNull(queue.offer(r4, 0));
     assertEquals(r4, queue.peek());
     // this offer will be accepted and r1 evicted
-    assertEquals(r1, queue.offer(r5));
+    assertEquals(r1, queue.offer(r5, 0));
     assertEquals(r5, queue.take());
     assertEquals(r4, queue.take());
     assertEquals(r3, queue.take());
@@ -129,16 +129,16 @@ public class TestFirstInFirstOutComparator {
     r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000);
     queue = new EvictingPriorityBlockingQueue(
         new FirstInFirstOutComparator(), 4);
-    assertNull(queue.offer(r1));
+    assertNull(queue.offer(r1, 0));
     assertEquals(r1, queue.peek());
-    assertNull(queue.offer(r2));
+    assertNull(queue.offer(r2, 0));
     assertEquals(r2, queue.peek());
-    assertNull(queue.offer(r3));
+    assertNull(queue.offer(r3, 0));
     assertEquals(r3, queue.peek());
-    assertNull(queue.offer(r4));
+    assertNull(queue.offer(r4, 0));
     assertEquals(r4, queue.peek());
     // this offer will be accpeted and r1 evicted
-    assertEquals(r1, queue.offer(r5));
+    assertEquals(r1, queue.offer(r5, 0));
     assertEquals(r5, queue.take());
     assertEquals(r4, queue.take());
     assertEquals(r3, queue.take());
@@ -151,16 +151,16 @@ public class TestFirstInFirstOutComparator {
     r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000);
     queue = new EvictingPriorityBlockingQueue(
         new FirstInFirstOutComparator(), 4);
-    assertNull(queue.offer(r1));
+    assertNull(queue.offer(r1, 0));
     assertEquals(r1, queue.peek());
-    assertNull(queue.offer(r2));
+    assertNull(queue.offer(r2, 0));
     assertEquals(r1, queue.peek());
-    assertNull(queue.offer(r3));
+    assertNull(queue.offer(r3, 0));
     assertEquals(r3, queue.peek());
-    assertNull(queue.offer(r4));
+    assertNull(queue.offer(r4, 0));
     assertEquals(r3, queue.peek());
     // offer accepted and r2 gets evicted
-    assertEquals(r2, queue.offer(r5));
+    assertEquals(r2, queue.offer(r5, 0));
     assertEquals(r5, queue.take());
     assertEquals(r3, queue.take());
     assertEquals(r1, queue.take());
@@ -173,16 +173,16 @@ public class TestFirstInFirstOutComparator {
     r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000);
     queue = new EvictingPriorityBlockingQueue(
         new FirstInFirstOutComparator(), 4);
-    assertNull(queue.offer(r1));
+    assertNull(queue.offer(r1, 0));
     assertEquals(r1, queue.peek());
-    assertNull(queue.offer(r2));
+    assertNull(queue.offer(r2, 0));
     assertEquals(r1, queue.peek());
-    assertNull(queue.offer(r3));
+    assertNull(queue.offer(r3, 0));
     assertEquals(r3, queue.peek());
-    assertNull(queue.offer(r4));
+    assertNull(queue.offer(r4, 0));
     assertEquals(r3, queue.peek());
     // offer accepted and r2 gets evicted
-    assertEquals(r2, queue.offer(r5));
+    assertEquals(r2, queue.offer(r5, 0));
     assertEquals(r5, queue.take());
     assertEquals(r3, queue.take());
     assertEquals(r1, queue.take());
@@ -195,16 +195,16 @@ public class TestFirstInFirstOutComparator {
     r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000);
     queue = new EvictingPriorityBlockingQueue(
         new FirstInFirstOutComparator(), 4);
-    assertNull(queue.offer(r1));
+    assertNull(queue.offer(r1, 0));
     assertEquals(r1, queue.peek());
-    assertNull(queue.offer(r2));
+    assertNull(queue.offer(r2, 0));
     assertEquals(r1, queue.peek());
-    assertNull(queue.offer(r3));
+    assertNull(queue.offer(r3, 0));
     assertEquals(r1, queue.peek());
-    assertNull(queue.offer(r4));
+    assertNull(queue.offer(r4, 0));
     assertEquals(r1, queue.peek());
     // offer accepted and r2 gets evicted
-    assertEquals(r2, queue.offer(r5));
+    assertEquals(r2, queue.offer(r5, 0));
     assertEquals(r5, queue.take());
     assertEquals(r1, queue.take());
     assertEquals(r4, queue.take());
@@ -217,16 +217,16 @@ public class TestFirstInFirstOutComparator {
     r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000);
     queue = new EvictingPriorityBlockingQueue(
         new FirstInFirstOutComparator(), 4);
-    assertNull(queue.offer(r1));
+    assertNull(queue.offer(r1, 0));
     assertEquals(r1, queue.peek());
-    assertNull(queue.offer(r2));
+    assertNull(queue.offer(r2, 0));
     assertEquals(r2, queue.peek());
-    assertNull(queue.offer(r3));
+    assertNull(queue.offer(r3, 0));
     assertEquals(r3, queue.peek());
-    assertNull(queue.offer(r4));
+    assertNull(queue.offer(r4, 0));
     assertEquals(r4, queue.peek());
     // offer accepted, r1 evicted
-    assertEquals(r1, queue.offer(r5));
+    assertEquals(r1, queue.offer(r5, 0));
     assertEquals(r5, queue.take());
     assertEquals(r4, queue.take());
     assertEquals(r3, queue.take());
@@ -239,16 +239,16 @@ public class TestFirstInFirstOutComparator {
     r5 = createTaskWrapper(createRequest(5, 10, 2, 500), true, 1000000);
     queue = new EvictingPriorityBlockingQueue(
         new FirstInFirstOutComparator(), 4);
-    assertNull(queue.offer(r1));
+    assertNull(queue.offer(r1, 0));
     assertEquals(r1, queue.peek());
-    assertNull(queue.offer(r2));
+    assertNull(queue.offer(r2, 0));
     assertEquals(r2, queue.peek());
-    assertNull(queue.offer(r3));
+    assertNull(queue.offer(r3, 0));
     assertEquals(r3, queue.peek());
-    assertNull(queue.offer(r4));
+    assertNull(queue.offer(r4, 0));
     assertEquals(r4, queue.peek());
     // offer accepted, r1 evicted
-    assertEquals(r1, queue.offer(r5));
+    assertEquals(r1, queue.offer(r5, 0));
     assertEquals(r5, queue.take());
     assertEquals(r4, queue.take());
     assertEquals(r3, queue.take());
@@ -264,9 +264,9 @@ public class TestFirstInFirstOutComparator {
     EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
         new FirstInFirstOutComparator(), 4);
 
-    assertNull(queue.offer(r1));
-    assertNull(queue.offer(r2));
-    assertNull(queue.offer(r3));
+    assertNull(queue.offer(r1, 0));
+    assertNull(queue.offer(r2, 0));
+    assertNull(queue.offer(r3, 0));
 
     assertEquals(r2, queue.take());
     assertEquals(r3, queue.take());
@@ -282,13 +282,13 @@ public class TestFirstInFirstOutComparator {
     EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
         new ShortestJobFirstComparator(), 3);
 
-    assertNull(queue.offer(r1));
-    assertNull(queue.offer(r2));
-    assertNull(queue.offer(r3));
+    assertNull(queue.offer(r1, 0));
+    assertNull(queue.offer(r2, 0));
+    assertNull(queue.offer(r3, 0));
 
     // can not queue more requests as queue is full
     TaskWrapper r4 = createTaskWrapper(createRequest(4, 1, 0, 10, 100, 10), true, 100000);
-    assertEquals(r4, queue.offer(r4));
+    assertEquals(r4, queue.offer(r4, 0));
   }
 
   @Test(timeout = 60000)
@@ -300,9 +300,9 @@ public class TestFirstInFirstOutComparator {
     EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
         new FirstInFirstOutComparator(), 4);
 
-    assertNull(queue.offer(r1));
-    assertNull(queue.offer(r2));
-    assertNull(queue.offer(r3));
+    assertNull(queue.offer(r1, 0));
+    assertNull(queue.offer(r2, 0));
+    assertNull(queue.offer(r3, 0));
 
     assertEquals(r2, queue.take());
     assertEquals(r3, queue.take());

http://git-wip-us.apache.org/repos/asf/hive/blob/29e671ed/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java
index 0059d0c..b348bd6 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java
@@ -35,16 +35,16 @@ public class TestShortestJobFirstComparator {
     TaskWrapper r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600, "q5"), false, 1000000);
     EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
         new ShortestJobFirstComparator(), 4);
-    assertNull(queue.offer(r1));
+    assertNull(queue.offer(r1, 0));
     assertEquals(r1, queue.peek());
-    assertNull(queue.offer(r2));
+    assertNull(queue.offer(r2, 0));
     assertEquals(r1, queue.peek());
-    assertNull(queue.offer(r3));
+    assertNull(queue.offer(r3, 0));
     assertEquals(r1, queue.peek());
-    assertNull(queue.offer(r4));
+    assertNull(queue.offer(r4, 0));
     assertEquals(r1, queue.peek());
     // this offer will be rejected
-    assertEquals(r5, queue.offer(r5));
+    assertEquals(r5, queue.offer(r5, 0));
     assertEquals(r1, queue.take());
     assertEquals(r2, queue.take());
     assertEquals(r3, queue.take());
@@ -57,16 +57,16 @@ public class TestShortestJobFirstComparator {
     r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600, "q5"), true, 1000000);
     queue = new EvictingPriorityBlockingQueue(
         new ShortestJobFirstComparator(), 4);
-    assertNull(queue.offer(r1));
+    assertNull(queue.offer(r1, 0));
     assertEquals(r1, queue.peek());
-    assertNull(queue.offer(r2));
+    assertNull(queue.offer(r2, 0));
     assertEquals(r1, queue.peek());
-    assertNull(queue.offer(r3));
+    assertNull(queue.offer(r3, 0));
     assertEquals(r1, queue.peek());
-    assertNull(queue.offer(r4));
+    assertNull(queue.offer(r4, 0));
     assertEquals(r1, queue.peek());
     // this offer will be rejected
-    assertEquals(r5, queue.offer(r5));
+    assertEquals(r5, queue.offer(r5, 0));
     assertEquals(r1, queue.take());
     assertEquals(r2, queue.take());
     assertEquals(r3, queue.take());
@@ -79,16 +79,16 @@ public class TestShortestJobFirstComparator {
     r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600, "q5"), true, 1000000);
     queue = new EvictingPriorityBlockingQueue(
         new ShortestJobFirstComparator(), 4);
-    assertNull(queue.offer(r1));
+    assertNull(queue.offer(r1, 0));
     assertEquals(r1, queue.peek());
-    assertNull(queue.offer(r2));
+    assertNull(queue.offer(r2, 0));
     assertEquals(r1, queue.peek());
-    assertNull(queue.offer(r3));
+    assertNull(queue.offer(r3, 0));
     assertEquals(r1, queue.peek());
-    assertNull(queue.offer(r4));
+    assertNull(queue.offer(r4, 0));
     assertEquals(r1, queue.peek());
     // offer accepted and r4 gets evicted
-    assertEquals(r4, queue.offer(r5));
+    assertEquals(r4, queue.offer(r5, 0));
     assertEquals(r1, queue.take());
     assertEquals(r3, queue.take());
     assertEquals(r5, queue.take());
@@ -101,16 +101,16 @@ public class TestShortestJobFirstComparator {
     r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600, "q5"), true, 1000000);
     queue = new EvictingPriorityBlockingQueue(
         new ShortestJobFirstComparator(), 4);
-    assertNull(queue.offer(r1));
+    assertNull(queue.offer(r1, 0));
     assertEquals(r1, queue.peek());
-    assertNull(queue.offer(r2));
+    assertNull(queue.offer(r2, 0));
     assertEquals(r1, queue.peek());
-    assertNull(queue.offer(r3));
+    assertNull(queue.offer(r3, 0));
     assertEquals(r1, queue.peek());
-    assertNull(queue.offer(r4));
+    assertNull(queue.offer(r4, 0));
     assertEquals(r1, queue.peek());
     // offer accepted and r4 gets evicted
-    assertEquals(r4, queue.offer(r5));
+    assertEquals(r4, queue.offer(r5, 0));
     assertEquals(r1, queue.take());
     assertEquals(r3, queue.take());
     assertEquals(r5, queue.take());
@@ -123,16 +123,16 @@ public class TestShortestJobFirstComparator {
     r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600, "q5"), true, 1000000);
     queue = new EvictingPriorityBlockingQueue(
         new ShortestJobFirstComparator(), 4);
-    assertNull(queue.offer(r1));
+    assertNull(queue.offer(r1, 0));
     assertEquals(r1, queue.peek());
-    assertNull(queue.offer(r2));
+    assertNull(queue.offer(r2, 0));
     assertEquals(r1, queue.peek());
-    assertNull(queue.offer(r3));
+    assertNull(queue.offer(r3, 0));
     assertEquals(r1, queue.peek());
-    assertNull(queue.offer(r4));
+    assertNull(queue.offer(r4, 0));
     assertEquals(r1, queue.peek());
     // offer accepted and r4 gets evicted
-    assertEquals(r4, queue.offer(r5));
+    assertEquals(r4, queue.offer(r5, 0));
     assertEquals(r1, queue.take());
     assertEquals(r5, queue.take());
     assertEquals(r2, queue.take());
@@ -145,16 +145,16 @@ public class TestShortestJobFirstComparator {
     r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600, "q5"), true, 1000000);
     queue = new EvictingPriorityBlockingQueue(
         new ShortestJobFirstComparator(), 4);
-    assertNull(queue.offer(r1));
+    assertNull(queue.offer(r1, 0));
     assertEquals(r1, queue.peek());
-    assertNull(queue.offer(r2));
+    assertNull(queue.offer(r2, 0));
     assertEquals(r2, queue.peek());
-    assertNull(queue.offer(r3));
+    assertNull(queue.offer(r3, 0));
     assertEquals(r2, queue.peek());
-    assertNull(queue.offer(r4));
+    assertNull(queue.offer(r4, 0));
     assertEquals(r2, queue.peek());
     // offer accepted, r1 evicted
-    assertEquals(r1, queue.offer(r5));
+    assertEquals(r1, queue.offer(r5, 0));
     assertEquals(r2, queue.take());
     assertEquals(r3, queue.take());
     assertEquals(r4, queue.take());
@@ -170,9 +170,9 @@ public class TestShortestJobFirstComparator {
     EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
         new ShortestJobFirstComparator(), 4);
 
-    assertNull(queue.offer(r1));
-    assertNull(queue.offer(r2));
-    assertNull(queue.offer(r3));
+    assertNull(queue.offer(r1, 0));
+    assertNull(queue.offer(r2, 0));
+    assertNull(queue.offer(r3, 0));
 
     assertEquals(r2, queue.take());
     assertEquals(r3, queue.take());
@@ -188,13 +188,13 @@ public class TestShortestJobFirstComparator {
     EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
         new ShortestJobFirstComparator(), 3);
 
-    assertNull(queue.offer(r1));
-    assertNull(queue.offer(r2));
-    assertNull(queue.offer(r3));
+    assertNull(queue.offer(r1, 0));
+    assertNull(queue.offer(r2, 0));
+    assertNull(queue.offer(r3, 0));
 
     // can not queue more requests as queue is full
     TaskWrapper r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 1, 0, 10, 100, 10), true, 100000);
-    assertEquals(r4, queue.offer(r4));
+    assertEquals(r4, queue.offer(r4, 0));
   }
 
   @Test(timeout = 60000)
@@ -206,9 +206,9 @@ public class TestShortestJobFirstComparator {
     EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
         new ShortestJobFirstComparator(), 4);
 
-    assertNull(queue.offer(r1));
-    assertNull(queue.offer(r2));
-    assertNull(queue.offer(r3));
+    assertNull(queue.offer(r1, 0));
+    assertNull(queue.offer(r2, 0));
+    assertNull(queue.offer(r3, 0));
 
     assertEquals(r2, queue.take());
     assertEquals(r3, queue.take());
@@ -224,9 +224,9 @@ public class TestShortestJobFirstComparator {
     EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
         new ShortestJobFirstComparator(), 4);
 
-    assertNull(queue.offer(r1));
-    assertNull(queue.offer(r2));
-    assertNull(queue.offer(r3));
+    assertNull(queue.offer(r1, 0));
+    assertNull(queue.offer(r2, 0));
+    assertNull(queue.offer(r3, 0));
 
     assertEquals(r1, queue.take());
     assertEquals(r2, queue.take());
@@ -241,9 +241,9 @@ public class TestShortestJobFirstComparator {
 
     queue = new EvictingPriorityBlockingQueue<>(new ShortestJobFirstComparator(), 4);
 
-    assertNull(queue.offer(r1));
-    assertNull(queue.offer(r2));
-    assertNull(queue.offer(r3));
+    assertNull(queue.offer(r1, 0));
+    assertNull(queue.offer(r2, 0));
+    assertNull(queue.offer(r3, 0));
 
     assertEquals(r2, queue.take());
     assertEquals(r3, queue.take());