You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2017/01/27 08:43:57 UTC

hive git commit: HIVE-15669: LLAP: Improve aging in shortest job first scheduler (Prasanth Jayachandran reviewed by Rajesh Balamohan)

Repository: hive
Updated Branches:
  refs/heads/master 16ed90aef -> 1be08d13a


HIVE-15669: LLAP: Improve aging in shortest job first scheduler (Prasanth Jayachandran reviewed by Rajesh Balamohan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1be08d13
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1be08d13
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1be08d13

Branch: refs/heads/master
Commit: 1be08d13a0198696466f1405a2dd2b95692f75ab
Parents: 16ed90a
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Fri Jan 27 00:43:43 2017 -0800
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Fri Jan 27 00:43:43 2017 -0800

----------------------------------------------------------------------
 .../comparator/ShortestJobFirstComparator.java  |  17 +--
 .../daemon/impl/TaskExecutorTestHelpers.java    |  21 ++--
 .../daemon/impl/TestTaskExecutorService.java    |  26 ++---
 .../TestShortestJobFirstComparator.java         | 107 ++++++++++++-------
 4 files changed, 106 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/1be08d13/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java
index 238ae9e..9b6c894 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java
@@ -54,17 +54,22 @@ public class ShortestJobFirstComparator implements Comparator<TaskWrapper> {
     // it's parent hierarchy. selfAndUpstreamComplete indicates how many of these have completed.
     int knownPending1 = fri1.getNumSelfAndUpstreamTasks() - fri1.getNumSelfAndUpstreamCompletedTasks();
     int knownPending2 = fri2.getNumSelfAndUpstreamTasks() - fri2.getNumSelfAndUpstreamCompletedTasks();
-    if (knownPending1 < knownPending2) {
-      return -1;
-    } else if (knownPending1 > knownPending2) {
-      return 1;
+    // longer the wait time for an attempt wrt to its start time, higher the priority it gets
+    long waitTime1 = fri1.getCurrentAttemptStartTime() - fri1.getFirstAttemptStartTime();
+    long waitTime2 = fri2.getCurrentAttemptStartTime() - fri2.getFirstAttemptStartTime();
+
+    if (waitTime1 == 0 || waitTime2 == 0) {
+      return knownPending1 - knownPending2;
     }
 
-    if (fri1.getFirstAttemptStartTime() < fri2.getFirstAttemptStartTime()) {
+    double ratio1 = (double) knownPending1 / (double) waitTime1;
+    double ratio2 = (double) knownPending2 / (double) waitTime2;
+    if (ratio1 < ratio2) {
       return -1;
-    } else if (fri1.getFirstAttemptStartTime() > fri2.getFirstAttemptStartTime()) {
+    } else if (ratio1 > ratio2) {
       return 1;
     }
+
     return 0;
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/1be08d13/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
index 73bb68e..2cd6542 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
@@ -47,10 +47,10 @@ public class TaskExecutorTestHelpers {
 
   private static final Logger LOG = LoggerFactory.getLogger(TestTaskExecutorService.class);
 
-  public static MockRequest createMockRequest(int fragmentNum, int parallelism, long startTime,
-                                              boolean canFinish, long workTime) {
+  public static MockRequest createMockRequest(int fragmentNum, int parallelism, long firstAttemptStartTime,
+    long currentAttemptStartTime, boolean canFinish, long workTime) {
     SubmitWorkRequestProto
-        request = createSubmitWorkRequestProto(fragmentNum, parallelism, startTime);
+        request = createSubmitWorkRequestProto(fragmentNum, parallelism, firstAttemptStartTime, currentAttemptStartTime);
     return createMockRequest(canFinish, workTime, request);
   }
 
@@ -83,16 +83,16 @@ public class TaskExecutorTestHelpers {
   }
 
   public static SubmitWorkRequestProto createSubmitWorkRequestProto(
-      int fragmentNumber, int selfAndUpstreamParallelism,
-      long attemptStartTime) {
-    return createSubmitWorkRequestProto(fragmentNumber, selfAndUpstreamParallelism, 0,
-        attemptStartTime, 1);
+      int fragmentNumber, int selfAndUpstreamParallelism, long firstAttemptStartTime,
+      long currentAttemptStartTime) {
+    return createSubmitWorkRequestProto(fragmentNumber, selfAndUpstreamParallelism, 0, firstAttemptStartTime,
+      currentAttemptStartTime, 1);
   }
 
   public static SubmitWorkRequestProto createSubmitWorkRequestProto(
       int fragmentNumber, int selfAndUpstreamParallelism,
-      int selfAndUpstreamComplete,
-      long attemptStartTime, int withinDagPriority) {
+      int selfAndUpstreamComplete, long firstAttemptStartTime,
+      long currentAttemptStartTime, int withinDagPriority) {
     ApplicationId appId = ApplicationId.newInstance(9999, 72);
     TezDAGID dagId = TezDAGID.getInstance(appId, 1);
     TezVertexID vId = TezVertexID.getInstance(dagId, 35);
@@ -124,7 +124,8 @@ public class TaskExecutorTestHelpers {
         .setFragmentRuntimeInfo(LlapDaemonProtocolProtos
             .FragmentRuntimeInfo
             .newBuilder()
-            .setFirstAttemptStartTime(attemptStartTime)
+            .setFirstAttemptStartTime(firstAttemptStartTime)
+            .setCurrentAttemptStartTime(currentAttemptStartTime)
             .setNumSelfAndUpstreamTasks(selfAndUpstreamParallelism)
             .setNumSelfAndUpstreamCompletedTasks(selfAndUpstreamComplete)
             .setWithinDagPriority(withinDagPriority)

http://git-wip-us.apache.org/repos/asf/hive/blob/1be08d13/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
index ac4e5f1..de7f2fc 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
@@ -49,10 +49,10 @@ public class TestTaskExecutorService {
 
   @Test(timeout = 5000)
   public void testPreemptionQueueComparator() throws InterruptedException {
-    TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100), false, 100000);
-    TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200), false, 100000);
-    TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300), false, 1000000);
-    TaskWrapper r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400), false, 1000000);
+    TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100, 200), false, 100000);
+    TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200, 300), false, 100000);
+    TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300, 400), false, 1000000);
+    TaskWrapper r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400, 500), false, 1000000);
     BlockingQueue<TaskWrapper> queue = new PriorityBlockingQueue<>(4,
         new TaskExecutorService.PreemptionQueueComparator());
 
