You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ss...@apache.org on 2015/08/21 00:57:51 UTC

[1/2] hive git commit: HIVE-11612. Allow wait queue comparator to be specified as a classname. (Siddharth Seth)

Repository: hive
Updated Branches:
  refs/heads/llap b18db4f40 -> d28b6a53e


http://git-wip-us.apache.org/repos/asf/hive/blob/d28b6a53/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java
new file mode 100644
index 0000000..9dafd15
--- /dev/null
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.daemon.impl.comparator;
+
+import static org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorTestHelpers.createSubmitWorkRequestProto;
+import static org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorTestHelpers.createTaskWrapper;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import org.apache.hadoop.hive.llap.daemon.impl.EvictingPriorityBlockingQueue;
+import org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorService.TaskWrapper;
+import org.junit.Test;
+
+public class TestShortestJobFirstComparator {
+
+
+  @Test(timeout = 5000)
+  public void testWaitQueueComparator() throws InterruptedException {
+    TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100), false, 100000);
+    TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200), false, 100000);
+    TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300), false, 1000000);
+    TaskWrapper r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400), false, 1000000);
+    TaskWrapper r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500), false, 1000000);
+    EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
+        new ShortestJobFirstComparator(), 4);
+    assertNull(queue.offer(r1));
+    assertEquals(r1, queue.peek());
+    assertNull(queue.offer(r2));
+    assertEquals(r1, queue.peek());
+    assertNull(queue.offer(r3));
+    assertEquals(r1, queue.peek());
+    assertNull(queue.offer(r4));
+    assertEquals(r1, queue.peek());
+    // this offer will be rejected
+    assertEquals(r5, queue.offer(r5));
+    assertEquals(r1, queue.take());
+    assertEquals(r2, queue.take());
+    assertEquals(r3, queue.take());
+    assertEquals(r4, queue.take());
+
+    r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100), true, 100000);
+    r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200), true, 100000);
+    r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300), true, 1000000);
+    r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400), true, 1000000);
+    r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500), true, 1000000);
+    queue = new EvictingPriorityBlockingQueue(
+        new ShortestJobFirstComparator(), 4);
+    assertNull(queue.offer(r1));
+    assertEquals(r1, queue.peek());
+    assertNull(queue.offer(r2));
+    assertEquals(r1, queue.peek());
+    assertNull(queue.offer(r3));
+    assertEquals(r1, queue.peek());
+    assertNull(queue.offer(r4));
+    assertEquals(r1, queue.peek());
+    // this offer will be rejected
+    assertEquals(r5, queue.offer(r5));
+    assertEquals(r1, queue.take());
+    assertEquals(r2, queue.take());
+    assertEquals(r3, queue.take());
+    assertEquals(r4, queue.take());
+
+    r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 1, 100), true, 100000);
+    r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 1, 200), false, 100000);
+    r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 1, 300), true, 1000000);
+    r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 1, 400), false, 1000000);
+    r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500), true, 1000000);
+    queue = new EvictingPriorityBlockingQueue(
+        new ShortestJobFirstComparator(), 4);
+    assertNull(queue.offer(r1));
+    assertEquals(r1, queue.peek());
+    assertNull(queue.offer(r2));
+    assertEquals(r1, queue.peek());
+    assertNull(queue.offer(r3));
+    assertEquals(r1, queue.peek());
+    assertNull(queue.offer(r4));
+    assertEquals(r1, queue.peek());
+    // offer accepted and r4 gets evicted
+    assertEquals(r4, queue.offer(r5));
+    assertEquals(r1, queue.take());
+    assertEquals(r3, queue.take());
+    assertEquals(r5, queue.take());
+    assertEquals(r2, queue.take());
+
+    r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100), true, 100000);
+    r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200), false, 100000);
+    r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300), true, 1000000);
+    r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400), false, 1000000);
+    r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500), true, 1000000);
+    queue = new EvictingPriorityBlockingQueue(
+        new ShortestJobFirstComparator(), 4);
+    assertNull(queue.offer(r1));
+    assertEquals(r1, queue.peek());
+    assertNull(queue.offer(r2));
+    assertEquals(r1, queue.peek());
+    assertNull(queue.offer(r3));
+    assertEquals(r1, queue.peek());
+    assertNull(queue.offer(r4));
+    assertEquals(r1, queue.peek());
+    // offer accepted and r4 gets evicted
+    assertEquals(r4, queue.offer(r5));
+    assertEquals(r1, queue.take());
+    assertEquals(r3, queue.take());
+    assertEquals(r5, queue.take());
+    assertEquals(r2, queue.take());
+
+    r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100), true, 100000);
+    r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200), false, 100000);
+    r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300), false, 1000000);
+    r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400), false, 1000000);
+    r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500), true, 1000000);
+    queue = new EvictingPriorityBlockingQueue(
+        new ShortestJobFirstComparator(), 4);
+    assertNull(queue.offer(r1));
+    assertEquals(r1, queue.peek());
+    assertNull(queue.offer(r2));
+    assertEquals(r1, queue.peek());
+    assertNull(queue.offer(r3));
+    assertEquals(r1, queue.peek());
+    assertNull(queue.offer(r4));
+    assertEquals(r1, queue.peek());
+    // offer accepted and r4 gets evicted
+    assertEquals(r4, queue.offer(r5));
+    assertEquals(r1, queue.take());
+    assertEquals(r5, queue.take());
+    assertEquals(r2, queue.take());
+    assertEquals(r3, queue.take());
+
+    r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100), false, 100000);
+    r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200), true, 100000);
+    r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300), true, 1000000);
+    r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400), true, 1000000);
+    r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500), true, 1000000);
+    queue = new EvictingPriorityBlockingQueue(
+        new ShortestJobFirstComparator(), 4);
+    assertNull(queue.offer(r1));
+    assertEquals(r1, queue.peek());
+    assertNull(queue.offer(r2));
+    assertEquals(r2, queue.peek());
+    assertNull(queue.offer(r3));
+    assertEquals(r2, queue.peek());
+    assertNull(queue.offer(r4));
+    assertEquals(r2, queue.peek());
+    // offer accepted, r1 evicted
+    assertEquals(r1, queue.offer(r5));
+    assertEquals(r2, queue.take());
+    assertEquals(r3, queue.take());
+    assertEquals(r4, queue.take());
+    assertEquals(r5, queue.take());
+  }
+
+  @Test(timeout = 5000)
+  public void testWaitQueueComparatorWithinDagPriority() throws InterruptedException {
+    TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 1, 0, 100, 10), false, 100000);
+    TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 1, 0, 100, 1), false, 100000);
+    TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 1, 0, 100, 5), false, 100000);
+
+    EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
+        new ShortestJobFirstComparator(), 4);
+
+    assertNull(queue.offer(r1));
+    assertNull(queue.offer(r2));
+    assertNull(queue.offer(r3));
+
+    assertEquals(r2, queue.take());
+    assertEquals(r3, queue.take());
+    assertEquals(r1, queue.take());
+  }
+
+  @Test(timeout = 5000)
+  public void testWaitQueueComparatorParallelism() throws InterruptedException {
+    TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 10, 3, 100, 1), false, 100000); // 7 pending
+    TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 10, 7, 100, 1), false, 100000); // 3 pending
+    TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 10, 5, 100, 1), false, 100000); // 5 pending
+
+    EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
+        new ShortestJobFirstComparator(), 4);
+
+    assertNull(queue.offer(r1));
+    assertNull(queue.offer(r2));
+    assertNull(queue.offer(r3));
+
+    assertEquals(r2, queue.take());
+    assertEquals(r3, queue.take());
+    assertEquals(r1, queue.take());
+  }
+}


[2/2] hive git commit: HIVE-11612. Allow wait queue comparator to be specified as a classname. (Siddharth Seth)

