You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2020/05/17 16:02:15 UTC
[hive] branch master updated: HIVE-23443 : LLAP speculative task
pre-emption seems to be not working (Prasanth J via Gopal V, Panos G)
This is an automated email from the ASF dual-hosted git repository.
hashutosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new e85731c HIVE-23443 : LLAP speculative task pre-emption seems to be not working (Prasanth J via Gopal V, Panos G)
e85731c is described below
commit e85731c42b6485412deefccf85f17e3ae9e0f403
Author: Prasanth Jayachandran <pr...@apache.org>
AuthorDate: Sun May 17 09:01:21 2020 -0700
HIVE-23443 : LLAP speculative task pre-emption seems to be not working (Prasanth J via Gopal V, Panos G)
Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
---
.../hive/llap/daemon/impl/TaskExecutorService.java | 22 +++-
.../llap/daemon/impl/TestTaskExecutorService.java | 134 +++++++++++++++++++++
2 files changed, 152 insertions(+), 4 deletions(-)
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
index d8b517d..1d6e852 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
@@ -867,7 +867,8 @@ public class TaskExecutorService extends AbstractService
return sc;
}
- private void finishableStateUpdated(TaskWrapper taskWrapper, boolean newFinishableState) {
+ @VisibleForTesting
+ void finishableStateUpdated(TaskWrapper taskWrapper, boolean newFinishableState) {
synchronized (lock) {
LOG.debug("Fragment {} guaranteed state changed to {}; finishable {}, in wait queue {}, "
+ "in preemption queue {}", taskWrapper.getRequestId(), taskWrapper.isGuaranteed(),
@@ -884,10 +885,20 @@ public class TaskExecutorService extends AbstractService
taskWrapper.updateCanFinishForPriority(newFinishableState);
forceReinsertIntoQueue(taskWrapper, isRemoved);
} else {
- taskWrapper.updateCanFinishForPriority(newFinishableState);
- if (!newFinishableState && !taskWrapper.isInPreemptionQueue()) {
- // No need to check guaranteed here; if it was false we would already be in the queue.
+ // if speculative task, any finishable state change should re-order the queue as speculative tasks are always
+ // not-guaranteed (re-order helps put non-finishable's ahead of finishable)
+ if (!taskWrapper.isGuaranteed()) {
+ removeFromPreemptionQueue(taskWrapper);
+ taskWrapper.updateCanFinishForPriority(newFinishableState);
addToPreemptionQueue(taskWrapper);
+ } else {
+ // if guaranteed task, if the finishable state changed to non-finishable and if the task doesn't exist
+ // pre-emption queue, then add it so that it becomes candidate to kill
+ taskWrapper.updateCanFinishForPriority(newFinishableState);
+ if (!newFinishableState && !taskWrapper.isInPreemptionQueue()) {
+ // No need to check guaranteed here; if it was false we would already be in the queue.
+ addToPreemptionQueue(taskWrapper);
+ }
}
}
@@ -896,6 +907,9 @@ public class TaskExecutorService extends AbstractService
}
private void addToPreemptionQueue(TaskWrapper taskWrapper) {
+ if (taskWrapper.isInPreemptionQueue()) {
+ return;
+ }
synchronized (lock) {
insertIntoPreemptionQueueOrFailUnlocked(taskWrapper);
taskWrapper.setIsInPreemptableQueue(true);
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
index ce9fce9..ff61fdd 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
@@ -236,6 +236,140 @@ public class TestTaskExecutorService {
}
}
+ @Test(timeout = 10000)
+ public void testPreemptionQueueOnFinishableStateUpdates() throws InterruptedException {
+
+ long r1WorkTime = 1000L;
+ long r2WorkTime = 2000L;
+ long r3WorkTime = 2000L;
+ // all tasks start with non-finishable state
+ MockRequest r1 = createMockRequest(1, 2, 100, 200, false, r1WorkTime, false);
+ MockRequest r2 = createMockRequest(2, 1, 100, 200, false, r2WorkTime, false);
+ MockRequest r3 = createMockRequest(3, 3, 50, 200, false, r3WorkTime, false);
+
+
+ TaskExecutorServiceForTest taskExecutorService =
+ new TaskExecutorServiceForTest(4, 2, ShortestJobFirstComparator.class.getName(), true, mockMetrics);
+ taskExecutorService.init(new Configuration());
+ taskExecutorService.start();
+
+ try {
+ String fragmentId1 = r1.getRequestId();
+ Scheduler.SubmissionState submissionState1 = taskExecutorService.schedule(r1);
+ assertEquals(Scheduler.SubmissionState.ACCEPTED, submissionState1);
+ awaitStartAndSchedulerRun(r1, taskExecutorService);
+
+ String fragmentId2 = r2.getRequestId();
+ Scheduler.SubmissionState submissionState2 = taskExecutorService.schedule(r2);
+ assertEquals(Scheduler.SubmissionState.ACCEPTED, submissionState2);
+ awaitStartAndSchedulerRun(r2, taskExecutorService);
+
+ String fragmentId3 = r3.getRequestId();
+ Scheduler.SubmissionState submissionState3 = taskExecutorService.schedule(r3);
+ assertEquals(Scheduler.SubmissionState.ACCEPTED, submissionState3);
+ awaitStartAndSchedulerRun(r3, taskExecutorService);
+
+ TaskWrapper taskWrapper = taskExecutorService.preemptionQueue.peek();
+ assertNotNull(taskWrapper);
+ assertTrue(taskWrapper.isInPreemptionQueue());
+ // all tasks are non-finishables, r2 has min tasks
+ assertEquals(fragmentId2, taskWrapper.getRequestId());
+ assertEquals(3, taskExecutorService.preemptionQueue.size());
+
+ // to let us set the finishable state for tests
+ r1.setCanUpdateFinishable();
+ r2.setCanUpdateFinishable();
+ r3.setCanUpdateFinishable();
+
+ TaskWrapper taskWrapper1 = taskExecutorService.knownTasks.get(fragmentId1);
+ TaskWrapper taskWrapper2 = taskExecutorService.knownTasks.get(fragmentId2);
+ TaskWrapper taskWrapper3 = taskExecutorService.knownTasks.get(fragmentId3);
+
+ // r2 is finishable now, so it should go to back of pre-emption queue.
+ taskExecutorService.finishableStateUpdated(taskWrapper2, true);
+ taskWrapper = taskExecutorService.preemptionQueue.peek();
+ assertNotNull(taskWrapper);
+ assertTrue(taskWrapper.isInPreemptionQueue());
+ // r1 is smallest among non-finishables, so should be first in queue
+ assertEquals(fragmentId1, taskWrapper.getRequestId());
+ assertFalse(taskWrapper.canFinishForPriority());
+ assertEquals(3, taskExecutorService.preemptionQueue.size());
+
+ // r1 is finishable now, so it should go to back of pre-emption queue.
+ taskExecutorService.finishableStateUpdated(taskWrapper1, true);
+ taskWrapper = taskExecutorService.preemptionQueue.peek();
+ assertNotNull(taskWrapper);
+ assertTrue(taskWrapper.isInPreemptionQueue());
+ // r3 is the only non-finishable
+ assertEquals(fragmentId3, taskWrapper.getRequestId());
+ assertFalse(taskWrapper.canFinishForPriority());
+ assertEquals(3, taskExecutorService.preemptionQueue.size());
+
+ // r3 is finishable now, so it should go to back of pre-emption queue.
+ taskExecutorService.finishableStateUpdated(taskWrapper3, true);
+ taskWrapper = taskExecutorService.preemptionQueue.peek();
+ assertNotNull(taskWrapper);
+ assertTrue(taskWrapper.isInPreemptionQueue());
+ // no more non-finishables left, r2 is smallest among the finishables
+ assertEquals(fragmentId2, taskWrapper.getRequestId());
+ assertTrue(taskWrapper.canFinishForPriority());
+ assertEquals(3, taskExecutorService.preemptionQueue.size());
+
+ // double notification test (nothing should change from the above sequence)
+ taskExecutorService.finishableStateUpdated(taskWrapper3, true);taskWrapper = taskExecutorService.preemptionQueue.peek();
+ assertNotNull(taskWrapper);
+ assertTrue(taskWrapper.isInPreemptionQueue());
+ // no more non-finishables left, r2 is smallest among the finishables
+ assertEquals(fragmentId2, taskWrapper.getRequestId());
+ assertTrue(taskWrapper.canFinishForPriority());
+ assertEquals(3, taskExecutorService.preemptionQueue.size());
+
+ // remove r2 from scheduler
+ taskExecutorService.killFragment(fragmentId2);
+
+ taskWrapper = taskExecutorService.preemptionQueue.peek();
+ assertNotNull(taskWrapper);
+ assertTrue(taskWrapper.isInPreemptionQueue());
+ // no more non-finishables left, r1 is the smallest among the finishables
+ assertEquals(fragmentId1, taskWrapper.getRequestId());
+ assertTrue(taskWrapper.canFinishForPriority());
+ assertEquals(2, taskExecutorService.preemptionQueue.size());
+
+ // make r3 as non-finishable and make sure its at top of queue
+ taskExecutorService.finishableStateUpdated(taskWrapper3, false);
+ taskWrapper = taskExecutorService.preemptionQueue.peek();
+ assertNotNull(taskWrapper);
+ assertTrue(taskWrapper.isInPreemptionQueue());
+ // r3 is non-finishable and should be at top
+ assertEquals(fragmentId3, taskWrapper.getRequestId());
+ assertFalse(taskWrapper.canFinishForPriority());
+ assertEquals(2, taskExecutorService.preemptionQueue.size());
+ // make sure the task is not added twice to pre-emption queue
+ taskExecutorService.tryScheduleUnderLock(taskWrapper);
+ assertEquals(2, taskExecutorService.preemptionQueue.size());
+
+ // remove r3 from scheduler
+ taskExecutorService.killFragment(fragmentId3);
+
+ taskWrapper = taskExecutorService.preemptionQueue.peek();
+ assertNotNull(taskWrapper);
+ assertTrue(taskWrapper.isInPreemptionQueue());
+ // r1 is the only one left in queue and is finishable
+ assertEquals(fragmentId1, taskWrapper.getRequestId());
+ assertTrue(taskWrapper.canFinishForPriority());
+ assertEquals(1, taskExecutorService.preemptionQueue.size());
+
+ // remove r1 from scheduler
+ taskExecutorService.killFragment(fragmentId1);
+
+ // no more left in queue
+ taskWrapper = taskExecutorService.preemptionQueue.peek();
+ assertNull(taskWrapper);
+ } finally {
+ taskExecutorService.shutDown(false);
+ }
+ }
+
// Tests wait queue behaviour for fragments which have reported to the AM, but have not given up their executor slot.
@Test (timeout = 10000)
public void testWaitQueueAcceptAfterAMTaskReport() throws