You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by rb...@apache.org on 2017/02/03 03:13:24 UTC

hive git commit: HIVE-15779: LLAP: WaitQueue comparators should return 0 when tasks of the same DAG are of same priority (Rajesh Balamohan reviewed by Siddharth Seth, Prasanth Jayachandran)

Repository: hive
Updated Branches:
  refs/heads/master feaa65fce -> 5d637a62b


HIVE-15779: LLAP: WaitQueue comparators should return 0 when tasks of the same DAG are of same priority (Rajesh Balamohan reviewed by Siddharth Seth, Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/5d637a62
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/5d637a62
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/5d637a62

Branch: refs/heads/master
Commit: 5d637a62b56eb7a05c115512df8f1057b02793a9
Parents: feaa65f
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Fri Feb 3 08:41:13 2017 +0530
Committer: Rajesh Balamohan <rb...@apache.org>
Committed: Fri Feb 3 08:42:20 2017 +0530

----------------------------------------------------------------------
 .../comparator/FirstInFirstOutComparator.java   |  6 +-
 .../comparator/ShortestJobFirstComparator.java  |  6 +-
 .../daemon/impl/TaskExecutorTestHelpers.java    | 17 +++-
 .../TestFirstInFirstOutComparator.java          | 73 ++++++++++-----
 .../TestShortestJobFirstComparator.java         | 98 ++++++++++++--------
 5 files changed, 126 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/5d637a62/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/FirstInFirstOutComparator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/FirstInFirstOutComparator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/FirstInFirstOutComparator.java
index 447fc7b..ae1ca5d 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/FirstInFirstOutComparator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/FirstInFirstOutComparator.java
@@ -53,11 +53,7 @@ public class FirstInFirstOutComparator implements Comparator<TaskWrapper> {
     if (o1.getQueryId().equals(o2.getQueryId())) {
       // Same Query
       // Within dag priority - lower values indicate higher priority.
-      if (fri1.getWithinDagPriority() < fri2.getWithinDagPriority()) {
-        return -1;
-      } else if (fri1.getWithinDagPriority() > fri2.getWithinDagPriority()){
-        return 1;
-      }
+      return Integer.compare(fri1.getWithinDagPriority(), fri2.getWithinDagPriority());
     }
 
     if (fri1.getFirstAttemptStartTime() < fri2.getFirstAttemptStartTime()) {

http://git-wip-us.apache.org/repos/asf/hive/blob/5d637a62/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java
index 9b6c894..b54f740 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java
@@ -43,11 +43,7 @@ public class ShortestJobFirstComparator implements Comparator<TaskWrapper> {
     if (o1.getQueryId().equals(o2.getQueryId())) {
       // Same Query
       // Within dag priority - lower values indicate higher priority.
-      if (fri1.getWithinDagPriority() < fri2.getWithinDagPriority()) {
-        return -1;
-      } else if (fri1.getWithinDagPriority() > fri2.getWithinDagPriority()){
-        return 1;
-      }
+      return Integer.compare(fri1.getWithinDagPriority(), fri2.getWithinDagPriority());
     }
 
     // Compute knownPending tasks. selfAndUpstream indicates task counts for current vertex and

http://git-wip-us.apache.org/repos/asf/hive/blob/5d637a62/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
index 2cd6542..6506d07 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
@@ -90,9 +90,24 @@ public class TaskExecutorTestHelpers {
   }
 
   public static SubmitWorkRequestProto createSubmitWorkRequestProto(
+      int fragmentNumber, int selfAndUpstreamParallelism, long firstAttemptStartTime,
+      long currentAttemptStartTime, String dagName) {
+    return createSubmitWorkRequestProto(fragmentNumber, selfAndUpstreamParallelism, 0, firstAttemptStartTime,
+        currentAttemptStartTime, 1, dagName);
+  }
+
+  public static SubmitWorkRequestProto createSubmitWorkRequestProto(
       int fragmentNumber, int selfAndUpstreamParallelism,
       int selfAndUpstreamComplete, long firstAttemptStartTime,
       long currentAttemptStartTime, int withinDagPriority) {
+    return createSubmitWorkRequestProto(fragmentNumber, selfAndUpstreamParallelism, 0, firstAttemptStartTime,
+        currentAttemptStartTime, withinDagPriority, "MockDag");
+  }
+
+  public static SubmitWorkRequestProto createSubmitWorkRequestProto(
+      int fragmentNumber, int selfAndUpstreamParallelism,
+      int selfAndUpstreamComplete, long firstAttemptStartTime,
+      long currentAttemptStartTime, int withinDagPriority, String dagName) {
     ApplicationId appId = ApplicationId.newInstance(9999, 72);
     TezDAGID dagId = TezDAGID.getInstance(appId, 1);
     TezVertexID vId = TezVertexID.getInstance(dagId, 35);
@@ -103,7 +118,7 @@ public class TaskExecutorTestHelpers {
         .setWorkSpec(
             VertexOrBinary.newBuilder().setVertex(
             SignableVertexSpec.newBuilder()
-                .setDagName("MockDag")
+                .setDagName(dagName)
                 .setUser("MockUser")
                 .setTokenIdentifier("MockToken_1")
                 .setQueryIdentifier(

http://git-wip-us.apache.org/repos/asf/hive/blob/5d637a62/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java
index 5ea62aa..8cce0cb 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java
@@ -39,14 +39,23 @@ import org.junit.Test;
 public class TestFirstInFirstOutComparator {
 
   private SubmitWorkRequestProto createRequest(int fragmentNumber, int numSelfAndUpstreamTasks, int dagStartTime,
-                                               int attemptStartTime) {
+      int attemptStartTime) {
     // Same priority for all tasks.
     return createRequest(fragmentNumber, numSelfAndUpstreamTasks, 0, dagStartTime, attemptStartTime, 1);
   }
 
   private SubmitWorkRequestProto createRequest(int fragmentNumber, int numSelfAndUpstreamTasks,
-                                               int numSelfAndUpstreamComplete, int dagStartTime,
-                                               int attemptStartTime, int withinDagPriority) {
+      int numSelfAndUpstreamComplete, int dagStartTime,
+      int attemptStartTime, int withinDagPriority) {
+    return createRequest(fragmentNumber, numSelfAndUpstreamTasks, numSelfAndUpstreamComplete,
+        dagStartTime, attemptStartTime, withinDagPriority, "MockDag");
+  }
+
+
+  private SubmitWorkRequestProto createRequest(int fragmentNumber, int numSelfAndUpstreamTasks,
+      int numSelfAndUpstreamComplete, int dagStartTime,
+      int attemptStartTime, int withinDagPriority,
+      String dagName) {
     ApplicationId appId = ApplicationId.newInstance(9999, 72);
     TezDAGID dagId = TezDAGID.getInstance(appId, 1);
     TezVertexID vId = TezVertexID.getInstance(dagId, 35);
@@ -58,22 +67,22 @@ public class TestFirstInFirstOutComparator {
         .setFragmentNumber(fragmentNumber)
         .setWorkSpec(
             VertexOrBinary.newBuilder().setVertex(
-            SignableVertexSpec
-                .newBuilder()
-                .setQueryIdentifier(
-                    QueryIdentifierProto.newBuilder()
-                        .setApplicationIdString(appId.toString())
-                        .setAppAttemptNumber(0)
-                        .setDagIndex(dagId.getId())
-                        .build())
-                .setVertexIndex(vId.getId())
-                .setDagName("MockDag")
-                .setVertexName("MockVertex")
-                .setUser("MockUser")
-                .setTokenIdentifier("MockToken_1")
-                .setProcessorDescriptor(
-                    EntityDescriptorProto.newBuilder().setClassName("MockProcessor").build())
-                .build()).build())
+                SignableVertexSpec
+                    .newBuilder()
+                    .setQueryIdentifier(
+                        QueryIdentifierProto.newBuilder()
+                            .setApplicationIdString(appId.toString())
+                            .setAppAttemptNumber(0)
+                            .setDagIndex(dagId.getId())
+                            .build())
+                    .setVertexIndex(vId.getId())
+                    .setDagName(dagName)
+                    .setVertexName("MockVertex")
+                    .setUser("MockUser")
+                    .setTokenIdentifier("MockToken_1")
+                    .setProcessorDescriptor(
+                        EntityDescriptorProto.newBuilder().setClassName("MockProcessor").build())
+                    .build()).build())
         .setAmHost("localhost")
         .setAmPort(12345)
         .setContainerIdString("MockContainer_1")
@@ -240,8 +249,8 @@ public class TestFirstInFirstOutComparator {
     assertEquals(r4, queue.peek());
     // offer accepted, r1 evicted
     assertEquals(r1, queue.offer(r5));
-    assertEquals(r4, queue.take());
     assertEquals(r5, queue.take());
+    assertEquals(r4, queue.take());
     assertEquals(r3, queue.take());
     assertEquals(r2, queue.take());
   }
@@ -265,10 +274,28 @@ public class TestFirstInFirstOutComparator {
   }
 
   @Test(timeout = 60000)
+  public void testWaitQueueComparatorWithinSameDagPriority() throws InterruptedException {
+    TaskWrapper r1 = createTaskWrapper(createRequest(1, 1, 0, 10, 100, 10), true, 100000);
+    TaskWrapper r2 = createTaskWrapper(createRequest(2, 1, 0, 10, 100, 10), true, 100000);
+    TaskWrapper r3 = createTaskWrapper(createRequest(3, 1, 0, 10, 100, 10), true, 100000);
+
+    EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
+        new ShortestJobFirstComparator(), 3);
+
+    assertNull(queue.offer(r1));
+    assertNull(queue.offer(r2));
+    assertNull(queue.offer(r3));
+
+    // can not queue more requests as queue is full
+    TaskWrapper r4 = createTaskWrapper(createRequest(4, 1, 0, 10, 100, 10), true, 100000);
+    assertEquals(r4, queue.offer(r4));
+  }
+
+  @Test(timeout = 60000)
   public void testWaitQueueComparatorParallelism() throws InterruptedException {
-    TaskWrapper r1 = createTaskWrapper(createRequest(1, 10, 3, 100, 100, 1), false, 100000);
-    TaskWrapper r2 = createTaskWrapper(createRequest(2, 10, 7, 100, 100, 1), false, 100000);
-    TaskWrapper r3 = createTaskWrapper(createRequest(3, 10, 5, 100, 100, 1), false, 100000);
+    TaskWrapper r1 = createTaskWrapper(createRequest(1, 10, 3, 100, 100, 1, "q1"), false, 100000);
+    TaskWrapper r2 = createTaskWrapper(createRequest(2, 10, 7, 100, 100, 1, "q2"), false, 100000);
+    TaskWrapper r3 = createTaskWrapper(createRequest(3, 10, 5, 100, 100, 1, "q3"), false, 100000);
 
     EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
         new FirstInFirstOutComparator(), 4);

http://git-wip-us.apache.org/repos/asf/hive/blob/5d637a62/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java
index e82f756..0059d0c 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java
@@ -28,11 +28,11 @@ public class TestShortestJobFirstComparator {
 
   @Test(timeout = 60000)
   public void testWaitQueueComparator() throws InterruptedException {
-    TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100, 200), false, 100000);
-    TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200, 300), false, 100000);
-    TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300, 400), false, 1000000);
-    TaskWrapper r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400, 500), false, 1000000);
-    TaskWrapper r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600), false, 1000000);
+    TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100, 200, "q1"), false, 100000);
+    TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200, 300, "q2"), false, 100000);
+    TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300, 400, "q3"), false, 1000000);
+    TaskWrapper r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400, 500, "q4"), false, 1000000);
+    TaskWrapper r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600, "q5"), false, 1000000);
     EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
         new ShortestJobFirstComparator(), 4);
     assertNull(queue.offer(r1));
