You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/11/22 02:04:04 UTC
git commit: TEZ-622. Failures in DelayedContainerManager can cause
the AM to hang. (hitesh)
Updated Branches:
refs/heads/master 7b2a0e6f3 -> 8a8ba651b
TEZ-622. Failures in DelayedContainerManager can cause the AM to hang. (hitesh)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/8a8ba651
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/8a8ba651
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/8a8ba651
Branch: refs/heads/master
Commit: 8a8ba651b9cb0d8f3b0f5039fc7a614fabbf0a2a
Parents: 7b2a0e6
Author: Hitesh Shah <hi...@apache.org>
Authored: Thu Nov 21 17:03:45 2013 -0800
Committer: Hitesh Shah <hi...@apache.org>
Committed: Thu Nov 21 17:03:45 2013 -0800
----------------------------------------------------------------------
.../apache/tez/dag/app/rm/TaskScheduler.java | 71 ++++++++++++--------
.../tez/mapreduce/examples/MRRSleepJob.java | 2 +-
2 files changed, 44 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8a8ba651/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
index d8d2de3..67966b1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
@@ -854,12 +854,17 @@ public class TaskScheduler extends AbstractService
} else {
// Don't attempt to delay containers if delay is 0.
HeldContainer heldContainer = heldContainers.get(container.getId());
- heldContainer.resetLocalityMatchLevel();
- long currentTime = System.currentTimeMillis();
- if (sessionDelay > 0) {
- heldContainer.setContainerExpiryTime(currentTime + sessionDelay);
+ if (heldContainer != null) {
+ heldContainer.resetLocalityMatchLevel();
+ long currentTime = System.currentTimeMillis();
+ if (sessionDelay > 0) {
+ heldContainer.setContainerExpiryTime(currentTime + sessionDelay);
+ }
+ assignedContainers = assignDelayedContainer(heldContainer);
+ } else {
+ LOG.info("Skipping container after task deallocate as container is"
+ + " no longer running, containerId=" + container.getId());
}
- assignedContainers = assignDelayedContainer(heldContainer);
}
}
}
@@ -1033,9 +1038,12 @@ public class TaskScheduler extends AbstractService
}
HeldContainer delayedContainer = heldContainers.remove(containerId);
if (delayedContainer != null) {
- Resources.subtractFrom(allocatedResources, delayedContainer.getContainer().getResource());
+ Resources.subtractFrom(allocatedResources,
+ delayedContainer.getContainer().getResource());
+ }
+ if (delayedContainer != null || !shouldReuseContainers) {
+ amRmClient.releaseAssignedContainer(containerId);
}
- amRmClient.releaseAssignedContainer(containerId);
if (assignedTask != null) {
// A task was assigned at some point. Add to release list since we are
// releasing the container.
@@ -1054,7 +1062,7 @@ public class TaskScheduler extends AbstractService
assert result == null;
containerAssignments.put(container.getId(), task);
HeldContainer heldContainer = heldContainers.get(container.getId());
- if (heldContainer == null) {
+ if (!shouldReuseContainers && heldContainer == null) {
heldContainers.put(container.getId(), new HeldContainer(container,
-1, -1, assigned));
Resources.addTo(allocatedResources, container.getResource());
@@ -1087,8 +1095,8 @@ public class TaskScheduler extends AbstractService
+ " is already held.");
}
Resources.addTo(allocatedResources, container.getResource());
- delayedContainerManager.addDelayedContainer(container,
- delayedContainerManager.maxScheduleTimeSeen + 1);
+ delayedContainerManager.addDelayedContainer(container,
+ delayedContainerManager.maxScheduleTimeSeen + 1);
}
}
delayedContainerManager.triggerScheduling(false);
@@ -1213,9 +1221,9 @@ public class TaskScheduler extends AbstractService
private void releaseUnassignedContainers(Iterable<Container> containers) {
for (Container container : containers) {
- releaseContainer(container.getId());
LOG.info("Releasing unused container: "
- + container);
+ + container.getId());
+ releaseContainer(container.getId());
}
}
@@ -1427,36 +1435,45 @@ public class TaskScheduler extends AbstractService
LOG.info("AllocatedContainerManager Thread interrupted");
}
} else {
- HeldContainer heldContainer = delayedContainers.peek();
- if (heldContainer == null) {
+ HeldContainer delayedContainer = delayedContainers.peek();
+ if (delayedContainer == null) {
continue;
}
if (LOG.isDebugEnabled()) {
- LOG.debug("Considering HeldContainer: " + heldContainer + " for assignment");
+ LOG.debug("Considering HeldContainer: "
+ + delayedContainer + " for assignment");
}
long currentTs = System.currentTimeMillis();
- long nextScheduleTs = heldContainer.getNextScheduleTime();
+ long nextScheduleTs = delayedContainer.getNextScheduleTime();
if (currentTs >= nextScheduleTs) {
// Remove the container and try scheduling it.
// TEZ-587 what if container is released by RM after this
// in onContainerCompleted()
- heldContainer = delayedContainers.poll();
- if (heldContainer == null) {
+ delayedContainer = delayedContainers.poll();
+ if (delayedContainer == null) {
continue;
}
- Map<CookieContainerRequest, Container> assignedContainers;
+ Map<CookieContainerRequest, Container> assignedContainers = null;
synchronized(TaskScheduler.this) {
- assignedContainers = assignDelayedContainer(heldContainer);
+ if (null !=
+ heldContainers.get(delayedContainer.getContainer().getId())) {
+ assignedContainers = assignDelayedContainer(delayedContainer);
+ } else {
+ LOG.info("Skipping delayed container as container is no longer"
+ + " running, containerId="
+ + delayedContainer.getContainer().getId());
+ }
}
+ // Inform App should be done outside of the lock
informAppAboutAssignments(assignedContainers);
} else {
synchronized(this) {
try {
// Wait for the next container to be assignable
- heldContainer = delayedContainers.peek();
+ delayedContainer = delayedContainers.peek();
long diff = localitySchedulingDelay;
- if (heldContainer != null) {
- diff = heldContainer.getNextScheduleTime() - currentTs;
+ if (delayedContainer != null) {
+ diff = delayedContainer.getNextScheduleTime() - currentTs;
}
if (diff > 0) {
this.wait(diff);
@@ -1526,11 +1543,9 @@ public class TaskScheduler extends AbstractService
long nextScheduleTime) {
HeldContainer delayedContainer = heldContainers.get(container.getId());
if (delayedContainer == null) {
- // Currently only already running containers are added to this. Hence
- // this condition should never occur.
- // Change this if and when all containers are handled by this construct.
- throw new TezUncheckedException("Attempting to add a non-running"
- + " container to the delayed container list");
+ LOG.warn("Attempting to add a non-running container to the"
+ + " delayed container list, containerId=" + container.getId());
+ return;
} else {
delayedContainer.setNextScheduleTime(nextScheduleTime);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8a8ba651/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
index edea15b..b02e87e 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
@@ -733,7 +733,7 @@ public class MRRSleepJob extends Configured implements Tool {
" [-mt mapSleepTime (msec)] [-rt reduceSleepTime (msec)]" +
" [-irt intermediateReduceSleepTime]" +
" [-recordt recordSleepTime (msec)]" +
- " [-generateSplitsInAM (fale)/true]" +
+ " [-generateSplitsInAM (false)/true]" +
" [-writeSplitsToDfs (false)/true]");
ToolRunner.printGenericCommandUsage(System.err);
return 2;