You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/11/27 22:39:07 UTC
hive git commit: HIVE-18073 : AM may assert when its guaranteed task
count is reduced (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Repository: hive
Updated Branches:
refs/heads/master 44ef59915 -> 2b730524f
HIVE-18073 : AM may assert when its guaranteed task count is reduced (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2b730524
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2b730524
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2b730524
Branch: refs/heads/master
Commit: 2b730524f19420df120ec73ef972ad244ae380c1
Parents: 44ef599
Author: sergey <se...@apache.org>
Authored: Mon Nov 27 14:33:22 2017 -0800
Committer: sergey <se...@apache.org>
Committed: Mon Nov 27 14:33:22 2017 -0800
----------------------------------------------------------------------
.../tezplugins/LlapTaskSchedulerService.java | 133 ++++++++++---------
1 file changed, 73 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/2b730524/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
index 66f3d2e..e97a267 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
@@ -431,21 +431,21 @@ public class LlapTaskSchedulerService extends TaskScheduler {
delta -= totalUpdated;
WM_LOG.info("Distributed " + totalUpdated);
}
- WM_LOG.info("Setting unused: " + unusedGuaranteed + " plus " + delta);
- unusedGuaranteed += delta;
+ int result = (unusedGuaranteed += delta);
+ WM_LOG.info("Setting unused to " + result + " based on remaining delta " + delta);
} else {
delta = -delta;
if (delta <= unusedGuaranteed) {
// Somebody took away our unwanted ducks.
- WM_LOG.info("Setting unused: " + unusedGuaranteed + " minus " + delta);
- unusedGuaranteed -= delta;
+ int result = (unusedGuaranteed -= delta);
+ WM_LOG.info("Setting unused to " + result + " based on full delta " + delta);
return;
} else {
delta -= unusedGuaranteed;
unusedGuaranteed = 0;
toUpdate = new ArrayList<>();
int totalUpdated = revokeGuaranteed(delta, null, toUpdate);
- WM_LOG.info("Unused is 0; revoked " + totalUpdated + " / " + delta);
+ WM_LOG.info("Setting unused to 0; revoked " + totalUpdated + " / " + delta);
// We must be able to take away the requisite number; if we can't, where'd the ducks go?
if (delta != totalUpdated) {
throw new AssertionError("Failed to revoke " + delta + " guaranteed tasks locally");
@@ -1109,7 +1109,10 @@ public class LlapTaskSchedulerService extends TaskScheduler {
List<TaskInfo> toUpdate = new ArrayList<>(1);
int updatedCount = distributeGuaranteed(1, null, toUpdate);
assert updatedCount <= 1;
- unusedGuaranteed += (1 - updatedCount);
+ if (updatedCount == 0) {
+ int result = ++unusedGuaranteed;
+ WM_LOG.info("Returning the unused duck; unused is now " + result);
+ }
if (toUpdate.isEmpty()) return null;
assert toUpdate.size() == 1;
return toUpdate.get(0);
@@ -1491,7 +1494,6 @@ public class LlapTaskSchedulerService extends TaskScheduler {
&& !removeFromRunningTaskMap(guaranteedTasks, task, taskInfo)) {
Preconditions.checkState(false, "runningTasks should contain an entry if the task" +
" was in running state. Caused by task: {}", task);
- // TODO: [!!!] is it possible that we are losing the guaranteed task here? dbl check paths
}
}
} else {
@@ -1543,7 +1545,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
@VisibleForTesting
protected void schedulePendingTasks() throws InterruptedException {
- Ref<TaskInfo> downgradedTask = null;
+ Ref<TaskInfo> downgradedTask = new Ref<>(null);
writeLock.lock();
try {
if (LOG.isDebugEnabled()) {
@@ -1565,31 +1567,10 @@ public class LlapTaskSchedulerService extends TaskScheduler {
dagStats.registerDelayedAllocation();
}
taskInfo.triedAssigningTask();
- if (unusedGuaranteed > 0) {
- synchronized (taskInfo) {
- assert !taskInfo.isPendingUpdate; // No updates before it's running.
- taskInfo.isGuaranteed = true;
- }
- --unusedGuaranteed;
- } else {
- // We could be scheduling a guaranteed task when a higher priority task cannot be
- // scheduled. Try to take a duck away from a lower priority task here.
- downgradedTask = new Ref<>(null);
- if (findGuaranteedToReallocate(taskInfo, downgradedTask)) {
- // We are revoking another duck; don't wait. We could also give the duck
- // to this task in the callback instead.
- synchronized (taskInfo) {
- assert !taskInfo.isPendingUpdate; // No updates before it's running.
- taskInfo.isGuaranteed = true;
- }
- // Note: after this, the method MUST send the downgrade message to downgradedTask
- // (outside of the writeLock, preferably), before exiting.
- }
- }
- ScheduleResult scheduleResult = scheduleTask(taskInfo, totalResource);
+ ScheduleResult scheduleResult = scheduleTask(taskInfo, totalResource, downgradedTask);
+ // Note: we must handle downgradedTask after this. We do it at the end, outside the lock.
if (LOG.isDebugEnabled()) {
- LOG.debug("ScheduleResult for Task: {} = {}", taskInfo,
- scheduleResult);
+ LOG.debug("ScheduleResult for Task: {} = {}", taskInfo, scheduleResult);
}
if (scheduleResult == ScheduleResult.SCHEDULED) {
taskIter.remove();
@@ -1699,8 +1680,8 @@ public class LlapTaskSchedulerService extends TaskScheduler {
} finally {
writeLock.unlock();
}
- if (downgradedTask != null && downgradedTask.value != null) {
- WM_LOG.info("downgrading " + downgradedTask.value.attemptId);
+ if (downgradedTask.value != null) {
+ WM_LOG.info("Downgrading " + downgradedTask.value.attemptId);
checkAndSendGuaranteedStateUpdate(downgradedTask.value);
}
}
@@ -1722,38 +1703,70 @@ public class LlapTaskSchedulerService extends TaskScheduler {
return sb.toString();
}
- private ScheduleResult scheduleTask(TaskInfo taskInfo, Resource totalResource) {
+ private ScheduleResult scheduleTask(TaskInfo taskInfo, Resource totalResource,
+ Ref<TaskInfo> downgradedTask) {
Preconditions.checkNotNull(totalResource, "totalResource can not be null");
// If there's no memory available, fail
if (totalResource.getMemory() <= 0) {
return SELECT_HOST_RESULT_INADEQUATE_TOTAL_CAPACITY.scheduleResult;
}
SelectHostResult selectHostResult = selectHost(taskInfo);
- if (selectHostResult.scheduleResult == ScheduleResult.SCHEDULED) {
- NodeInfo nodeInfo = selectHostResult.nodeInfo;
- Container container =
- containerFactory.createContainer(nodeInfo.getResourcePerExecutor(), taskInfo.priority,
- nodeInfo.getHost(),
- nodeInfo.getRpcPort(),
- nodeInfo.getServiceAddress());
- writeLock.lock(); // While updating local structures
- // Note: this is actually called under the epic writeLock in schedulePendingTasks
- try {
- // The canAccept part of this log message does not account for this allocation.
- assignedTaskCounter.incrementAndGet();
- LOG.info("Assigned #{}, task={} on node={}, to container={}",
- assignedTaskCounter.get(),
- taskInfo, nodeInfo.toShortString(), container.getId());
- dagStats.registerTaskAllocated(taskInfo.requestedHosts, taskInfo.requestedRacks,
- nodeInfo.getHost());
- taskInfo.setAssignmentInfo(nodeInfo, container.getId(), clock.getTime());
- registerRunningTask(taskInfo);
- nodeInfo.registerTaskScheduled();
- } finally {
- writeLock.unlock();
- }
- getContext().taskAllocated(taskInfo.task, taskInfo.clientCookie, container);
+ if (selectHostResult.scheduleResult != ScheduleResult.SCHEDULED) {
+ return selectHostResult.scheduleResult;
}
+ if (unusedGuaranteed > 0) {
+ boolean wasGuaranteed = false;
+ synchronized (taskInfo) {
+ assert !taskInfo.isPendingUpdate; // No updates before it's running.
+ wasGuaranteed = taskInfo.isGuaranteed;
+ taskInfo.isGuaranteed = true;
+ }
+ if (wasGuaranteed) {
+ // This should never happen - we only schedule one attempt once.
+ WM_LOG.error("The task had guaranteed flag set before scheduling: " + taskInfo);
+ } else {
+ int result = --unusedGuaranteed;
+ WM_LOG.info("Using an unused duck for " + taskInfo.attemptId
+ + "; unused is now " + result);
+ }
+ } else {
+ // We could be scheduling a guaranteed task when a higher priority task cannot be
+ // scheduled. Try to take a duck away from a lower priority task here.
+ if (findGuaranteedToReallocate(taskInfo, downgradedTask)) {
+ // We are revoking another duck; don't wait. We could also give the duck
+ // to this task in the callback instead.
+ synchronized (taskInfo) {
+ assert !taskInfo.isPendingUpdate; // No updates before it's running.
+ taskInfo.isGuaranteed = true;
+ }
+ // Note: after this, the caller MUST send the downgrade message to downgradedTask
+ // (outside of the writeLock, preferably), before exiting.
+ }
+ }
+
+ NodeInfo nodeInfo = selectHostResult.nodeInfo;
+ Container container =
+ containerFactory.createContainer(nodeInfo.getResourcePerExecutor(), taskInfo.priority,
+ nodeInfo.getHost(),
+ nodeInfo.getRpcPort(),
+ nodeInfo.getServiceAddress());
+ writeLock.lock(); // While updating local structures
+ // Note: this is actually called under the epic writeLock in schedulePendingTasks
+ try {
+ // The canAccept part of this log message does not account for this allocation.
+ assignedTaskCounter.incrementAndGet();
+ LOG.info("Assigned #{}, task={} on node={}, to container={}",
+ assignedTaskCounter.get(),
+ taskInfo, nodeInfo.toShortString(), container.getId());
+ dagStats.registerTaskAllocated(taskInfo.requestedHosts, taskInfo.requestedRacks,
+ nodeInfo.getHost());
+ taskInfo.setAssignmentInfo(nodeInfo, container.getId(), clock.getTime());
+ registerRunningTask(taskInfo);
+ nodeInfo.registerTaskScheduled();
+ } finally {
+ writeLock.unlock();
+ }
+ getContext().taskAllocated(taskInfo.task, taskInfo.clientCookie, container);
return selectHostResult.scheduleResult;
}
@@ -2818,7 +2831,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
communicator.startUpdateGuaranteed(ti.attemptId, ti.assignedNode, newState, UPDATE_CALLBACK, ti);
}
-
+ @VisibleForTesting
int getUnusedGuaranteedCount() {
return unusedGuaranteed;
}