You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by rb...@apache.org on 2017/02/03 03:13:24 UTC
hive git commit: HIVE-15779: LLAP: WaitQueue comparators should
return 0 when tasks of the same DAG are of same priority (Rajesh Balamohan
reviewed by Siddharth Seth, Prasanth Jayachandran)
Repository: hive
Updated Branches:
refs/heads/master feaa65fce -> 5d637a62b
HIVE-15779: LLAP: WaitQueue comparators should return 0 when tasks of the same DAG are of same priority (Rajesh Balamohan reviewed by Siddharth Seth, Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/5d637a62
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/5d637a62
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/5d637a62
Branch: refs/heads/master
Commit: 5d637a62b56eb7a05c115512df8f1057b02793a9
Parents: feaa65f
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Fri Feb 3 08:41:13 2017 +0530
Committer: Rajesh Balamohan <rb...@apache.org>
Committed: Fri Feb 3 08:42:20 2017 +0530
----------------------------------------------------------------------
.../comparator/FirstInFirstOutComparator.java | 6 +-
.../comparator/ShortestJobFirstComparator.java | 6 +-
.../daemon/impl/TaskExecutorTestHelpers.java | 17 +++-
.../TestFirstInFirstOutComparator.java | 73 ++++++++++-----
.../TestShortestJobFirstComparator.java | 98 ++++++++++++--------
5 files changed, 126 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/5d637a62/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/FirstInFirstOutComparator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/FirstInFirstOutComparator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/FirstInFirstOutComparator.java
index 447fc7b..ae1ca5d 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/FirstInFirstOutComparator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/FirstInFirstOutComparator.java
@@ -53,11 +53,7 @@ public class FirstInFirstOutComparator implements Comparator<TaskWrapper> {
if (o1.getQueryId().equals(o2.getQueryId())) {
// Same Query
// Within dag priority - lower values indicate higher priority.
- if (fri1.getWithinDagPriority() < fri2.getWithinDagPriority()) {
- return -1;
- } else if (fri1.getWithinDagPriority() > fri2.getWithinDagPriority()){
- return 1;
- }
+ return Integer.compare(fri1.getWithinDagPriority(), fri2.getWithinDagPriority());
}
if (fri1.getFirstAttemptStartTime() < fri2.getFirstAttemptStartTime()) {
http://git-wip-us.apache.org/repos/asf/hive/blob/5d637a62/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java
index 9b6c894..b54f740 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java
@@ -43,11 +43,7 @@ public class ShortestJobFirstComparator implements Comparator<TaskWrapper> {
if (o1.getQueryId().equals(o2.getQueryId())) {
// Same Query
// Within dag priority - lower values indicate higher priority.
- if (fri1.getWithinDagPriority() < fri2.getWithinDagPriority()) {
- return -1;
- } else if (fri1.getWithinDagPriority() > fri2.getWithinDagPriority()){
- return 1;
- }
+ return Integer.compare(fri1.getWithinDagPriority(), fri2.getWithinDagPriority());
}
// Compute knownPending tasks. selfAndUpstream indicates task counts for current vertex and
http://git-wip-us.apache.org/repos/asf/hive/blob/5d637a62/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
index 2cd6542..6506d07 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
@@ -90,9 +90,24 @@ public class TaskExecutorTestHelpers {
}
public static SubmitWorkRequestProto createSubmitWorkRequestProto(
+ int fragmentNumber, int selfAndUpstreamParallelism, long firstAttemptStartTime,
+ long currentAttemptStartTime, String dagName) {
+ return createSubmitWorkRequestProto(fragmentNumber, selfAndUpstreamParallelism, 0, firstAttemptStartTime,
+ currentAttemptStartTime, 1, dagName);
+ }
+
+ public static SubmitWorkRequestProto createSubmitWorkRequestProto(
int fragmentNumber, int selfAndUpstreamParallelism,
int selfAndUpstreamComplete, long firstAttemptStartTime,
long currentAttemptStartTime, int withinDagPriority) {
+ return createSubmitWorkRequestProto(fragmentNumber, selfAndUpstreamParallelism, 0, firstAttemptStartTime,
+ currentAttemptStartTime, withinDagPriority, "MockDag");
+ }
+
+ public static SubmitWorkRequestProto createSubmitWorkRequestProto(
+ int fragmentNumber, int selfAndUpstreamParallelism,
+ int selfAndUpstreamComplete, long firstAttemptStartTime,
+ long currentAttemptStartTime, int withinDagPriority, String dagName) {
ApplicationId appId = ApplicationId.newInstance(9999, 72);
TezDAGID dagId = TezDAGID.getInstance(appId, 1);
TezVertexID vId = TezVertexID.getInstance(dagId, 35);
@@ -103,7 +118,7 @@ public class TaskExecutorTestHelpers {
.setWorkSpec(
VertexOrBinary.newBuilder().setVertex(
SignableVertexSpec.newBuilder()
- .setDagName("MockDag")
+ .setDagName(dagName)
.setUser("MockUser")
.setTokenIdentifier("MockToken_1")
.setQueryIdentifier(
http://git-wip-us.apache.org/repos/asf/hive/blob/5d637a62/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java
index 5ea62aa..8cce0cb 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java
@@ -39,14 +39,23 @@ import org.junit.Test;
public class TestFirstInFirstOutComparator {
private SubmitWorkRequestProto createRequest(int fragmentNumber, int numSelfAndUpstreamTasks, int dagStartTime,
- int attemptStartTime) {
+ int attemptStartTime) {
// Same priority for all tasks.
return createRequest(fragmentNumber, numSelfAndUpstreamTasks, 0, dagStartTime, attemptStartTime, 1);
}
private SubmitWorkRequestProto createRequest(int fragmentNumber, int numSelfAndUpstreamTasks,
- int numSelfAndUpstreamComplete, int dagStartTime,
- int attemptStartTime, int withinDagPriority) {
+ int numSelfAndUpstreamComplete, int dagStartTime,
+ int attemptStartTime, int withinDagPriority) {
+ return createRequest(fragmentNumber, numSelfAndUpstreamTasks, numSelfAndUpstreamComplete,
+ dagStartTime, attemptStartTime, withinDagPriority, "MockDag");
+ }
+
+
+ private SubmitWorkRequestProto createRequest(int fragmentNumber, int numSelfAndUpstreamTasks,
+ int numSelfAndUpstreamComplete, int dagStartTime,
+ int attemptStartTime, int withinDagPriority,
+ String dagName) {
ApplicationId appId = ApplicationId.newInstance(9999, 72);
TezDAGID dagId = TezDAGID.getInstance(appId, 1);
TezVertexID vId = TezVertexID.getInstance(dagId, 35);
@@ -58,22 +67,22 @@ public class TestFirstInFirstOutComparator {
.setFragmentNumber(fragmentNumber)
.setWorkSpec(
VertexOrBinary.newBuilder().setVertex(
- SignableVertexSpec
- .newBuilder()
- .setQueryIdentifier(
- QueryIdentifierProto.newBuilder()
- .setApplicationIdString(appId.toString())
- .setAppAttemptNumber(0)
- .setDagIndex(dagId.getId())
- .build())
- .setVertexIndex(vId.getId())
- .setDagName("MockDag")
- .setVertexName("MockVertex")
- .setUser("MockUser")
- .setTokenIdentifier("MockToken_1")
- .setProcessorDescriptor(
- EntityDescriptorProto.newBuilder().setClassName("MockProcessor").build())
- .build()).build())
+ SignableVertexSpec
+ .newBuilder()
+ .setQueryIdentifier(
+ QueryIdentifierProto.newBuilder()
+ .setApplicationIdString(appId.toString())
+ .setAppAttemptNumber(0)
+ .setDagIndex(dagId.getId())
+ .build())
+ .setVertexIndex(vId.getId())
+ .setDagName(dagName)
+ .setVertexName("MockVertex")
+ .setUser("MockUser")
+ .setTokenIdentifier("MockToken_1")
+ .setProcessorDescriptor(
+ EntityDescriptorProto.newBuilder().setClassName("MockProcessor").build())
+ .build()).build())
.setAmHost("localhost")
.setAmPort(12345)
.setContainerIdString("MockContainer_1")
@@ -240,8 +249,8 @@ public class TestFirstInFirstOutComparator {
assertEquals(r4, queue.peek());
// offer accepted, r1 evicted
assertEquals(r1, queue.offer(r5));
- assertEquals(r4, queue.take());
assertEquals(r5, queue.take());
+ assertEquals(r4, queue.take());
assertEquals(r3, queue.take());
assertEquals(r2, queue.take());
}
@@ -265,10 +274,28 @@ public class TestFirstInFirstOutComparator {
}
@Test(timeout = 60000)
+ public void testWaitQueueComparatorWithinSameDagPriority() throws InterruptedException {
+ TaskWrapper r1 = createTaskWrapper(createRequest(1, 1, 0, 10, 100, 10), true, 100000);
+ TaskWrapper r2 = createTaskWrapper(createRequest(2, 1, 0, 10, 100, 10), true, 100000);
+ TaskWrapper r3 = createTaskWrapper(createRequest(3, 1, 0, 10, 100, 10), true, 100000);
+
+ EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
+ new ShortestJobFirstComparator(), 3);
+
+ assertNull(queue.offer(r1));
+ assertNull(queue.offer(r2));
+ assertNull(queue.offer(r3));
+
+ // can not queue more requests as queue is full
+ TaskWrapper r4 = createTaskWrapper(createRequest(4, 1, 0, 10, 100, 10), true, 100000);
+ assertEquals(r4, queue.offer(r4));
+ }
+
+ @Test(timeout = 60000)
public void testWaitQueueComparatorParallelism() throws InterruptedException {
- TaskWrapper r1 = createTaskWrapper(createRequest(1, 10, 3, 100, 100, 1), false, 100000);
- TaskWrapper r2 = createTaskWrapper(createRequest(2, 10, 7, 100, 100, 1), false, 100000);
- TaskWrapper r3 = createTaskWrapper(createRequest(3, 10, 5, 100, 100, 1), false, 100000);
+ TaskWrapper r1 = createTaskWrapper(createRequest(1, 10, 3, 100, 100, 1, "q1"), false, 100000);
+ TaskWrapper r2 = createTaskWrapper(createRequest(2, 10, 7, 100, 100, 1, "q2"), false, 100000);
+ TaskWrapper r3 = createTaskWrapper(createRequest(3, 10, 5, 100, 100, 1, "q3"), false, 100000);
EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
new FirstInFirstOutComparator(), 4);
http://git-wip-us.apache.org/repos/asf/hive/blob/5d637a62/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java
index e82f756..0059d0c 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java
@@ -28,11 +28,11 @@ public class TestShortestJobFirstComparator {
@Test(timeout = 60000)
public void testWaitQueueComparator() throws InterruptedException {
- TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100, 200), false, 100000);
- TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200, 300), false, 100000);
- TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300, 400), false, 1000000);
- TaskWrapper r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400, 500), false, 1000000);
- TaskWrapper r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600), false, 1000000);
+ TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100, 200, "q1"), false, 100000);
+ TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200, 300, "q2"), false, 100000);
+ TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300, 400, "q3"), false, 1000000);
+ TaskWrapper r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400, 500, "q4"), false, 1000000);
+ TaskWrapper r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600, "q5"), false, 1000000);
EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
new ShortestJobFirstComparator(), 4);
assertNull(queue.offer(r1));
@@ -50,11 +50,11 @@ public class TestShortestJobFirstComparator {
assertEquals(r3, queue.take());
assertEquals(r4, queue.take());
- r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100, 200), true, 100000);
- r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200, 300), true, 100000);
- r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300, 400), true, 1000000);
- r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400, 500), true, 1000000);
- r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600), true, 1000000);
+ r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100, 200, "q1"), true, 100000);
+ r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200, 300, "q2"), true, 100000);
+ r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300, 400, "q3"), true, 1000000);
+ r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400, 500, "q4"), true, 1000000);
+ r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600, "q5"), true, 1000000);
queue = new EvictingPriorityBlockingQueue(
new ShortestJobFirstComparator(), 4);
assertNull(queue.offer(r1));
@@ -72,11 +72,11 @@ public class TestShortestJobFirstComparator {
assertEquals(r3, queue.take());
assertEquals(r4, queue.take());
- r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 1, 100, 1000), true, 100000);
- r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 1, 200, 900), false, 100000);
- r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 1, 300, 800), true, 1000000);
- r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 1, 400, 700), false, 1000000);
- r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600), true, 1000000);
+ r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 1, 100, 1000, "q1"), true, 100000);
+ r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 1, 200, 900, "q2"), false, 100000);
+ r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 1, 300, 800, "q3"), true, 1000000);
+ r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 1, 400, 700, "q4"), false, 1000000);
+ r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600, "q5"), true, 1000000);
queue = new EvictingPriorityBlockingQueue(
new ShortestJobFirstComparator(), 4);
assertNull(queue.offer(r1));
@@ -94,11 +94,11 @@ public class TestShortestJobFirstComparator {
assertEquals(r5, queue.take());
assertEquals(r2, queue.take());
- r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100, 200), true, 100000);
- r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200, 300), false, 100000);
- r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300, 400), true, 1000000);
- r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400, 500), false, 1000000);
- r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600), true, 1000000);
+ r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100, 200, "q1"), true, 100000);
+ r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200, 300, "q2"), false, 100000);
+ r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300, 400, "q3"), true, 1000000);
+ r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400, 500, "q4"), false, 1000000);
+ r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600, "q5"), true, 1000000);
queue = new EvictingPriorityBlockingQueue(
new ShortestJobFirstComparator(), 4);
assertNull(queue.offer(r1));
@@ -116,11 +116,11 @@ public class TestShortestJobFirstComparator {
assertEquals(r5, queue.take());
assertEquals(r2, queue.take());
- r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100, 200), true, 100000);
- r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200, 300), false, 100000);
- r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300, 400), false, 1000000);
- r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400, 500), false, 1000000);
- r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600), true, 1000000);
+ r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100, 200, "q1"), true, 100000);
+ r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200, 300, "q2"), false, 100000);
+ r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300, 400, "q3"), false, 1000000);
+ r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400, 500, "q4"), false, 1000000);
+ r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600, "q5"), true, 1000000);
queue = new EvictingPriorityBlockingQueue(
new ShortestJobFirstComparator(), 4);
assertNull(queue.offer(r1));
@@ -138,11 +138,11 @@ public class TestShortestJobFirstComparator {
assertEquals(r2, queue.take());
assertEquals(r3, queue.take());
- r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100, 200), false, 100000);
- r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200, 300), true, 100000);
- r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300, 400), true, 1000000);
- r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400, 500), true, 1000000);
- r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600), true, 1000000);
+ r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100, 200, "q1"), false, 100000);
+ r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200, 300, "q2"), true, 100000);
+ r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300, 400, "q3"), true, 1000000);
+ r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400, 500, "q4"), true, 1000000);
+ r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600, "q5"), true, 1000000);
queue = new EvictingPriorityBlockingQueue(
new ShortestJobFirstComparator(), 4);
assertNull(queue.offer(r1));
@@ -180,10 +180,28 @@ public class TestShortestJobFirstComparator {
}
@Test(timeout = 60000)
+ public void testWaitQueueComparatorWithinSameDagPriority() throws InterruptedException {
+ TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 1, 0, 10, 100, 10), true, 100000);
+ TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 1, 0, 10, 100, 10), true, 100000);
+ TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 1, 0, 10, 100, 10), true, 100000);
+
+ EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
+ new ShortestJobFirstComparator(), 3);
+
+ assertNull(queue.offer(r1));
+ assertNull(queue.offer(r2));
+ assertNull(queue.offer(r3));
+
+ // can not queue more requests as queue is full
+ TaskWrapper r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 1, 0, 10, 100, 10), true, 100000);
+ assertEquals(r4, queue.offer(r4));
+ }
+
+ @Test(timeout = 60000)
public void testWaitQueueComparatorParallelism() throws InterruptedException {
- TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 10, 3, 10, 100, 1), false, 100000); // 7 pending
- TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 10, 7, 10, 100, 1), false, 100000); // 3 pending
- TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 10, 5, 10, 100, 1), false, 100000); // 5 pending
+ TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 10, 3, 10, 100, 1, "q1"), false, 100000); // 7 pending
+ TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 10, 7, 10, 100, 1, "q2"), false, 100000); // 3 pending
+ TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 10, 5, 10, 100, 1, "q3"), false, 100000); // 5 pending
EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
new ShortestJobFirstComparator(), 4);
@@ -199,12 +217,12 @@ public class TestShortestJobFirstComparator {
@Test(timeout = 60000)
public void testWaitQueueComparatorAging() throws InterruptedException {
- TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 10, 100, 200), true, 100000);
- TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 20, 100, 200), true, 100000);
- TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 30, 100, 200), true, 100000);
+ TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 10, 100, 200, "q1"), true, 100000);
+ TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 20, 100, 200, "q2"), true, 100000);
+ TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 30, 100, 200, "q3"), true, 100000);
EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
- new ShortestJobFirstComparator(), 4);
+ new ShortestJobFirstComparator(), 4);
assertNull(queue.offer(r1));
assertNull(queue.offer(r2));
@@ -215,11 +233,11 @@ public class TestShortestJobFirstComparator {
assertEquals(r3, queue.take());
// priority = 10 / (200 - 100) = 0.01
- r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 10, 100, 200), true, 100000);
+ r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 10, 100, 200, "q1"), true, 100000);
// priority = 20 / (3000 - 100) = 0.0069
- r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 20, 100, 3000), true, 100000);
+ r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 20, 100, 3000, "q2"), true, 100000);
// priority = 30 / (4000 - 100) = 0.0076
- r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 30, 100, 4000), true, 100000);
+ r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 30, 100, 4000, "q3"), true, 100000);
queue = new EvictingPriorityBlockingQueue<>(new ShortestJobFirstComparator(), 4);