You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2015/07/02 22:02:46 UTC
hive git commit: HIVE-11173: Fair ordering of fragments in wait queue
(Prasanth Jayachandran)
Repository: hive
Updated Branches:
refs/heads/llap 44e550b63 -> ddbf944fe
HIVE-11173: Fair ordering of fragments in wait queue (Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ddbf944f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ddbf944f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ddbf944f
Branch: refs/heads/llap
Commit: ddbf944fe31d5b1ff863628d4de46ce2c4565df7
Parents: 44e550b
Author: Prasanth Jayachandran <j....@gmail.com>
Authored: Thu Jul 2 13:02:33 2015 -0700
Committer: Prasanth Jayachandran <j....@gmail.com>
Committed: Thu Jul 2 13:02:33 2015 -0700
----------------------------------------------------------------------
.../llap/configuration/LlapConfiguration.java | 4 +
.../llap/daemon/impl/ContainerRunnerImpl.java | 10 +-
.../llap/daemon/impl/TaskExecutorService.java | 57 +++-
.../llap/daemon/impl/TaskRunnerCallable.java | 4 +-
.../daemon/impl/TestTaskExecutorService.java | 16 +-
.../daemon/impl/TestTaskExecutorService2.java | 278 +++++++++++++++++++
6 files changed, 353 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/ddbf944f/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java b/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java
index f5aa2a6..b6633b8 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java
@@ -140,6 +140,10 @@ public class LlapConfiguration extends Configuration {
LLAP_DAEMON_PREFIX + "task.scheduler.wait.queue.size";
public static final int LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE_DEFAULT = 10;
+ public static final String LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_FAIR_ORDERING =
+ LLAP_DAEMON_PREFIX + "task.scheduler.wait.queue.fair.ordering";
+ public static final boolean LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_FAIR_ORDERING_DEFAULT = false;
+
public static final String LLAP_DAEMON_TASK_SCHEDULER_ENABLE_PREEMPTION =
LLAP_DAEMON_PREFIX + "task.scheduler.enable.preemption";
public static final boolean LLAP_DAEMON_TASK_SCHEDULER_ENABLE_PREEMPTION_DEFAULT = true;
http://git-wip-us.apache.org/repos/asf/hive/blob/ddbf944f/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index e26852a..cba057c 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -26,6 +26,7 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler;
import org.apache.hadoop.hive.llap.daemon.HistoryLogger;
@@ -54,11 +55,11 @@ import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
-
-import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
+
// TODO Convert this to a CompositeService
public class ContainerRunnerImpl extends CompositeService implements ContainerRunner, FragmentCompletionHandler, QueryFailedHandler {
@@ -93,7 +94,10 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
this.queryTracker = new QueryTracker(conf, localDirsBase);
addIfService(queryTracker);
- this.executorService = new TaskExecutorService(numExecutors, waitQueueSize, enablePreemption);
+ boolean useFairOrdering = conf.getBoolean(LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_FAIR_ORDERING,
+ LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_FAIR_ORDERING_DEFAULT);
+ this.executorService = new TaskExecutorService(numExecutors, waitQueueSize, useFairOrdering,
+ enablePreemption);
AuxiliaryServiceHelper.setServiceDataIntoEnv(
TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID,
ByteBuffer.allocate(4).putInt(localShufflePort), localEnv);
http://git-wip-us.apache.org/repos/asf/hive/blob/ddbf944f/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
index 0fd89de..f083a48 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
@@ -100,9 +100,16 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
private final Object lock = new Object();
- public TaskExecutorService(int numExecutors, int waitQueueSize, boolean enablePreemption) {
+ public TaskExecutorService(int numExecutors, int waitQueueSize, boolean useFairOrdering,
+ boolean enablePreemption) {
super(TaskExecutorService.class.getSimpleName());
- this.waitQueue = new EvictingPriorityBlockingQueue<>(new WaitQueueComparator(), waitQueueSize);
+ final Comparator<TaskWrapper> waitQueueComparator;
+ if (useFairOrdering) {
+ waitQueueComparator = new FirstInFirstOutComparator();
+ } else {
+ waitQueueComparator = new ShortestJobFirstComparator();
+ }
+ this.waitQueue = new EvictingPriorityBlockingQueue<>(waitQueueComparator, waitQueueSize);
this.threadPoolExecutor = new ThreadPoolExecutor(numExecutors, // core pool size
numExecutors, // max pool size
1, TimeUnit.MINUTES,
@@ -540,8 +547,10 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
}
}
+ // if map tasks and reduce tasks are in finishable state then priority is given to the task
+ // that has less number of pending tasks (shortest job)
@VisibleForTesting
- public static class WaitQueueComparator implements Comparator<TaskWrapper> {
+ public static class ShortestJobFirstComparator implements Comparator<TaskWrapper> {
@Override
public int compare(TaskWrapper t1, TaskWrapper t2) {
@@ -570,6 +579,48 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
}
}
+ // if map tasks and reduce tasks are in finishable state then priority is given to the task in
+ // the following order
+ // 1) Dag start time
+ // 2) Attempt start time
+ // 3) Vertex parallelism
+ @VisibleForTesting
+ public static class FirstInFirstOutComparator implements Comparator<TaskWrapper> {
+
+ @Override
+ public int compare(TaskWrapper t1, TaskWrapper t2) {
+ TaskRunnerCallable o1 = t1.getTaskRunnerCallable();
+ TaskRunnerCallable o2 = t2.getTaskRunnerCallable();
+ boolean newCanFinish = o1.canFinish();
+ boolean oldCanFinish = o2.canFinish();
+ if (newCanFinish == true && oldCanFinish == false) {
+ return -1;
+ } else if (newCanFinish == false && oldCanFinish == true) {
+ return 1;
+ }
+
+ if (o1.getDagStartTime() < o2.getDagStartTime()) {
+ return -1;
+ } else if (o1.getDagStartTime() > o2.getDagStartTime()) {
+ return 1;
+ }
+
+ if (o1.getFirstAttemptStartTime() < o2.getFirstAttemptStartTime()) {
+ return -1;
+ } else if (o1.getFirstAttemptStartTime() > o2.getFirstAttemptStartTime()) {
+ return 1;
+ }
+
+ if (o1.getVertexParallelism() < o2.getVertexParallelism()) {
+ return -1;
+ } else if (o1.getVertexParallelism() > o2.getVertexParallelism()) {
+ return 1;
+ }
+
+ return 0;
+ }
+ }
+
@VisibleForTesting
public static class PreemptionQueueComparator implements Comparator<TaskWrapper> {
http://git-wip-us.apache.org/repos/asf/hive/blob/ddbf944f/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index 1a125cb..52f21d9 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -478,8 +478,8 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
return request.getFragmentRuntimeInfo().getFirstAttemptStartTime();
}
- public long getCurrentAttemptStartTime() {
- return request.getFragmentRuntimeInfo().getCurrentAttemptStartTime();
+ public long getDagStartTime() {
+ return request.getFragmentRuntimeInfo().getDagStartTime();
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ddbf944f/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
index 6b6fac0..dd5b457 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
@@ -113,7 +113,7 @@ public class TestTaskExecutorService {
.build())
.build();
}
-
+
@Test
public void testWaitQueueComparator() throws InterruptedException {
TaskWrapper r1 = createTaskWrapper(createRequest(1, 2, 100), false, 100000);
@@ -122,7 +122,7 @@ public class TestTaskExecutorService {
TaskWrapper r4 = createTaskWrapper(createRequest(4, 8, 400), false, 1000000);
TaskWrapper r5 = createTaskWrapper(createRequest(5, 10, 500), false, 1000000);
EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
- new TaskExecutorService.WaitQueueComparator(), 4);
+ new TaskExecutorService.ShortestJobFirstComparator(), 4);
assertNull(queue.offer(r1));
assertEquals(r1, queue.peek());
assertNull(queue.offer(r2));
@@ -144,7 +144,7 @@ public class TestTaskExecutorService {
r4 = createTaskWrapper(createRequest(4, 8, 400), true, 1000000);
r5 = createTaskWrapper(createRequest(5, 10, 500), true, 1000000);
queue = new EvictingPriorityBlockingQueue(
- new TaskExecutorService.WaitQueueComparator(), 4);
+ new TaskExecutorService.ShortestJobFirstComparator(), 4);
assertNull(queue.offer(r1));
assertEquals(r1, queue.peek());
assertNull(queue.offer(r2));
@@ -166,7 +166,7 @@ public class TestTaskExecutorService {
r4 = createTaskWrapper(createRequest(4, 1, 400), false, 1000000);
r5 = createTaskWrapper(createRequest(5, 10, 500), true, 1000000);
queue = new EvictingPriorityBlockingQueue(
- new TaskExecutorService.WaitQueueComparator(), 4);
+ new TaskExecutorService.ShortestJobFirstComparator(), 4);
assertNull(queue.offer(r1));
assertEquals(r1, queue.peek());
assertNull(queue.offer(r2));
@@ -175,7 +175,7 @@ public class TestTaskExecutorService {
assertEquals(r1, queue.peek());
assertNull(queue.offer(r4));
assertEquals(r1, queue.peek());
- // offer accepted and r2 gets evicted
+ // offer accepted and r4 gets evicted
assertEquals(r4, queue.offer(r5));
assertEquals(r1, queue.take());
assertEquals(r3, queue.take());
@@ -188,7 +188,7 @@ public class TestTaskExecutorService {
r4 = createTaskWrapper(createRequest(4, 8, 400), false, 1000000);
r5 = createTaskWrapper(createRequest(5, 10, 500), true, 1000000);
queue = new EvictingPriorityBlockingQueue(
- new TaskExecutorService.WaitQueueComparator(), 4);
+ new TaskExecutorService.ShortestJobFirstComparator(), 4);
assertNull(queue.offer(r1));
assertEquals(r1, queue.peek());
assertNull(queue.offer(r2));
@@ -210,7 +210,7 @@ public class TestTaskExecutorService {
r4 = createTaskWrapper(createRequest(4, 8, 400), false, 1000000);
r5 = createTaskWrapper(createRequest(5, 10, 500), true, 1000000);
queue = new EvictingPriorityBlockingQueue(
- new TaskExecutorService.WaitQueueComparator(), 4);
+ new TaskExecutorService.ShortestJobFirstComparator(), 4);
assertNull(queue.offer(r1));
assertEquals(r1, queue.peek());
assertNull(queue.offer(r2));
@@ -232,7 +232,7 @@ public class TestTaskExecutorService {
r4 = createTaskWrapper(createRequest(4, 8, 400), true, 1000000);
r5 = createTaskWrapper(createRequest(5, 10, 500), true, 1000000);
queue = new EvictingPriorityBlockingQueue(
- new TaskExecutorService.WaitQueueComparator(), 4);
+ new TaskExecutorService.ShortestJobFirstComparator(), 4);
assertNull(queue.offer(r1));
assertEquals(r1, queue.peek());
assertNull(queue.offer(r2));
http://git-wip-us.apache.org/repos/asf/hive/blob/ddbf944f/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService2.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService2.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService2.java
new file mode 100644
index 0000000..ad2a15b
--- /dev/null
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService2.java
@@ -0,0 +1,278 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.daemon.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler;
+import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler;
+import org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorService.TaskWrapper;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.EntityDescriptorProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
+import org.apache.tez.runtime.task.EndReason;
+import org.apache.tez.runtime.task.TaskRunner2Result;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestTaskExecutorService2 {
+ private static Configuration conf;
+ private static Credentials cred = new Credentials();
+
+ private static class MockRequest extends TaskRunnerCallable {
+ private int workTime;
+ private boolean canFinish;
+
+ public MockRequest(SubmitWorkRequestProto requestProto,
+ boolean canFinish, int workTime) {
+ super(requestProto, mock(QueryFragmentInfo.class), conf,
+ new ExecutionContextImpl("localhost"), null, cred, 0, null, null, null,
+ mock(KilledTaskHandler.class), mock(
+ FragmentCompletionHandler.class));
+ this.workTime = workTime;
+ this.canFinish = canFinish;
+ }
+
+ @Override
+ protected TaskRunner2Result callInternal() {
+ System.out.println(super.getRequestId() + " is executing..");
+ try {
+ Thread.sleep(workTime);
+ } catch (InterruptedException e) {
+ return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, false);
+ }
+ return new TaskRunner2Result(EndReason.SUCCESS, null, false);
+ }
+
+ @Override
+ public boolean canFinish() {
+ return canFinish;
+ }
+ }
+
+ @Before
+ public void setup() {
+ conf = new Configuration();
+ }
+
+ private SubmitWorkRequestProto createRequest(int fragmentNumber, int parallelism, int dagStartTime,
+ int attemptStartTime) {
+ ApplicationId appId = ApplicationId.newInstance(9999, 72);
+ TezDAGID dagId = TezDAGID.getInstance(appId, 1);
+ TezVertexID vId = TezVertexID.getInstance(dagId, 35);
+ TezTaskID tId = TezTaskID.getInstance(vId, 389);
+ TezTaskAttemptID taId = TezTaskAttemptID.getInstance(tId, fragmentNumber);
+ return SubmitWorkRequestProto
+ .newBuilder()
+ .setFragmentSpec(
+ FragmentSpecProto
+ .newBuilder()
+ .setAttemptNumber(0)
+ .setDagName("MockDag")
+ .setFragmentNumber(fragmentNumber)
+ .setVertexName("MockVertex")
+ .setVertexParallelism(parallelism)
+ .setProcessorDescriptor(
+ EntityDescriptorProto.newBuilder().setClassName("MockProcessor").build())
+ .setFragmentIdentifierString(taId.toString()).build()).setAmHost("localhost")
+ .setAmPort(12345).setAppAttemptNumber(0).setApplicationIdString("MockApp_1")
+ .setContainerIdString("MockContainer_1").setUser("MockUser")
+ .setTokenIdentifier("MockToken_1")
+ .setFragmentRuntimeInfo(LlapDaemonProtocolProtos
+ .FragmentRuntimeInfo
+ .newBuilder()
+ .setDagStartTime(dagStartTime)
+ .setFirstAttemptStartTime(attemptStartTime)
+ .build())
+ .build();
+ }
+
+ @Test
+ public void testWaitQueueComparator() throws InterruptedException {
+ TaskWrapper r1 = createTaskWrapper(createRequest(1, 2, 5, 100), false, 100000);
+ TaskWrapper r2 = createTaskWrapper(createRequest(2, 4, 4, 200), false, 100000);
+ TaskWrapper r3 = createTaskWrapper(createRequest(3, 6, 3, 300), false, 1000000);
+ TaskWrapper r4 = createTaskWrapper(createRequest(4, 8, 2, 400), false, 1000000);
+ TaskWrapper r5 = createTaskWrapper(createRequest(5, 10, 1, 500), false, 1000000);
+ EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
+ new TaskExecutorService.FirstInFirstOutComparator(), 4);
+ assertNull(queue.offer(r1));
+ assertEquals(r1, queue.peek());
+ assertNull(queue.offer(r2));
+ assertEquals(r2, queue.peek());
+ assertNull(queue.offer(r3));
+ assertEquals(r3, queue.peek());
+ assertNull(queue.offer(r4));
+ assertEquals(r4, queue.peek());
+ // this offer will be accepted and r1 evicted
+ assertEquals(r1, queue.offer(r5));
+ assertEquals(r5, queue.take());
+ assertEquals(r4, queue.take());
+ assertEquals(r3, queue.take());
+ assertEquals(r2, queue.take());
+
+ r1 = createTaskWrapper(createRequest(1, 2, 5, 100), true, 100000);
+ r2 = createTaskWrapper(createRequest(2, 4, 4, 200), true, 100000);
+ r3 = createTaskWrapper(createRequest(3, 6, 3, 300), true, 1000000);
+ r4 = createTaskWrapper(createRequest(4, 8, 2, 400), true, 1000000);
+ r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000);
+ queue = new EvictingPriorityBlockingQueue(
+ new TaskExecutorService.FirstInFirstOutComparator(), 4);
+ assertNull(queue.offer(r1));
+ assertEquals(r1, queue.peek());
+ assertNull(queue.offer(r2));
+ assertEquals(r2, queue.peek());
+ assertNull(queue.offer(r3));
+ assertEquals(r3, queue.peek());
+ assertNull(queue.offer(r4));
+ assertEquals(r4, queue.peek());
+ // this offer will be accpeted and r1 evicted
+ assertEquals(r1, queue.offer(r5));
+ assertEquals(r5, queue.take());
+ assertEquals(r4, queue.take());
+ assertEquals(r3, queue.take());
+ assertEquals(r2, queue.take());
+
+ r1 = createTaskWrapper(createRequest(1, 1, 5, 100), true, 100000);
+ r2 = createTaskWrapper(createRequest(2, 1, 4, 200), false, 100000);
+ r3 = createTaskWrapper(createRequest(3, 1, 3, 300), true, 1000000);
+ r4 = createTaskWrapper(createRequest(4, 1, 2, 400), false, 1000000);
+ r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000);
+ queue = new EvictingPriorityBlockingQueue(
+ new TaskExecutorService.FirstInFirstOutComparator(), 4);
+ assertNull(queue.offer(r1));
+ assertEquals(r1, queue.peek());
+ assertNull(queue.offer(r2));
+ assertEquals(r1, queue.peek());
+ assertNull(queue.offer(r3));
+ assertEquals(r3, queue.peek());
+ assertNull(queue.offer(r4));
+ assertEquals(r3, queue.peek());
+ // offer accepted and r2 gets evicted
+ assertEquals(r2, queue.offer(r5));
+ assertEquals(r5, queue.take());
+ assertEquals(r3, queue.take());
+ assertEquals(r1, queue.take());
+ assertEquals(r4, queue.take());
+
+ r1 = createTaskWrapper(createRequest(1, 2, 5, 100), true, 100000);
+ r2 = createTaskWrapper(createRequest(2, 4, 4, 200), false, 100000);
+ r3 = createTaskWrapper(createRequest(3, 6, 3, 300), true, 1000000);
+ r4 = createTaskWrapper(createRequest(4, 8, 2, 400), false, 1000000);
+ r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000);
+ queue = new EvictingPriorityBlockingQueue(
+ new TaskExecutorService.FirstInFirstOutComparator(), 4);
+ assertNull(queue.offer(r1));
+ assertEquals(r1, queue.peek());
+ assertNull(queue.offer(r2));
+ assertEquals(r1, queue.peek());
+ assertNull(queue.offer(r3));
+ assertEquals(r3, queue.peek());
+ assertNull(queue.offer(r4));
+ assertEquals(r3, queue.peek());
+ // offer accepted and r2 gets evicted
+ assertEquals(r2, queue.offer(r5));
+ assertEquals(r5, queue.take());
+ assertEquals(r3, queue.take());
+ assertEquals(r1, queue.take());
+ assertEquals(r4, queue.take());
+
+ r1 = createTaskWrapper(createRequest(1, 2, 5, 100), true, 100000);
+ r2 = createTaskWrapper(createRequest(2, 4, 4, 200), false, 100000);
+ r3 = createTaskWrapper(createRequest(3, 6, 3, 300), false, 1000000);
+ r4 = createTaskWrapper(createRequest(4, 8, 2, 400), false, 1000000);
+ r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000);
+ queue = new EvictingPriorityBlockingQueue(
+ new TaskExecutorService.FirstInFirstOutComparator(), 4);
+ assertNull(queue.offer(r1));
+ assertEquals(r1, queue.peek());
+ assertNull(queue.offer(r2));
+ assertEquals(r1, queue.peek());
+ assertNull(queue.offer(r3));
+ assertEquals(r1, queue.peek());
+ assertNull(queue.offer(r4));
+ assertEquals(r1, queue.peek());
+ // offer accepted and r2 gets evicted
+ assertEquals(r2, queue.offer(r5));
+ assertEquals(r5, queue.take());
+ assertEquals(r1, queue.take());
+ assertEquals(r4, queue.take());
+ assertEquals(r3, queue.take());
+
+ r1 = createTaskWrapper(createRequest(1, 2, 5, 100), false, 100000);
+ r2 = createTaskWrapper(createRequest(2, 4, 4, 200), true, 100000);
+ r3 = createTaskWrapper(createRequest(3, 6, 3, 300), true, 1000000);
+ r4 = createTaskWrapper(createRequest(4, 8, 2, 400), true, 1000000);
+ r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000);
+ queue = new EvictingPriorityBlockingQueue(
+ new TaskExecutorService.FirstInFirstOutComparator(), 4);
+ assertNull(queue.offer(r1));
+ assertEquals(r1, queue.peek());
+ assertNull(queue.offer(r2));
+ assertEquals(r2, queue.peek());
+ assertNull(queue.offer(r3));
+ assertEquals(r3, queue.peek());
+ assertNull(queue.offer(r4));
+ assertEquals(r4, queue.peek());
+ // offer accepted, r1 evicted
+ assertEquals(r1, queue.offer(r5));
+ assertEquals(r5, queue.take());
+ assertEquals(r4, queue.take());
+ assertEquals(r3, queue.take());
+ assertEquals(r2, queue.take());
+
+ r1 = createTaskWrapper(createRequest(1, 2, 5, 100), false, 100000);
+ r2 = createTaskWrapper(createRequest(2, 4, 4, 200), true, 100000);
+ r3 = createTaskWrapper(createRequest(3, 6, 3, 300), true, 1000000);
+ r4 = createTaskWrapper(createRequest(4, 8, 2, 400), true, 1000000);
+ r5 = createTaskWrapper(createRequest(5, 10, 2, 500), true, 1000000);
+ queue = new EvictingPriorityBlockingQueue(
+ new TaskExecutorService.FirstInFirstOutComparator(), 4);
+ assertNull(queue.offer(r1));
+ assertEquals(r1, queue.peek());
+ assertNull(queue.offer(r2));
+ assertEquals(r2, queue.peek());
+ assertNull(queue.offer(r3));
+ assertEquals(r3, queue.peek());
+ assertNull(queue.offer(r4));
+ assertEquals(r4, queue.peek());
+ // offer accepted, r1 evicted
+ assertEquals(r1, queue.offer(r5));
+ assertEquals(r4, queue.take());
+ assertEquals(r5, queue.take());
+ assertEquals(r3, queue.take());
+ assertEquals(r2, queue.take());
+ }
+
+ private TaskWrapper createTaskWrapper(SubmitWorkRequestProto request, boolean canFinish, int workTime) {
+ MockRequest mockRequest = new MockRequest(request, canFinish, workTime);
+ TaskWrapper taskWrapper = new TaskWrapper(mockRequest, null);
+ return taskWrapper;
+ }
+}