You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ss...@apache.org on 2015/08/14 03:03:03 UTC
hive git commit: HIVE-11272. LLAP: Execution order within LLAP
daemons should consider query-specific priority assigned to fragments.
(Siddharth Seth)
Repository: hive
Updated Branches:
refs/heads/llap 3bf0a45f8 -> a8ac648c8
HIVE-11272. LLAP: Execution order within LLAP daemons should consider query-specific priority assigned to fragments. (Siddharth Seth)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a8ac648c
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a8ac648c
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a8ac648c
Branch: refs/heads/llap
Commit: a8ac648c86f11f856b1a307aa0e545482618b769
Parents: 3bf0a45
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu Aug 13 18:01:52 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu Aug 13 18:01:52 2015 -0700
----------------------------------------------------------------------
.../llap/daemon/impl/TaskExecutorService.java | 94 +++++++++++++++-----
.../llap/daemon/impl/TaskRunnerCallable.java | 36 ++++----
.../daemon/impl/TestTaskExecutorService.java | 50 ++++++++++-
.../daemon/impl/TestTaskExecutorService2.java | 52 ++++++++++-
4 files changed, 187 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/a8ac648c/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
index 5099a5c..f99c05d 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
@@ -38,6 +38,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.hadoop.hive.llap.daemon.FinishableStateUpdateHandler;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto;
import org.apache.hadoop.service.AbstractService;
import org.apache.tez.runtime.task.EndReason;
@@ -380,6 +381,7 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
try {
synchronized (lock) {
boolean canFinish = taskWrapper.getTaskRunnerCallable().canFinish();
+ LOG.info("Attempting to execute {}", taskWrapper);
ListenableFuture<TaskRunner2Result> future = executorService.submit(taskWrapper.getTaskRunnerCallable());
taskWrapper.setIsInWaitQueue(false);
FutureCallback<TaskRunner2Result> wrappedCallback = createInternalCompletionListener(taskWrapper);
@@ -584,23 +586,41 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
public int compare(TaskWrapper t1, TaskWrapper t2) {
TaskRunnerCallable o1 = t1.getTaskRunnerCallable();
TaskRunnerCallable o2 = t2.getTaskRunnerCallable();
- boolean newCanFinish = o1.canFinish();
- boolean oldCanFinish = o2.canFinish();
- if (newCanFinish == true && oldCanFinish == false) {
+ boolean o1CanFinish = o1.canFinish();
+ boolean o2CanFinish = o2.canFinish();
+ if (o1CanFinish == true && o2CanFinish == false) {
return -1;
- } else if (newCanFinish == false && oldCanFinish == true) {
+ } else if (o1CanFinish == false && o2CanFinish == true) {
return 1;
}
- if (o1.getVertexParallelism() < o2.getVertexParallelism()) {
+ FragmentRuntimeInfo fri1 = o1.getFragmentRuntimeInfo();
+ FragmentRuntimeInfo fri2 = o2.getFragmentRuntimeInfo();
+
+ // Check if these belong to the same task, and work with withinDagPriority
+ if (o1.getQueryId().equals(o2.getQueryId())) {
+ // Same Query
+ // Within dag priority - lower values indicate higher priority.
+ if (fri1.getWithinDagPriority() < fri2.getWithinDagPriority()) {
+ return -1;
+ } else if (fri1.getWithinDagPriority() > fri2.getWithinDagPriority()){
+ return 1;
+ }
+ }
+
+ // Compute knownPending tasks. selfAndUpstream indicates task counts for current vertex and
+ // it's parent hierarchy. selfAndUpstreamComplete indicates how many of these have completed.
+ int knownPending1 = fri1.getNumSelfAndUpstreamTasks() - fri1.getNumSelfAndUpstreamCompletedTasks();
+ int knownPending2 = fri2.getNumSelfAndUpstreamTasks() - fri2.getNumSelfAndUpstreamCompletedTasks();
+ if (knownPending1 < knownPending2) {
return -1;
- } else if (o1.getVertexParallelism() > o2.getVertexParallelism()) {
+ } else if (knownPending1 > knownPending2) {
return 1;
}
- if (o1.getFirstAttemptStartTime() < o2.getFirstAttemptStartTime()) {
+ if (fri1.getFirstAttemptStartTime() < fri2.getFirstAttemptStartTime()) {
return -1;
- } else if (o1.getFirstAttemptStartTime() > o2.getFirstAttemptStartTime()) {
+ } else if (fri1.getFirstAttemptStartTime() > fri2.getFirstAttemptStartTime()) {
return 1;
}
return 0;
@@ -610,8 +630,9 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
// if map tasks and reduce tasks are in finishable state then priority is given to the task in
// the following order
// 1) Dag start time
- // 2) Attempt start time
- // 3) Vertex parallelism
+ // 2) Within dag priority
+ // 3) Attempt start time
+ // 4) Vertex parallelism
@VisibleForTesting
public static class FirstInFirstOutComparator implements Comparator<TaskWrapper> {
@@ -619,29 +640,47 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
public int compare(TaskWrapper t1, TaskWrapper t2) {
TaskRunnerCallable o1 = t1.getTaskRunnerCallable();
TaskRunnerCallable o2 = t2.getTaskRunnerCallable();
- boolean newCanFinish = o1.canFinish();
- boolean oldCanFinish = o2.canFinish();
- if (newCanFinish == true && oldCanFinish == false) {
+ boolean o1CanFinish = o1.canFinish();
+ boolean o2CanFinish = o2.canFinish();
+ if (o1CanFinish == true && o2CanFinish == false) {
return -1;
- } else if (newCanFinish == false && oldCanFinish == true) {
+ } else if (o1CanFinish == false && o2CanFinish == true) {
return 1;
}
- if (o1.getDagStartTime() < o2.getDagStartTime()) {
+ FragmentRuntimeInfo fri1 = o1.getFragmentRuntimeInfo();
+ FragmentRuntimeInfo fri2 = o2.getFragmentRuntimeInfo();
+
+ if (fri1.getDagStartTime() < fri2.getDagStartTime()) {
return -1;
- } else if (o1.getDagStartTime() > o2.getDagStartTime()) {
+ } else if (fri1.getDagStartTime() > fri2.getDagStartTime()) {
return 1;
}
- if (o1.getFirstAttemptStartTime() < o2.getFirstAttemptStartTime()) {
+ // Check if these belong to the same task, and work with withinDagPriority
+ if (o1.getQueryId().equals(o2.getQueryId())) {
+ // Same Query
+ // Within dag priority - lower values indicate higher priority.
+ if (fri1.getWithinDagPriority() < fri2.getWithinDagPriority()) {
+ return -1;
+ } else if (fri1.getWithinDagPriority() > fri2.getWithinDagPriority()){
+ return 1;
+ }
+ }
+
+ if (fri1.getFirstAttemptStartTime() < fri2.getFirstAttemptStartTime()) {
return -1;
- } else if (o1.getFirstAttemptStartTime() > o2.getFirstAttemptStartTime()) {
+ } else if (fri1.getFirstAttemptStartTime() > fri2.getFirstAttemptStartTime()) {
return 1;
}
- if (o1.getVertexParallelism() < o2.getVertexParallelism()) {
+ // Compute knownPending tasks. selfAndUpstream indicates task counts for current vertex and
+ // it's parent hierarchy. selfAndUpstreamComplete indicates how many of these have completed.
+ int knownPending1 = fri1.getNumSelfAndUpstreamTasks() - fri1.getNumSelfAndUpstreamCompletedTasks();
+ int knownPending2 = fri2.getNumSelfAndUpstreamTasks() - fri2.getNumSelfAndUpstreamCompletedTasks();
+ if (knownPending1 < knownPending2) {
return -1;
- } else if (o1.getVertexParallelism() > o2.getVertexParallelism()) {
+ } else if (knownPending1 > knownPending2) {
return 1;
}
@@ -656,9 +695,12 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
public int compare(TaskWrapper t1, TaskWrapper t2) {
TaskRunnerCallable o1 = t1.getTaskRunnerCallable();
TaskRunnerCallable o2 = t2.getTaskRunnerCallable();
- if (o1.getVertexParallelism() > o2.getVertexParallelism()) {
+ FragmentRuntimeInfo fri1 = o1.getFragmentRuntimeInfo();
+ FragmentRuntimeInfo fri2 = o2.getFragmentRuntimeInfo();
+
+ if (fri1.getNumSelfAndUpstreamTasks() > fri2.getNumSelfAndUpstreamTasks()) {
return 1;
- } else if (o1.getVertexParallelism() < o2.getVertexParallelism()) {
+ } else if (fri1.getNumSelfAndUpstreamTasks() < fri2.getNumSelfAndUpstreamTasks()) {
return -1;
}
return 0;
@@ -738,8 +780,12 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
", inPreemptionQueue=" + inPreemptionQueue +
", registeredForNotifications=" + registeredForNotifications +
", canFinish=" + taskRunnerCallable.canFinish() +
- ", firstAttemptStartTime=" + taskRunnerCallable.getFirstAttemptStartTime() +
- ", vertexParallelism=" + taskRunnerCallable.getVertexParallelism() +
+ ", firstAttemptStartTime=" + taskRunnerCallable.getFragmentRuntimeInfo().getFirstAttemptStartTime() +
+ ", dagStartTime=" + taskRunnerCallable.getFragmentRuntimeInfo().getDagStartTime() +
+ ", withinDagPriority=" + taskRunnerCallable.getFragmentRuntimeInfo().getWithinDagPriority() +
+ ", vertexParallelism= " + taskRunnerCallable.getFragmentSpec().getVertexParallelism() +
+ ", selfAndUpstreamParallelism= " + taskRunnerCallable.getFragmentRuntimeInfo().getNumSelfAndUpstreamTasks() +
+ ", selfAndUpstreamComplete= " + taskRunnerCallable.getFragmentRuntimeInfo().getNumSelfAndUpstreamCompletedTasks() +
'}';
}
http://git-wip-us.apache.org/repos/asf/hive/blob/a8ac648c/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index 52f21d9..6ceb2e5 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hive.common.CallableWithNdc;
import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler;
import org.apache.hadoop.hive.llap.daemon.HistoryLogger;
import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
@@ -96,6 +97,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
private volatile String threadName;
private final LlapDaemonExecutorMetrics metrics;
private final String requestId;
+ private final String queryId;
private boolean shouldRunTask = true;
final Stopwatch runtimeWatch = new Stopwatch();
final Stopwatch killtimerWatch = new Stopwatch();
@@ -129,7 +131,9 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
request.getUser(), jobToken, null, request.getFragmentSpec().getDagName());
}
this.metrics = metrics;
- this.requestId = getRequestId(request);
+ this.requestId = request.getFragmentSpec().getFragmentIdentifierString();
+ // TODO Change this to the queryId/Name when that's available.
+ this.queryId = request.getFragmentSpec().getDagName();
this.killedTaskHandler = killedTaskHandler;
this.fragmentCompletionHanler = fragmentCompleteHandler;
}
@@ -330,8 +334,13 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
@Override
public String toString() {
return requestId + " {canFinish: " + canFinish() +
- " vertexParallelism: " + getVertexParallelism() +
- " firstAttemptStartTime: " + getFirstAttemptStartTime() + "}";
+ ", vertexParallelism: " + request.getFragmentSpec().getVertexParallelism() +
+ ", selfAndUpstreamParallelism: " + request.getFragmentRuntimeInfo().getNumSelfAndUpstreamTasks() +
+ ", selfAndUpstreamComplete: " + request.getFragmentRuntimeInfo().getNumSelfAndUpstreamCompletedTasks() +
+ ", firstAttemptStartTime: " + getFragmentRuntimeInfo().getFirstAttemptStartTime() +
+ ", dagStartTime:" + getFragmentRuntimeInfo().getDagStartTime() +
+ ", withinDagPriority: " + getFragmentRuntimeInfo().getWithinDagPriority() +
+ "}";
}
@Override
@@ -347,14 +356,14 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
return requestId.equals(((TaskRunnerCallable) obj).getRequestId());
}
- public int getVertexParallelism() {
- return request.getFragmentSpec().getVertexParallelism();
- }
-
public String getRequestId() {
return requestId;
}
+ public String getQueryId() {
+ return queryId;
+ }
+
public QueryFragmentInfo getFragmentInfo() {
return fragmentInfo;
}
@@ -470,16 +479,11 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
return sb.toString();
}
- private static String getRequestId(SubmitWorkRequestProto request) {
- return request.getFragmentSpec().getFragmentIdentifierString();
+ public FragmentRuntimeInfo getFragmentRuntimeInfo() {
+ return request.getFragmentRuntimeInfo();
}
- public long getFirstAttemptStartTime() {
- return request.getFragmentRuntimeInfo().getFirstAttemptStartTime();
+ public FragmentSpecProto getFragmentSpec() {
+ return request.getFragmentSpec();
}
-
- public long getDagStartTime() {
- return request.getFragmentRuntimeInfo().getDagStartTime();
- }
-
}
http://git-wip-us.apache.org/repos/asf/hive/blob/a8ac648c/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
index 25f7a81..7a01b39 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
@@ -67,6 +67,7 @@ public class TestTaskExecutorService {
conf = new Configuration();
}
+
@Test(timeout = 5000)
public void testWaitQueueComparator() throws InterruptedException {
TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100), false, 100000);
@@ -203,6 +204,42 @@ public class TestTaskExecutorService {
}
@Test(timeout = 5000)
+ public void testWaitQueueComparatorWithinDagPriority() throws InterruptedException {
+ TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 1, 0, 100, 10), false, 100000);
+ TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 1, 0, 100, 1), false, 100000);
+ TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 1, 0, 100, 5), false, 100000);
+
+ EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
+ new TaskExecutorService.ShortestJobFirstComparator(), 4);
+
+ assertNull(queue.offer(r1));
+ assertNull(queue.offer(r2));
+ assertNull(queue.offer(r3));
+
+ assertEquals(r2, queue.take());
+ assertEquals(r3, queue.take());
+ assertEquals(r1, queue.take());
+ }
+
+ @Test(timeout = 5000)
+ public void testWaitQueueComparatorParallelism() throws InterruptedException {
+ TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 10, 3, 100, 1), false, 100000); // 7 pending
+ TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 10, 7, 100, 1), false, 100000); // 3 pending
+ TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 10, 5, 100, 1), false, 100000); // 5 pending
+
+ EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
+ new TaskExecutorService.ShortestJobFirstComparator(), 4);
+
+ assertNull(queue.offer(r1));
+ assertNull(queue.offer(r2));
+ assertNull(queue.offer(r3));
+
+ assertEquals(r2, queue.take());
+ assertEquals(r3, queue.take());
+ assertEquals(r1, queue.take());
+ }
+
+ @Test(timeout = 5000)
public void testPreemptionQueueComparator() throws InterruptedException {
TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100), false, 100000);
TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200), false, 100000);
@@ -344,8 +381,15 @@ public class TestTaskExecutorService {
// ----------- Helper classes and methods go after this point. Tests above this -----------
- private SubmitWorkRequestProto createSubmitWorkRequestProto(int fragmentNumber, int parallelism,
+ // Create requests with the same within dag priority
+ private SubmitWorkRequestProto createSubmitWorkRequestProto(int fragmentNumber, int selfAndUpstreamParallelism,
long attemptStartTime) {
+ return createSubmitWorkRequestProto(fragmentNumber, selfAndUpstreamParallelism, 0, attemptStartTime, 1);
+ }
+
+ private SubmitWorkRequestProto createSubmitWorkRequestProto(int fragmentNumber, int selfAndUpstreamParallelism,
+ int selfAndUpstreamComplete,
+ long attemptStartTime, int withinDagPriority) {
ApplicationId appId = ApplicationId.newInstance(9999, 72);
TezDAGID dagId = TezDAGID.getInstance(appId, 1);
TezVertexID vId = TezVertexID.getInstance(dagId, 35);
@@ -360,7 +404,6 @@ public class TestTaskExecutorService {
.setDagName("MockDag")
.setFragmentNumber(fragmentNumber)
.setVertexName("MockVertex")
- .setVertexParallelism(parallelism)
.setProcessorDescriptor(
EntityDescriptorProto.newBuilder().setClassName("MockProcessor").build())
.setFragmentIdentifierString(taId.toString()).build()).setAmHost("localhost")
@@ -371,6 +414,9 @@ public class TestTaskExecutorService {
.FragmentRuntimeInfo
.newBuilder()
.setFirstAttemptStartTime(attemptStartTime)
+ .setNumSelfAndUpstreamTasks(selfAndUpstreamParallelism)
+ .setNumSelfAndUpstreamCompletedTasks(selfAndUpstreamComplete)
+ .setWithinDagPriority(withinDagPriority)
.build())
.build();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/a8ac648c/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService2.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService2.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService2.java
index ad2a15b..1929439 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService2.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService2.java
@@ -81,8 +81,15 @@ public class TestTaskExecutorService2 {
conf = new Configuration();
}
- private SubmitWorkRequestProto createRequest(int fragmentNumber, int parallelism, int dagStartTime,
- int attemptStartTime) {
+ private SubmitWorkRequestProto createRequest(int fragmentNumber, int numSelfAndUpstreamTasks, int dagStartTime,
+ int attemptStartTime) {
+ // Same priority for all tasks.
+ return createRequest(fragmentNumber, numSelfAndUpstreamTasks, 0, dagStartTime, attemptStartTime, 1);
+ }
+
+ private SubmitWorkRequestProto createRequest(int fragmentNumber, int numSelfAndUpstreamTasks,
+ int numSelfAndUpstreamComplete, int dagStartTime,
+ int attemptStartTime, int withinDagPriority) {
ApplicationId appId = ApplicationId.newInstance(9999, 72);
TezDAGID dagId = TezDAGID.getInstance(appId, 1);
TezVertexID vId = TezVertexID.getInstance(dagId, 35);
@@ -97,7 +104,6 @@ public class TestTaskExecutorService2 {
.setDagName("MockDag")
.setFragmentNumber(fragmentNumber)
.setVertexName("MockVertex")
- .setVertexParallelism(parallelism)
.setProcessorDescriptor(
EntityDescriptorProto.newBuilder().setClassName("MockProcessor").build())
.setFragmentIdentifierString(taId.toString()).build()).setAmHost("localhost")
@@ -109,6 +115,9 @@ public class TestTaskExecutorService2 {
.newBuilder()
.setDagStartTime(dagStartTime)
.setFirstAttemptStartTime(attemptStartTime)
+ .setNumSelfAndUpstreamTasks(numSelfAndUpstreamTasks)
+ .setNumSelfAndUpstreamCompletedTasks(numSelfAndUpstreamComplete)
+ .setWithinDagPriority(withinDagPriority)
.build())
.build();
}
@@ -270,6 +279,43 @@ public class TestTaskExecutorService2 {
assertEquals(r2, queue.take());
}
+ @Test(timeout = 5000)
+ public void testWaitQueueComparatorWithinDagPriority() throws InterruptedException {
+ TaskWrapper r1 = createTaskWrapper(createRequest(1, 1, 0, 100, 100, 10), false, 100000);
+ TaskWrapper r2 = createTaskWrapper(createRequest(2, 1, 0, 100, 100, 1), false, 100000);
+ TaskWrapper r3 = createTaskWrapper(createRequest(3, 1, 0, 100, 100, 5), false, 100000);
+
+ EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
+ new TaskExecutorService.ShortestJobFirstComparator(), 4);
+
+ assertNull(queue.offer(r1));
+ assertNull(queue.offer(r2));
+ assertNull(queue.offer(r3));
+
+ assertEquals(r2, queue.take());
+ assertEquals(r3, queue.take());
+ assertEquals(r1, queue.take());
+ }
+
+ @Test(timeout = 5000)
+ public void testWaitQueueComparatorParallelism() throws InterruptedException {
+ TaskWrapper r1 = createTaskWrapper(createRequest(1, 10, 3, 100, 100, 1), false, 100000);
+ TaskWrapper r2 = createTaskWrapper(createRequest(2, 10, 7, 100, 100, 1), false, 100000);
+ TaskWrapper r3 = createTaskWrapper(createRequest(3, 10, 5, 100, 100, 1), false, 100000);
+
+ EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
+ new TaskExecutorService.ShortestJobFirstComparator(), 4);
+
+ assertNull(queue.offer(r1));
+ assertNull(queue.offer(r2));
+ assertNull(queue.offer(r3));
+
+ assertEquals(r2, queue.take());
+ assertEquals(r3, queue.take());
+ assertEquals(r1, queue.take());
+ }
+
+
private TaskWrapper createTaskWrapper(SubmitWorkRequestProto request, boolean canFinish, int workTime) {
MockRequest mockRequest = new MockRequest(request, canFinish, workTime);
TaskWrapper taskWrapper = new TaskWrapper(mockRequest, null);