@@ -71,8 +71,8 @@ public class TestTaskExecutorService {
 
   @Test(timeout = 10000)
   public void testFinishablePreeptsNonFinishable() throws InterruptedException {
-    MockRequest r1 = createMockRequest(1, 1, 100, false, 5000l);
-    MockRequest r2 = createMockRequest(2, 1, 100, true, 1000l);
+    MockRequest r1 = createMockRequest(1, 1, 100, 200, false, 5000l);
+    MockRequest r2 = createMockRequest(2, 1, 100, 200, true, 1000l);
     TaskExecutorServiceForTest taskExecutorService = new TaskExecutorServiceForTest(1, 2,
         ShortestJobFirstComparator.class.getName(), true);
     taskExecutorService.init(new Configuration());
@@ -110,7 +110,7 @@ public class TestTaskExecutorService {
   @Test(timeout = 10000)
   public void testPreemptionStateOnTaskMoveToFinishableState() throws InterruptedException {
 
-    MockRequest r1 = createMockRequest(1, 1, 100, false, 20000l);
+    MockRequest r1 = createMockRequest(1, 1, 100, 200, false, 20000l);
 
     TaskExecutorServiceForTest taskExecutorService =
         new TaskExecutorServiceForTest(1, 2, ShortestJobFirstComparator.class.getName(), true);
@@ -142,7 +142,7 @@ public class TestTaskExecutorService {
   @Test(timeout = 10000)
   public void testPreemptionStateOnTaskMoveToNonFinishableState() throws InterruptedException {
 
-    MockRequest r1 = createMockRequest(1, 1, 100, true, 20000l);
+    MockRequest r1 = createMockRequest(1, 1, 100, 200, true, 20000l);
 
     TaskExecutorServiceForTest taskExecutorService =
         new TaskExecutorServiceForTest(1, 2, ShortestJobFirstComparator.class.getName(), true);
@@ -176,11 +176,11 @@ public class TestTaskExecutorService {
 
   @Test(timeout = 10000)
   public void testWaitQueuePreemption() throws InterruptedException {
-    MockRequest r1 = createMockRequest(1, 1, 100, true, 20000l);
-    MockRequest r2 = createMockRequest(2, 1, 200, false, 20000l);
-    MockRequest r3 = createMockRequest(3, 1, 300, false, 20000l);
-    MockRequest r4 = createMockRequest(4, 1, 400, false, 20000l);
-    MockRequest r5 = createMockRequest(5, 1, 500, true, 20000l);
+    MockRequest r1 = createMockRequest(1, 1, 100, 200, true, 20000l);
+    MockRequest r2 = createMockRequest(2, 1, 200, 330, false, 20000l);
+    MockRequest r3 = createMockRequest(3, 1, 300, 420, false, 20000l);
+    MockRequest r4 = createMockRequest(4, 1, 400, 510, false, 20000l);
+    MockRequest r5 = createMockRequest(5, 1, 500, 610, true, 20000l);
 
     TaskExecutorServiceForTest taskExecutorService =
         new TaskExecutorServiceForTest(1, 2, ShortestJobFirstComparator.class.getName(), true);

http://git-wip-us.apache.org/repos/asf/hive/blob/1be08d13/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java
index f50c657..e82f756 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java
@@ -28,11 +28,11 @@ public class TestShortestJobFirstComparator {
 
   @Test(timeout = 60000)
   public void testWaitQueueComparator() throws InterruptedException {
-    TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100), false, 100000);
-    TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200), false, 100000);
-    TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300), false, 1000000);
-    TaskWrapper r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400), false, 1000000);
-    TaskWrapper r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500), false, 1000000);
+    TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100, 200), false, 100000);
+    TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200, 300), false, 100000);
+    TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300, 400), false, 1000000);
+    TaskWrapper r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400, 500), false, 1000000);
+    TaskWrapper r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600), false, 1000000);
     EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
         new ShortestJobFirstComparator(), 4);
     assertNull(queue.offer(r1));
