You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ss...@apache.org on 2015/05/19 23:09:18 UTC
hive git commit: HIVE-10756. LLAP: Misc changes to daemon scheduling.
(Siddharth Seth)
Repository: hive
Updated Branches:
refs/heads/llap 41d123412 -> f0175bc8d
HIVE-10756. LLAP: Misc changes to daemon scheduling. (Siddharth Seth)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f0175bc8
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f0175bc8
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f0175bc8
Branch: refs/heads/llap
Commit: f0175bc8de1a585903fee71f280812983117f299
Parents: 41d1234
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue May 19 14:08:13 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Tue May 19 14:08:13 2015 -0700
----------------------------------------------------------------------
.../llap/daemon/impl/ContainerRunnerImpl.java | 2 -
.../impl/EvictingPriorityBlockingQueue.java | 4 +-
.../llap/daemon/impl/TaskExecutorService.java | 60 +++++++++++--------
.../llap/daemon/impl/TaskRunnerCallable.java | 62 ++++++++++----------
4 files changed, 70 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/f0175bc8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index e544789..3fd7920 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -67,7 +67,6 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
private final QueryTracker queryTracker;
private final Scheduler<TaskRunnerCallable> executorService;
private final AtomicReference<InetSocketAddress> localAddress;
- private final String[] localDirsBase;
private final Map<String, String> localEnv = new HashMap<>();
private final long memoryPerExecutor;
private final LlapDaemonExecutorMetrics metrics;
@@ -87,7 +86,6 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
this.conf = conf;
Preconditions.checkState(numExecutors > 0,
"Invalid number of executors: " + numExecutors + ". Must be > 0");
- this.localDirsBase = localDirsBase;
this.localAddress = localAddress;
this.queryTracker = new QueryTracker(conf, localDirsBase);
http://git-wip-us.apache.org/repos/asf/hive/blob/f0175bc8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java
index e8d789b..101a69c 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java
@@ -27,8 +27,8 @@ import java.util.Comparator;
* returned back. If the queue is not full, new element will be added to queue and null is returned.
*/
public class EvictingPriorityBlockingQueue<E> {
- private PriorityBlockingDeque<E> deque;
- private Comparator<E> comparator;
+ private final PriorityBlockingDeque<E> deque;
+ private final Comparator<E> comparator;
public EvictingPriorityBlockingQueue(Comparator<E> comparator, int maxSize) {
this.deque = new PriorityBlockingDeque<>(comparator, maxSize);
http://git-wip-us.apache.org/repos/asf/hive/blob/f0175bc8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
index 08af1e2..5323f05 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
@@ -71,9 +71,13 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> {
// some object to lock upon. Used by task scheduler to notify wait queue scheduler of new items
// to wait queue
private final Object waitLock;
+ // Thread pool for actual execution of work.
private final ListeningExecutorService executorService;
private final EvictingPriorityBlockingQueue<TaskRunnerCallable> waitQueue;
+ // Thread pool for taking entities off the wait queue.
private final ListeningExecutorService waitQueueExecutorService;
+ // Thread pool for callbacks on completion of execution of a work unit.
+ private final ListeningExecutorService executionCompletionExecutorService;
private final BlockingQueue<TaskRunnerCallable> preemptionQueue;
private final boolean enablePreemption;
private final ThreadPoolExecutor threadPoolExecutor;
@@ -94,9 +98,14 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> {
this.numSlotsAvailable = new AtomicInteger(numExecutors);
// single threaded scheduler for tasks from wait queue to executor threads
- ExecutorService wes = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
+ ExecutorService wes = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat(WAIT_QUEUE_SCHEDULER_THREAD_NAME_FORMAT).build());
this.waitQueueExecutorService = MoreExecutors.listeningDecorator(wes);
+
+ ExecutorService executionCompletionExecutorServiceRaw = Executors.newFixedThreadPool(1,
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ExecutionCompletionThread #%d")
+ .build());
+ executionCompletionExecutorService = MoreExecutors.listeningDecorator(executionCompletionExecutorServiceRaw);
ListenableFuture<?> future = waitQueueExecutorService.submit(new WaitQueueWorker());
Futures.addCallback(future, new WaitQueueWorkerCallback());
}
@@ -125,8 +134,12 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> {
// if the task cannot finish and if no slots are available then don't schedule it.
// TODO: Event notifications that change canFinish state should notify waitLock
synchronized (waitLock) {
+ // KKK Is this a tight loop when there's only finishable tasks available ?
if (!task.canFinish() && numSlotsAvailable.get() == 0) {
waitLock.wait();
+ // Another task at a higher priority may have come in during the wait. Lookup the
+ // queue again to pick up the task at the highest priority.
+ continue;
}
}
@@ -190,7 +203,9 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> {
try {
ListenableFuture<TaskRunner2Result> future = executorService.submit(task);
FutureCallback<TaskRunner2Result> wrappedCallback = new InternalCompletionListener(task);
- Futures.addCallback(future, wrappedCallback);
+ // Callback on a separate thread so that when a task completes, the thread in the main queue
+ // is actually available for execution and will not potentially result in a RejectedExecution
+ Futures.addCallback(future, wrappedCallback, executionCompletionExecutorService);
if (isInfoEnabled) {
LOG.info(task.getRequestId() + " scheduled for execution.");
@@ -216,13 +231,15 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> {
}
TaskRunnerCallable pRequest = preemptionQueue.remove();
- if (pRequest != null && !pRequest.isCompleted() && !pRequest.isKillInvoked()) {
+ if (pRequest != null) {
if (isInfoEnabled) {
- LOG.info("Kill task invoked for " + pRequest.getRequestId() + " due to pre-emption");
+ LOG.info("Invoking kill task for {} due to pre-emption.", pRequest.getRequestId());
}
- pRequest.setKillInvoked();
+ // The task will either be killed or is already in the process of completing, which will
+ // trigger the next scheduling run, or result in available slots being higher than 0,
+ // which will cause the scheduler loop to continue.
pRequest.killTask();
}
}
@@ -241,14 +258,12 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> {
@Override
public void onSuccess(TaskRunner2Result result) {
- task.setCompleted();
task.getCallback().onSuccess(result);
updatePreemptionListAndNotify(result.getEndReason());
}
@Override
public void onFailure(Throwable t) {
- task.setCompleted();
task.getCallback().onFailure(t);
updatePreemptionListAndNotify(null);
LOG.error("Failed notification received: Stacktrace: " + ExceptionUtils.getStackTrace(t));
@@ -282,23 +297,9 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> {
LOG.debug("awaitTermination: " + awaitTermination + " shutting down task executor" +
" service gracefully");
}
- executorService.shutdown();
- try {
- if (!executorService.awaitTermination(1, TimeUnit.MINUTES)) {
- executorService.shutdownNow();
- }
- } catch (InterruptedException e) {
- executorService.shutdownNow();
- }
-
- waitQueueExecutorService.shutdown();
- try {
- if (!waitQueueExecutorService.awaitTermination(1, TimeUnit.MINUTES)) {
- waitQueueExecutorService.shutdownNow();
- }
- } catch (InterruptedException e) {
- waitQueueExecutorService.shutdownNow();
- }
+ shutdownExecutor(waitQueueExecutorService);
+ shutdownExecutor(executorService);
+ shutdownExecutor(executionCompletionExecutorService);
} else {
if (isDebugEnabled) {
LOG.debug("awaitTermination: " + awaitTermination + " shutting down task executor" +
@@ -309,6 +310,17 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> {
}
}
+ private void shutdownExecutor(ExecutorService executorService) {
+ executorService.shutdown();
+ try {
+ if (!executorService.awaitTermination(1, TimeUnit.MINUTES)) {
+ executorService.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ executorService.shutdownNow();
+ }
+ }
+
@VisibleForTesting
public static class WaitQueueComparator implements Comparator<TaskRunnerCallable> {
http://git-wip-us.apache.org/repos/asf/hive/blob/f0175bc8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index e505070..d1b1c61 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -102,8 +102,8 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
private boolean shouldRunTask = true;
final Stopwatch runtimeWatch = new Stopwatch();
final Stopwatch killtimerWatch = new Stopwatch();
- private AtomicBoolean isCompleted;
- private AtomicBoolean killInvoked;
+ private final AtomicBoolean isCompleted = new AtomicBoolean(false);
+ private final AtomicBoolean killInvoked = new AtomicBoolean(false);
TaskRunnerCallable(LlapDaemonProtocolProtos.SubmitWorkRequestProto request, Configuration conf,
ExecutionContext executionContext, Map<String, String> envMap,
@@ -133,24 +133,6 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
this.metrics = metrics;
this.requestId = getTaskAttemptId(request);
this.killedTaskHandler = killedTaskHandler;
- this.isCompleted = new AtomicBoolean(false);
- this.killInvoked = new AtomicBoolean(false);
- }
-
- public void setCompleted() {
- isCompleted.set(true);
- }
-
- public boolean isCompleted() {
- return isCompleted.get();
- }
-
- public boolean isKillInvoked() {
- return killInvoked.get();
- }
-
- public void setKillInvoked() {
- killInvoked.set(true);
}
@Override
@@ -226,6 +208,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
if (result.isContainerShutdownRequested()) {
LOG.warn("Unexpected container shutdown requested while running task. Ignoring");
}
+ isCompleted.set(true);
return result;
} finally {
@@ -242,21 +225,38 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
/**
* Attempt to kill a running task. If the task has not started running, it will not start.
* If it's already running, a kill request will be sent to it.
- *
+ * <p/>
* The AM will be informed about the task kill.
*/
public void killTask() {
- synchronized (this) {
- LOG.info("Killing task with id {}, taskRunnerSetup={}", taskSpec.getTaskAttemptID(), (taskRunner != null));
- if (taskRunner != null) {
- killtimerWatch.start();
- LOG.info("Issuing kill to task {}" + taskSpec.getTaskAttemptID());
- taskRunner.killTask();
- shouldRunTask = false;
+ if (!isCompleted.get()) {
+ if (!killInvoked.getAndSet(true)) {
+ synchronized (this) {
+ LOG.info("Kill task requested for id={}, taskRunnerSetup={}", taskSpec.getTaskAttemptID(),
+ (taskRunner != null));
+ if (taskRunner != null) {
+ killtimerWatch.start();
+ LOG.info("Issuing kill to task {}", taskSpec.getTaskAttemptID());
+ boolean killed = taskRunner.killTask();
+ if (killed) {
+ // Sending a kill message to the AM right here. Don't need to wait for the task to complete.
+ reportTaskKilled();
+ } else {
+ LOG.info("Kill request for task {} did not complete because the task is already complete",
+ taskSpec.getTaskAttemptID());
+ }
+ shouldRunTask = false;
+ }
+ }
+ } else {
+ // This should not happen.
+ LOG.warn("Ignoring kill request for task {} since a previous kill request was processed",
+ taskSpec.getTaskAttemptID());
}
+ } else {
+ LOG.info("Ignoring kill request for task {} since it's already complete",
+ taskSpec.getTaskAttemptID());
}
- // Sending a kill message to the AM right here. Don't need to wait for the task to complete.
- reportTaskKilled();
}
/**
@@ -382,6 +382,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
// via a kill message when a task kill is requested by the daemon.
@Override
public void onSuccess(TaskRunner2Result result) {
+ isCompleted.set(true);
switch(result.getEndReason()) {
// Only the KILLED case requires a message to be sent out to the AM.
case SUCCESS:
@@ -424,6 +425,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
@Override
public void onFailure(Throwable t) {
+ isCompleted.set(true);
LOG.error("TezTaskRunner execution failed for : " + getTaskIdentifierString(request), t);
// TODO HIVE-10236 Report a fatal error over the umbilical
taskRunnerCallable.shutdown();