@@ -50,11 +50,11 @@ public class TestShortestJobFirstComparator {
     assertEquals(r3, queue.take());
     assertEquals(r4, queue.take());
 
-    r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100, 200), true, 100000);
-    r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200, 300), true, 100000);
-    r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300, 400), true, 1000000);
-    r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400, 500), true, 1000000);
-    r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600), true, 1000000);
+    r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100, 200, "q1"), true, 100000);
+    r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200, 300, "q2"), true, 100000);
+    r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300, 400, "q3"), true, 1000000);
+    r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400, 500, "q4"), true, 1000000);
+    r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600, "q5"), true, 1000000);
     queue = new EvictingPriorityBlockingQueue(
         new ShortestJobFirstComparator(), 4);
     assertNull(queue.offer(r1));
@@ -72,11 +72,11 @@ public class TestShortestJobFirstComparator {
     assertEquals(r3, queue.take());
     assertEquals(r4, queue.take());
 
-    r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 1, 100, 1000), true, 100000);
-    r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 1, 200, 900), false, 100000);
-    r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 1, 300, 800), true, 1000000);
-    r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 1, 400, 700), false, 1000000);
-    r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600), true, 1000000);
+    r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 1, 100, 1000, "q1"), true, 100000);
+    r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 1, 200, 900, "q2"), false, 100000);
+    r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 1, 300, 800, "q3"), true, 1000000);
+    r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 1, 400, 700, "q4"), false, 1000000);
+    r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600, "q5"), true, 1000000);
     queue = new EvictingPriorityBlockingQueue(
         new ShortestJobFirstComparator(), 4);
     assertNull(queue.offer(r1));
