You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/03/05 22:58:25 UTC
git commit: TEZ-915. TaskScheduler can get hung when all headroom is
used and it cannot utilize existing new containers (bikas)
Repository: incubator-tez
Updated Branches:
refs/heads/master 664046686 -> 18290c848
TEZ-915. TaskScheduler can get hung when all headroom is used and it cannot utilize existing new containers (bikas)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/18290c84
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/18290c84
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/18290c84
Branch: refs/heads/master
Commit: 18290c8489df01f814087673ecdc6b140fbffc44
Parents: 6640466
Author: Bikas Saha <bi...@apache.org>
Authored: Wed Mar 5 13:58:14 2014 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Wed Mar 5 13:58:14 2014 -0800
----------------------------------------------------------------------
.../apache/tez/dag/app/rm/TaskScheduler.java | 89 ++++++++++++++++++--
.../tez/dag/app/rm/TestTaskScheduler.java | 50 ++++++++++-
2 files changed, 127 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/18290c84/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
index c50e3a4..36c90ca 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
@@ -576,6 +576,7 @@ public class TaskScheduler extends AbstractService
Container containerToAssign = heldContainer.container;
+ heldContainer.incrementAssignmentAttempts();
// Each time a container is seen, we try node, rack and non-local in that
// order depending on matching level allowed
@@ -906,11 +907,6 @@ public class TaskScheduler extends AbstractService
" delayedContainers: " + delayedContainerManager.delayedContainers.size());
}
assert freeResources.getMemory() >= 0;
-
- if (delayedContainerManager.delayedContainers.size() > 0) {
- // if we are holding onto containers then nothing to preempt from outside
- return;
- }
CookieContainerRequest highestPriRequest = null;
for(CookieContainerRequest request : taskRequests.values()) {
@@ -926,6 +922,74 @@ public class TaskScheduler extends AbstractService
// highest priority request will not fit in existing free resources
// free up some more
// TODO this is subject to error wrt RM resource normalization
+
+ // This request must have been considered for matching with all existing
+ // containers when request was made.
+ Container lowestPriNewContainer = null;
+ // could not find anything to preempt. Check if we can release unused
+ // containers
+ for (HeldContainer heldContainer : delayedContainerManager.delayedContainers) {
+ if (!heldContainer.isNew()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Reused container exists. Wait for assignment loop to release it. "
+ + heldContainer.getContainer().getId());
+ }
+ return;
+ }
+ if (heldContainer.geNumAssignmentAttempts() < 3) {
+ // we havent tried to assign this container at node/rack/ANY
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Brand new container. Wait for assignment loop to match it. "
+ + heldContainer.getContainer().getId());
+ }
+ return;
+ }
+ Container container = heldContainer.getContainer();
+ if (lowestPriNewContainer == null ||
+ isHigherPriority(lowestPriNewContainer.getPriority(), container.getPriority())){
+ // there is a lower priority new container
+ lowestPriNewContainer = container;
+ }
+ }
+
+ if (lowestPriNewContainer != null) {
+ LOG.info("Preempting new container: " + lowestPriNewContainer.getId() +
+ " with priority: " + lowestPriNewContainer.getPriority() +
+ " to free resource for request: " + highestPriRequest +
+ " . Current free resources: " + freeResources);
+ releaseUnassignedContainers(Collections.singletonList(lowestPriNewContainer));
+ // We are returning an unused resource back the RM. The RM thinks it
+ // has serviced our initial request and will not re-allocate this back
+ // to us anymore. So we need to ask for this again. If there is no
+ // outstanding request at that priority then its fine to not ask again.
+ // See TEZ-915 for more details
+ for (Map.Entry<Object, CookieContainerRequest> entry : taskRequests.entrySet()) {
+ Object task = entry.getKey();
+ CookieContainerRequest request = entry.getValue();
+ if (request.getPriority().equals(lowestPriNewContainer.getPriority())) {
+ LOG.info("Resending request for task again: " + task);
+ deallocateTask(task, true);
+ allocateTask(task, request.getCapability(),
+ (request.getNodes() == null ? null :
+ request.getNodes().toArray(new String[request.getNodes().size()])),
+ (request.getRacks() == null ? null :
+ request.getRacks().toArray(new String[request.getRacks().size()])),
+ request.getPriority(),
+ request.getCookie().getContainerSignature(),
+ request.getCookie().getAppCookie());
+ break;
+ }
+ }
+
+ return;
+ }
+
+ // this assert will be a no-op in production but can help identify
+ // invalid assumptions during testing
+ assert delayedContainerManager.delayedContainers.isEmpty();
+
+ // there are no reused or new containers to release
+ // try to preempt running containers
Map.Entry<Object, Container> preemptedEntry = null;
for(Map.Entry<Object, Container> entry : taskAllocations.entrySet()) {
HeldContainer heldContainer = heldContainers.get(entry.getValue().getId());
@@ -1265,7 +1329,6 @@ public class TaskScheduler extends AbstractService
if (blacklistedNodes.contains(container.getNodeId())) {
CookieContainerRequest request = entry.getKey();
Object task = getTask(request);
- Object clientCookie = request.getCookie().getAppCookie();
LOG.info("Container: " + container.getId() +
" allocated on blacklisted node: " + container.getNodeId() +
" for task: " + task);
@@ -1280,7 +1343,8 @@ public class TaskScheduler extends AbstractService
(request.getRacks() == null ? null :
request.getRacks().toArray(new String[request.getRacks().size()])),
request.getPriority(),
- request.getCookie().getContainerSignature(), clientCookie);
+ request.getCookie().getContainerSignature(),
+ request.getCookie().getAppCookie());
} else {
informAppAboutAssignment(entry.getKey(), container);
}
@@ -1433,7 +1497,7 @@ public class TaskScheduler extends AbstractService
}
}
- private PriorityBlockingQueue<HeldContainer> delayedContainers =
+ PriorityBlockingQueue<HeldContainer> delayedContainers =
new PriorityBlockingQueue<HeldContainer>(20,
new HeldContainerTimerComparator());
@@ -1672,6 +1736,7 @@ public class TaskScheduler extends AbstractService
private LocalityMatchLevel localityMatchLevel;
private long containerExpiryTime;
private CookieContainerRequest lastTaskInfo;
+ private int numAssignmentAttempts = 0;
HeldContainer(Container container,
long nextScheduleTime,
@@ -1691,6 +1756,14 @@ public class TaskScheduler extends AbstractService
return firstContainerSignature == null;
}
+ int geNumAssignmentAttempts() {
+ return numAssignmentAttempts;
+ }
+
+ void incrementAssignmentAttempts() {
+ numAssignmentAttempts++;
+ }
+
public Container getContainer() {
return this.container;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/18290c84/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
index 63cc476..64a0b0c 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
@@ -66,6 +66,7 @@ import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.DAGAppMasterState;
import org.apache.tez.dag.app.rm.TaskScheduler.CookieContainerRequest;
+import org.apache.tez.dag.app.rm.TaskScheduler.HeldContainer;
import org.apache.tez.dag.app.rm.TaskScheduler.TaskSchedulerAppCallback;
import org.apache.tez.dag.app.rm.TaskScheduler.TaskSchedulerAppCallback.AppFinalStatus;
import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerAppCallbackDrainable;
@@ -1046,6 +1047,7 @@ public class TestTaskScheduler {
scheduler.onContainersAllocated(containers);
drainableAppCallback.drain();
+ Assert.assertEquals(3, scheduler.taskAllocations.size());
Assert.assertEquals(3072, scheduler.allocatedResources.getMemory());
Assert.assertEquals(mockCId1,
scheduler.taskAllocations.get(mockTask1).getId());
@@ -1059,19 +1061,59 @@ public class TestTaskScheduler {
drainableAppCallback.drain();
verify(mockRMClient, times(0)).releaseAssignedContainer((ContainerId)any());
+ Object mockTask3WaitCookie = new Object();
scheduler.allocateTask(mockTask3Wait, taskAsk, null,
- null, pri6, obj3, null);
+ null, pri6, obj3, mockTask3WaitCookie);
// no preemption - same pri
scheduler.getProgress();
drainableAppCallback.drain();
verify(mockRMClient, times(0)).releaseAssignedContainer((ContainerId)any());
-
+
+ Priority pri8 = Priority.newInstance(8);
+ Container mockContainer4 = mock(Container.class, RETURNS_DEEP_STUBS);
+ when(mockContainer4.getNodeId().getHost()).thenReturn("host1");
+ when(mockContainer4.getResource()).thenReturn(taskAsk);
+ when(mockContainer4.getPriority()).thenReturn(pri8);
+ ContainerId mockCId4 = mock(ContainerId.class);
+ when(mockContainer4.getId()).thenReturn(mockCId4);
+ containers.clear();
+ containers.add(mockContainer4);
+
+ // Fudge new container being present in delayed allocation list due to race
+ HeldContainer heldContainer = new HeldContainer(mockContainer4, -1, -1, null);
+ scheduler.delayedContainerManager.delayedContainers.add(heldContainer);
+ // no preemption - container assignment attempts < 3
+ scheduler.getProgress();
+ drainableAppCallback.drain();
+ verify(mockRMClient, times(0)).releaseAssignedContainer((ContainerId)any());
+ heldContainer.incrementAssignmentAttempts();
+ // no preemption - container assignment attempts < 3
+ scheduler.getProgress();
+ drainableAppCallback.drain();
+ verify(mockRMClient, times(0)).releaseAssignedContainer((ContainerId)any());
+ heldContainer.incrementAssignmentAttempts();
+ heldContainer.incrementAssignmentAttempts();
+ // preemption - container released and resource asked again
+ scheduler.getProgress();
+ drainableAppCallback.drain();
+ verify(mockRMClient, times(1)).releaseAssignedContainer((ContainerId)any());
+ verify(mockRMClient, times(1)).releaseAssignedContainer(mockCId4);
+ verify(mockRMClient, times(4)).
+ addContainerRequest(requestCaptor.capture());
+ CookieContainerRequest reAdded = requestCaptor.getValue();
+ Assert.assertEquals(pri6, reAdded.getPriority());
+ Assert.assertEquals(taskAsk, reAdded.getCapability());
+ Assert.assertEquals(mockTask3WaitCookie, reAdded.getCookie().getAppCookie());
+
+ // remove fudging.
+ scheduler.delayedContainerManager.delayedContainers.clear();
+
scheduler.allocateTask(mockTask3Retry, taskAsk, null,
null, pri5, obj3, null);
// no preemption - higher pri. exact match
scheduler.getProgress();
drainableAppCallback.drain();
- verify(mockRMClient, times(0)).releaseAssignedContainer((ContainerId)any());
+ verify(mockRMClient, times(1)).releaseAssignedContainer((ContainerId)any());
scheduler.allocateTask(mockTask2, taskAsk, null,
null, pri4, null, null);
@@ -1080,7 +1122,7 @@ public class TestTaskScheduler {
// mockTaskPri3Kill gets preempted
scheduler.getProgress();
drainableAppCallback.drain();
- verify(mockRMClient, times(1)).releaseAssignedContainer((ContainerId)any());
+ verify(mockRMClient, times(2)).releaseAssignedContainer((ContainerId)any());
verify(mockRMClient, times(1)).releaseAssignedContainer(mockCId3);
AppFinalStatus finalStatus =