@@ -50,11 +50,11 @@ public class TestShortestJobFirstComparator {
     assertEquals(r3, queue.take());
     assertEquals(r4, queue.take());
 
-    r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100), true, 100000);
-    r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200), true, 100000);
-    r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300), true, 1000000);
-    r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400), true, 1000000);
-    r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500), true, 1000000);
+    r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100, 200), true, 100000);
+    r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200, 300), true, 100000);
+    r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300, 400), true, 1000000);
+    r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400, 500), true, 1000000);
+    r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600), true, 1000000);
     queue = new EvictingPriorityBlockingQueue(
         new ShortestJobFirstComparator(), 4);
     assertNull(queue.offer(r1));
@@ -72,11 +72,11 @@ public class TestShortestJobFirstComparator {
     assertEquals(r3, queue.take());
     assertEquals(r4, queue.take());
 
-    r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 1, 100), true, 100000);
-    r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 1, 200), false, 100000);
-    r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 1, 300), true, 1000000);
-    r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 1, 400), false, 1000000);
-    r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500), true, 1000000);
+    r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 1, 100, 1000), true, 100000);
+    r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 1, 200, 900), false, 100000);
+    r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 1, 300, 800), true, 1000000);
+    r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 1, 400, 700), false, 1000000);
+    r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600), true, 1000000);
     queue = new EvictingPriorityBlockingQueue(
         new ShortestJobFirstComparator(), 4);
     assertNull(queue.offer(r1));
@@ -94,11 +94,11 @@ public class TestShortestJobFirstComparator {
     assertEquals(r5, queue.take());
     assertEquals(r2, queue.take());
 
-    r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100), true, 100000);
-    r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200), false, 100000);
-    r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300), true, 1000000);
-    r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400), false, 1000000);
-    r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500), true, 1000000);
+    r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100, 200), true, 100000);
+    r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200, 300), false, 100000);
+    r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300, 400), true, 1000000);
+    r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400, 500), false, 1000000);
+    r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600), true, 1000000);
     queue = new EvictingPriorityBlockingQueue(
         new ShortestJobFirstComparator(), 4);
     assertNull(queue.offer(r1));
