You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2020/05/17 16:02:15 UTC

[hive] branch master updated: HIVE-23443 : LLAP speculative task pre-emption seems to be not working (Prasanth J via Gopal V, Panos G)

This is an automated email from the ASF dual-hosted git repository.

hashutosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new e85731c  HIVE-23443 : LLAP speculative task pre-emption seems to be not working (Prasanth J via Gopal V, Panos G)
e85731c is described below

commit e85731c42b6485412deefccf85f17e3ae9e0f403
Author: Prasanth Jayachandran <pr...@apache.org>
AuthorDate: Sun May 17 09:01:21 2020 -0700

    HIVE-23443 : LLAP speculative task pre-emption seems to be not working (Prasanth J via Gopal V, Panos G)
    
    Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
---
 .../hive/llap/daemon/impl/TaskExecutorService.java |  22 +++-
 .../llap/daemon/impl/TestTaskExecutorService.java  | 134 +++++++++++++++++++++
 2 files changed, 152 insertions(+), 4 deletions(-)

diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
index d8b517d..1d6e852 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
@@ -867,7 +867,8 @@ public class TaskExecutorService extends AbstractService
     return sc;
   }
 
-  private void finishableStateUpdated(TaskWrapper taskWrapper, boolean newFinishableState) {
+  @VisibleForTesting
+  void finishableStateUpdated(TaskWrapper taskWrapper, boolean newFinishableState) {
     synchronized (lock) {
       LOG.debug("Fragment {} guaranteed state changed to {}; finishable {}, in wait queue {}, "
           + "in preemption queue {}", taskWrapper.getRequestId(), taskWrapper.isGuaranteed(),
@@ -884,10 +885,20 @@ public class TaskExecutorService extends AbstractService
         taskWrapper.updateCanFinishForPriority(newFinishableState);
         forceReinsertIntoQueue(taskWrapper, isRemoved);
       } else {
-        taskWrapper.updateCanFinishForPriority(newFinishableState);
-        if (!newFinishableState && !taskWrapper.isInPreemptionQueue()) {
-          // No need to check guaranteed here; if it was false we would already be in the queue.
+        // if speculative task, any finishable state change should re-order the queue as speculative tasks are always
+        // not-guaranteed (re-order helps put non-finishable's ahead of finishable)
+        if (!taskWrapper.isGuaranteed()) {
+          removeFromPreemptionQueue(taskWrapper);
+          taskWrapper.updateCanFinishForPriority(newFinishableState);
           addToPreemptionQueue(taskWrapper);
+        } else {
+          // if guaranteed task, if the finishable state changed to non-finishable and if the task doesn't exist
+          // pre-emption queue, then add it so that it becomes candidate to kill
+          taskWrapper.updateCanFinishForPriority(newFinishableState);
+          if (!newFinishableState && !taskWrapper.isInPreemptionQueue()) {
+            // No need to check guaranteed here; if it was false we would already be in the queue.
+            addToPreemptionQueue(taskWrapper);
+          }
         }
       }
 
@@ -896,6 +907,9 @@ public class TaskExecutorService extends AbstractService
   }
 
   private void addToPreemptionQueue(TaskWrapper taskWrapper) {
+    if (taskWrapper.isInPreemptionQueue()) {
+      return;
+    }
     synchronized (lock) {
       insertIntoPreemptionQueueOrFailUnlocked(taskWrapper);
       taskWrapper.setIsInPreemptableQueue(true);
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
index ce9fce9..ff61fdd 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
@@ -236,6 +236,140 @@ public class TestTaskExecutorService {
     }
   }
 
+  @Test(timeout = 10000)
+  public void testPreemptionQueueOnFinishableStateUpdates() throws InterruptedException {
+
+    long r1WorkTime = 1000L;
+    long r2WorkTime = 2000L;
+    long r3WorkTime = 2000L;
+    // all tasks start with non-finishable state
+    MockRequest r1 = createMockRequest(1, 2, 100, 200, false, r1WorkTime, false);
+    MockRequest r2 = createMockRequest(2, 1, 100, 200, false, r2WorkTime, false);
+    MockRequest r3 = createMockRequest(3, 3, 50, 200, false, r3WorkTime, false);
+
+
+    TaskExecutorServiceForTest taskExecutorService =
+      new TaskExecutorServiceForTest(4, 2, ShortestJobFirstComparator.class.getName(), true, mockMetrics);
+    taskExecutorService.init(new Configuration());
+    taskExecutorService.start();
+
+    try {
+      String fragmentId1 = r1.getRequestId();
+      Scheduler.SubmissionState submissionState1 = taskExecutorService.schedule(r1);
+      assertEquals(Scheduler.SubmissionState.ACCEPTED, submissionState1);
+      awaitStartAndSchedulerRun(r1, taskExecutorService);
+
+      String fragmentId2 = r2.getRequestId();
+      Scheduler.SubmissionState submissionState2 = taskExecutorService.schedule(r2);
+      assertEquals(Scheduler.SubmissionState.ACCEPTED, submissionState2);
+      awaitStartAndSchedulerRun(r2, taskExecutorService);
+
+      String fragmentId3 = r3.getRequestId();
+      Scheduler.SubmissionState submissionState3 = taskExecutorService.schedule(r3);
+      assertEquals(Scheduler.SubmissionState.ACCEPTED, submissionState3);
+      awaitStartAndSchedulerRun(r3, taskExecutorService);
+
+      TaskWrapper taskWrapper = taskExecutorService.preemptionQueue.peek();
+      assertNotNull(taskWrapper);
+      assertTrue(taskWrapper.isInPreemptionQueue());
+      // all tasks are non-finishables, r2 has min tasks
+      assertEquals(fragmentId2, taskWrapper.getRequestId());
+      assertEquals(3, taskExecutorService.preemptionQueue.size());
+
+      // to let us set the finishable state for tests
+      r1.setCanUpdateFinishable();
+      r2.setCanUpdateFinishable();
+      r3.setCanUpdateFinishable();
+
+      TaskWrapper taskWrapper1 = taskExecutorService.knownTasks.get(fragmentId1);
+      TaskWrapper taskWrapper2 = taskExecutorService.knownTasks.get(fragmentId2);
+      TaskWrapper taskWrapper3 = taskExecutorService.knownTasks.get(fragmentId3);
+
+      // r2 is finishable now, so it should go to back of pre-emption queue.
+      taskExecutorService.finishableStateUpdated(taskWrapper2, true);
+      taskWrapper = taskExecutorService.preemptionQueue.peek();
+      assertNotNull(taskWrapper);
+      assertTrue(taskWrapper.isInPreemptionQueue());
+      // r1 is smallest among non-finishables, so should be first in queue
+      assertEquals(fragmentId1, taskWrapper.getRequestId());
+      assertFalse(taskWrapper.canFinishForPriority());
+      assertEquals(3, taskExecutorService.preemptionQueue.size());
+
+      // r1 is finishable now, so it should go to back of pre-emption queue.
+      taskExecutorService.finishableStateUpdated(taskWrapper1, true);
+      taskWrapper = taskExecutorService.preemptionQueue.peek();
+      assertNotNull(taskWrapper);
+      assertTrue(taskWrapper.isInPreemptionQueue());
+      // r3 is the only non-finishable
+      assertEquals(fragmentId3, taskWrapper.getRequestId());
+      assertFalse(taskWrapper.canFinishForPriority());
+      assertEquals(3, taskExecutorService.preemptionQueue.size());
+
+      // r3 is finishable now, so it should go to back of pre-emption queue.
+      taskExecutorService.finishableStateUpdated(taskWrapper3, true);
+      taskWrapper = taskExecutorService.preemptionQueue.peek();
+      assertNotNull(taskWrapper);
+      assertTrue(taskWrapper.isInPreemptionQueue());
+      // no more non-finishables left, r2 is smallest among the finishables
+      assertEquals(fragmentId2, taskWrapper.getRequestId());
+      assertTrue(taskWrapper.canFinishForPriority());
+      assertEquals(3, taskExecutorService.preemptionQueue.size());
+
+      // double notification test (nothing should change from the above sequence)
+      taskExecutorService.finishableStateUpdated(taskWrapper3, true);taskWrapper = taskExecutorService.preemptionQueue.peek();
+      assertNotNull(taskWrapper);
+      assertTrue(taskWrapper.isInPreemptionQueue());
+      // no more non-finishables left, r2 is smallest among the finishables
+      assertEquals(fragmentId2, taskWrapper.getRequestId());
+      assertTrue(taskWrapper.canFinishForPriority());
+      assertEquals(3, taskExecutorService.preemptionQueue.size());
+
+      // remove r2 from scheduler
+      taskExecutorService.killFragment(fragmentId2);
+
+      taskWrapper = taskExecutorService.preemptionQueue.peek();
+      assertNotNull(taskWrapper);
+      assertTrue(taskWrapper.isInPreemptionQueue());
+      // no more non-finishables left, r1 is the smallest among the finishables
+      assertEquals(fragmentId1, taskWrapper.getRequestId());
+      assertTrue(taskWrapper.canFinishForPriority());
+      assertEquals(2, taskExecutorService.preemptionQueue.size());
+
+      // make r3 as non-finishable and make sure its at top of queue
+      taskExecutorService.finishableStateUpdated(taskWrapper3, false);
+      taskWrapper = taskExecutorService.preemptionQueue.peek();
+      assertNotNull(taskWrapper);
+      assertTrue(taskWrapper.isInPreemptionQueue());
+      // r3 is non-finishable and should be at top
+      assertEquals(fragmentId3, taskWrapper.getRequestId());
+      assertFalse(taskWrapper.canFinishForPriority());
+      assertEquals(2, taskExecutorService.preemptionQueue.size());
+      // make sure the task is not added twice to pre-emption queue
+      taskExecutorService.tryScheduleUnderLock(taskWrapper);
+      assertEquals(2, taskExecutorService.preemptionQueue.size());
+
+      // remove r3 from scheduler
+      taskExecutorService.killFragment(fragmentId3);
+
+      taskWrapper = taskExecutorService.preemptionQueue.peek();
+      assertNotNull(taskWrapper);
+      assertTrue(taskWrapper.isInPreemptionQueue());
+      // r1 is the only one left in queue and is finishable
+      assertEquals(fragmentId1, taskWrapper.getRequestId());
+      assertTrue(taskWrapper.canFinishForPriority());
+      assertEquals(1, taskExecutorService.preemptionQueue.size());
+
+      // remove r1 from scheduler
+      taskExecutorService.killFragment(fragmentId1);
+
+      // no more left in queue
+      taskWrapper = taskExecutorService.preemptionQueue.peek();
+      assertNull(taskWrapper);
+    } finally {
+      taskExecutorService.shutDown(false);
+    }
+  }
+
   // Tests wait queue behaviour for fragments which have reported to the AM, but have not given up their executor slot.
   @Test (timeout = 10000)
   public void testWaitQueueAcceptAfterAMTaskReport() throws