You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by rb...@apache.org on 2020/03/05 13:08:50 UTC
[hive] branch master updated: HIVE-22966: LLAP: Consider including
waitTime for comparing attempts in same vertex (Rajesh Balamohan,
reviewed by Gopal V)
This is an automated email from the ASF dual-hosted git repository.
rbalamohan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 9b3ef2b HIVE-22966: LLAP: Consider including waitTime for comparing attempts in same vertex (Rajesh Balamohan, reviewed by Gopal V)
9b3ef2b is described below
commit 9b3ef2b452b103d328ee0299b47a525bc3b5ed95
Author: Rajesh Balamohan <rb...@apache.org>
AuthorDate: Thu Mar 5 18:38:36 2020 +0530
HIVE-22966: LLAP: Consider including waitTime for comparing attempts in same vertex (Rajesh Balamohan, reviewed by Gopal V)
---
.../impl/comparator/ShortestJobFirstComparator.java | 7 +++++++
.../impl/comparator/TestShortestJobFirstComparator.java | 16 +++++++++++++---
2 files changed, 20 insertions(+), 3 deletions(-)
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java
index 21d70cd..9d7af7e 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java
@@ -29,6 +29,13 @@ public class ShortestJobFirstComparator extends LlapQueueComparatorBase {
// Check if these belong to the same task, and work with withinDagPriority
if (o1.getQueryId().equals(o2.getQueryId())) {
// Same Query
+
+ if (fri1.getWithinDagPriority() == fri2.getWithinDagPriority()) {
+ // task_attempt within same vertex.
+ // Choose the attempt that was started earlier
+ return Long.compare(fri1.getCurrentAttemptStartTime(), fri2.getCurrentAttemptStartTime());
+ }
+
// Within dag priority - lower values indicate higher priority.
return Integer.compare(fri1.getWithinDagPriority(), fri2.getWithinDagPriority());
}
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java
index 02bb361..048e1d7 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java
@@ -200,9 +200,9 @@ public class TestShortestJobFirstComparator {
@Test(timeout = 60000)
public void testWaitQueueComparatorWithinSameDagPriority() throws InterruptedException {
- TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 1, 0, 10, 100, 10), true, 100000);
- TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 1, 0, 10, 100, 10), true, 100000);
- TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 1, 0, 10, 100, 10), true, 100000);
+ TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 1, 0, 10, 1, 10), true, 100000);
+ TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 1, 0, 10, 2, 10), true, 100000);
+ TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 1, 0, 10, 3, 10), true, 100000);
EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
new ShortestJobFirstComparator(), 3);
@@ -214,6 +214,16 @@ public class TestShortestJobFirstComparator {
// can not queue more requests as queue is full
TaskWrapper r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 1, 0, 10, 100, 10), true, 100000);
assertEquals(r4, queue.offer(r4, 0));
+
+ // add a task with currentAttemptStartTime lesser than existing tasks
+ TaskWrapper r0 = createTaskWrapper(createSubmitWorkRequestProto(0, 1, 0, 10, 0, 10), true, 100000);
+ assertEquals(r3, queue.offer(r0, 0));
+ // ensure that R0 is picked up as it started much earlier.
+ assertEquals(r0, queue.take());
+
+ // other tasks
+ assertEquals(r1, queue.take());
+ assertEquals(r2, queue.take());
}
@Test(timeout = 60000)