You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ss...@apache.org on 2015/08/31 23:55:47 UTC
hive git commit: HIVE-11660. LLAP: TestTaskExecutorService,
TestLlapTaskSchedulerService are flaky. Also fixes a real scheduling
issue in LlapTaskSchedulerService. (Siddharth Seth)
Repository: hive
Updated Branches:
refs/heads/llap 025765382 -> 2faf01eed
HIVE-11660. LLAP: TestTaskExecutorService, TestLlapTaskSchedulerService are flaky. Also fixes a real scheduling issue in LlapTaskSchedulerService. (Siddharth Seth)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2faf01ee
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2faf01ee
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2faf01ee
Branch: refs/heads/llap
Commit: 2faf01eedaa32469f06d5408f599823e18096e28
Parents: 0257653
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Aug 31 14:54:35 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Aug 31 14:54:35 2015 -0700
----------------------------------------------------------------------
.../llap/daemon/impl/TaskExecutorService.java | 73 +++++++++++---------
.../dag/app/rm/LlapTaskSchedulerService.java | 12 +++-
.../daemon/impl/TaskExecutorTestHelpers.java | 7 +-
.../daemon/impl/TestTaskExecutorService.java | 48 +++++++++++--
.../app/rm/TestLlapTaskSchedulerService.java | 19 ++---
5 files changed, 110 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/2faf01ee/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
index badeb63..875aef6 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
@@ -223,6 +223,7 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
while (!isShutdown.get()) {
+ RejectedExecutionException rejectedException = null;
synchronized (lock) {
// Since schedule() can be called from multiple threads, we peek the wait queue,
// try scheduling the task and then remove the task if scheduling is successful.
@@ -259,17 +260,21 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
// queue again to pick up the task at the highest priority.
continue;
}
- }
-
- boolean scheduled = trySchedule(task);
- if (scheduled) {
- // wait queue could have been re-ordered in the mean time because of concurrent task
- // submission. So remove the specific task instead of the head task.
- synchronized (lock) {
+ try {
+ trySchedule(task);
+ // wait queue could have been re-ordered in the mean time because of concurrent task
+ // submission. So remove the specific task instead of the head task.
waitQueue.remove(task);
+ } catch (RejectedExecutionException e) {
+ rejectedException = e;
}
}
+ // Handle the rejection outside of the lock
+ if (rejectedException !=null) {
+ handleScheduleAttemptedRejection(task);
+ }
+
synchronized (lock) {
while (waitQueue.isEmpty()) {
if (!isShutdown.get()) {
@@ -318,6 +323,10 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
synchronized (lock) {
// If the queue does not have capacity, it does not throw a Rejection. Instead it will
// return the task with the lowest priority, which could be the task which is currently being processed.
+
+ // TODO HIVE-11687 It's possible for a bunch of tasks to come in around the same time, without the
+ // actual executor threads picking up any work. This will lead to unnecessary rejection of tasks.
+ // The wait queue should be able to fit at least (waitQueue + currentFreeExecutor slots)
evictedTask = waitQueue.offer(taskWrapper);
if (evictedTask != taskWrapper) {
knownTasks.put(taskWrapper.getRequestId(), taskWrapper);
@@ -392,10 +401,8 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
}
}
- private boolean trySchedule(final TaskWrapper taskWrapper) {
+ private void trySchedule(final TaskWrapper taskWrapper) throws RejectedExecutionException {
- boolean scheduled = false;
- try {
synchronized (lock) {
boolean canFinish = taskWrapper.getTaskRunnerCallable().canFinish();
LOG.info("Attempting to execute {}", taskWrapper);
@@ -423,37 +430,35 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
}
}
numSlotsAvailable.decrementAndGet();
- scheduled = true;
- } catch (RejectedExecutionException e) {
- if (enablePreemption && taskWrapper.getTaskRunnerCallable().canFinish() && !preemptionQueue.isEmpty()) {
+ }
- if (isDebugEnabled) {
- LOG.debug("Preemption Queue: " + preemptionQueue);
- }
+ private void handleScheduleAttemptedRejection(TaskWrapper taskWrapper) {
+ if (enablePreemption && taskWrapper.getTaskRunnerCallable().canFinish() && !preemptionQueue.isEmpty()) {
- TaskWrapper pRequest = removeAndGetFromPreemptionQueue();
+ if (isDebugEnabled) {
+ LOG.debug("Preemption Queue: " + preemptionQueue);
+ }
- // Avoid preempting tasks which are finishable - callback still to be processed.
- if (pRequest != null) {
- if (pRequest.getTaskRunnerCallable().canFinish()) {
- LOG.info(
- "Removed {} from preemption queue, but not preempting since it's now finishable",
- pRequest.getRequestId());
- } else {
- if (isInfoEnabled) {
- LOG.info("Invoking kill task for {} due to pre-emption to run {}",
- pRequest.getRequestId(), taskWrapper.getRequestId());
- }
- // The task will either be killed or is already in the process of completing, which will
- // trigger the next scheduling run, or result in available slots being higher than 0,
- // which will cause the scheduler loop to continue.
- pRequest.getTaskRunnerCallable().killTask();
+ TaskWrapper pRequest = removeAndGetFromPreemptionQueue();
+
+ // Avoid preempting tasks which are finishable - callback still to be processed.
+ if (pRequest != null) {
+ if (pRequest.getTaskRunnerCallable().canFinish()) {
+ LOG.info(
+ "Removed {} from preemption queue, but not preempting since it's now finishable",
+ pRequest.getRequestId());
+ } else {
+ if (isInfoEnabled) {
+ LOG.info("Invoking kill task for {} due to pre-emption to run {}",
+ pRequest.getRequestId(), taskWrapper.getRequestId());
}
+ // The task will either be killed or is already in the process of completing, which will
+ // trigger the next scheduling run, or result in available slots being higher than 0,
+ // which will cause the scheduler loop to continue.
+ pRequest.getTaskRunnerCallable().killTask();
}
}
}
-
- return scheduled;
}
private void finishableStateUpdated(TaskWrapper taskWrapper, boolean newFinishableState) {
http://git-wip-us.apache.org/repos/asf/hive/blob/2faf01ee/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java b/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
index 38d42b9..7fb9a99 100644
--- a/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
+++ b/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
@@ -123,6 +123,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
private final Lock scheduleLock = new ReentrantLock();
private final Condition scheduleCondition = scheduleLock.newCondition();
+ private final AtomicBoolean pendingScheduleInvodations = new AtomicBoolean(false);
private final ListeningExecutorService schedulerExecutor;
private final SchedulerCallable schedulerCallable = new SchedulerCallable();
@@ -910,6 +911,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
private void trySchedulingPendingTasks() {
scheduleLock.lock();
try {
+ pendingScheduleInvodations.set(true);
scheduleCondition.signal();
} finally {
scheduleLock.unlock();
@@ -924,7 +926,9 @@ public class LlapTaskSchedulerService extends TaskScheduler {
while (!isShutdown.get() && !Thread.currentThread().isInterrupted()) {
scheduleLock.lock();
try {
- scheduleCondition.await();
+ while (!pendingScheduleInvodations.get()) {
+ scheduleCondition.await();
+ }
} catch (InterruptedException e) {
if (isShutdown.get()) {
LOG.info("Scheduler thread interrupted after shutdown");
@@ -936,6 +940,12 @@ public class LlapTaskSchedulerService extends TaskScheduler {
} finally {
scheduleLock.unlock();
}
+
+ // Set pending to false since scheduling is about to run. Any triggers up to this point
+ // will be handled in the next run.
+ // A new request may come in right after this is set to false, but before the actual scheduling.
+ // This will be handled in this run, but will cause an immediate run after, which is harmless.
+ pendingScheduleInvodations.set(false);
// Schedule outside of the scheduleLock - which should only be used to wait on the condition.
schedulePendingTasks();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/2faf01ee/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
index ec1ffcf..38af07e 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
@@ -116,6 +116,7 @@ public class TaskExecutorTestHelpers {
private final ReentrantLock lock = new ReentrantLock();
private final Condition startedCondition = lock.newCondition();
private final Condition sleepCondition = lock.newCondition();
+ private boolean shouldSleep = true;
private final Condition finishedCondition = lock.newCondition();
public MockRequest(SubmitWorkRequestProto requestProto,
@@ -143,7 +144,9 @@ public class TaskExecutorTestHelpers {
lock.lock();
try {
- sleepCondition.await(workTime, TimeUnit.MILLISECONDS);
+ if (shouldSleep) {
+ sleepCondition.await(workTime, TimeUnit.MILLISECONDS);
+ }
} catch (InterruptedException e) {
wasInterrupted.set(true);
return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, false);
@@ -171,6 +174,7 @@ public class TaskExecutorTestHelpers {
lock.lock();
try {
wasKilled.set(true);
+ shouldSleep = false;
sleepCondition.signal();
} finally {
lock.unlock();
@@ -192,6 +196,7 @@ public class TaskExecutorTestHelpers {
void complete() {
lock.lock();
try {
+ shouldSleep = false;
sleepCondition.signal();
} finally {
lock.unlock();
http://git-wip-us.apache.org/repos/asf/hive/blob/2faf01ee/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
index 34ab40a..cb2d0e9 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
@@ -24,6 +24,8 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -81,6 +83,7 @@ public class TestTaskExecutorService {
r2.awaitStart();
// Verify r1 was preempted. Also verify that it finished (single executor), otherwise
// r2 could have run anyway.
+ r1.awaitEnd();
assertTrue(r1.wasPreempted());
assertTrue(r1.hasFinished());
@@ -117,6 +120,9 @@ public class TestTaskExecutorService {
try {
taskExecutorService.schedule(r1);
+
+ // TODO HIVE-11687. Remove the awaitStart once offer can handle (waitQueueSize + numFreeExecutionSlots)
+ // This currently serves to allow the task to be removed from the waitQueue.
r1.awaitStart();
try {
taskExecutorService.schedule(r2);
@@ -154,6 +160,7 @@ public class TestTaskExecutorService {
assertTrue(taskExecutorService.knownTasks.containsKey(r5.getRequestId()));
r1.complete();
+ r1.awaitEnd();
icl1.awaitCompletion();
// Two known tasks left. r2 and r5. (r1 complete, r3 evicted, r4 rejected)
@@ -165,6 +172,7 @@ public class TestTaskExecutorService {
TaskExecutorServiceForTest.InternalCompletionListenerForTest icl5 =
taskExecutorService.getInternalCompletionListenerForTest(r5.getRequestId());
r5.complete();
+ r5.awaitEnd();
icl5.awaitCompletion();
// 1 Pending task which is not finishable
@@ -175,6 +183,7 @@ public class TestTaskExecutorService {
TaskExecutorServiceForTest.InternalCompletionListenerForTest icl2 =
taskExecutorService.getInternalCompletionListenerForTest(r2.getRequestId());
r2.complete();
+ r2.awaitEnd();
icl2.awaitCompletion();
// 0 Pending task which is not finishable
assertEquals(0, taskExecutorService.knownTasks.size());
@@ -187,6 +196,10 @@ public class TestTaskExecutorService {
private static class TaskExecutorServiceForTest extends TaskExecutorService {
+
+ private final Lock iclCreationLock = new ReentrantLock();
+ private final Map<String, Condition> iclCreationConditions = new HashMap<>();
+
public TaskExecutorServiceForTest(int numExecutors, int waitQueueSize, String waitQueueComparatorClassName,
boolean enablePreemption) {
super(numExecutors, waitQueueSize, waitQueueComparatorClassName, enablePreemption);
@@ -196,13 +209,38 @@ public class TestTaskExecutorService {
@Override
InternalCompletionListener createInternalCompletionListener(TaskWrapper taskWrapper) {
- InternalCompletionListenerForTest icl = new InternalCompletionListenerForTest(taskWrapper);
- completionListeners.put(taskWrapper.getRequestId(), icl);
- return icl;
+ iclCreationLock.lock();
+ try {
+ InternalCompletionListenerForTest icl = new InternalCompletionListenerForTest(taskWrapper);
+ completionListeners.put(taskWrapper.getRequestId(), icl);
+ Condition condition = iclCreationConditions.get(taskWrapper.getRequestId());
+ if (condition == null) {
+ condition = iclCreationLock.newCondition();
+ iclCreationConditions.put(taskWrapper.getRequestId(), condition);
+ }
+ condition.signalAll();
+ return icl;
+ } finally {
+ iclCreationLock.unlock();
+ }
}
- InternalCompletionListenerForTest getInternalCompletionListenerForTest(String requestId) {
- return completionListeners.get(requestId);
+ InternalCompletionListenerForTest getInternalCompletionListenerForTest(String requestId) throws
+ InterruptedException {
+ iclCreationLock.lock();
+ try {
+ Condition condition = iclCreationConditions.get(requestId);
+ if (condition == null) {
+ condition = iclCreationLock.newCondition();
+ iclCreationConditions.put(requestId, condition);
+ }
+ while (completionListeners.get(requestId) == null) {
+ condition.await();
+ }
+ return completionListeners.get(requestId);
+ } finally {
+ iclCreationLock.unlock();
+ }
}
private class InternalCompletionListenerForTest extends TaskExecutorService.InternalCompletionListener {
http://git-wip-us.apache.org/repos/asf/hive/blob/2faf01ee/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java b/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
index ce60e6e..2f93266 100644
--- a/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
+++ b/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
@@ -343,9 +343,12 @@ public class TestLlapTaskSchedulerService {
ts = new LlapTaskSchedulerServiceForTest(mockAppCallback, clock);
+ controlScheduler(true);
ts.initialize();
ts.start();
// One scheduler pass from the nodes that are added at startup
+ signalSchedulerRun();
+ controlScheduler(false);
awaitSchedulerRun();
}
@@ -386,9 +389,9 @@ public class TestLlapTaskSchedulerService {
private AtomicBoolean controlScheduling = new AtomicBoolean(false);
private final Lock testLock = new ReentrantLock();
private final Condition schedulingCompleteCondition = testLock.newCondition();
- private final AtomicBoolean schedulingComplete = new AtomicBoolean(false);
+ private boolean schedulingComplete = false;
private final Condition triggerSchedulingCondition = testLock.newCondition();
- private final AtomicBoolean schedulingTriggered = new AtomicBoolean(false);
+ private boolean schedulingTriggered = false;
private final AtomicInteger numSchedulerRuns = new AtomicInteger(0);
@@ -402,7 +405,7 @@ public class TestLlapTaskSchedulerService {
testLock.lock();
try {
if (controlScheduling.get()) {
- while (!schedulingTriggered.get()) {
+ while (!schedulingTriggered) {
try {
triggerSchedulingCondition.await();
} catch (InterruptedException e) {
@@ -412,8 +415,8 @@ public class TestLlapTaskSchedulerService {
}
numSchedulerRuns.incrementAndGet();
super.schedulePendingTasks();
- schedulingTriggered.set(false);
- schedulingComplete.set(true);
+ schedulingTriggered = false;
+ schedulingComplete = true;
schedulingCompleteCondition.signal();
} finally {
testLock.unlock();
@@ -428,7 +431,7 @@ public class TestLlapTaskSchedulerService {
void forTestSignalSchedulingRun() throws InterruptedException {
testLock.lock();
try {
- schedulingTriggered.set(true);
+ schedulingTriggered = true;
triggerSchedulingCondition.signal();
} finally {
testLock.unlock();
@@ -438,10 +441,10 @@ public class TestLlapTaskSchedulerService {
void forTestAwaitSchedulingRun() throws InterruptedException {
testLock.lock();
try {
- while (!schedulingComplete.get()) {
+ while (!schedulingComplete) {
schedulingCompleteCondition.await();
}
- schedulingComplete.set(false);
+ schedulingComplete = false;
} finally {
testLock.unlock();
}