@@ -94,11 +94,11 @@ public class TestShortestJobFirstComparator {
     assertEquals(r5, queue.take());
     assertEquals(r2, queue.take());
 
-    r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100, 200), true, 100000);
-    r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200, 300), false, 100000);
-    r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300, 400), true, 1000000);
-    r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400, 500), false, 1000000);
-    r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600), true, 1000000);
+    r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100, 200, "q1"), true, 100000);
+    r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200, 300, "q2"), false, 100000);
+    r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300, 400, "q3"), true, 1000000);
+    r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400, 500, "q4"), false, 1000000);
+    r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600, "q5"), true, 1000000);
     queue = new EvictingPriorityBlockingQueue(
         new ShortestJobFirstComparator(), 4);
     assertNull(queue.offer(r1));
@@ -116,11 +116,11 @@ public class TestShortestJobFirstComparator {
     assertEquals(r5, queue.take());
     assertEquals(r2, queue.take());
 
-    r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100, 200), true, 100000);
-    r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200, 300), false, 100000);
-    r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300, 400), false, 1000000);
-    r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400, 500), false, 1000000);
-    r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600), true, 1000000);
+    r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100, 200, "q1"), true, 100000);
+    r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200, 300, "q2"), false, 100000);
+    r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300, 400, "q3"), false, 1000000);
+    r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400, 500, "q4"), false, 1000000);
+    r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600, "q5"), true, 1000000);
     queue = new EvictingPriorityBlockingQueue(
         new ShortestJobFirstComparator(), 4);
     assertNull(queue.offer(r1));