Posted by ss...@apache.org.
HIVE-11612. Allow wait queue comparator to be specified as a classname. (Siddharth Seth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d28b6a53
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d28b6a53
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d28b6a53

Branch: refs/heads/llap
Commit: d28b6a53e0d3091452405750d28fbb687b255fbe
Parents: b18db4f
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu Aug 20 15:57:28 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu Aug 20 15:57:28 2015 -0700

----------------------------------------------------------------------
 .../llap/configuration/LlapConfiguration.java   |   7 +-
 .../llap/daemon/impl/ContainerRunnerImpl.java   |   7 +-
 .../llap/daemon/impl/TaskExecutorService.java   | 144 ++-----
 .../llap/daemon/impl/TaskRunnerCallable.java    |   4 +-
 .../comparator/FirstInFirstOutComparator.java   |  81 ++++
 .../comparator/ShortestJobFirstComparator.java  |  70 ++++
 .../daemon/impl/TaskExecutorTestHelpers.java    | 238 +++++++++++
 .../daemon/impl/TestTaskExecutorService.java    | 404 +------------------
 .../daemon/impl/TestTaskExecutorService2.java   | 324 ---------------
 .../TestFirstInFirstOutComparator.java          | 321 +++++++++++++++
 .../TestShortestJobFirstComparator.java         | 199 +++++++++
 11 files changed, 958 insertions(+), 841 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/d28b6a53/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java b/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java
index b6633b8..0c90fe8 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java
@@ -140,9 +140,10 @@ public class LlapConfiguration extends Configuration {
       LLAP_DAEMON_PREFIX + "task.scheduler.wait.queue.size";
   public static final int LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE_DEFAULT = 10;
 
-  public static final String LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_FAIR_ORDERING =
-      LLAP_DAEMON_PREFIX + "task.scheduler.wait.queue.fair.ordering";
-  public static final boolean LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_FAIR_ORDERING_DEFAULT = false;
+  public static final String LLAP_DAEMON_WAIT_QUEUE_SCHEDULER_CLASS_NAME =
+      LLAP_DAEMON_PREFIX + "wait.queue.comparator.class.name";
+  public static final String LLAP_DAEMON_WAIT_QUEUE_SCHEDULER_CLASS_NAME_DEFAULT =
+      "org.apache.hadoop.hive.llap.daemon.impl.comparator.ShortestJobFirstComparator";
 
   public static final String LLAP_DAEMON_TASK_SCHEDULER_ENABLE_PREEMPTION =
       LLAP_DAEMON_PREFIX + "task.scheduler.enable.preemption";

http://git-wip-us.apache.org/repos/asf/hive/blob/d28b6a53/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index 710c593..411d965 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -96,9 +96,10 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
 
     this.queryTracker = new QueryTracker(conf, localDirsBase);
     addIfService(queryTracker);
-    boolean useFairOrdering = conf.getBoolean(LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_FAIR_ORDERING,
-        LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_FAIR_ORDERING_DEFAULT);
-    this.executorService = new TaskExecutorService(numExecutors, waitQueueSize, useFairOrdering,
+    String waitQueueSchedulerClassName =
+        conf.get(LlapConfiguration.LLAP_DAEMON_WAIT_QUEUE_SCHEDULER_CLASS_NAME,
+            LlapConfiguration.LLAP_DAEMON_WAIT_QUEUE_SCHEDULER_CLASS_NAME_DEFAULT);
+    this.executorService = new TaskExecutorService(numExecutors, waitQueueSize, waitQueueSchedulerClassName,
         enablePreemption);
     AuxiliaryServiceHelper.setServiceDataIntoEnv(
         TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID,

http://git-wip-us.apache.org/repos/asf/hive/blob/d28b6a53/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
index f99c05d..badeb63 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hive.llap.daemon.impl;
 
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
 import java.text.SimpleDateFormat;
 import java.util.Comparator;
 import java.util.Date;
@@ -103,14 +105,32 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
 
   private final Object lock = new Object();
 
-  public TaskExecutorService(int numExecutors, int waitQueueSize, boolean useFairOrdering,
+  public TaskExecutorService(int numExecutors, int waitQueueSize, String waitQueueComparatorClassName,
       boolean enablePreemption) {
     super(TaskExecutorService.class.getSimpleName());
+    LOG.info("TaskExecutorService is being setup with parameters: "
+        + "numExecutors=" + numExecutors
+        + ", waitQueueSize=" + waitQueueSize
+        + ", waitQueueComparatorClassName=" + waitQueueComparatorClassName
+        + ", enablePreemption=" + enablePreemption);
+
     final Comparator<TaskWrapper> waitQueueComparator;
-    if (useFairOrdering) {
-      waitQueueComparator = new FirstInFirstOutComparator();
-    } else {
-      waitQueueComparator = new ShortestJobFirstComparator();
+    try {
+      Class<? extends Comparator> waitQueueComparatorClazz =
+          (Class<? extends Comparator>) Class.forName(
+              waitQueueComparatorClassName);
+      Constructor<? extends Comparator> ctor = waitQueueComparatorClazz.getConstructor(null);
+      waitQueueComparator = ctor.newInstance(null);
+    } catch (ClassNotFoundException e) {
+      throw new RuntimeException(
+          "Failed to load wait queue comparator, class=" + waitQueueComparatorClassName, e);
+    } catch (NoSuchMethodException e) {
+      throw new RuntimeException("Failed to find constructor for wait queue comparator, class=" +
+          waitQueueComparatorClassName, e);
+    } catch (InvocationTargetException | InstantiationException | IllegalAccessException e) {
+      throw new RuntimeException(
+          "Failed to find instantiate wait queue comparator, class=" + waitQueueComparatorClassName,
+          e);
     }
     this.waitQueue = new EvictingPriorityBlockingQueue<>(waitQueueComparator, waitQueueSize);
     this.threadPoolExecutor = new ThreadPoolExecutor(numExecutors, // core pool size
@@ -137,10 +157,7 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
     ListenableFuture<?> future = waitQueueExecutorService.submit(new WaitQueueWorker());
     Futures.addCallback(future, new WaitQueueWorkerCallback());
 
-    LOG.info("TaskExecutorService started with parameters: "
-            + "numExecutors=" + numExecutors
-            + ", waitQueueSize=" + waitQueueSize
-            + ", enablePreemption=" + enablePreemption);
+
   }
 
   @Override
@@ -577,116 +594,7 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
     }
   }
 
-  // if map tasks and reduce tasks are in finishable state then priority is given to the task
-  // that has less number of pending tasks (shortest job)
-  @VisibleForTesting
-  public static class ShortestJobFirstComparator implements Comparator<TaskWrapper> {
-
-    @Override
-    public int compare(TaskWrapper t1, TaskWrapper t2) {
-      TaskRunnerCallable o1 = t1.getTaskRunnerCallable();
-      TaskRunnerCallable o2 = t2.getTaskRunnerCallable();
-      boolean o1CanFinish = o1.canFinish();
-      boolean o2CanFinish = o2.canFinish();
-      if (o1CanFinish == true && o2CanFinish == false) {
-        return -1;
-      } else if (o1CanFinish == false && o2CanFinish == true) {
-        return 1;
-      }
-
-      FragmentRuntimeInfo fri1 = o1.getFragmentRuntimeInfo();
-      FragmentRuntimeInfo fri2 = o2.getFragmentRuntimeInfo();
-
-      // Check if these belong to the same task, and work with withinDagPriority
-      if (o1.getQueryId().equals(o2.getQueryId())) {
-        // Same Query
-        // Within dag priority - lower values indicate higher priority.
-        if (fri1.getWithinDagPriority() < fri2.getWithinDagPriority()) {
-          return -1;
-        } else if (fri1.getWithinDagPriority() > fri2.getWithinDagPriority()){
-          return 1;
-        }
-      }
-
-      // Compute knownPending tasks. selfAndUpstream indicates task counts for current vertex and
-      // it's parent hierarchy. selfAndUpstreamComplete indicates how many of these have completed.
-      int knownPending1 = fri1.getNumSelfAndUpstreamTasks() - fri1.getNumSelfAndUpstreamCompletedTasks();
-      int knownPending2 = fri2.getNumSelfAndUpstreamTasks() - fri2.getNumSelfAndUpstreamCompletedTasks();
-      if (knownPending1 < knownPending2) {
-        return -1;
-      } else if (knownPending1 > knownPending2) {
-        return 1;
-      }
-
-      if (fri1.getFirstAttemptStartTime() < fri2.getFirstAttemptStartTime()) {
-        return -1;
-      } else if (fri1.getFirstAttemptStartTime() > fri2.getFirstAttemptStartTime()) {
-        return 1;
-      }
-      return 0;
-    }
-  }
-
-  // if map tasks and reduce tasks are in finishable state then priority is given to the task in
-  // the following order
-  // 1) Dag start time
-  // 2) Within dag priority
-  // 3) Attempt start time
-  // 4) Vertex parallelism
-  @VisibleForTesting
-  public static class FirstInFirstOutComparator implements Comparator<TaskWrapper> {
-
-    @Override
-    public int compare(TaskWrapper t1, TaskWrapper t2) {
-      TaskRunnerCallable o1 = t1.getTaskRunnerCallable();
-      TaskRunnerCallable o2 = t2.getTaskRunnerCallable();
-      boolean o1CanFinish = o1.canFinish();
-      boolean o2CanFinish = o2.canFinish();
-      if (o1CanFinish == true && o2CanFinish == false) {
-        return -1;
-      } else if (o1CanFinish == false && o2CanFinish == true) {
-        return 1;
-      }
-
-      FragmentRuntimeInfo fri1 = o1.getFragmentRuntimeInfo();
-      FragmentRuntimeInfo fri2 = o2.getFragmentRuntimeInfo();
-
-      if (fri1.getDagStartTime() < fri2.getDagStartTime()) {
-        return -1;
-      } else if (fri1.getDagStartTime() > fri2.getDagStartTime()) {
-        return 1;
-      }
-
-      // Check if these belong to the same task, and work with withinDagPriority
-      if (o1.getQueryId().equals(o2.getQueryId())) {
-        // Same Query
-        // Within dag priority - lower values indicate higher priority.
-        if (fri1.getWithinDagPriority() < fri2.getWithinDagPriority()) {
-          return -1;
-        } else if (fri1.getWithinDagPriority() > fri2.getWithinDagPriority()){
-          return 1;
-        }
-      }
-
-      if (fri1.getFirstAttemptStartTime() < fri2.getFirstAttemptStartTime()) {
-        return -1;
-      } else if (fri1.getFirstAttemptStartTime() > fri2.getFirstAttemptStartTime()) {
-        return 1;
-      }
 
-      // Compute knownPending tasks. selfAndUpstream indicates task counts for current vertex and
-      // it's parent hierarchy. selfAndUpstreamComplete indicates how many of these have completed.
-      int knownPending1 = fri1.getNumSelfAndUpstreamTasks() - fri1.getNumSelfAndUpstreamCompletedTasks();
-      int knownPending2 = fri2.getNumSelfAndUpstreamTasks() - fri2.getNumSelfAndUpstreamCompletedTasks();
-      if (knownPending1 < knownPending2) {
-        return -1;
-      } else if (knownPending1 > knownPending2) {
-        return 1;
-      }
-
-      return 0;
-    }
-  }
 
   @VisibleForTesting
   public static class PreemptionQueueComparator implements Comparator<TaskWrapper> {

http://git-wip-us.apache.org/repos/asf/hive/blob/d28b6a53/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index 6ceb2e5..e0bd48a 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -27,6 +27,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.CallableWithNdc;
 import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler;
@@ -105,7 +106,8 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
   private final AtomicBoolean isCompleted = new AtomicBoolean(false);
   private final AtomicBoolean killInvoked = new AtomicBoolean(false);
 
-  TaskRunnerCallable(SubmitWorkRequestProto request, QueryFragmentInfo fragmentInfo,
+  @VisibleForTesting
+  public TaskRunnerCallable(SubmitWorkRequestProto request, QueryFragmentInfo fragmentInfo,
                      Configuration conf,
                      ExecutionContext executionContext, Map<String, String> envMap,
                      Credentials credentials,

http://git-wip-us.apache.org/repos/asf/hive/blob/d28b6a53/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/FirstInFirstOutComparator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/FirstInFirstOutComparator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/FirstInFirstOutComparator.java
new file mode 100644
index 0000000..447fc7b
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/FirstInFirstOutComparator.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.daemon.impl.comparator;
+
+import java.util.Comparator;
+
+import org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorService.TaskWrapper;
+import org.apache.hadoop.hive.llap.daemon.impl.TaskRunnerCallable;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
+
+// if map tasks and reduce tasks are in finishable state then priority is given to the task in
+// the following order
+// 1) Dag start time
+// 2) Within dag priority
+// 3) Attempt start time
+// 4) Vertex parallelism
+public class FirstInFirstOutComparator implements Comparator<TaskWrapper> {
+
+  @Override
+  public int compare(TaskWrapper t1, TaskWrapper t2) {
+    TaskRunnerCallable o1 = t1.getTaskRunnerCallable();
+    TaskRunnerCallable o2 = t2.getTaskRunnerCallable();
+    boolean o1CanFinish = o1.canFinish();
+    boolean o2CanFinish = o2.canFinish();
+    if (o1CanFinish == true && o2CanFinish == false) {
+      return -1;
+    } else if (o1CanFinish == false && o2CanFinish == true) {
+      return 1;
+    }
+
+    LlapDaemonProtocolProtos.FragmentRuntimeInfo fri1 = o1.getFragmentRuntimeInfo();
+    LlapDaemonProtocolProtos.FragmentRuntimeInfo fri2 = o2.getFragmentRuntimeInfo();
+
+    if (fri1.getDagStartTime() < fri2.getDagStartTime()) {
+      return -1;
+    } else if (fri1.getDagStartTime() > fri2.getDagStartTime()) {
+      return 1;
+    }
+
+    // Check if these belong to the same task, and work with withinDagPriority
+    if (o1.getQueryId().equals(o2.getQueryId())) {
+      // Same Query
+      // Within dag priority - lower values indicate higher priority.
+      if (fri1.getWithinDagPriority() < fri2.getWithinDagPriority()) {
+        return -1;
+      } else if (fri1.getWithinDagPriority() > fri2.getWithinDagPriority()){
+        return 1;
+      }
+    }
+
+    if (fri1.getFirstAttemptStartTime() < fri2.getFirstAttemptStartTime()) {
+      return -1;
+    } else if (fri1.getFirstAttemptStartTime() > fri2.getFirstAttemptStartTime()) {
+      return 1;
+    }
+
+    // Compute knownPending tasks. selfAndUpstream indicates task counts for current vertex and
+    // it's parent hierarchy. selfAndUpstreamComplete indicates how many of these have completed.
+    int knownPending1 = fri1.getNumSelfAndUpstreamTasks() - fri1.getNumSelfAndUpstreamCompletedTasks();
+    int knownPending2 = fri2.getNumSelfAndUpstreamTasks() - fri2.getNumSelfAndUpstreamCompletedTasks();
+    if (knownPending1 < knownPending2) {
+      return -1;
+    } else if (knownPending1 > knownPending2) {
+      return 1;
+    }
+
+    return 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/d28b6a53/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java
new file mode 100644
index 0000000..238ae9e
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.daemon.impl.comparator;
+
+import java.util.Comparator;
+
+import org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorService.TaskWrapper;
+import org.apache.hadoop.hive.llap.daemon.impl.TaskRunnerCallable;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
+
+// if map tasks and reduce tasks are in finishable state then priority is given to the task
+// that has less number of pending tasks (shortest job)
+public class ShortestJobFirstComparator implements Comparator<TaskWrapper> {
+
+  @Override
+  public int compare(TaskWrapper t1, TaskWrapper t2) {
+    TaskRunnerCallable o1 = t1.getTaskRunnerCallable();
+    TaskRunnerCallable o2 = t2.getTaskRunnerCallable();
+    boolean o1CanFinish = o1.canFinish();
+    boolean o2CanFinish = o2.canFinish();
+    if (o1CanFinish == true && o2CanFinish == false) {
+      return -1;
+    } else if (o1CanFinish == false && o2CanFinish == true) {
+      return 1;
+    }
+
+    LlapDaemonProtocolProtos.FragmentRuntimeInfo fri1 = o1.getFragmentRuntimeInfo();
+    LlapDaemonProtocolProtos.FragmentRuntimeInfo fri2 = o2.getFragmentRuntimeInfo();
+
+    // Check if these belong to the same task, and work with withinDagPriority
+    if (o1.getQueryId().equals(o2.getQueryId())) {
+      // Same Query
+      // Within dag priority - lower values indicate higher priority.
+      if (fri1.getWithinDagPriority() < fri2.getWithinDagPriority()) {
+        return -1;
+      } else if (fri1.getWithinDagPriority() > fri2.getWithinDagPriority()){
+        return 1;
+      }
+    }
+
+    // Compute knownPending tasks. selfAndUpstream indicates task counts for current vertex and
+    // it's parent hierarchy. selfAndUpstreamComplete indicates how many of these have completed.
+    int knownPending1 = fri1.getNumSelfAndUpstreamTasks() - fri1.getNumSelfAndUpstreamCompletedTasks();
+    int knownPending2 = fri2.getNumSelfAndUpstreamTasks() - fri2.getNumSelfAndUpstreamCompletedTasks();
+    if (knownPending1 < knownPending2) {
+      return -1;
+    } else if (knownPending1 > knownPending2) {
+      return 1;
+    }
+
+    if (fri1.getFirstAttemptStartTime() < fri2.getFirstAttemptStartTime()) {
+      return -1;
+    } else if (fri1.getFirstAttemptStartTime() > fri2.getFirstAttemptStartTime()) {
+      return 1;
+    }
+    return 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/d28b6a53/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
new file mode 100644
index 0000000..ec1ffcf
--- /dev/null
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.daemon.impl;
+
+import static org.mockito.Mockito.mock;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler;
+import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
+import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
+import org.apache.tez.runtime.task.EndReason;
+import org.apache.tez.runtime.task.TaskRunner2Result;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TaskExecutorTestHelpers {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TestTaskExecutorService.class);
+
+  public static MockRequest createMockRequest(int fragmentNum, int parallelism, long startTime,
+                                              boolean canFinish, long workTime) {
+    SubmitWorkRequestProto
+        requestProto = createSubmitWorkRequestProto(fragmentNum, parallelism,
+        startTime);
+    MockRequest mockRequest = new MockRequest(requestProto, canFinish, workTime);
+    return mockRequest;
+  }
+
+  public static TaskExecutorService.TaskWrapper createTaskWrapper(
+      SubmitWorkRequestProto request, boolean canFinish, int workTime) {
+    MockRequest mockRequest = new MockRequest(request, canFinish, workTime);
+    TaskExecutorService.TaskWrapper
+        taskWrapper = new TaskExecutorService.TaskWrapper(mockRequest, null);
+    return taskWrapper;
+  }
+
+
+  public static SubmitWorkRequestProto createSubmitWorkRequestProto(
+      int fragmentNumber, int selfAndUpstreamParallelism,
+      long attemptStartTime) {
+    return createSubmitWorkRequestProto(fragmentNumber, selfAndUpstreamParallelism, 0,
+        attemptStartTime, 1);
+  }
+
+  public static SubmitWorkRequestProto createSubmitWorkRequestProto(
+      int fragmentNumber, int selfAndUpstreamParallelism,
+      int selfAndUpstreamComplete,
+      long attemptStartTime, int withinDagPriority) {
+    ApplicationId appId = ApplicationId.newInstance(9999, 72);
+    TezDAGID dagId = TezDAGID.getInstance(appId, 1);
+    TezVertexID vId = TezVertexID.getInstance(dagId, 35);
+    TezTaskID tId = TezTaskID.getInstance(vId, 389);
+    TezTaskAttemptID taId = TezTaskAttemptID.getInstance(tId, fragmentNumber);
+    return SubmitWorkRequestProto
+        .newBuilder()
+        .setFragmentSpec(
+            LlapDaemonProtocolProtos.FragmentSpecProto
+                .newBuilder()
+                .setAttemptNumber(0)
+                .setDagName("MockDag")
+                .setFragmentNumber(fragmentNumber)
+                .setVertexName("MockVertex")
+                .setProcessorDescriptor(
+                    LlapDaemonProtocolProtos.EntityDescriptorProto.newBuilder()
+                        .setClassName("MockProcessor").build())
+                .setFragmentIdentifierString(taId.toString()).build()).setAmHost("localhost")
+        .setAmPort(12345).setAppAttemptNumber(0).setApplicationIdString("MockApp_1")
+        .setContainerIdString("MockContainer_1").setUser("MockUser")
+        .setTokenIdentifier("MockToken_1")
+        .setFragmentRuntimeInfo(LlapDaemonProtocolProtos
+            .FragmentRuntimeInfo
+            .newBuilder()
+            .setFirstAttemptStartTime(attemptStartTime)
+            .setNumSelfAndUpstreamTasks(selfAndUpstreamParallelism)
+            .setNumSelfAndUpstreamCompletedTasks(selfAndUpstreamComplete)
+            .setWithinDagPriority(withinDagPriority)
+            .build())
+        .build();
+  }
+
+  public static class MockRequest extends TaskRunnerCallable {
+    private final long workTime;
+    private final boolean canFinish;
+
+    private final AtomicBoolean isStarted = new AtomicBoolean(false);
+    private final AtomicBoolean isFinished = new AtomicBoolean(false);
+    private final AtomicBoolean wasKilled = new AtomicBoolean(false);
+    private final AtomicBoolean wasInterrupted = new AtomicBoolean(false);
+
+    private final ReentrantLock lock = new ReentrantLock();
+    private final Condition startedCondition = lock.newCondition();
+    private final Condition sleepCondition = lock.newCondition();
+    private final Condition finishedCondition = lock.newCondition();
+
+    public MockRequest(SubmitWorkRequestProto requestProto,
+                       boolean canFinish, long workTime) {
+      super(requestProto, mock(QueryFragmentInfo.class), new Configuration(),
+          new ExecutionContextImpl("localhost"), null, new Credentials(), 0, null, null, mock(
+              LlapDaemonExecutorMetrics.class),
+          mock(KilledTaskHandler.class), mock(
+              FragmentCompletionHandler.class));
+      this.workTime = workTime;
+      this.canFinish = canFinish;
+    }
+
+    @Override
+    protected TaskRunner2Result callInternal() {
+      try {
+        logInfo(super.getRequestId() + " is executing..", null);
+        lock.lock();
+        try {
+          isStarted.set(true);
+          startedCondition.signal();
+        } finally {
+          lock.unlock();
+        }
+
+        lock.lock();
+        try {
+          sleepCondition.await(workTime, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+          wasInterrupted.set(true);
+          return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, false);
+        } finally {
+          lock.unlock();
+        }
+        if (wasKilled.get()) {
+          return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, false);
+        } else {
+          return new TaskRunner2Result(EndReason.SUCCESS, null, false);
+        }
+      } finally {
+        lock.lock();
+        try {
+          isFinished.set(true);
+          finishedCondition.signal();
+        } finally {
+          lock.unlock();
+        }
+      }
+    }
+
+    @Override
+    public void killTask() {
+      lock.lock();
+      try {
+        wasKilled.set(true);
+        sleepCondition.signal();
+      } finally {
+        lock.unlock();
+      }
+    }
+
+    boolean hasStarted() {
+      return isStarted.get();
+    }
+
+    boolean hasFinished() {
+      return isFinished.get();
+    }
+
+    boolean wasPreempted() {
+      return wasKilled.get();
+    }
+
+    void complete() {
+      lock.lock();
+      try {
+        sleepCondition.signal();
+      } finally {
+        lock.unlock();
+      }
+    }
+
+    void awaitStart() throws InterruptedException {
+      lock.lock();
+      try {
+        while (!isStarted.get()) {
+          startedCondition.await();
+        }
+      } finally {
+        lock.unlock();
+      }
+    }
+
+    void awaitEnd() throws InterruptedException {
+      lock.lock();
+      try {
+        while (!isFinished.get()) {
+          finishedCondition.await();
+        }
+      } finally {
+        lock.unlock();
+      }
+    }
+
+
+    @Override
+    public boolean canFinish() {
+      return canFinish;
+    }
+  }
+
+  private static void logInfo(String message, Throwable t) {
+    LOG.info(message, t);
+  }
+
+  private static void logInfo(String message) {
+    logInfo(message, null);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/d28b6a53/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
index 7a01b39..34ab40a 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
@@ -17,227 +17,31 @@
  */
 package org.apache.hadoop.hive.llap.daemon.impl;
 
+import static org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorTestHelpers.createMockRequest;
+import static org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorTestHelpers.createSubmitWorkRequestProto;
+import static org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorTestHelpers.createTaskWrapper;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
 
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler;
-import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler;
 import org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorService.TaskWrapper;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.EntityDescriptorProto;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
-import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.tez.dag.records.TezDAGID;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.dag.records.TezTaskID;
-import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
-import org.apache.tez.runtime.task.EndReason;
+import org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorTestHelpers.MockRequest;
+import org.apache.hadoop.hive.llap.daemon.impl.comparator.ShortestJobFirstComparator;
 import org.apache.tez.runtime.task.TaskRunner2Result;
-import org.junit.Before;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TestTaskExecutorService {
-  private static Configuration conf;
-  private static Credentials cred = new Credentials();
-  private static final Logger LOG = LoggerFactory.getLogger(TestTaskExecutorService.class);
-
-  @Before
-  public void setup() {
-    conf = new Configuration();
-  }
-
-
-  @Test(timeout = 5000)
-  public void testWaitQueueComparator() throws InterruptedException {
-    TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100), false, 100000);
-    TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200), false, 100000);
-    TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300), false, 1000000);
-    TaskWrapper r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400), false, 1000000);
-    TaskWrapper r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500), false, 1000000);
-    EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
-        new TaskExecutorService.ShortestJobFirstComparator(), 4);
-    assertNull(queue.offer(r1));
-    assertEquals(r1, queue.peek());
-    assertNull(queue.offer(r2));
-    assertEquals(r1, queue.peek());
-    assertNull(queue.offer(r3));
-    assertEquals(r1, queue.peek());
-    assertNull(queue.offer(r4));
-    assertEquals(r1, queue.peek());
-    // this offer will be rejected
-    assertEquals(r5, queue.offer(r5));
-    assertEquals(r1, queue.take());
-    assertEquals(r2, queue.take());
-    assertEquals(r3, queue.take());
-    assertEquals(r4, queue.take());
-
-    r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100), true, 100000);
-    r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200), true, 100000);
-    r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300), true, 1000000);
-    r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400), true, 1000000);
-    r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500), true, 1000000);
-    queue = new EvictingPriorityBlockingQueue(
-        new TaskExecutorService.ShortestJobFirstComparator(), 4);
-    assertNull(queue.offer(r1));
-    assertEquals(r1, queue.peek());
-    assertNull(queue.offer(r2));
-    assertEquals(r1, queue.peek());
-    assertNull(queue.offer(r3));
-    assertEquals(r1, queue.peek());
-    assertNull(queue.offer(r4));
-    assertEquals(r1, queue.peek());
-    // this offer will be rejected
-    assertEquals(r5, queue.offer(r5));
-    assertEquals(r1, queue.take());
-    assertEquals(r2, queue.take());
-    assertEquals(r3, queue.take());
-    assertEquals(r4, queue.take());
-
-    r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 1, 100), true, 100000);
-    r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 1, 200), false, 100000);
-    r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 1, 300), true, 1000000);
-    r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 1, 400), false, 1000000);
-    r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500), true, 1000000);
-    queue = new EvictingPriorityBlockingQueue(
-        new TaskExecutorService.ShortestJobFirstComparator(), 4);
-    assertNull(queue.offer(r1));
-    assertEquals(r1, queue.peek());
-    assertNull(queue.offer(r2));
-    assertEquals(r1, queue.peek());
-    assertNull(queue.offer(r3));
-    assertEquals(r1, queue.peek());
-    assertNull(queue.offer(r4));
-    assertEquals(r1, queue.peek());
-    // offer accepted and r4 gets evicted
-    assertEquals(r4, queue.offer(r5));
-    assertEquals(r1, queue.take());
-    assertEquals(r3, queue.take());
-    assertEquals(r5, queue.take());
-    assertEquals(r2, queue.take());
-
-    r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100), true, 100000);
-    r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200), false, 100000);
-    r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300), true, 1000000);
-    r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400), false, 1000000);
-    r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500), true, 1000000);
-    queue = new EvictingPriorityBlockingQueue(
-        new TaskExecutorService.ShortestJobFirstComparator(), 4);
-    assertNull(queue.offer(r1));
-    assertEquals(r1, queue.peek());
-    assertNull(queue.offer(r2));
-    assertEquals(r1, queue.peek());
-    assertNull(queue.offer(r3));
-    assertEquals(r1, queue.peek());
-    assertNull(queue.offer(r4));
-    assertEquals(r1, queue.peek());
-    // offer accepted and r4 gets evicted
-    assertEquals(r4, queue.offer(r5));
-    assertEquals(r1, queue.take());
-    assertEquals(r3, queue.take());
-    assertEquals(r5, queue.take());
-    assertEquals(r2, queue.take());
-
-    r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100), true, 100000);
-    r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200), false, 100000);
-    r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300), false, 1000000);
-    r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400), false, 1000000);
-    r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500), true, 1000000);
-    queue = new EvictingPriorityBlockingQueue(
-        new TaskExecutorService.ShortestJobFirstComparator(), 4);
-    assertNull(queue.offer(r1));
-    assertEquals(r1, queue.peek());
-    assertNull(queue.offer(r2));
-    assertEquals(r1, queue.peek());
-    assertNull(queue.offer(r3));
-    assertEquals(r1, queue.peek());
-    assertNull(queue.offer(r4));
-    assertEquals(r1, queue.peek());
-    // offer accepted and r4 gets evicted
-    assertEquals(r4, queue.offer(r5));
-    assertEquals(r1, queue.take());
-    assertEquals(r5, queue.take());
-    assertEquals(r2, queue.take());
-    assertEquals(r3, queue.take());
-
-    r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100), false, 100000);
-    r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200), true, 100000);
-    r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300), true, 1000000);
-    r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400), true, 1000000);
-    r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500), true, 1000000);
-    queue = new EvictingPriorityBlockingQueue(
-        new TaskExecutorService.ShortestJobFirstComparator(), 4);
-    assertNull(queue.offer(r1));
-    assertEquals(r1, queue.peek());
-    assertNull(queue.offer(r2));
-    assertEquals(r2, queue.peek());
-    assertNull(queue.offer(r3));
-    assertEquals(r2, queue.peek());
-    assertNull(queue.offer(r4));
-    assertEquals(r2, queue.peek());
-    // offer accepted, r1 evicted
-    assertEquals(r1, queue.offer(r5));
-    assertEquals(r2, queue.take());
-    assertEquals(r3, queue.take());
-    assertEquals(r4, queue.take());
-    assertEquals(r5, queue.take());
-  }
-
-  @Test(timeout = 5000)
-  public void testWaitQueueComparatorWithinDagPriority() throws InterruptedException {
-    TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 1, 0, 100, 10), false, 100000);
-    TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 1, 0, 100, 1), false, 100000);
-    TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 1, 0, 100, 5), false, 100000);
-
-    EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
-        new TaskExecutorService.ShortestJobFirstComparator(), 4);
-
-    assertNull(queue.offer(r1));
-    assertNull(queue.offer(r2));
-    assertNull(queue.offer(r3));
-
-    assertEquals(r2, queue.take());
-    assertEquals(r3, queue.take());
-    assertEquals(r1, queue.take());
-  }
-
-  @Test(timeout = 5000)
-  public void testWaitQueueComparatorParallelism() throws InterruptedException {
-    TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 10, 3, 100, 1), false, 100000); // 7 pending
-    TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 10, 7, 100, 1), false, 100000); // 3 pending
-    TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 10, 5, 100, 1), false, 100000); // 5 pending
-
-    EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
-        new TaskExecutorService.ShortestJobFirstComparator(), 4);
-
-    assertNull(queue.offer(r1));
-    assertNull(queue.offer(r2));
-    assertNull(queue.offer(r3));
-
-    assertEquals(r2, queue.take());
-    assertEquals(r3, queue.take());
-    assertEquals(r1, queue.take());
-  }
 
   @Test(timeout = 5000)
   public void testPreemptionQueueComparator() throws InterruptedException {
@@ -265,8 +69,9 @@ public class TestTaskExecutorService {
   public void testFinishablePreeptsNonFinishable() throws InterruptedException {
     MockRequest r1 = createMockRequest(1, 1, 100, false, 5000l);
     MockRequest r2 = createMockRequest(2, 1, 100, true, 1000l);
-    TaskExecutorServiceForTest taskExecutorService = new TaskExecutorServiceForTest(1, 2, false, true);
-    taskExecutorService.init(conf);
+    TaskExecutorServiceForTest taskExecutorService = new TaskExecutorServiceForTest(1, 2,
+        ShortestJobFirstComparator.class.getName(), true);
+    taskExecutorService.init(new Configuration());
     taskExecutorService.start();
 
     try {
@@ -306,8 +111,8 @@ public class TestTaskExecutorService {
     MockRequest r5 = createMockRequest(5, 1, 500, true, 20000l);
 
     TaskExecutorServiceForTest taskExecutorService =
-        new TaskExecutorServiceForTest(1, 2, false, true);
-    taskExecutorService.init(conf);
+        new TaskExecutorServiceForTest(1, 2, ShortestJobFirstComparator.class.getName(), true);
+    taskExecutorService.init(new Configuration());
     taskExecutorService.start();
 
     try {
@@ -379,197 +184,12 @@ public class TestTaskExecutorService {
   }
 
 
-  // ----------- Helper classes and methods go after this point. Tests above this -----------
-
-  // Create requests with the same within dag priority
-  private SubmitWorkRequestProto createSubmitWorkRequestProto(int fragmentNumber, int selfAndUpstreamParallelism,
-                                                              long attemptStartTime) {
-    return createSubmitWorkRequestProto(fragmentNumber, selfAndUpstreamParallelism, 0, attemptStartTime, 1);
-  }
-
-  private SubmitWorkRequestProto createSubmitWorkRequestProto(int fragmentNumber, int selfAndUpstreamParallelism,
-                                                              int selfAndUpstreamComplete,
-                                                              long attemptStartTime, int withinDagPriority) {
-    ApplicationId appId = ApplicationId.newInstance(9999, 72);
-    TezDAGID dagId = TezDAGID.getInstance(appId, 1);
-    TezVertexID vId = TezVertexID.getInstance(dagId, 35);
-    TezTaskID tId = TezTaskID.getInstance(vId, 389);
-    TezTaskAttemptID taId = TezTaskAttemptID.getInstance(tId, fragmentNumber);
-    return SubmitWorkRequestProto
-        .newBuilder()
-        .setFragmentSpec(
-            FragmentSpecProto
-                .newBuilder()
-                .setAttemptNumber(0)
-                .setDagName("MockDag")
-                .setFragmentNumber(fragmentNumber)
-                .setVertexName("MockVertex")
-                .setProcessorDescriptor(
-                    EntityDescriptorProto.newBuilder().setClassName("MockProcessor").build())
-                .setFragmentIdentifierString(taId.toString()).build()).setAmHost("localhost")
-        .setAmPort(12345).setAppAttemptNumber(0).setApplicationIdString("MockApp_1")
-        .setContainerIdString("MockContainer_1").setUser("MockUser")
-        .setTokenIdentifier("MockToken_1")
-        .setFragmentRuntimeInfo(LlapDaemonProtocolProtos
-            .FragmentRuntimeInfo
-            .newBuilder()
-            .setFirstAttemptStartTime(attemptStartTime)
-            .setNumSelfAndUpstreamTasks(selfAndUpstreamParallelism)
-            .setNumSelfAndUpstreamCompletedTasks(selfAndUpstreamComplete)
-            .setWithinDagPriority(withinDagPriority)
-            .build())
-        .build();
-  }
-
-  private MockRequest createMockRequest(int fragmentNum, int parallelism, long startTime,
-                                        boolean canFinish, long workTime) {
-    SubmitWorkRequestProto requestProto = createSubmitWorkRequestProto(fragmentNum, parallelism,
-        startTime);
-    MockRequest mockRequest = new MockRequest(requestProto, canFinish, workTime);
-    return mockRequest;
-  }
-
-  private TaskWrapper createTaskWrapper(SubmitWorkRequestProto request, boolean canFinish, int workTime) {
-    MockRequest mockRequest = new MockRequest(request, canFinish, workTime);
-    TaskWrapper taskWrapper = new TaskWrapper(mockRequest, null);
-    return taskWrapper;
-  }
-
-  private static void logInfo(String message, Throwable t) {
-    LOG.info(message, t);
-  }
-
-  private static void logInfo(String message) {
-    logInfo(message, null);
-  }
-
-  private static class MockRequest extends TaskRunnerCallable {
-    private final long workTime;
-    private final boolean canFinish;
-
-    private final AtomicBoolean isStarted = new AtomicBoolean(false);
-    private final AtomicBoolean isFinished = new AtomicBoolean(false);
-    private final AtomicBoolean wasKilled = new AtomicBoolean(false);
-    private final AtomicBoolean wasInterrupted = new AtomicBoolean(false);
-
-    private final ReentrantLock lock = new ReentrantLock();
-    private final Condition startedCondition = lock.newCondition();
-    private final Condition sleepCondition = lock.newCondition();
-    private final Condition finishedCondition = lock.newCondition();
-
-    public MockRequest(LlapDaemonProtocolProtos.SubmitWorkRequestProto requestProto,
-                       boolean canFinish, long workTime) {
-      super(requestProto, mock(QueryFragmentInfo.class), conf,
-          new ExecutionContextImpl("localhost"), null, cred, 0, null, null, mock(
-              LlapDaemonExecutorMetrics.class),
-          mock(KilledTaskHandler.class), mock(
-              FragmentCompletionHandler.class));
-      this.workTime = workTime;
-      this.canFinish = canFinish;
-    }
 
-    @Override
-    protected TaskRunner2Result callInternal() {
-      try {
-        logInfo(super.getRequestId() + " is executing..", null);
-        lock.lock();
-        try {
-          isStarted.set(true);
-          startedCondition.signal();
-        } finally {
-          lock.unlock();
-        }
-
-        lock.lock();
-        try {
-          sleepCondition.await(workTime, TimeUnit.MILLISECONDS);
-        } catch (InterruptedException e) {
-          wasInterrupted.set(true);
-          return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, false);
-        } finally {
-          lock.unlock();
-        }
-        if (wasKilled.get()) {
-          return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, false);
-        } else {
-          return new TaskRunner2Result(EndReason.SUCCESS, null, false);
-        }
-      } finally {
-        lock.lock();
-        try {
-          isFinished.set(true);
-          finishedCondition.signal();
-        } finally {
-          lock.unlock();
-        }
-      }
-    }
-
-    @Override
-    public void killTask() {
-      lock.lock();
-      try {
-        wasKilled.set(true);
-        sleepCondition.signal();
-      } finally {
-        lock.unlock();
-      }
-    }
-
-    boolean hasStarted() {
-      return isStarted.get();
-    }
-
-    boolean hasFinished() {
-      return isFinished.get();
-    }
-
-    boolean wasPreempted() {
-      return wasKilled.get();
-    }
-
-    void complete() {
-      lock.lock();
-      try {
-        sleepCondition.signal();
-      } finally {
-        lock.unlock();
-      }
-    }
-
-    void awaitStart() throws InterruptedException {
-      lock.lock();
-      try {
-        while (!isStarted.get()) {
-          startedCondition.await();
-        }
-      } finally {
-        lock.unlock();
-      }
-    }
-
-    void awaitEnd() throws InterruptedException {
-      lock.lock();
-      try {
-        while (!isFinished.get()) {
-          finishedCondition.await();
-        }
-      } finally {
-        lock.unlock();
-      }
-    }
-
-
-    @Override
-    public boolean canFinish() {
-      return canFinish;
-    }
-  }
 
   private static class TaskExecutorServiceForTest extends TaskExecutorService {
-    public TaskExecutorServiceForTest(int numExecutors, int waitQueueSize, boolean useFairOrdering,
+    public TaskExecutorServiceForTest(int numExecutors, int waitQueueSize, String waitQueueComparatorClassName,
                                       boolean enablePreemption) {
-      super(numExecutors, waitQueueSize, useFairOrdering, enablePreemption);
+      super(numExecutors, waitQueueSize, waitQueueComparatorClassName, enablePreemption);
     }
 
     private ConcurrentMap<String, InternalCompletionListenerForTest> completionListeners = new ConcurrentHashMap<>();

http://git-wip-us.apache.org/repos/asf/hive/blob/d28b6a53/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService2.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService2.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService2.java
deleted file mode 100644
index 1929439..0000000
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService2.java
+++ /dev/null
@@ -1,324 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.llap.daemon.impl;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.mockito.Mockito.mock;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler;
-import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler;
-import org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorService.TaskWrapper;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.EntityDescriptorProto;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.tez.dag.records.TezDAGID;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.dag.records.TezTaskID;
-import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
-import org.apache.tez.runtime.task.EndReason;
-import org.apache.tez.runtime.task.TaskRunner2Result;
-import org.junit.Before;
-import org.junit.Test;
-
-public class TestTaskExecutorService2 {
-  private static Configuration conf;
-  private static Credentials cred = new Credentials();
-
-  private static class MockRequest extends TaskRunnerCallable {
-    private int workTime;
-    private boolean canFinish;
-
-    public MockRequest(SubmitWorkRequestProto requestProto,
-        boolean canFinish, int workTime) {
-      super(requestProto, mock(QueryFragmentInfo.class), conf,
-          new ExecutionContextImpl("localhost"), null, cred, 0, null, null, null,
-          mock(KilledTaskHandler.class), mock(
-          FragmentCompletionHandler.class));
-      this.workTime = workTime;
-      this.canFinish = canFinish;
-    }
-
-    @Override
-    protected TaskRunner2Result callInternal() {
-      System.out.println(super.getRequestId() + " is executing..");
-      try {
-        Thread.sleep(workTime);
-      } catch (InterruptedException e) {
-        return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, false);
-      }
-      return new TaskRunner2Result(EndReason.SUCCESS, null, false);
-    }
-
-    @Override
-    public boolean canFinish() {
-      return canFinish;
-    }
-  }
-
-  @Before
-  public void setup() {
-    conf = new Configuration();
-  }
-
-  private SubmitWorkRequestProto createRequest(int fragmentNumber, int numSelfAndUpstreamTasks, int dagStartTime,
-                                               int attemptStartTime) {
-    // Same priority for all tasks.
-    return createRequest(fragmentNumber, numSelfAndUpstreamTasks, 0, dagStartTime, attemptStartTime, 1);
-  }
-
-  private SubmitWorkRequestProto createRequest(int fragmentNumber, int numSelfAndUpstreamTasks,
-                                               int numSelfAndUpstreamComplete, int dagStartTime,
-                                               int attemptStartTime, int withinDagPriority) {
-    ApplicationId appId = ApplicationId.newInstance(9999, 72);
-    TezDAGID dagId = TezDAGID.getInstance(appId, 1);
-    TezVertexID vId = TezVertexID.getInstance(dagId, 35);
-    TezTaskID tId = TezTaskID.getInstance(vId, 389);
-    TezTaskAttemptID taId = TezTaskAttemptID.getInstance(tId, fragmentNumber);
-    return SubmitWorkRequestProto
-        .newBuilder()
-        .setFragmentSpec(
-            FragmentSpecProto
-                .newBuilder()
-                .setAttemptNumber(0)
-                .setDagName("MockDag")
-                .setFragmentNumber(fragmentNumber)
-                .setVertexName("MockVertex")
-                .setProcessorDescriptor(
-                    EntityDescriptorProto.newBuilder().setClassName("MockProcessor").build())
-                .setFragmentIdentifierString(taId.toString()).build()).setAmHost("localhost")
-        .setAmPort(12345).setAppAttemptNumber(0).setApplicationIdString("MockApp_1")
-        .setContainerIdString("MockContainer_1").setUser("MockUser")
-        .setTokenIdentifier("MockToken_1")
-        .setFragmentRuntimeInfo(LlapDaemonProtocolProtos
-            .FragmentRuntimeInfo
-            .newBuilder()
-            .setDagStartTime(dagStartTime)
-            .setFirstAttemptStartTime(attemptStartTime)
-            .setNumSelfAndUpstreamTasks(numSelfAndUpstreamTasks)
-            .setNumSelfAndUpstreamCompletedTasks(numSelfAndUpstreamComplete)
-            .setWithinDagPriority(withinDagPriority)
-            .build())
-        .build();
-  }
-
-  @Test
-  public void testWaitQueueComparator() throws InterruptedException {
-    TaskWrapper r1 = createTaskWrapper(createRequest(1, 2, 5, 100), false, 100000);
-    TaskWrapper r2 = createTaskWrapper(createRequest(2, 4, 4, 200), false, 100000);
-    TaskWrapper r3 = createTaskWrapper(createRequest(3, 6, 3, 300), false, 1000000);
-    TaskWrapper r4 = createTaskWrapper(createRequest(4, 8, 2, 400), false, 1000000);
-    TaskWrapper r5 = createTaskWrapper(createRequest(5, 10, 1, 500), false, 1000000);
-    EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
-        new TaskExecutorService.FirstInFirstOutComparator(), 4);
-    assertNull(queue.offer(r1));
-    assertEquals(r1, queue.peek());
-    assertNull(queue.offer(r2));
-    assertEquals(r2, queue.peek());
-    assertNull(queue.offer(r3));
-    assertEquals(r3, queue.peek());
-    assertNull(queue.offer(r4));
-    assertEquals(r4, queue.peek());
-    // this offer will be accepted and r1 evicted
-    assertEquals(r1, queue.offer(r5));
-    assertEquals(r5, queue.take());
-    assertEquals(r4, queue.take());
-    assertEquals(r3, queue.take());
-    assertEquals(r2, queue.take());
-
-    r1 = createTaskWrapper(createRequest(1, 2, 5, 100), true, 100000);
-    r2 = createTaskWrapper(createRequest(2, 4, 4, 200), true, 100000);
-    r3 = createTaskWrapper(createRequest(3, 6, 3, 300), true, 1000000);
-    r4 = createTaskWrapper(createRequest(4, 8, 2, 400), true, 1000000);
-    r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000);
-    queue = new EvictingPriorityBlockingQueue(
-        new TaskExecutorService.FirstInFirstOutComparator(), 4);
-    assertNull(queue.offer(r1));
-    assertEquals(r1, queue.peek());
-    assertNull(queue.offer(r2));
-    assertEquals(r2, queue.peek());
-    assertNull(queue.offer(r3));
-    assertEquals(r3, queue.peek());
-    assertNull(queue.offer(r4));
-    assertEquals(r4, queue.peek());
-    // this offer will be accpeted and r1 evicted
-    assertEquals(r1, queue.offer(r5));
-    assertEquals(r5, queue.take());
-    assertEquals(r4, queue.take());
-    assertEquals(r3, queue.take());
-    assertEquals(r2, queue.take());
-
-    r1 = createTaskWrapper(createRequest(1, 1, 5, 100), true, 100000);
-    r2 = createTaskWrapper(createRequest(2, 1, 4, 200), false, 100000);
-    r3 = createTaskWrapper(createRequest(3, 1, 3, 300), true, 1000000);
-    r4 = createTaskWrapper(createRequest(4, 1, 2, 400), false, 1000000);
-    r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000);
-    queue = new EvictingPriorityBlockingQueue(
-        new TaskExecutorService.FirstInFirstOutComparator(), 4);
-    assertNull(queue.offer(r1));
-    assertEquals(r1, queue.peek());
-    assertNull(queue.offer(r2));
-    assertEquals(r1, queue.peek());
-    assertNull(queue.offer(r3));
-    assertEquals(r3, queue.peek());
-    assertNull(queue.offer(r4));
-    assertEquals(r3, queue.peek());
-    // offer accepted and r2 gets evicted
-    assertEquals(r2, queue.offer(r5));
-    assertEquals(r5, queue.take());
-    assertEquals(r3, queue.take());
-    assertEquals(r1, queue.take());
-    assertEquals(r4, queue.take());
-
-    r1 = createTaskWrapper(createRequest(1, 2, 5, 100), true, 100000);
-    r2 = createTaskWrapper(createRequest(2, 4, 4, 200), false, 100000);
-    r3 = createTaskWrapper(createRequest(3, 6, 3, 300), true, 1000000);
-    r4 = createTaskWrapper(createRequest(4, 8, 2, 400), false, 1000000);
-    r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000);
-    queue = new EvictingPriorityBlockingQueue(
-        new TaskExecutorService.FirstInFirstOutComparator(), 4);
-    assertNull(queue.offer(r1));
-    assertEquals(r1, queue.peek());
-    assertNull(queue.offer(r2));
-    assertEquals(r1, queue.peek());
-    assertNull(queue.offer(r3));
-    assertEquals(r3, queue.peek());
-    assertNull(queue.offer(r4));
-    assertEquals(r3, queue.peek());
-    // offer accepted and r2 gets evicted
-    assertEquals(r2, queue.offer(r5));
-    assertEquals(r5, queue.take());
-    assertEquals(r3, queue.take());
-    assertEquals(r1, queue.take());
-    assertEquals(r4, queue.take());
-
-    r1 = createTaskWrapper(createRequest(1, 2, 5, 100), true, 100000);
-    r2 = createTaskWrapper(createRequest(2, 4, 4, 200), false, 100000);
-    r3 = createTaskWrapper(createRequest(3, 6, 3, 300), false, 1000000);
-    r4 = createTaskWrapper(createRequest(4, 8, 2, 400), false, 1000000);
-    r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000);
-    queue = new EvictingPriorityBlockingQueue(
-        new TaskExecutorService.FirstInFirstOutComparator(), 4);
-    assertNull(queue.offer(r1));
-    assertEquals(r1, queue.peek());
-    assertNull(queue.offer(r2));
-    assertEquals(r1, queue.peek());
-    assertNull(queue.offer(r3));
-    assertEquals(r1, queue.peek());
-    assertNull(queue.offer(r4));
-    assertEquals(r1, queue.peek());
-    // offer accepted and r2 gets evicted
-    assertEquals(r2, queue.offer(r5));
-    assertEquals(r5, queue.take());
-    assertEquals(r1, queue.take());
-    assertEquals(r4, queue.take());
-    assertEquals(r3, queue.take());
-
-    r1 = createTaskWrapper(createRequest(1, 2, 5, 100), false, 100000);
-    r2 = createTaskWrapper(createRequest(2, 4, 4, 200), true, 100000);
-    r3 = createTaskWrapper(createRequest(3, 6, 3, 300), true, 1000000);
-    r4 = createTaskWrapper(createRequest(4, 8, 2, 400), true, 1000000);
-    r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000);
-    queue = new EvictingPriorityBlockingQueue(
-        new TaskExecutorService.FirstInFirstOutComparator(), 4);
-    assertNull(queue.offer(r1));
-    assertEquals(r1, queue.peek());
-    assertNull(queue.offer(r2));
-    assertEquals(r2, queue.peek());
-    assertNull(queue.offer(r3));
-    assertEquals(r3, queue.peek());
-    assertNull(queue.offer(r4));
-    assertEquals(r4, queue.peek());
-    // offer accepted, r1 evicted
-    assertEquals(r1, queue.offer(r5));
-    assertEquals(r5, queue.take());
-    assertEquals(r4, queue.take());
-    assertEquals(r3, queue.take());
-    assertEquals(r2, queue.take());
-
-    r1 = createTaskWrapper(createRequest(1, 2, 5, 100), false, 100000);
-    r2 = createTaskWrapper(createRequest(2, 4, 4, 200), true, 100000);
-    r3 = createTaskWrapper(createRequest(3, 6, 3, 300), true, 1000000);
-    r4 = createTaskWrapper(createRequest(4, 8, 2, 400), true, 1000000);
-    r5 = createTaskWrapper(createRequest(5, 10, 2, 500), true, 1000000);
-    queue = new EvictingPriorityBlockingQueue(
-        new TaskExecutorService.FirstInFirstOutComparator(), 4);
-    assertNull(queue.offer(r1));
-    assertEquals(r1, queue.peek());
-    assertNull(queue.offer(r2));
-    assertEquals(r2, queue.peek());
-    assertNull(queue.offer(r3));
-    assertEquals(r3, queue.peek());
-    assertNull(queue.offer(r4));
-    assertEquals(r4, queue.peek());
-    // offer accepted, r1 evicted
-    assertEquals(r1, queue.offer(r5));
-    assertEquals(r4, queue.take());
-    assertEquals(r5, queue.take());
-    assertEquals(r3, queue.take());
-    assertEquals(r2, queue.take());
-  }
-
-  @Test(timeout = 5000)
-  public void testWaitQueueComparatorWithinDagPriority() throws InterruptedException {
-    TaskWrapper r1 = createTaskWrapper(createRequest(1, 1, 0, 100, 100, 10), false, 100000);
-    TaskWrapper r2 = createTaskWrapper(createRequest(2, 1, 0, 100, 100, 1), false, 100000);
-    TaskWrapper r3 = createTaskWrapper(createRequest(3, 1, 0, 100, 100, 5), false, 100000);
-
-    EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
-        new TaskExecutorService.ShortestJobFirstComparator(), 4);
-
-    assertNull(queue.offer(r1));
-    assertNull(queue.offer(r2));
-    assertNull(queue.offer(r3));
-
-    assertEquals(r2, queue.take());
-    assertEquals(r3, queue.take());
-    assertEquals(r1, queue.take());
-  }
-
-  @Test(timeout = 5000)
-  public void testWaitQueueComparatorParallelism() throws InterruptedException {
-    TaskWrapper r1 = createTaskWrapper(createRequest(1, 10, 3, 100, 100, 1), false, 100000);
-    TaskWrapper r2 = createTaskWrapper(createRequest(2, 10, 7, 100, 100, 1), false, 100000);
-    TaskWrapper r3 = createTaskWrapper(createRequest(3, 10, 5, 100, 100, 1), false, 100000);
-
-    EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
-        new TaskExecutorService.ShortestJobFirstComparator(), 4);
-
-    assertNull(queue.offer(r1));
-    assertNull(queue.offer(r2));
-    assertNull(queue.offer(r3));
-
-    assertEquals(r2, queue.take());
-    assertEquals(r3, queue.take());
-    assertEquals(r1, queue.take());
-  }
-
-
-  private TaskWrapper createTaskWrapper(SubmitWorkRequestProto request, boolean canFinish, int workTime) {
-    MockRequest mockRequest = new MockRequest(request, canFinish, workTime);
-    TaskWrapper taskWrapper = new TaskWrapper(mockRequest, null);
-    return taskWrapper;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/d28b6a53/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java
new file mode 100644
index 0000000..ebfb430
--- /dev/null
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java
@@ -0,0 +1,321 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.daemon.impl.comparator;
+
+import static org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorTestHelpers.createTaskWrapper;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler;
+import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler;
+import org.apache.hadoop.hive.llap.daemon.impl.EvictingPriorityBlockingQueue;
+import org.apache.hadoop.hive.llap.daemon.impl.QueryFragmentInfo;
+import org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorService.TaskWrapper;
+import org.apache.hadoop.hive.llap.daemon.impl.TaskRunnerCallable;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.EntityDescriptorProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
+import org.apache.tez.runtime.task.EndReason;
+import org.apache.tez.runtime.task.TaskRunner2Result;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestFirstInFirstOutComparator {
+  private static Configuration conf;
+  private static Credentials cred = new Credentials();
+
+  private static class MockRequest extends TaskRunnerCallable {
+    private int workTime;
+    private boolean canFinish;
+
+    public MockRequest(SubmitWorkRequestProto requestProto,
+        boolean canFinish, int workTime) {
+      super(requestProto, mock(QueryFragmentInfo.class), conf,
+          new ExecutionContextImpl("localhost"), null, cred, 0, null, null, null,
+          mock(KilledTaskHandler.class), mock(
+          FragmentCompletionHandler.class));
+      this.workTime = workTime;
+      this.canFinish = canFinish;
+    }
+
+    @Override
+    protected TaskRunner2Result callInternal() {
+      System.out.println(super.getRequestId() + " is executing..");
+      try {
+        Thread.sleep(workTime);
+      } catch (InterruptedException e) {
+        return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, false);
+      }
+      return new TaskRunner2Result(EndReason.SUCCESS, null, false);
+    }
+
+    @Override
+    public boolean canFinish() {
+      return canFinish;
+    }
+  }
+
+  @Before
+  public void setup() {
+    conf = new Configuration();
+  }
+
+  private SubmitWorkRequestProto createRequest(int fragmentNumber, int numSelfAndUpstreamTasks, int dagStartTime,
+                                               int attemptStartTime) {
+    // Same priority for all tasks.
+    return createRequest(fragmentNumber, numSelfAndUpstreamTasks, 0, dagStartTime, attemptStartTime, 1);
+  }
+
+  private SubmitWorkRequestProto createRequest(int fragmentNumber, int numSelfAndUpstreamTasks,
+                                               int numSelfAndUpstreamComplete, int dagStartTime,
+                                               int attemptStartTime, int withinDagPriority) {
+    ApplicationId appId = ApplicationId.newInstance(9999, 72);
+    TezDAGID dagId = TezDAGID.getInstance(appId, 1);
+    TezVertexID vId = TezVertexID.getInstance(dagId, 35);
+    TezTaskID tId = TezTaskID.getInstance(vId, 389);
+    TezTaskAttemptID taId = TezTaskAttemptID.getInstance(tId, fragmentNumber);
+    return SubmitWorkRequestProto
+        .newBuilder()
+        .setFragmentSpec(
+            FragmentSpecProto
+                .newBuilder()
+                .setAttemptNumber(0)
+                .setDagName("MockDag")
+                .setFragmentNumber(fragmentNumber)
+                .setVertexName("MockVertex")
+                .setProcessorDescriptor(
+                    EntityDescriptorProto.newBuilder().setClassName("MockProcessor").build())
+                .setFragmentIdentifierString(taId.toString()).build()).setAmHost("localhost")
+        .setAmPort(12345).setAppAttemptNumber(0).setApplicationIdString("MockApp_1")
+        .setContainerIdString("MockContainer_1").setUser("MockUser")
+        .setTokenIdentifier("MockToken_1")
+        .setFragmentRuntimeInfo(LlapDaemonProtocolProtos
+            .FragmentRuntimeInfo
+            .newBuilder()
+            .setDagStartTime(dagStartTime)
+            .setFirstAttemptStartTime(attemptStartTime)
+            .setNumSelfAndUpstreamTasks(numSelfAndUpstreamTasks)
+            .setNumSelfAndUpstreamCompletedTasks(numSelfAndUpstreamComplete)
+            .setWithinDagPriority(withinDagPriority)
+            .build())
+        .build();
+  }
+
+  @Test
+  public void testWaitQueueComparator() throws InterruptedException {
+    TaskWrapper r1 = createTaskWrapper(createRequest(1, 2, 5, 100), false, 100000);
+    TaskWrapper r2 = createTaskWrapper(createRequest(2, 4, 4, 200), false, 100000);
+    TaskWrapper r3 = createTaskWrapper(createRequest(3, 6, 3, 300), false, 1000000);
+    TaskWrapper r4 = createTaskWrapper(createRequest(4, 8, 2, 400), false, 1000000);
+    TaskWrapper r5 = createTaskWrapper(createRequest(5, 10, 1, 500), false, 1000000);
+    EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
+        new FirstInFirstOutComparator(), 4);
+    assertNull(queue.offer(r1));
+    assertEquals(r1, queue.peek());
+    assertNull(queue.offer(r2));
+    assertEquals(r2, queue.peek());
+    assertNull(queue.offer(r3));
+    assertEquals(r3, queue.peek());
+    assertNull(queue.offer(r4));
+    assertEquals(r4, queue.peek());
+    // this offer will be accepted and r1 evicted
+    assertEquals(r1, queue.offer(r5));
+    assertEquals(r5, queue.take());
+    assertEquals(r4, queue.take());
+    assertEquals(r3, queue.take());
+    assertEquals(r2, queue.take());
+
+    r1 = createTaskWrapper(createRequest(1, 2, 5, 100), true, 100000);
+    r2 = createTaskWrapper(createRequest(2, 4, 4, 200), true, 100000);
+    r3 = createTaskWrapper(createRequest(3, 6, 3, 300), true, 1000000);
+    r4 = createTaskWrapper(createRequest(4, 8, 2, 400), true, 1000000);
+    r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000);
+    queue = new EvictingPriorityBlockingQueue(
+        new FirstInFirstOutComparator(), 4);
+    assertNull(queue.offer(r1));
+    assertEquals(r1, queue.peek());
+    assertNull(queue.offer(r2));
+    assertEquals(r2, queue.peek());
+    assertNull(queue.offer(r3));
+    assertEquals(r3, queue.peek());
+    assertNull(queue.offer(r4));
+    assertEquals(r4, queue.peek());
+    // this offer will be accpeted and r1 evicted
+    assertEquals(r1, queue.offer(r5));
+    assertEquals(r5, queue.take());
+    assertEquals(r4, queue.take());
+    assertEquals(r3, queue.take());
+    assertEquals(r2, queue.take());
+
+    r1 = createTaskWrapper(createRequest(1, 1, 5, 100), true, 100000);
+    r2 = createTaskWrapper(createRequest(2, 1, 4, 200), false, 100000);
+    r3 = createTaskWrapper(createRequest(3, 1, 3, 300), true, 1000000);
+    r4 = createTaskWrapper(createRequest(4, 1, 2, 400), false, 1000000);
+    r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000);
+    queue = new EvictingPriorityBlockingQueue(
+        new FirstInFirstOutComparator(), 4);
+    assertNull(queue.offer(r1));
+    assertEquals(r1, queue.peek());
+    assertNull(queue.offer(r2));
+    assertEquals(r1, queue.peek());
+    assertNull(queue.offer(r3));
+    assertEquals(r3, queue.peek());
+    assertNull(queue.offer(r4));
+    assertEquals(r3, queue.peek());
+    // offer accepted and r2 gets evicted
+    assertEquals(r2, queue.offer(r5));
+    assertEquals(r5, queue.take());
+    assertEquals(r3, queue.take());
+    assertEquals(r1, queue.take());
+    assertEquals(r4, queue.take());
+
+    r1 = createTaskWrapper(createRequest(1, 2, 5, 100), true, 100000);
+    r2 = createTaskWrapper(createRequest(2, 4, 4, 200), false, 100000);
+    r3 = createTaskWrapper(createRequest(3, 6, 3, 300), true, 1000000);
+    r4 = createTaskWrapper(createRequest(4, 8, 2, 400), false, 1000000);
+    r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000);
+    queue = new EvictingPriorityBlockingQueue(
+        new FirstInFirstOutComparator(), 4);
+    assertNull(queue.offer(r1));
+    assertEquals(r1, queue.peek());
+    assertNull(queue.offer(r2));
+    assertEquals(r1, queue.peek());
+    assertNull(queue.offer(r3));
+    assertEquals(r3, queue.peek());
+    assertNull(queue.offer(r4));
+    assertEquals(r3, queue.peek());
+    // offer accepted and r2 gets evicted
+    assertEquals(r2, queue.offer(r5));
+    assertEquals(r5, queue.take());
+    assertEquals(r3, queue.take());
+    assertEquals(r1, queue.take());
+    assertEquals(r4, queue.take());
+
+    r1 = createTaskWrapper(createRequest(1, 2, 5, 100), true, 100000);
+    r2 = createTaskWrapper(createRequest(2, 4, 4, 200), false, 100000);
+    r3 = createTaskWrapper(createRequest(3, 6, 3, 300), false, 1000000);
+    r4 = createTaskWrapper(createRequest(4, 8, 2, 400), false, 1000000);
+    r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000);
+    queue = new EvictingPriorityBlockingQueue(
+        new FirstInFirstOutComparator(), 4);
+    assertNull(queue.offer(r1));
+    assertEquals(r1, queue.peek());
+    assertNull(queue.offer(r2));
+    assertEquals(r1, queue.peek());
+    assertNull(queue.offer(r3));
+    assertEquals(r1, queue.peek());
+    assertNull(queue.offer(r4));
+    assertEquals(r1, queue.peek());
+    // offer accepted and r2 gets evicted
+    assertEquals(r2, queue.offer(r5));
+    assertEquals(r5, queue.take());
+    assertEquals(r1, queue.take());
+    assertEquals(r4, queue.take());
+    assertEquals(r3, queue.take());
+
+    r1 = createTaskWrapper(createRequest(1, 2, 5, 100), false, 100000);
+    r2 = createTaskWrapper(createRequest(2, 4, 4, 200), true, 100000);
+    r3 = createTaskWrapper(createRequest(3, 6, 3, 300), true, 1000000);
+    r4 = createTaskWrapper(createRequest(4, 8, 2, 400), true, 1000000);
+    r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000);
+    queue = new EvictingPriorityBlockingQueue(
+        new FirstInFirstOutComparator(), 4);
+    assertNull(queue.offer(r1));
+    assertEquals(r1, queue.peek());
+    assertNull(queue.offer(r2));
+    assertEquals(r2, queue.peek());
+    assertNull(queue.offer(r3));
+    assertEquals(r3, queue.peek());
+    assertNull(queue.offer(r4));
+    assertEquals(r4, queue.peek());
+    // offer accepted, r1 evicted
+    assertEquals(r1, queue.offer(r5));
+    assertEquals(r5, queue.take());
+    assertEquals(r4, queue.take());
+    assertEquals(r3, queue.take());
+    assertEquals(r2, queue.take());
+
+    r1 = createTaskWrapper(createRequest(1, 2, 5, 100), false, 100000);
+    r2 = createTaskWrapper(createRequest(2, 4, 4, 200), true, 100000);
+    r3 = createTaskWrapper(createRequest(3, 6, 3, 300), true, 1000000);
+    r4 = createTaskWrapper(createRequest(4, 8, 2, 400), true, 1000000);
+    r5 = createTaskWrapper(createRequest(5, 10, 2, 500), true, 1000000);
+    queue = new EvictingPriorityBlockingQueue(
+        new FirstInFirstOutComparator(), 4);
+    assertNull(queue.offer(r1));
+    assertEquals(r1, queue.peek());
+    assertNull(queue.offer(r2));
+    assertEquals(r2, queue.peek());
+    assertNull(queue.offer(r3));
+    assertEquals(r3, queue.peek());
+    assertNull(queue.offer(r4));
+    assertEquals(r4, queue.peek());
+    // offer accepted, r1 evicted
+    assertEquals(r1, queue.offer(r5));
+    assertEquals(r4, queue.take());
+    assertEquals(r5, queue.take());
+    assertEquals(r3, queue.take());
+    assertEquals(r2, queue.take());
+  }
+
+  @Test(timeout = 5000)
+  public void testWaitQueueComparatorWithinDagPriority() throws InterruptedException {
+    TaskWrapper r1 = createTaskWrapper(createRequest(1, 1, 0, 100, 100, 10), false, 100000);
+    TaskWrapper r2 = createTaskWrapper(createRequest(2, 1, 0, 100, 100, 1), false, 100000);
+    TaskWrapper r3 = createTaskWrapper(createRequest(3, 1, 0, 100, 100, 5), false, 100000);
+
+    EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
+        new FirstInFirstOutComparator(), 4);
+
+    assertNull(queue.offer(r1));
+    assertNull(queue.offer(r2));
+    assertNull(queue.offer(r3));
+
+    assertEquals(r2, queue.take());
+    assertEquals(r3, queue.take());
+    assertEquals(r1, queue.take());
+  }
+
+  @Test(timeout = 5000)
+  public void testWaitQueueComparatorParallelism() throws InterruptedException {
+    TaskWrapper r1 = createTaskWrapper(createRequest(1, 10, 3, 100, 100, 1), false, 100000);
+    TaskWrapper r2 = createTaskWrapper(createRequest(2, 10, 7, 100, 100, 1), false, 100000);
+    TaskWrapper r3 = createTaskWrapper(createRequest(3, 10, 5, 100, 100, 1), false, 100000);
+
+    EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
+        new FirstInFirstOutComparator(), 4);
+
+    assertNull(queue.offer(r1));
+    assertNull(queue.offer(r2));
+    assertNull(queue.offer(r3));
+
+    assertEquals(r2, queue.take());
+    assertEquals(r3, queue.take());
+    assertEquals(r1, queue.take());
+  }
+}