You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2015/05/20 03:04:19 UTC

hive git commit: HIVE-10759: LLAP: Add aging to wait queue tasks (Prasanth Jayachandran)

Repository: hive
Updated Branches:
  refs/heads/llap f0175bc8d -> 37317b11f


HIVE-10759: LLAP: Add aging to wait queue tasks (Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/37317b11
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/37317b11
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/37317b11

Branch: refs/heads/llap
Commit: 37317b11fcb669bbfba95e60fece7ef1a62be287
Parents: f0175bc
Author: Prasanth Jayachandran <j....@gmail.com>
Authored: Tue May 19 18:04:10 2015 -0700
Committer: Prasanth Jayachandran <j....@gmail.com>
Committed: Tue May 19 18:04:10 2015 -0700

----------------------------------------------------------------------
 .../impl/EvictingPriorityBlockingQueue.java     |  5 ++
 .../llap/daemon/impl/TaskExecutorService.java   | 15 +++-
 .../llap/daemon/impl/TaskRunnerCallable.java    | 10 ++-
 .../daemon/impl/TestTaskExecutorService.java    | 90 ++++++++++----------
 .../hadoop/hive/ql/exec/tez/TezJobMonitor.java  |  6 +-
 5 files changed, 75 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/37317b11/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java
index 101a69c..926835b 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java
@@ -64,4 +64,9 @@ public class EvictingPriorityBlockingQueue<E> {
   public synchronized void remove(E e) {
     deque.remove(e);
   }
+
+  @Override
+  public synchronized String toString() {
+    return deque.toString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/37317b11/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
index 5323f05..599c759 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
@@ -186,6 +186,9 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> {
         LOG.info(task.getRequestId() + " added to wait queue.");
       }
 
+      if (isDebugEnabled) {
+        LOG.debug("Wait Queue: {}", waitQueue);
+      }
       synchronized (waitLock) {
         waitLock.notify();
       }
@@ -227,7 +230,7 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> {
       if (enablePreemption && task.canFinish() && !preemptionQueue.isEmpty()) {
 
         if (isDebugEnabled) {
-          LOG.trace("preemptionQueue: " + preemptionQueue);
+          LOG.debug("Preemption Queue: " + preemptionQueue);
         }
 
         TaskRunnerCallable pRequest = preemptionQueue.remove();
@@ -334,10 +337,16 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> {
         return 1;
       }
 
-      if (o1.getVertexParallelism() > o2.getVertexParallelism()) {
+      if (o1.getVertexParallelism() < o2.getVertexParallelism()) {
+        return -1;
+      } else if (o1.getVertexParallelism() > o2.getVertexParallelism()) {
         return 1;
-      } else if (o1.getVertexParallelism() < o2.getVertexParallelism()) {
+      }
+
+      if (o1.getFirstAttemptStartTime() < o2.getFirstAttemptStartTime()) {
         return -1;
+      } else if (o1.getFirstAttemptStartTime() > o2.getFirstAttemptStartTime()) {
+        return 1;
       }
       return 0;
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/37317b11/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index d1b1c61..94512d6 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -98,7 +98,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
   private volatile long startTime;
   private volatile String threadName;
   private LlapDaemonExecutorMetrics metrics;
-  protected String requestId;
+  private final String requestId;
   private boolean shouldRunTask = true;
   final Stopwatch runtimeWatch = new Stopwatch();
   final Stopwatch killtimerWatch = new Stopwatch();
@@ -335,7 +335,9 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
 
   @Override
   public String toString() {
-    return requestId;
+    return requestId + " {canFinish: " + canFinish() +
+        " vertexParallelism: " + getVertexParallelism() +
+        " firstAttemptStartTime: " + getFirstAttemptStartTime() + "}";
   }
 
   @Override
@@ -470,4 +472,8 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
   private String getTaskAttemptId(SubmitWorkRequestProto request) {
     return request.getFragmentSpec().getTaskAttemptIdString();
   }
+
+  public long getFirstAttemptStartTime() {
+    return request.getFragmentRuntimeInfo().getFirstAttemptStartTime();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/37317b11/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
index 740a2ca..f0e53a7 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
@@ -80,7 +80,7 @@ public class TestTaskExecutorService {
     conf = new Configuration();
   }
 
-  private SubmitWorkRequestProto createRequest(int fragmentNumber, int parallelism) {
+  private SubmitWorkRequestProto createRequest(int fragmentNumber, int parallelism, int attemptStartTime) {
     ApplicationId appId = ApplicationId.newInstance(9999, 72);
     TezDAGID dagId = TezDAGID.getInstance(appId, 1);
     TezVertexID vId = TezVertexID.getInstance(dagId, 35);
@@ -101,16 +101,22 @@ public class TestTaskExecutorService {
                 .setTaskAttemptIdString(taId.toString()).build()).setAmHost("localhost")
         .setAmPort(12345).setAppAttemptNumber(0).setApplicationIdString("MockApp_1")
         .setContainerIdString("MockContainer_1").setUser("MockUser")
-        .setTokenIdentifier("MockToken_1").build();
+        .setTokenIdentifier("MockToken_1")
+        .setFragmentRuntimeInfo(LlapDaemonProtocolProtos
+            .FragmentRuntimeInfo
+            .newBuilder()
+            .setFirstAttemptStartTime(attemptStartTime)
+            .build())
+        .build();
   }
   
   @Test
   public void testWaitQueueComparator() throws InterruptedException {
-    MockRequest r1 = new MockRequest(createRequest(1, 2), false, 100000);
-    MockRequest r2 = new MockRequest(createRequest(2, 4), false, 100000);
-    MockRequest r3 = new MockRequest(createRequest(3, 6), false, 1000000);
-    MockRequest r4 = new MockRequest(createRequest(4, 8), false, 1000000);
-    MockRequest r5 = new MockRequest(createRequest(5, 10), false, 1000000);
+    MockRequest r1 = new MockRequest(createRequest(1, 2, 100), false, 100000);
+    MockRequest r2 = new MockRequest(createRequest(2, 4, 200), false, 100000);
+    MockRequest r3 = new MockRequest(createRequest(3, 6, 300), false, 1000000);
+    MockRequest r4 = new MockRequest(createRequest(4, 8, 400), false, 1000000);
+    MockRequest r5 = new MockRequest(createRequest(5, 10, 500), false, 1000000);
     EvictingPriorityBlockingQueue queue = new EvictingPriorityBlockingQueue(
         new TaskExecutorService.WaitQueueComparator(), 4);
     assertNull(queue.offer(r1));
@@ -128,11 +134,11 @@ public class TestTaskExecutorService {
     assertEquals(r3, queue.take());
     assertEquals(r4, queue.take());
 
-    r1 = new MockRequest(createRequest(1, 2), true, 100000);
-    r2 = new MockRequest(createRequest(2, 4), true, 100000);
-    r3 = new MockRequest(createRequest(3, 6), true, 1000000);
-    r4 = new MockRequest(createRequest(4, 8), true, 1000000);
-    r5 = new MockRequest(createRequest(5, 10), true, 1000000);
+    r1 = new MockRequest(createRequest(1, 2, 100), true, 100000);
+    r2 = new MockRequest(createRequest(2, 4, 200), true, 100000);
+    r3 = new MockRequest(createRequest(3, 6, 300), true, 1000000);
+    r4 = new MockRequest(createRequest(4, 8, 400), true, 1000000);
+    r5 = new MockRequest(createRequest(5, 10, 500), true, 1000000);
     queue = new EvictingPriorityBlockingQueue(
         new TaskExecutorService.WaitQueueComparator(), 4);
     assertNull(queue.offer(r1));
@@ -150,11 +156,11 @@ public class TestTaskExecutorService {
     assertEquals(r3, queue.take());
     assertEquals(r4, queue.take());
 
-    r1 = new MockRequest(createRequest(1, 1), true, 100000);
-    r2 = new MockRequest(createRequest(2, 1), false, 100000);
-    r3 = new MockRequest(createRequest(3, 1), true, 1000000);
-    r4 = new MockRequest(createRequest(4, 1), false, 1000000);
-    r5 = new MockRequest(createRequest(5, 10), true, 1000000);
+    r1 = new MockRequest(createRequest(1, 1, 100), true, 100000);
+    r2 = new MockRequest(createRequest(2, 1, 200), false, 100000);
+    r3 = new MockRequest(createRequest(3, 1, 300), true, 1000000);
+    r4 = new MockRequest(createRequest(4, 1, 400), false, 1000000);
+    r5 = new MockRequest(createRequest(5, 10, 500), true, 1000000);
     queue = new EvictingPriorityBlockingQueue(
         new TaskExecutorService.WaitQueueComparator(), 4);
     assertNull(queue.offer(r1));
@@ -162,23 +168,21 @@ public class TestTaskExecutorService {
     assertNull(queue.offer(r2));
     assertEquals(r1, queue.peek());
     assertNull(queue.offer(r3));
-    // same priority with r1
-    assertEquals(r3, queue.peek());
-    // same priority with r2
+    assertEquals(r1, queue.peek());
     assertNull(queue.offer(r4));
-    assertEquals(r3, queue.peek());
+    assertEquals(r1, queue.peek());
     // offer accepted and r2 gets evicted
-    assertEquals(r2, queue.offer(r5));
-    assertEquals(r3, queue.take());
+    assertEquals(r4, queue.offer(r5));
     assertEquals(r1, queue.take());
+    assertEquals(r3, queue.take());
     assertEquals(r5, queue.take());
-    assertEquals(r4, queue.take());
+    assertEquals(r2, queue.take());
 
-    r1 = new MockRequest(createRequest(1, 2), true, 100000);
-    r2 = new MockRequest(createRequest(2, 4), false, 100000);
-    r3 = new MockRequest(createRequest(3, 6), true, 1000000);
-    r4 = new MockRequest(createRequest(4, 8), false, 1000000);
-    r5 = new MockRequest(createRequest(5, 10), true, 1000000);
+    r1 = new MockRequest(createRequest(1, 2, 100), true, 100000);
+    r2 = new MockRequest(createRequest(2, 4, 200), false, 100000);
+    r3 = new MockRequest(createRequest(3, 6, 300), true, 1000000);
+    r4 = new MockRequest(createRequest(4, 8, 400), false, 1000000);
+    r5 = new MockRequest(createRequest(5, 10, 500), true, 1000000);
     queue = new EvictingPriorityBlockingQueue(
         new TaskExecutorService.WaitQueueComparator(), 4);
     assertNull(queue.offer(r1));
@@ -196,11 +200,11 @@ public class TestTaskExecutorService {
     assertEquals(r5, queue.take());
     assertEquals(r2, queue.take());
 
-    r1 = new MockRequest(createRequest(1, 2), true, 100000);
-    r2 = new MockRequest(createRequest(2, 4), false, 100000);
-    r3 = new MockRequest(createRequest(3, 6), false, 1000000);
-    r4 = new MockRequest(createRequest(4, 8), false, 1000000);
-    r5 = new MockRequest(createRequest(5, 10), true, 1000000);
+    r1 = new MockRequest(createRequest(1, 2, 100), true, 100000);
+    r2 = new MockRequest(createRequest(2, 4, 200), false, 100000);
+    r3 = new MockRequest(createRequest(3, 6, 300), false, 1000000);
+    r4 = new MockRequest(createRequest(4, 8, 400), false, 1000000);
+    r5 = new MockRequest(createRequest(5, 10, 500), true, 1000000);
     queue = new EvictingPriorityBlockingQueue(
         new TaskExecutorService.WaitQueueComparator(), 4);
     assertNull(queue.offer(r1));
@@ -218,11 +222,11 @@ public class TestTaskExecutorService {
     assertEquals(r2, queue.take());
     assertEquals(r3, queue.take());
 
-    r1 = new MockRequest(createRequest(1, 2), false, 100000);
-    r2 = new MockRequest(createRequest(2, 4), true, 100000);
-    r3 = new MockRequest(createRequest(3, 6), true, 1000000);
-    r4 = new MockRequest(createRequest(4, 8), true, 1000000);
-    r5 = new MockRequest(createRequest(5, 10), true, 1000000);
+    r1 = new MockRequest(createRequest(1, 2, 100), false, 100000);
+    r2 = new MockRequest(createRequest(2, 4, 200), true, 100000);
+    r3 = new MockRequest(createRequest(3, 6, 300), true, 1000000);
+    r4 = new MockRequest(createRequest(4, 8, 400), true, 1000000);
+    r5 = new MockRequest(createRequest(5, 10, 500), true, 1000000);
     queue = new EvictingPriorityBlockingQueue(
         new TaskExecutorService.WaitQueueComparator(), 4);
     assertNull(queue.offer(r1));
@@ -243,10 +247,10 @@ public class TestTaskExecutorService {
 
   @Test
   public void testPreemptionQueueComparator() throws InterruptedException {
-    MockRequest r1 = new MockRequest(createRequest(1, 2), false, 100000);
-    MockRequest r2 = new MockRequest(createRequest(2, 4), false, 100000);
-    MockRequest r3 = new MockRequest(createRequest(3, 6), false, 1000000);
-    MockRequest r4 = new MockRequest(createRequest(4, 8), false, 1000000);
+    MockRequest r1 = new MockRequest(createRequest(1, 2, 100), false, 100000);
+    MockRequest r2 = new MockRequest(createRequest(2, 4, 200), false, 100000);
+    MockRequest r3 = new MockRequest(createRequest(3, 6, 300), false, 1000000);
+    MockRequest r4 = new MockRequest(createRequest(4, 8, 400), false, 1000000);
     BlockingQueue queue = new PriorityBlockingQueue(4,
         new TaskExecutorService.PreemptionQueueComparator());
     queue.offer(r1);

http://git-wip-us.apache.org/repos/asf/hive/blob/37317b11/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
index 755f847..4e74a46 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
@@ -75,14 +75,14 @@ import jline.TerminalFactory;
 public class TezJobMonitor {
 
   private static final String CLASS_NAME = TezJobMonitor.class.getName();
-  private static final int MIN_TERMINAL_WIDTH = 92;
+  private static final int MIN_TERMINAL_WIDTH = 94;
   private static final int COLUMN_1_WIDTH = 16;
   private static final int SEPARATOR_WIDTH = MIN_TERMINAL_WIDTH;
 
   // keep this within 80 chars width. If more columns needs to be added then update min terminal
   // width requirement and separator width accordingly
-  private static final String HEADER_FORMAT = "%16s%10s %11s  %5s  %9s  %7s  %7s  %6s  %6s  ";
-  private static final String VERTEX_FORMAT = "%-16s%10s %11s  %5s  %9s  %7s  %7s  %6s  %6s  ";
+  private static final String HEADER_FORMAT = "%16s%10s %13s  %5s  %9s  %7s  %7s  %6s  %6s  ";
+  private static final String VERTEX_FORMAT = "%-16s%10s %13s  %5s  %9s  %7s  %7s  %6s  %6s  ";
   private static final String FOOTER_FORMAT = "%-15s  %-30s %-4s  %-25s";
   private static final String HEADER = String.format(HEADER_FORMAT,
       "VERTICES", "MODE", "STATUS", "TOTAL", "COMPLETED", "RUNNING", "PENDING", "FAILED", "KILLED");