You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2015/07/02 22:02:46 UTC

hive git commit: HIVE-11173: Fair ordering of fragments in wait queue (Prasanth Jayachandran)

Repository: hive
Updated Branches:
  refs/heads/llap 44e550b63 -> ddbf944fe


HIVE-11173: Fair ordering of fragments in wait queue (Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ddbf944f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ddbf944f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ddbf944f

Branch: refs/heads/llap
Commit: ddbf944fe31d5b1ff863628d4de46ce2c4565df7
Parents: 44e550b
Author: Prasanth Jayachandran <j....@gmail.com>
Authored: Thu Jul 2 13:02:33 2015 -0700
Committer: Prasanth Jayachandran <j....@gmail.com>
Committed: Thu Jul 2 13:02:33 2015 -0700

----------------------------------------------------------------------
 .../llap/configuration/LlapConfiguration.java   |   4 +
 .../llap/daemon/impl/ContainerRunnerImpl.java   |  10 +-
 .../llap/daemon/impl/TaskExecutorService.java   |  57 +++-
 .../llap/daemon/impl/TaskRunnerCallable.java    |   4 +-
 .../daemon/impl/TestTaskExecutorService.java    |  16 +-
 .../daemon/impl/TestTaskExecutorService2.java   | 278 +++++++++++++++++++
 6 files changed, 353 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/ddbf944f/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java b/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java
index f5aa2a6..b6633b8 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java
@@ -140,6 +140,10 @@ public class LlapConfiguration extends Configuration {
       LLAP_DAEMON_PREFIX + "task.scheduler.wait.queue.size";
   public static final int LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE_DEFAULT = 10;
 
+  public static final String LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_FAIR_ORDERING =
+      LLAP_DAEMON_PREFIX + "task.scheduler.wait.queue.fair.ordering";
+  public static final boolean LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_FAIR_ORDERING_DEFAULT = false;
+
   public static final String LLAP_DAEMON_TASK_SCHEDULER_ENABLE_PREEMPTION =
       LLAP_DAEMON_PREFIX + "task.scheduler.enable.preemption";
   public static final boolean LLAP_DAEMON_TASK_SCHEDULER_ENABLE_PREEMPTION_DEFAULT = true;

http://git-wip-us.apache.org/repos/asf/hive/blob/ddbf944f/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index e26852a..cba057c 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -26,6 +26,7 @@ import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
 import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
 import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler;
 import org.apache.hadoop.hive.llap.daemon.HistoryLogger;
@@ -54,11 +55,11 @@ import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
-
-import com.google.common.base.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
+
 // TODO Convert this to a CompositeService
 public class ContainerRunnerImpl extends CompositeService implements ContainerRunner, FragmentCompletionHandler, QueryFailedHandler {
 
@@ -93,7 +94,10 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
 
     this.queryTracker = new QueryTracker(conf, localDirsBase);
     addIfService(queryTracker);
-    this.executorService = new TaskExecutorService(numExecutors, waitQueueSize, enablePreemption);
+    boolean useFairOrdering = conf.getBoolean(LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_FAIR_ORDERING,
+        LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_FAIR_ORDERING_DEFAULT);
+    this.executorService = new TaskExecutorService(numExecutors, waitQueueSize, useFairOrdering,
+        enablePreemption);
     AuxiliaryServiceHelper.setServiceDataIntoEnv(
         TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID,
         ByteBuffer.allocate(4).putInt(localShufflePort), localEnv);

http://git-wip-us.apache.org/repos/asf/hive/blob/ddbf944f/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
index 0fd89de..f083a48 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
@@ -100,9 +100,16 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
 
   private final Object lock = new Object();
 
-  public TaskExecutorService(int numExecutors, int waitQueueSize, boolean enablePreemption) {
+  public TaskExecutorService(int numExecutors, int waitQueueSize, boolean useFairOrdering,
+      boolean enablePreemption) {
     super(TaskExecutorService.class.getSimpleName());
-    this.waitQueue = new EvictingPriorityBlockingQueue<>(new WaitQueueComparator(), waitQueueSize);
+    final Comparator<TaskWrapper> waitQueueComparator;
+    if (useFairOrdering) {
+      waitQueueComparator = new FirstInFirstOutComparator();
+    } else {
+      waitQueueComparator = new ShortestJobFirstComparator();
+    }
+    this.waitQueue = new EvictingPriorityBlockingQueue<>(waitQueueComparator, waitQueueSize);
     this.threadPoolExecutor = new ThreadPoolExecutor(numExecutors, // core pool size
         numExecutors, // max pool size
         1, TimeUnit.MINUTES,
@@ -540,8 +547,10 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
     }
   }
 
+  // if map tasks and reduce tasks are in finishable state then priority is given to the task
+  // that has less number of pending tasks (shortest job)
   @VisibleForTesting
-  public static class WaitQueueComparator implements Comparator<TaskWrapper> {
+  public static class ShortestJobFirstComparator implements Comparator<TaskWrapper> {
 
     @Override
     public int compare(TaskWrapper t1, TaskWrapper t2) {
@@ -570,6 +579,48 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
     }
   }
 
+  // if map tasks and reduce tasks are in finishable state then priority is given to the task in
+  // the following order
+  // 1) Dag start time
+  // 2) Attempt start time
+  // 3) Vertex parallelism
+  @VisibleForTesting
+  public static class FirstInFirstOutComparator implements Comparator<TaskWrapper> {
+
+    @Override
+    public int compare(TaskWrapper t1, TaskWrapper t2) {
+      TaskRunnerCallable o1 = t1.getTaskRunnerCallable();
+      TaskRunnerCallable o2 = t2.getTaskRunnerCallable();
+      boolean newCanFinish = o1.canFinish();
+      boolean oldCanFinish = o2.canFinish();
+      if (newCanFinish == true && oldCanFinish == false) {
+        return -1;
+      } else if (newCanFinish == false && oldCanFinish == true) {
+        return 1;
+      }
+
+      if (o1.getDagStartTime() < o2.getDagStartTime()) {
+        return -1;
+      } else if (o1.getDagStartTime() > o2.getDagStartTime()) {
+        return 1;
+      }
+
+      if (o1.getFirstAttemptStartTime() < o2.getFirstAttemptStartTime()) {
+        return -1;
+      } else if (o1.getFirstAttemptStartTime() > o2.getFirstAttemptStartTime()) {
+        return 1;
+      }
+
+      if (o1.getVertexParallelism() < o2.getVertexParallelism()) {
+        return -1;
+      } else if (o1.getVertexParallelism() > o2.getVertexParallelism()) {
+        return 1;
+      }
+
+      return 0;
+    }
+  }
+
   @VisibleForTesting
   public static class PreemptionQueueComparator implements Comparator<TaskWrapper> {
 

http://git-wip-us.apache.org/repos/asf/hive/blob/ddbf944f/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index 1a125cb..52f21d9 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -478,8 +478,8 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
     return request.getFragmentRuntimeInfo().getFirstAttemptStartTime();
   }
 
-  public long getCurrentAttemptStartTime() {
-    return request.getFragmentRuntimeInfo().getCurrentAttemptStartTime();
+  public long getDagStartTime() {
+    return request.getFragmentRuntimeInfo().getDagStartTime();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/ddbf944f/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
index 6b6fac0..dd5b457 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
@@ -113,7 +113,7 @@ public class TestTaskExecutorService {
             .build())
         .build();
   }
-  
+
   @Test
   public void testWaitQueueComparator() throws InterruptedException {
     TaskWrapper r1 = createTaskWrapper(createRequest(1, 2, 100), false, 100000);
@@ -122,7 +122,7 @@ public class TestTaskExecutorService {
     TaskWrapper r4 = createTaskWrapper(createRequest(4, 8, 400), false, 1000000);
     TaskWrapper r5 = createTaskWrapper(createRequest(5, 10, 500), false, 1000000);
     EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
-        new TaskExecutorService.WaitQueueComparator(), 4);
+        new TaskExecutorService.ShortestJobFirstComparator(), 4);
     assertNull(queue.offer(r1));
     assertEquals(r1, queue.peek());
     assertNull(queue.offer(r2));
@@ -144,7 +144,7 @@ public class TestTaskExecutorService {
     r4 = createTaskWrapper(createRequest(4, 8, 400), true, 1000000);
     r5 = createTaskWrapper(createRequest(5, 10, 500), true, 1000000);
     queue = new EvictingPriorityBlockingQueue(
-        new TaskExecutorService.WaitQueueComparator(), 4);
+        new TaskExecutorService.ShortestJobFirstComparator(), 4);
     assertNull(queue.offer(r1));
     assertEquals(r1, queue.peek());
     assertNull(queue.offer(r2));
@@ -166,7 +166,7 @@ public class TestTaskExecutorService {
     r4 = createTaskWrapper(createRequest(4, 1, 400), false, 1000000);
     r5 = createTaskWrapper(createRequest(5, 10, 500), true, 1000000);
     queue = new EvictingPriorityBlockingQueue(
-        new TaskExecutorService.WaitQueueComparator(), 4);
+        new TaskExecutorService.ShortestJobFirstComparator(), 4);
     assertNull(queue.offer(r1));
     assertEquals(r1, queue.peek());
     assertNull(queue.offer(r2));
@@ -175,7 +175,7 @@ public class TestTaskExecutorService {
     assertEquals(r1, queue.peek());
     assertNull(queue.offer(r4));
     assertEquals(r1, queue.peek());
-    // offer accepted and r2 gets evicted
+    // offer accepted and r4 gets evicted
     assertEquals(r4, queue.offer(r5));
     assertEquals(r1, queue.take());
     assertEquals(r3, queue.take());
@@ -188,7 +188,7 @@ public class TestTaskExecutorService {
     r4 = createTaskWrapper(createRequest(4, 8, 400), false, 1000000);
     r5 = createTaskWrapper(createRequest(5, 10, 500), true, 1000000);
     queue = new EvictingPriorityBlockingQueue(
-        new TaskExecutorService.WaitQueueComparator(), 4);
+        new TaskExecutorService.ShortestJobFirstComparator(), 4);
     assertNull(queue.offer(r1));
     assertEquals(r1, queue.peek());
     assertNull(queue.offer(r2));
@@ -210,7 +210,7 @@ public class TestTaskExecutorService {
     r4 = createTaskWrapper(createRequest(4, 8, 400), false, 1000000);
     r5 = createTaskWrapper(createRequest(5, 10, 500), true, 1000000);
     queue = new EvictingPriorityBlockingQueue(
-        new TaskExecutorService.WaitQueueComparator(), 4);
+        new TaskExecutorService.ShortestJobFirstComparator(), 4);
     assertNull(queue.offer(r1));
     assertEquals(r1, queue.peek());
     assertNull(queue.offer(r2));
@@ -232,7 +232,7 @@ public class TestTaskExecutorService {
     r4 = createTaskWrapper(createRequest(4, 8, 400), true, 1000000);
     r5 = createTaskWrapper(createRequest(5, 10, 500), true, 1000000);
     queue = new EvictingPriorityBlockingQueue(
-        new TaskExecutorService.WaitQueueComparator(), 4);
+        new TaskExecutorService.ShortestJobFirstComparator(), 4);
     assertNull(queue.offer(r1));
     assertEquals(r1, queue.peek());
     assertNull(queue.offer(r2));

http://git-wip-us.apache.org/repos/asf/hive/blob/ddbf944f/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService2.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService2.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService2.java
new file mode 100644
index 0000000..ad2a15b
--- /dev/null
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService2.java
@@ -0,0 +1,278 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.daemon.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler;
+import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler;
+import org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorService.TaskWrapper;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.EntityDescriptorProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
+import org.apache.tez.runtime.task.EndReason;
+import org.apache.tez.runtime.task.TaskRunner2Result;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestTaskExecutorService2 {
+  private static Configuration conf;
+  private static Credentials cred = new Credentials();
+
+  private static class MockRequest extends TaskRunnerCallable {
+    private int workTime;
+    private boolean canFinish;
+
+    public MockRequest(SubmitWorkRequestProto requestProto,
+        boolean canFinish, int workTime) {
+      super(requestProto, mock(QueryFragmentInfo.class), conf,
+          new ExecutionContextImpl("localhost"), null, cred, 0, null, null, null,
+          mock(KilledTaskHandler.class), mock(
+          FragmentCompletionHandler.class));
+      this.workTime = workTime;
+      this.canFinish = canFinish;
+    }
+
+    @Override
+    protected TaskRunner2Result callInternal() {
+      System.out.println(super.getRequestId() + " is executing..");
+      try {
+        Thread.sleep(workTime);
+      } catch (InterruptedException e) {
+        return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, false);
+      }
+      return new TaskRunner2Result(EndReason.SUCCESS, null, false);
+    }
+
+    @Override
+    public boolean canFinish() {
+      return canFinish;
+    }
+  }
+
+  @Before
+  public void setup() {
+    conf = new Configuration();
+  }
+
+  private SubmitWorkRequestProto createRequest(int fragmentNumber, int parallelism, int dagStartTime,
+      int attemptStartTime) {
+    ApplicationId appId = ApplicationId.newInstance(9999, 72);
+    TezDAGID dagId = TezDAGID.getInstance(appId, 1);
+    TezVertexID vId = TezVertexID.getInstance(dagId, 35);
+    TezTaskID tId = TezTaskID.getInstance(vId, 389);
+    TezTaskAttemptID taId = TezTaskAttemptID.getInstance(tId, fragmentNumber);
+    return SubmitWorkRequestProto
+        .newBuilder()
+        .setFragmentSpec(
+            FragmentSpecProto
+                .newBuilder()
+                .setAttemptNumber(0)
+                .setDagName("MockDag")
+                .setFragmentNumber(fragmentNumber)
+                .setVertexName("MockVertex")
+                .setVertexParallelism(parallelism)
+                .setProcessorDescriptor(
+                    EntityDescriptorProto.newBuilder().setClassName("MockProcessor").build())
+                .setFragmentIdentifierString(taId.toString()).build()).setAmHost("localhost")
+        .setAmPort(12345).setAppAttemptNumber(0).setApplicationIdString("MockApp_1")
+        .setContainerIdString("MockContainer_1").setUser("MockUser")
+        .setTokenIdentifier("MockToken_1")
+        .setFragmentRuntimeInfo(LlapDaemonProtocolProtos
+            .FragmentRuntimeInfo
+            .newBuilder()
+            .setDagStartTime(dagStartTime)
+            .setFirstAttemptStartTime(attemptStartTime)
+            .build())
+        .build();
+  }
+
+  @Test
+  public void testWaitQueueComparator() throws InterruptedException {
+    TaskWrapper r1 = createTaskWrapper(createRequest(1, 2, 5, 100), false, 100000);
+    TaskWrapper r2 = createTaskWrapper(createRequest(2, 4, 4, 200), false, 100000);
+    TaskWrapper r3 = createTaskWrapper(createRequest(3, 6, 3, 300), false, 1000000);
+    TaskWrapper r4 = createTaskWrapper(createRequest(4, 8, 2, 400), false, 1000000);
+    TaskWrapper r5 = createTaskWrapper(createRequest(5, 10, 1, 500), false, 1000000);
+    EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
+        new TaskExecutorService.FirstInFirstOutComparator(), 4);
+    assertNull(queue.offer(r1));
+    assertEquals(r1, queue.peek());
+    assertNull(queue.offer(r2));
+    assertEquals(r2, queue.peek());
+    assertNull(queue.offer(r3));
+    assertEquals(r3, queue.peek());
+    assertNull(queue.offer(r4));
+    assertEquals(r4, queue.peek());
+    // this offer will be accepted and r1 evicted
+    assertEquals(r1, queue.offer(r5));
+    assertEquals(r5, queue.take());
+    assertEquals(r4, queue.take());
+    assertEquals(r3, queue.take());
+    assertEquals(r2, queue.take());
+
+    r1 = createTaskWrapper(createRequest(1, 2, 5, 100), true, 100000);
+    r2 = createTaskWrapper(createRequest(2, 4, 4, 200), true, 100000);
+    r3 = createTaskWrapper(createRequest(3, 6, 3, 300), true, 1000000);
+    r4 = createTaskWrapper(createRequest(4, 8, 2, 400), true, 1000000);
+    r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000);
+    queue = new EvictingPriorityBlockingQueue(
+        new TaskExecutorService.FirstInFirstOutComparator(), 4);
+    assertNull(queue.offer(r1));
+    assertEquals(r1, queue.peek());
+    assertNull(queue.offer(r2));
+    assertEquals(r2, queue.peek());
+    assertNull(queue.offer(r3));
+    assertEquals(r3, queue.peek());
+    assertNull(queue.offer(r4));
+    assertEquals(r4, queue.peek());
+    // this offer will be accpeted and r1 evicted
+    assertEquals(r1, queue.offer(r5));
+    assertEquals(r5, queue.take());
+    assertEquals(r4, queue.take());
+    assertEquals(r3, queue.take());
+    assertEquals(r2, queue.take());
+
+    r1 = createTaskWrapper(createRequest(1, 1, 5, 100), true, 100000);
+    r2 = createTaskWrapper(createRequest(2, 1, 4, 200), false, 100000);
+    r3 = createTaskWrapper(createRequest(3, 1, 3, 300), true, 1000000);
+    r4 = createTaskWrapper(createRequest(4, 1, 2, 400), false, 1000000);
+    r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000);
+    queue = new EvictingPriorityBlockingQueue(
+        new TaskExecutorService.FirstInFirstOutComparator(), 4);
+    assertNull(queue.offer(r1));
+    assertEquals(r1, queue.peek());
+    assertNull(queue.offer(r2));
+    assertEquals(r1, queue.peek());
+    assertNull(queue.offer(r3));
+    assertEquals(r3, queue.peek());
+    assertNull(queue.offer(r4));
+    assertEquals(r3, queue.peek());
+    // offer accepted and r2 gets evicted
+    assertEquals(r2, queue.offer(r5));
+    assertEquals(r5, queue.take());
+    assertEquals(r3, queue.take());
+    assertEquals(r1, queue.take());
+    assertEquals(r4, queue.take());
+
+    r1 = createTaskWrapper(createRequest(1, 2, 5, 100), true, 100000);
+    r2 = createTaskWrapper(createRequest(2, 4, 4, 200), false, 100000);
+    r3 = createTaskWrapper(createRequest(3, 6, 3, 300), true, 1000000);
+    r4 = createTaskWrapper(createRequest(4, 8, 2, 400), false, 1000000);
+    r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000);
+    queue = new EvictingPriorityBlockingQueue(
+        new TaskExecutorService.FirstInFirstOutComparator(), 4);
+    assertNull(queue.offer(r1));
+    assertEquals(r1, queue.peek());
+    assertNull(queue.offer(r2));
+    assertEquals(r1, queue.peek());
+    assertNull(queue.offer(r3));
+    assertEquals(r3, queue.peek());
+    assertNull(queue.offer(r4));
+    assertEquals(r3, queue.peek());
+    // offer accepted and r2 gets evicted
+    assertEquals(r2, queue.offer(r5));
+    assertEquals(r5, queue.take());
+    assertEquals(r3, queue.take());
+    assertEquals(r1, queue.take());
+    assertEquals(r4, queue.take());
+
+    r1 = createTaskWrapper(createRequest(1, 2, 5, 100), true, 100000);
+    r2 = createTaskWrapper(createRequest(2, 4, 4, 200), false, 100000);
+    r3 = createTaskWrapper(createRequest(3, 6, 3, 300), false, 1000000);
+    r4 = createTaskWrapper(createRequest(4, 8, 2, 400), false, 1000000);
+    r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000);
+    queue = new EvictingPriorityBlockingQueue(
+        new TaskExecutorService.FirstInFirstOutComparator(), 4);
+    assertNull(queue.offer(r1));
+    assertEquals(r1, queue.peek());
+    assertNull(queue.offer(r2));
+    assertEquals(r1, queue.peek());
+    assertNull(queue.offer(r3));
+    assertEquals(r1, queue.peek());
+    assertNull(queue.offer(r4));
+    assertEquals(r1, queue.peek());
+    // offer accepted and r2 gets evicted
+    assertEquals(r2, queue.offer(r5));
+    assertEquals(r5, queue.take());
+    assertEquals(r1, queue.take());
+    assertEquals(r4, queue.take());
+    assertEquals(r3, queue.take());
+
+    r1 = createTaskWrapper(createRequest(1, 2, 5, 100), false, 100000);
+    r2 = createTaskWrapper(createRequest(2, 4, 4, 200), true, 100000);
+    r3 = createTaskWrapper(createRequest(3, 6, 3, 300), true, 1000000);
+    r4 = createTaskWrapper(createRequest(4, 8, 2, 400), true, 1000000);
+    r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000);
+    queue = new EvictingPriorityBlockingQueue(
+        new TaskExecutorService.FirstInFirstOutComparator(), 4);
+    assertNull(queue.offer(r1));
+    assertEquals(r1, queue.peek());
+    assertNull(queue.offer(r2));
+    assertEquals(r2, queue.peek());
+    assertNull(queue.offer(r3));
+    assertEquals(r3, queue.peek());
+    assertNull(queue.offer(r4));
+    assertEquals(r4, queue.peek());
+    // offer accepted, r1 evicted
+    assertEquals(r1, queue.offer(r5));
+    assertEquals(r5, queue.take());
+    assertEquals(r4, queue.take());
+    assertEquals(r3, queue.take());
+    assertEquals(r2, queue.take());
+
+    r1 = createTaskWrapper(createRequest(1, 2, 5, 100), false, 100000);
+    r2 = createTaskWrapper(createRequest(2, 4, 4, 200), true, 100000);
+    r3 = createTaskWrapper(createRequest(3, 6, 3, 300), true, 1000000);
+    r4 = createTaskWrapper(createRequest(4, 8, 2, 400), true, 1000000);
+    r5 = createTaskWrapper(createRequest(5, 10, 2, 500), true, 1000000);
+    queue = new EvictingPriorityBlockingQueue(
+        new TaskExecutorService.FirstInFirstOutComparator(), 4);
+    assertNull(queue.offer(r1));
+    assertEquals(r1, queue.peek());
+    assertNull(queue.offer(r2));
+    assertEquals(r2, queue.peek());
+    assertNull(queue.offer(r3));
+    assertEquals(r3, queue.peek());
+    assertNull(queue.offer(r4));
+    assertEquals(r4, queue.peek());
+    // offer accepted, r1 evicted
+    assertEquals(r1, queue.offer(r5));
+    assertEquals(r4, queue.take());
+    assertEquals(r5, queue.take());
+    assertEquals(r3, queue.take());
+    assertEquals(r2, queue.take());
+  }
+
+  private TaskWrapper createTaskWrapper(SubmitWorkRequestProto request, boolean canFinish, int workTime) {
+    MockRequest mockRequest = new MockRequest(request, canFinish, workTime);
+    TaskWrapper taskWrapper = new TaskWrapper(mockRequest, null);
+    return taskWrapper;
+  }
+}