@@ -138,11 +138,11 @@ public class TestShortestJobFirstComparator {
     assertEquals(r2, queue.take());
     assertEquals(r3, queue.take());
 
-    r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100, 200), false, 100000);
-    r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200, 300), true, 100000);
-    r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300, 400), true, 1000000);
-    r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400, 500), true, 1000000);
-    r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600), true, 1000000);
+    r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100, 200, "q1"), false, 100000);
+    r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200, 300, "q2"), true, 100000);
+    r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300, 400, "q3"), true, 1000000);
+    r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400, 500, "q4"), true, 1000000);
+    r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600, "q5"), true, 1000000);
     queue = new EvictingPriorityBlockingQueue(
         new ShortestJobFirstComparator(), 4);
     assertNull(queue.offer(r1));
@@ -180,10 +180,28 @@ public class TestShortestJobFirstComparator {
   }
 
   @Test(timeout = 60000)
+  public void testWaitQueueComparatorWithinSameDagPriority() throws InterruptedException {
+    TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 1, 0, 10, 100, 10), true, 100000);
+    TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 1, 0, 10, 100, 10), true, 100000);
+    TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 1, 0, 10, 100, 10), true, 100000);
+
+    EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
+        new ShortestJobFirstComparator(), 3);
+
+    assertNull(queue.offer(r1));
+    assertNull(queue.offer(r2));
+    assertNull(queue.offer(r3));
+
+    // can not queue more requests as queue is full
+    TaskWrapper r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 1, 0, 10, 100, 10), true, 100000);
+    assertEquals(r4, queue.offer(r4));
+  }
+
+  @Test(timeout = 60000)
   public void testWaitQueueComparatorParallelism() throws InterruptedException {
-    TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 10, 3, 10, 100, 1), false, 100000); // 7 pending
-    TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 10, 7, 10, 100, 1), false, 100000); // 3 pending
-    TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 10, 5, 10, 100, 1), false, 100000); // 5 pending
+    TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 10, 3, 10, 100, 1, "q1"), false, 100000); // 7 pending
+    TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 10, 7, 10, 100, 1, "q2"), false, 100000); // 3 pending
+    TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 10, 5, 10, 100, 1, "q3"), false, 100000); // 5 pending
 
     EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
         new ShortestJobFirstComparator(), 4);
@@ -199,12 +217,12 @@ public class TestShortestJobFirstComparator {
 
   @Test(timeout = 60000)
   public void testWaitQueueComparatorAging() throws InterruptedException {
-    TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 10, 100, 200), true, 100000);
-    TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 20, 100, 200), true, 100000);
-    TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 30, 100, 200), true, 100000);
+    TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 10, 100, 200, "q1"), true, 100000);
+    TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 20, 100, 200, "q2"), true, 100000);
+    TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 30, 100, 200, "q3"), true, 100000);
 
     EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
-      new ShortestJobFirstComparator(), 4);
+        new ShortestJobFirstComparator(), 4);
 
     assertNull(queue.offer(r1));
     assertNull(queue.offer(r2));
@@ -215,11 +233,11 @@ public class TestShortestJobFirstComparator {
     assertEquals(r3, queue.take());
 
     // priority = 10 / (200 - 100) = 0.01
-    r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 10, 100, 200), true, 100000);
+    r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 10, 100, 200, "q1"), true, 100000);
     // priority = 20 / (3000 - 100) = 0.0069
-    r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 20, 100, 3000), true, 100000);
+    r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 20, 100, 3000, "q2"), true, 100000);
     // priority = 30 / (4000 - 100) = 0.0076
-    r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 30, 100, 4000), true, 100000);
+    r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 30, 100, 4000, "q3"), true, 100000);
 
     queue = new EvictingPriorityBlockingQueue<>(new ShortestJobFirstComparator(), 4);