You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by rb...@apache.org on 2020/03/05 13:08:50 UTC

[hive] branch master updated: HIVE-22966: LLAP: Consider including waitTime for comparing attempts in same vertex (Rajesh Balamohan, reviewed by Gopal V)

This is an automated email from the ASF dual-hosted git repository.

rbalamohan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b3ef2b  HIVE-22966: LLAP: Consider including waitTime for comparing attempts in same vertex (Rajesh Balamohan, reviewed by Gopal V)
9b3ef2b is described below

commit 9b3ef2b452b103d328ee0299b47a525bc3b5ed95
Author: Rajesh Balamohan <rb...@apache.org>
AuthorDate: Thu Mar 5 18:38:36 2020 +0530

    HIVE-22966: LLAP: Consider including waitTime for comparing attempts in same vertex (Rajesh Balamohan, reviewed by Gopal V)
---
 .../impl/comparator/ShortestJobFirstComparator.java      |  7 +++++++
 .../impl/comparator/TestShortestJobFirstComparator.java  | 16 +++++++++++++---
 2 files changed, 20 insertions(+), 3 deletions(-)

diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java
index 21d70cd..9d7af7e 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java
@@ -29,6 +29,13 @@ public class ShortestJobFirstComparator extends LlapQueueComparatorBase {
     // Check if these belong to the same task, and work with withinDagPriority
     if (o1.getQueryId().equals(o2.getQueryId())) {
       // Same Query
+
+      if (fri1.getWithinDagPriority() == fri2.getWithinDagPriority()) {
+        // task_attempt within same vertex.
+        // Choose the attempt that was started earlier
+        return Long.compare(fri1.getCurrentAttemptStartTime(), fri2.getCurrentAttemptStartTime());
+      }
+
       // Within dag priority - lower values indicate higher priority.
       return Integer.compare(fri1.getWithinDagPriority(), fri2.getWithinDagPriority());
     }
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java
index 02bb361..048e1d7 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java
@@ -200,9 +200,9 @@ public class TestShortestJobFirstComparator {
 
   @Test(timeout = 60000)
   public void testWaitQueueComparatorWithinSameDagPriority() throws InterruptedException {
-    TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 1, 0, 10, 100, 10), true, 100000);
-    TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 1, 0, 10, 100, 10), true, 100000);
-    TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 1, 0, 10, 100, 10), true, 100000);
+    TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 1, 0, 10, 1, 10), true, 100000);
+    TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 1, 0, 10, 2, 10), true, 100000);
+    TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 1, 0, 10, 3, 10), true, 100000);
 
     EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
         new ShortestJobFirstComparator(), 3);
@@ -214,6 +214,16 @@ public class TestShortestJobFirstComparator {
     // can not queue more requests as queue is full
     TaskWrapper r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 1, 0, 10, 100, 10), true, 100000);
     assertEquals(r4, queue.offer(r4, 0));
+
+    // add a task with currentAttemptStartTime lesser than existing tasks
+    TaskWrapper r0 = createTaskWrapper(createSubmitWorkRequestProto(0, 1, 0, 10, 0, 10), true, 100000);
+    assertEquals(r3, queue.offer(r0, 0));
+    // ensure that R0 is picked up as it started much earlier.
+    assertEquals(r0, queue.take());
+
+    // other tasks
+    assertEquals(r1, queue.take());
+    assertEquals(r2, queue.take());
   }
 
   @Test(timeout = 60000)