You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2015/05/20 03:04:19 UTC
hive git commit: HIVE-10759: LLAP: Add aging to wait queue tasks
(Prasanth Jayachandran)
Repository: hive
Updated Branches:
refs/heads/llap f0175bc8d -> 37317b11f
HIVE-10759: LLAP: Add aging to wait queue tasks (Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/37317b11
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/37317b11
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/37317b11
Branch: refs/heads/llap
Commit: 37317b11fcb669bbfba95e60fece7ef1a62be287
Parents: f0175bc
Author: Prasanth Jayachandran <j....@gmail.com>
Authored: Tue May 19 18:04:10 2015 -0700
Committer: Prasanth Jayachandran <j....@gmail.com>
Committed: Tue May 19 18:04:10 2015 -0700
----------------------------------------------------------------------
.../impl/EvictingPriorityBlockingQueue.java | 5 ++
.../llap/daemon/impl/TaskExecutorService.java | 15 +++-
.../llap/daemon/impl/TaskRunnerCallable.java | 10 ++-
.../daemon/impl/TestTaskExecutorService.java | 90 ++++++++++----------
.../hadoop/hive/ql/exec/tez/TezJobMonitor.java | 6 +-
5 files changed, 75 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/37317b11/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java
index 101a69c..926835b 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java
@@ -64,4 +64,9 @@ public class EvictingPriorityBlockingQueue<E> {
public synchronized void remove(E e) {
deque.remove(e);
}
+
+ @Override
+ public synchronized String toString() {
+ return deque.toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/37317b11/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
index 5323f05..599c759 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
@@ -186,6 +186,9 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> {
LOG.info(task.getRequestId() + " added to wait queue.");
}
+ if (isDebugEnabled) {
+ LOG.debug("Wait Queue: {}", waitQueue);
+ }
synchronized (waitLock) {
waitLock.notify();
}
@@ -227,7 +230,7 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> {
if (enablePreemption && task.canFinish() && !preemptionQueue.isEmpty()) {
if (isDebugEnabled) {
- LOG.trace("preemptionQueue: " + preemptionQueue);
+ LOG.debug("Preemption Queue: " + preemptionQueue);
}
TaskRunnerCallable pRequest = preemptionQueue.remove();
@@ -334,10 +337,16 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> {
return 1;
}
- if (o1.getVertexParallelism() > o2.getVertexParallelism()) {
+ if (o1.getVertexParallelism() < o2.getVertexParallelism()) {
+ return -1;
+ } else if (o1.getVertexParallelism() > o2.getVertexParallelism()) {
return 1;
- } else if (o1.getVertexParallelism() < o2.getVertexParallelism()) {
+ }
+
+ if (o1.getFirstAttemptStartTime() < o2.getFirstAttemptStartTime()) {
return -1;
+ } else if (o1.getFirstAttemptStartTime() > o2.getFirstAttemptStartTime()) {
+ return 1;
}
return 0;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/37317b11/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index d1b1c61..94512d6 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -98,7 +98,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
private volatile long startTime;
private volatile String threadName;
private LlapDaemonExecutorMetrics metrics;
- protected String requestId;
+ private final String requestId;
private boolean shouldRunTask = true;
final Stopwatch runtimeWatch = new Stopwatch();
final Stopwatch killtimerWatch = new Stopwatch();
@@ -335,7 +335,9 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
@Override
public String toString() {
- return requestId;
+ return requestId + " {canFinish: " + canFinish() +
+ " vertexParallelism: " + getVertexParallelism() +
+ " firstAttemptStartTime: " + getFirstAttemptStartTime() + "}";
}
@Override
@@ -470,4 +472,8 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
private String getTaskAttemptId(SubmitWorkRequestProto request) {
return request.getFragmentSpec().getTaskAttemptIdString();
}
+
+ public long getFirstAttemptStartTime() {
+ return request.getFragmentRuntimeInfo().getFirstAttemptStartTime();
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/37317b11/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
index 740a2ca..f0e53a7 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
@@ -80,7 +80,7 @@ public class TestTaskExecutorService {
conf = new Configuration();
}
- private SubmitWorkRequestProto createRequest(int fragmentNumber, int parallelism) {
+ private SubmitWorkRequestProto createRequest(int fragmentNumber, int parallelism, int attemptStartTime) {
ApplicationId appId = ApplicationId.newInstance(9999, 72);
TezDAGID dagId = TezDAGID.getInstance(appId, 1);
TezVertexID vId = TezVertexID.getInstance(dagId, 35);
@@ -101,16 +101,22 @@ public class TestTaskExecutorService {
.setTaskAttemptIdString(taId.toString()).build()).setAmHost("localhost")
.setAmPort(12345).setAppAttemptNumber(0).setApplicationIdString("MockApp_1")
.setContainerIdString("MockContainer_1").setUser("MockUser")
- .setTokenIdentifier("MockToken_1").build();
+ .setTokenIdentifier("MockToken_1")
+ .setFragmentRuntimeInfo(LlapDaemonProtocolProtos
+ .FragmentRuntimeInfo
+ .newBuilder()
+ .setFirstAttemptStartTime(attemptStartTime)
+ .build())
+ .build();
}
@Test
public void testWaitQueueComparator() throws InterruptedException {
- MockRequest r1 = new MockRequest(createRequest(1, 2), false, 100000);
- MockRequest r2 = new MockRequest(createRequest(2, 4), false, 100000);
- MockRequest r3 = new MockRequest(createRequest(3, 6), false, 1000000);
- MockRequest r4 = new MockRequest(createRequest(4, 8), false, 1000000);
- MockRequest r5 = new MockRequest(createRequest(5, 10), false, 1000000);
+ MockRequest r1 = new MockRequest(createRequest(1, 2, 100), false, 100000);
+ MockRequest r2 = new MockRequest(createRequest(2, 4, 200), false, 100000);
+ MockRequest r3 = new MockRequest(createRequest(3, 6, 300), false, 1000000);
+ MockRequest r4 = new MockRequest(createRequest(4, 8, 400), false, 1000000);
+ MockRequest r5 = new MockRequest(createRequest(5, 10, 500), false, 1000000);
EvictingPriorityBlockingQueue queue = new EvictingPriorityBlockingQueue(
new TaskExecutorService.WaitQueueComparator(), 4);
assertNull(queue.offer(r1));
@@ -128,11 +134,11 @@ public class TestTaskExecutorService {
assertEquals(r3, queue.take());
assertEquals(r4, queue.take());
- r1 = new MockRequest(createRequest(1, 2), true, 100000);
- r2 = new MockRequest(createRequest(2, 4), true, 100000);
- r3 = new MockRequest(createRequest(3, 6), true, 1000000);
- r4 = new MockRequest(createRequest(4, 8), true, 1000000);
- r5 = new MockRequest(createRequest(5, 10), true, 1000000);
+ r1 = new MockRequest(createRequest(1, 2, 100), true, 100000);
+ r2 = new MockRequest(createRequest(2, 4, 200), true, 100000);
+ r3 = new MockRequest(createRequest(3, 6, 300), true, 1000000);
+ r4 = new MockRequest(createRequest(4, 8, 400), true, 1000000);
+ r5 = new MockRequest(createRequest(5, 10, 500), true, 1000000);
queue = new EvictingPriorityBlockingQueue(
new TaskExecutorService.WaitQueueComparator(), 4);
assertNull(queue.offer(r1));
@@ -150,11 +156,11 @@ public class TestTaskExecutorService {
assertEquals(r3, queue.take());
assertEquals(r4, queue.take());
- r1 = new MockRequest(createRequest(1, 1), true, 100000);
- r2 = new MockRequest(createRequest(2, 1), false, 100000);
- r3 = new MockRequest(createRequest(3, 1), true, 1000000);
- r4 = new MockRequest(createRequest(4, 1), false, 1000000);
- r5 = new MockRequest(createRequest(5, 10), true, 1000000);
+ r1 = new MockRequest(createRequest(1, 1, 100), true, 100000);
+ r2 = new MockRequest(createRequest(2, 1, 200), false, 100000);
+ r3 = new MockRequest(createRequest(3, 1, 300), true, 1000000);
+ r4 = new MockRequest(createRequest(4, 1, 400), false, 1000000);
+ r5 = new MockRequest(createRequest(5, 10, 500), true, 1000000);
queue = new EvictingPriorityBlockingQueue(
new TaskExecutorService.WaitQueueComparator(), 4);
assertNull(queue.offer(r1));
@@ -162,23 +168,21 @@ public class TestTaskExecutorService {
assertNull(queue.offer(r2));
assertEquals(r1, queue.peek());
assertNull(queue.offer(r3));
- // same priority with r1
- assertEquals(r3, queue.peek());
- // same priority with r2
+ assertEquals(r1, queue.peek());
assertNull(queue.offer(r4));
- assertEquals(r3, queue.peek());
+ assertEquals(r1, queue.peek());
// offer accepted and r2 gets evicted
- assertEquals(r2, queue.offer(r5));
- assertEquals(r3, queue.take());
+ assertEquals(r4, queue.offer(r5));
assertEquals(r1, queue.take());
+ assertEquals(r3, queue.take());
assertEquals(r5, queue.take());
- assertEquals(r4, queue.take());
+ assertEquals(r2, queue.take());
- r1 = new MockRequest(createRequest(1, 2), true, 100000);
- r2 = new MockRequest(createRequest(2, 4), false, 100000);
- r3 = new MockRequest(createRequest(3, 6), true, 1000000);
- r4 = new MockRequest(createRequest(4, 8), false, 1000000);
- r5 = new MockRequest(createRequest(5, 10), true, 1000000);
+ r1 = new MockRequest(createRequest(1, 2, 100), true, 100000);
+ r2 = new MockRequest(createRequest(2, 4, 200), false, 100000);
+ r3 = new MockRequest(createRequest(3, 6, 300), true, 1000000);
+ r4 = new MockRequest(createRequest(4, 8, 400), false, 1000000);
+ r5 = new MockRequest(createRequest(5, 10, 500), true, 1000000);
queue = new EvictingPriorityBlockingQueue(
new TaskExecutorService.WaitQueueComparator(), 4);
assertNull(queue.offer(r1));
@@ -196,11 +200,11 @@ public class TestTaskExecutorService {
assertEquals(r5, queue.take());
assertEquals(r2, queue.take());
- r1 = new MockRequest(createRequest(1, 2), true, 100000);
- r2 = new MockRequest(createRequest(2, 4), false, 100000);
- r3 = new MockRequest(createRequest(3, 6), false, 1000000);
- r4 = new MockRequest(createRequest(4, 8), false, 1000000);
- r5 = new MockRequest(createRequest(5, 10), true, 1000000);
+ r1 = new MockRequest(createRequest(1, 2, 100), true, 100000);
+ r2 = new MockRequest(createRequest(2, 4, 200), false, 100000);
+ r3 = new MockRequest(createRequest(3, 6, 300), false, 1000000);
+ r4 = new MockRequest(createRequest(4, 8, 400), false, 1000000);
+ r5 = new MockRequest(createRequest(5, 10, 500), true, 1000000);
queue = new EvictingPriorityBlockingQueue(
new TaskExecutorService.WaitQueueComparator(), 4);
assertNull(queue.offer(r1));
@@ -218,11 +222,11 @@ public class TestTaskExecutorService {
assertEquals(r2, queue.take());
assertEquals(r3, queue.take());
- r1 = new MockRequest(createRequest(1, 2), false, 100000);
- r2 = new MockRequest(createRequest(2, 4), true, 100000);
- r3 = new MockRequest(createRequest(3, 6), true, 1000000);
- r4 = new MockRequest(createRequest(4, 8), true, 1000000);
- r5 = new MockRequest(createRequest(5, 10), true, 1000000);
+ r1 = new MockRequest(createRequest(1, 2, 100), false, 100000);
+ r2 = new MockRequest(createRequest(2, 4, 200), true, 100000);
+ r3 = new MockRequest(createRequest(3, 6, 300), true, 1000000);
+ r4 = new MockRequest(createRequest(4, 8, 400), true, 1000000);
+ r5 = new MockRequest(createRequest(5, 10, 500), true, 1000000);
queue = new EvictingPriorityBlockingQueue(
new TaskExecutorService.WaitQueueComparator(), 4);
assertNull(queue.offer(r1));
@@ -243,10 +247,10 @@ public class TestTaskExecutorService {
@Test
public void testPreemptionQueueComparator() throws InterruptedException {
- MockRequest r1 = new MockRequest(createRequest(1, 2), false, 100000);
- MockRequest r2 = new MockRequest(createRequest(2, 4), false, 100000);
- MockRequest r3 = new MockRequest(createRequest(3, 6), false, 1000000);
- MockRequest r4 = new MockRequest(createRequest(4, 8), false, 1000000);
+ MockRequest r1 = new MockRequest(createRequest(1, 2, 100), false, 100000);
+ MockRequest r2 = new MockRequest(createRequest(2, 4, 200), false, 100000);
+ MockRequest r3 = new MockRequest(createRequest(3, 6, 300), false, 1000000);
+ MockRequest r4 = new MockRequest(createRequest(4, 8, 400), false, 1000000);
BlockingQueue queue = new PriorityBlockingQueue(4,
new TaskExecutorService.PreemptionQueueComparator());
queue.offer(r1);
http://git-wip-us.apache.org/repos/asf/hive/blob/37317b11/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
index 755f847..4e74a46 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
@@ -75,14 +75,14 @@ import jline.TerminalFactory;
public class TezJobMonitor {
private static final String CLASS_NAME = TezJobMonitor.class.getName();
- private static final int MIN_TERMINAL_WIDTH = 92;
+ private static final int MIN_TERMINAL_WIDTH = 94;
private static final int COLUMN_1_WIDTH = 16;
private static final int SEPARATOR_WIDTH = MIN_TERMINAL_WIDTH;
// keep this within 80 chars width. If more columns needs to be added then update min terminal
// width requirement and separator width accordingly
- private static final String HEADER_FORMAT = "%16s%10s %11s %5s %9s %7s %7s %6s %6s ";
- private static final String VERTEX_FORMAT = "%-16s%10s %11s %5s %9s %7s %7s %6s %6s ";
+ private static final String HEADER_FORMAT = "%16s%10s %13s %5s %9s %7s %7s %6s %6s ";
+ private static final String VERTEX_FORMAT = "%-16s%10s %13s %5s %9s %7s %7s %6s %6s ";
private static final String FOOTER_FORMAT = "%-15s %-30s %-4s %-25s";
private static final String HEADER = String.format(HEADER_FORMAT,
"VERTICES", "MODE", "STATUS", "TOTAL", "COMPLETED", "RUNNING", "PENDING", "FAILED", "KILLED");