You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ss...@apache.org on 2015/08/31 23:55:47 UTC

hive git commit: HIVE-11660. LLAP: TestTaskExecutorService, TestLlapTaskSchedulerService are flaky. Also fixes a real scheduling issue in LlapTaskSchedulerService. (Siddharth Seth)

Repository: hive
Updated Branches:
  refs/heads/llap 025765382 -> 2faf01eed


HIVE-11660. LLAP: TestTaskExecutorService, TestLlapTaskSchedulerService are flaky. Also fixes a real scheduling issue in LlapTaskSchedulerService. (Siddharth Seth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2faf01ee
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2faf01ee
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2faf01ee

Branch: refs/heads/llap
Commit: 2faf01eedaa32469f06d5408f599823e18096e28
Parents: 0257653
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Aug 31 14:54:35 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Aug 31 14:54:35 2015 -0700

----------------------------------------------------------------------
 .../llap/daemon/impl/TaskExecutorService.java   | 73 +++++++++++---------
 .../dag/app/rm/LlapTaskSchedulerService.java    | 12 +++-
 .../daemon/impl/TaskExecutorTestHelpers.java    |  7 +-
 .../daemon/impl/TestTaskExecutorService.java    | 48 +++++++++++--
 .../app/rm/TestLlapTaskSchedulerService.java    | 19 ++---
 5 files changed, 110 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/2faf01ee/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
index badeb63..875aef6 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
@@ -223,6 +223,7 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
 
 
         while (!isShutdown.get()) {
+          RejectedExecutionException rejectedException = null;
           synchronized (lock) {
             // Since schedule() can be called from multiple threads, we peek the wait queue,
             // try scheduling the task and then remove the task if scheduling is successful.
@@ -259,17 +260,21 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
               // queue again to pick up the task at the highest priority.
               continue;
             }
-          }
-
-          boolean scheduled = trySchedule(task);
-          if (scheduled) {
-            // wait queue could have been re-ordered in the mean time because of concurrent task
-            // submission. So remove the specific task instead of the head task.
-            synchronized (lock) {
+            try {
+              trySchedule(task);
+              // wait queue could have been re-ordered in the mean time because of concurrent task
+              // submission. So remove the specific task instead of the head task.
               waitQueue.remove(task);
+            } catch (RejectedExecutionException e) {
+              rejectedException = e;
             }
           }
 
+          // Handle the rejection outside of the lock
+          if (rejectedException !=null) {
+            handleScheduleAttemptedRejection(task);
+          }
+
           synchronized (lock) {
             while (waitQueue.isEmpty()) {
               if (!isShutdown.get()) {
@@ -318,6 +323,10 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
     synchronized (lock) {
       // If the queue does not have capacity, it does not throw a Rejection. Instead it will
       // return the task with the lowest priority, which could be the task which is currently being processed.
+
+      // TODO HIVE-11687 It's possible for a bunch of tasks to come in around the same time, without the
+      // actual executor threads picking up any work. This will lead to unnecessary rejection of tasks.
+      // The wait queue should be able to fit at least (waitQueue + currentFreeExecutor slots)
       evictedTask = waitQueue.offer(taskWrapper);
       if (evictedTask != taskWrapper) {
         knownTasks.put(taskWrapper.getRequestId(), taskWrapper);
@@ -392,10 +401,8 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
     }
   }
 
-  private boolean trySchedule(final TaskWrapper taskWrapper) {
+  private void trySchedule(final TaskWrapper taskWrapper) throws RejectedExecutionException {
 
-    boolean scheduled = false;
-    try {
       synchronized (lock) {
         boolean canFinish = taskWrapper.getTaskRunnerCallable().canFinish();
         LOG.info("Attempting to execute {}", taskWrapper);
@@ -423,37 +430,35 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
         }
       }
       numSlotsAvailable.decrementAndGet();
-      scheduled = true;
-    } catch (RejectedExecutionException e) {
-      if (enablePreemption && taskWrapper.getTaskRunnerCallable().canFinish() && !preemptionQueue.isEmpty()) {
+  }
 
-        if (isDebugEnabled) {
-          LOG.debug("Preemption Queue: " + preemptionQueue);
-        }
+  private void handleScheduleAttemptedRejection(TaskWrapper taskWrapper) {
+    if (enablePreemption && taskWrapper.getTaskRunnerCallable().canFinish() && !preemptionQueue.isEmpty()) {
 
-        TaskWrapper pRequest = removeAndGetFromPreemptionQueue();
+      if (isDebugEnabled) {
+        LOG.debug("Preemption Queue: " + preemptionQueue);
+      }
 
-        // Avoid preempting tasks which are finishable - callback still to be processed.
-        if (pRequest != null) {
-          if (pRequest.getTaskRunnerCallable().canFinish()) {
-            LOG.info(
-                "Removed {} from preemption queue, but not preempting since it's now finishable",
-                pRequest.getRequestId());
-          } else {
-            if (isInfoEnabled) {
-              LOG.info("Invoking kill task for {} due to pre-emption to run {}",
-                  pRequest.getRequestId(), taskWrapper.getRequestId());
-            }
-            // The task will either be killed or is already in the process of completing, which will
-            // trigger the next scheduling run, or result in available slots being higher than 0,
-            // which will cause the scheduler loop to continue.
-            pRequest.getTaskRunnerCallable().killTask();
+      TaskWrapper pRequest = removeAndGetFromPreemptionQueue();
+
+      // Avoid preempting tasks which are finishable - callback still to be processed.
+      if (pRequest != null) {
+        if (pRequest.getTaskRunnerCallable().canFinish()) {
+          LOG.info(
+              "Removed {} from preemption queue, but not preempting since it's now finishable",
+              pRequest.getRequestId());
+        } else {
+          if (isInfoEnabled) {
+            LOG.info("Invoking kill task for {} due to pre-emption to run {}",
+                pRequest.getRequestId(), taskWrapper.getRequestId());
           }
+          // The task will either be killed or is already in the process of completing, which will
+          // trigger the next scheduling run, or result in available slots being higher than 0,
+          // which will cause the scheduler loop to continue.
+          pRequest.getTaskRunnerCallable().killTask();
         }
       }
     }
-
-    return scheduled;
   }
 
   private void finishableStateUpdated(TaskWrapper taskWrapper, boolean newFinishableState) {

http://git-wip-us.apache.org/repos/asf/hive/blob/2faf01ee/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java b/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
index 38d42b9..7fb9a99 100644
--- a/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
+++ b/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
@@ -123,6 +123,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
 
   private final Lock scheduleLock = new ReentrantLock();
   private final Condition scheduleCondition = scheduleLock.newCondition();
+  private final AtomicBoolean pendingScheduleInvodations = new AtomicBoolean(false);
   private final ListeningExecutorService schedulerExecutor;
   private final SchedulerCallable schedulerCallable = new SchedulerCallable();
 
@@ -910,6 +911,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
   private void trySchedulingPendingTasks() {
     scheduleLock.lock();
     try {
+      pendingScheduleInvodations.set(true);
       scheduleCondition.signal();
     } finally {
       scheduleLock.unlock();
@@ -924,7 +926,9 @@ public class LlapTaskSchedulerService extends TaskScheduler {
       while (!isShutdown.get() && !Thread.currentThread().isInterrupted()) {
         scheduleLock.lock();
         try {
-          scheduleCondition.await();
+          while (!pendingScheduleInvodations.get()) {
+            scheduleCondition.await();
+          }
         } catch (InterruptedException e) {
           if (isShutdown.get()) {
             LOG.info("Scheduler thread interrupted after shutdown");
@@ -936,6 +940,12 @@ public class LlapTaskSchedulerService extends TaskScheduler {
         } finally {
           scheduleLock.unlock();
         }
+
+        // Set pending to false since scheduling is about to run. Any triggers up to this point
+        // will be handled in the next run.
+        // A new request may come in right after this is set to false, but before the actual scheduling.
+        // This will be handled in this run, but will cause an immediate run after, which is harmless.
+        pendingScheduleInvodations.set(false);
         // Schedule outside of the scheduleLock - which should only be used to wait on the condition.
         schedulePendingTasks();
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/2faf01ee/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
index ec1ffcf..38af07e 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
@@ -116,6 +116,7 @@ public class TaskExecutorTestHelpers {
     private final ReentrantLock lock = new ReentrantLock();
     private final Condition startedCondition = lock.newCondition();
     private final Condition sleepCondition = lock.newCondition();
+    private boolean shouldSleep = true;
     private final Condition finishedCondition = lock.newCondition();
 
     public MockRequest(SubmitWorkRequestProto requestProto,
@@ -143,7 +144,9 @@ public class TaskExecutorTestHelpers {
 
         lock.lock();
         try {
-          sleepCondition.await(workTime, TimeUnit.MILLISECONDS);
+          if (shouldSleep) {
+            sleepCondition.await(workTime, TimeUnit.MILLISECONDS);
+          }
         } catch (InterruptedException e) {
           wasInterrupted.set(true);
           return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, false);
@@ -171,6 +174,7 @@ public class TaskExecutorTestHelpers {
       lock.lock();
       try {
         wasKilled.set(true);
+        shouldSleep = false;
         sleepCondition.signal();
       } finally {
         lock.unlock();
@@ -192,6 +196,7 @@ public class TaskExecutorTestHelpers {
     void complete() {
       lock.lock();
       try {
+        shouldSleep = false;
         sleepCondition.signal();
       } finally {
         lock.unlock();

http://git-wip-us.apache.org/repos/asf/hive/blob/2faf01ee/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
index 34ab40a..cb2d0e9 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
@@ -24,6 +24,8 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -81,6 +83,7 @@ public class TestTaskExecutorService {
       r2.awaitStart();
       // Verify r1 was preempted. Also verify that it finished (single executor), otherwise
       // r2 could have run anyway.
+      r1.awaitEnd();
       assertTrue(r1.wasPreempted());
       assertTrue(r1.hasFinished());
 
@@ -117,6 +120,9 @@ public class TestTaskExecutorService {
 
     try {
       taskExecutorService.schedule(r1);
+
+      // TODO HIVE-11687. Remove the awaitStart once offer can handle (waitQueueSize + numFreeExecutionSlots)
+      // This currently serves to allow the task to be removed from the waitQueue.
       r1.awaitStart();
       try {
         taskExecutorService.schedule(r2);
@@ -154,6 +160,7 @@ public class TestTaskExecutorService {
       assertTrue(taskExecutorService.knownTasks.containsKey(r5.getRequestId()));
 
       r1.complete();
+      r1.awaitEnd();
       icl1.awaitCompletion();
 
       // Two known tasks left. r2 and r5. (r1 complete, r3 evicted, r4 rejected)
@@ -165,6 +172,7 @@ public class TestTaskExecutorService {
       TaskExecutorServiceForTest.InternalCompletionListenerForTest icl5 =
           taskExecutorService.getInternalCompletionListenerForTest(r5.getRequestId());
       r5.complete();
+      r5.awaitEnd();
       icl5.awaitCompletion();
 
       // 1 Pending task which is not finishable
@@ -175,6 +183,7 @@ public class TestTaskExecutorService {
       TaskExecutorServiceForTest.InternalCompletionListenerForTest icl2 =
           taskExecutorService.getInternalCompletionListenerForTest(r2.getRequestId());
       r2.complete();
+      r2.awaitEnd();
       icl2.awaitCompletion();
       // 0 Pending task which is not finishable
       assertEquals(0, taskExecutorService.knownTasks.size());
@@ -187,6 +196,10 @@ public class TestTaskExecutorService {
 
 
   private static class TaskExecutorServiceForTest extends TaskExecutorService {
+
+    private final Lock iclCreationLock = new ReentrantLock();
+    private final Map<String, Condition> iclCreationConditions = new HashMap<>();
+
     public TaskExecutorServiceForTest(int numExecutors, int waitQueueSize, String waitQueueComparatorClassName,
                                       boolean enablePreemption) {
       super(numExecutors, waitQueueSize, waitQueueComparatorClassName, enablePreemption);
@@ -196,13 +209,38 @@ public class TestTaskExecutorService {
 
     @Override
     InternalCompletionListener createInternalCompletionListener(TaskWrapper taskWrapper) {
-      InternalCompletionListenerForTest icl = new InternalCompletionListenerForTest(taskWrapper);
-      completionListeners.put(taskWrapper.getRequestId(), icl);
-      return icl;
+      iclCreationLock.lock();
+      try {
+        InternalCompletionListenerForTest icl = new InternalCompletionListenerForTest(taskWrapper);
+        completionListeners.put(taskWrapper.getRequestId(), icl);
+        Condition condition = iclCreationConditions.get(taskWrapper.getRequestId());
+        if (condition == null) {
+          condition = iclCreationLock.newCondition();
+          iclCreationConditions.put(taskWrapper.getRequestId(), condition);
+        }
+        condition.signalAll();
+        return icl;
+      } finally {
+        iclCreationLock.unlock();
+      }
     }
 
-    InternalCompletionListenerForTest getInternalCompletionListenerForTest(String requestId) {
-      return completionListeners.get(requestId);
+    InternalCompletionListenerForTest getInternalCompletionListenerForTest(String requestId) throws
+        InterruptedException {
+      iclCreationLock.lock();
+      try {
+        Condition condition = iclCreationConditions.get(requestId);
+        if (condition == null) {
+          condition = iclCreationLock.newCondition();
+          iclCreationConditions.put(requestId, condition);
+        }
+        while (completionListeners.get(requestId) == null) {
+          condition.await();
+        }
+        return completionListeners.get(requestId);
+      } finally {
+        iclCreationLock.unlock();
+      }
     }
 
     private class InternalCompletionListenerForTest extends TaskExecutorService.InternalCompletionListener {

http://git-wip-us.apache.org/repos/asf/hive/blob/2faf01ee/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java b/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
index ce60e6e..2f93266 100644
--- a/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
+++ b/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
@@ -343,9 +343,12 @@ public class TestLlapTaskSchedulerService {
 
       ts = new LlapTaskSchedulerServiceForTest(mockAppCallback, clock);
 
+      controlScheduler(true);
       ts.initialize();
       ts.start();
       // One scheduler pass from the nodes that are added at startup
+      signalSchedulerRun();
+      controlScheduler(false);
       awaitSchedulerRun();
     }
 
@@ -386,9 +389,9 @@ public class TestLlapTaskSchedulerService {
     private AtomicBoolean controlScheduling = new AtomicBoolean(false);
     private final Lock testLock = new ReentrantLock();
     private final Condition schedulingCompleteCondition = testLock.newCondition();
-    private final AtomicBoolean schedulingComplete = new AtomicBoolean(false);
+    private boolean schedulingComplete = false;
     private final Condition triggerSchedulingCondition = testLock.newCondition();
-    private final AtomicBoolean schedulingTriggered = new AtomicBoolean(false);
+    private boolean schedulingTriggered = false;
     private final AtomicInteger numSchedulerRuns = new AtomicInteger(0);
 
 
@@ -402,7 +405,7 @@ public class TestLlapTaskSchedulerService {
       testLock.lock();
       try {
         if (controlScheduling.get()) {
-          while (!schedulingTriggered.get()) {
+          while (!schedulingTriggered) {
             try {
               triggerSchedulingCondition.await();
             } catch (InterruptedException e) {
@@ -412,8 +415,8 @@ public class TestLlapTaskSchedulerService {
         }
         numSchedulerRuns.incrementAndGet();
         super.schedulePendingTasks();
-        schedulingTriggered.set(false);
-        schedulingComplete.set(true);
+        schedulingTriggered = false;
+        schedulingComplete = true;
         schedulingCompleteCondition.signal();
       } finally {
         testLock.unlock();
@@ -428,7 +431,7 @@ public class TestLlapTaskSchedulerService {
     void forTestSignalSchedulingRun() throws InterruptedException {
       testLock.lock();
       try {
-        schedulingTriggered.set(true);
+        schedulingTriggered = true;
         triggerSchedulingCondition.signal();
       } finally {
         testLock.unlock();
@@ -438,10 +441,10 @@ public class TestLlapTaskSchedulerService {
     void forTestAwaitSchedulingRun() throws InterruptedException {
       testLock.lock();
       try {
-        while (!schedulingComplete.get()) {
+        while (!schedulingComplete) {
           schedulingCompleteCondition.await();
         }
-        schedulingComplete.set(false);
+        schedulingComplete = false;
       } finally {
         testLock.unlock();
       }