@@ -116,11 +116,11 @@ public class TestShortestJobFirstComparator {
     assertEquals(r5, queue.take());
     assertEquals(r2, queue.take());
 
-    r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100), true, 100000);
-    r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200), false, 100000);
-    r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300), false, 1000000);
-    r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400), false, 1000000);
-    r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500), true, 1000000);
+    r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100, 200), true, 100000);
+    r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200, 300), false, 100000);
+    r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300, 400), false, 1000000);
+    r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400, 500), false, 1000000);
+    r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600), true, 1000000);
     queue = new EvictingPriorityBlockingQueue(
         new ShortestJobFirstComparator(), 4);
     assertNull(queue.offer(r1));
@@ -138,11 +138,11 @@ public class TestShortestJobFirstComparator {
     assertEquals(r2, queue.take());
     assertEquals(r3, queue.take());
 
-    r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100), false, 100000);
-    r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200), true, 100000);
-    r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300), true, 1000000);
-    r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400), true, 1000000);
-    r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500), true, 1000000);
+    r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100, 200), false, 100000);
+    r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200, 300), true, 100000);
+    r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300, 400), true, 1000000);
+    r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400, 500), true, 1000000);
+    r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600), true, 1000000);
     queue = new EvictingPriorityBlockingQueue(
         new ShortestJobFirstComparator(), 4);
     assertNull(queue.offer(r1));
@@ -163,9 +163,9 @@ public class TestShortestJobFirstComparator {
 
   @Test(timeout = 60000)
   public void testWaitQueueComparatorWithinDagPriority() throws InterruptedException {
-    TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 1, 0, 100, 10), false, 100000);
-    TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 1, 0, 100, 1), false, 100000);
-    TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 1, 0, 100, 5), false, 100000);
+    TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 1, 0, 10, 100, 10), false, 100000);
+    TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 1, 0, 10, 100, 1), false, 100000);
+    TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 1, 0, 10, 100, 5), false, 100000);
 
     EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
         new ShortestJobFirstComparator(), 4);
@@ -181,9 +181,9 @@ public class TestShortestJobFirstComparator {
 
   @Test(timeout = 60000)
   public void testWaitQueueComparatorParallelism() throws InterruptedException {
-    TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 10, 3, 100, 1), false, 100000); // 7 pending
-    TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 10, 7, 100, 1), false, 100000); // 3 pending
-    TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 10, 5, 100, 1), false, 100000); // 5 pending
+    TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 10, 3, 10, 100, 1), false, 100000); // 7 pending
+    TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 10, 7, 10, 100, 1), false, 100000); // 3 pending
+    TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 10, 5, 10, 100, 1), false, 100000); // 5 pending
 
     EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
         new ShortestJobFirstComparator(), 4);
@@ -196,4 +196,39 @@ public class TestShortestJobFirstComparator {
     assertEquals(r3, queue.take());
     assertEquals(r1, queue.take());
   }
+
+  @Test(timeout = 60000)
+  public void testWaitQueueComparatorAging() throws InterruptedException {
+    TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 10, 100, 200), true, 100000);
+    TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 20, 100, 200), true, 100000);
+    TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 30, 100, 200), true, 100000);
+
+    EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
+      new ShortestJobFirstComparator(), 4);
+
+    assertNull(queue.offer(r1));
+    assertNull(queue.offer(r2));
+    assertNull(queue.offer(r3));
+
+    assertEquals(r1, queue.take());
+    assertEquals(r2, queue.take());
+    assertEquals(r3, queue.take());
+
+    // priority = 10 / (200 - 100) = 0.01
+    r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 10, 100, 200), true, 100000);
+    // priority = 20 / (3000 - 100) = 0.0069
+    r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 20, 100, 3000), true, 100000);
+    // priority = 30 / (4000 - 100) = 0.0076
+    r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 30, 100, 4000), true, 100000);
+
+    queue = new EvictingPriorityBlockingQueue<>(new ShortestJobFirstComparator(), 4);
+
+    assertNull(queue.offer(r1));
+    assertNull(queue.offer(r2));
+    assertNull(queue.offer(r3));
+
+    assertEquals(r2, queue.take());
+    assertEquals(r3, queue.take());
+    assertEquals(r1, queue.take());
+  }
 }