You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/03/05 22:58:25 UTC

git commit: TEZ-915. TaskScheduler can get hung when all headroom is used and it cannot utilize existing new containers (bikas)

Repository: incubator-tez
Updated Branches:
  refs/heads/master 664046686 -> 18290c848


TEZ-915. TaskScheduler can get hung when all headroom is used and it cannot utilize existing new containers (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/18290c84
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/18290c84
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/18290c84

Branch: refs/heads/master
Commit: 18290c8489df01f814087673ecdc6b140fbffc44
Parents: 6640466
Author: Bikas Saha <bi...@apache.org>
Authored: Wed Mar 5 13:58:14 2014 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Wed Mar 5 13:58:14 2014 -0800

----------------------------------------------------------------------
 .../apache/tez/dag/app/rm/TaskScheduler.java    | 89 ++++++++++++++++++--
 .../tez/dag/app/rm/TestTaskScheduler.java       | 50 ++++++++++-
 2 files changed, 127 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/18290c84/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
index c50e3a4..36c90ca 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
@@ -576,6 +576,7 @@ public class TaskScheduler extends AbstractService
 
       Container containerToAssign = heldContainer.container;
 
+      heldContainer.incrementAssignmentAttempts();
       // Each time a container is seen, we try node, rack and non-local in that
       // order depending on matching level allowed
 
@@ -906,11 +907,6 @@ public class TaskScheduler extends AbstractService
           " delayedContainers: " + delayedContainerManager.delayedContainers.size());
       }
       assert freeResources.getMemory() >= 0;
-      
-      if (delayedContainerManager.delayedContainers.size() > 0) {
-        // if we are holding onto containers then nothing to preempt from outside
-        return;
-      }
   
       CookieContainerRequest highestPriRequest = null;
       for(CookieContainerRequest request : taskRequests.values()) {
@@ -926,6 +922,74 @@ public class TaskScheduler extends AbstractService
         // highest priority request will not fit in existing free resources
         // free up some more
         // TODO this is subject to error wrt RM resource normalization
+        
+        // This request must have been considered for matching with all existing 
+        // containers when request was made.
+        Container lowestPriNewContainer = null;
+        // could not find anything to preempt. Check if we can release unused 
+        // containers
+        for (HeldContainer heldContainer : delayedContainerManager.delayedContainers) {
+          if (!heldContainer.isNew()) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Reused container exists. Wait for assignment loop to release it. "
+                  + heldContainer.getContainer().getId());
+            }
+            return;
+          }
+          if (heldContainer.geNumAssignmentAttempts() < 3) {
+            // we havent tried to assign this container at node/rack/ANY
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Brand new container. Wait for assignment loop to match it. "
+                  + heldContainer.getContainer().getId());
+            }
+            return;
+          }
+          Container container = heldContainer.getContainer();
+          if (lowestPriNewContainer == null ||
+              isHigherPriority(lowestPriNewContainer.getPriority(), container.getPriority())){
+            // there is a lower priority new container
+            lowestPriNewContainer = container;
+          }
+        }
+        
+        if (lowestPriNewContainer != null) {
+          LOG.info("Preempting new container: " + lowestPriNewContainer.getId() +
+              " with priority: " + lowestPriNewContainer.getPriority() + 
+              " to free resource for request: " + highestPriRequest +
+              " . Current free resources: " + freeResources);
+          releaseUnassignedContainers(Collections.singletonList(lowestPriNewContainer));
+          // We are returning an unused resource back the RM. The RM thinks it 
+          // has serviced our initial request and will not re-allocate this back
+          // to us anymore. So we need to ask for this again. If there is no
+          // outstanding request at that priority then its fine to not ask again.
+          // See TEZ-915 for more details
+          for (Map.Entry<Object, CookieContainerRequest> entry : taskRequests.entrySet()) {
+            Object task = entry.getKey();
+            CookieContainerRequest request = entry.getValue();
+            if (request.getPriority().equals(lowestPriNewContainer.getPriority())) {
+              LOG.info("Resending request for task again: " + task);
+              deallocateTask(task, true);
+              allocateTask(task, request.getCapability(), 
+                  (request.getNodes() == null ? null : 
+                    request.getNodes().toArray(new String[request.getNodes().size()])), 
+                    (request.getRacks() == null ? null : 
+                      request.getRacks().toArray(new String[request.getRacks().size()])), 
+                    request.getPriority(), 
+                    request.getCookie().getContainerSignature(),
+                    request.getCookie().getAppCookie());
+              break;
+            }
+          }
+          
+          return;
+        }
+        
+        // this assert will be a no-op in production but can help identify 
+        // invalid assumptions during testing
+        assert delayedContainerManager.delayedContainers.isEmpty();
+        
+        // there are no reused or new containers to release
+        // try to preempt running containers
         Map.Entry<Object, Container> preemptedEntry = null;
         for(Map.Entry<Object, Container> entry : taskAllocations.entrySet()) {
           HeldContainer heldContainer = heldContainers.get(entry.getValue().getId());
@@ -1265,7 +1329,6 @@ public class TaskScheduler extends AbstractService
       if (blacklistedNodes.contains(container.getNodeId())) {
         CookieContainerRequest request = entry.getKey();
         Object task = getTask(request);
-        Object clientCookie = request.getCookie().getAppCookie();
         LOG.info("Container: " + container.getId() + 
             " allocated on blacklisted node: " + container.getNodeId() + 
             " for task: " + task);
@@ -1280,7 +1343,8 @@ public class TaskScheduler extends AbstractService
             (request.getRacks() == null ? null : 
               request.getRacks().toArray(new String[request.getRacks().size()])), 
             request.getPriority(), 
-            request.getCookie().getContainerSignature(), clientCookie);
+            request.getCookie().getContainerSignature(), 
+            request.getCookie().getAppCookie());
       } else {
         informAppAboutAssignment(entry.getKey(), container);
       }
@@ -1433,7 +1497,7 @@ public class TaskScheduler extends AbstractService
       }
     }
 
-    private PriorityBlockingQueue<HeldContainer> delayedContainers =
+    PriorityBlockingQueue<HeldContainer> delayedContainers =
       new PriorityBlockingQueue<HeldContainer>(20,
         new HeldContainerTimerComparator());
 
@@ -1672,6 +1736,7 @@ public class TaskScheduler extends AbstractService
     private LocalityMatchLevel localityMatchLevel;
     private long containerExpiryTime;
     private CookieContainerRequest lastTaskInfo;
+    private int numAssignmentAttempts = 0;
     
     HeldContainer(Container container,
         long nextScheduleTime,
@@ -1691,6 +1756,14 @@ public class TaskScheduler extends AbstractService
       return firstContainerSignature == null;
     }
     
+    int geNumAssignmentAttempts() {
+      return numAssignmentAttempts;
+    }
+    
+    void incrementAssignmentAttempts() {
+      numAssignmentAttempts++;
+    }
+    
     public Container getContainer() {
       return this.container;
     }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/18290c84/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
index 63cc476..64a0b0c 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
@@ -66,6 +66,7 @@ import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.DAGAppMasterState;
 import org.apache.tez.dag.app.rm.TaskScheduler.CookieContainerRequest;
+import org.apache.tez.dag.app.rm.TaskScheduler.HeldContainer;
 import org.apache.tez.dag.app.rm.TaskScheduler.TaskSchedulerAppCallback;
 import org.apache.tez.dag.app.rm.TaskScheduler.TaskSchedulerAppCallback.AppFinalStatus;
 import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerAppCallbackDrainable;
@@ -1046,6 +1047,7 @@ public class TestTaskScheduler {
     
     scheduler.onContainersAllocated(containers);
     drainableAppCallback.drain();
+    Assert.assertEquals(3, scheduler.taskAllocations.size());
     Assert.assertEquals(3072, scheduler.allocatedResources.getMemory());
     Assert.assertEquals(mockCId1,
         scheduler.taskAllocations.get(mockTask1).getId());
@@ -1059,19 +1061,59 @@ public class TestTaskScheduler {
     drainableAppCallback.drain();
     verify(mockRMClient, times(0)).releaseAssignedContainer((ContainerId)any());
 
+    Object mockTask3WaitCookie = new Object();
     scheduler.allocateTask(mockTask3Wait, taskAsk, null,
-                           null, pri6, obj3, null);
+                           null, pri6, obj3, mockTask3WaitCookie);
     // no preemption - same pri
     scheduler.getProgress();
     drainableAppCallback.drain();
     verify(mockRMClient, times(0)).releaseAssignedContainer((ContainerId)any());
-
+    
+    Priority pri8 = Priority.newInstance(8);
+    Container mockContainer4 = mock(Container.class, RETURNS_DEEP_STUBS);
+    when(mockContainer4.getNodeId().getHost()).thenReturn("host1");
+    when(mockContainer4.getResource()).thenReturn(taskAsk);
+    when(mockContainer4.getPriority()).thenReturn(pri8);
+    ContainerId mockCId4 = mock(ContainerId.class);
+    when(mockContainer4.getId()).thenReturn(mockCId4);
+    containers.clear();
+    containers.add(mockContainer4);
+    
+    // Fudge new container being present in delayed allocation list due to race
+    HeldContainer heldContainer = new HeldContainer(mockContainer4, -1, -1, null);
+    scheduler.delayedContainerManager.delayedContainers.add(heldContainer);
+    // no preemption - container assignment attempts < 3
+    scheduler.getProgress();
+    drainableAppCallback.drain();
+    verify(mockRMClient, times(0)).releaseAssignedContainer((ContainerId)any());
+    heldContainer.incrementAssignmentAttempts();
+    // no preemption - container assignment attempts < 3
+    scheduler.getProgress();
+    drainableAppCallback.drain();
+    verify(mockRMClient, times(0)).releaseAssignedContainer((ContainerId)any());
+    heldContainer.incrementAssignmentAttempts();
+    heldContainer.incrementAssignmentAttempts();
+    // preemption - container released and resource asked again
+    scheduler.getProgress();
+    drainableAppCallback.drain();
+    verify(mockRMClient, times(1)).releaseAssignedContainer((ContainerId)any());
+    verify(mockRMClient, times(1)).releaseAssignedContainer(mockCId4);
+    verify(mockRMClient, times(4)).
+    addContainerRequest(requestCaptor.capture());
+    CookieContainerRequest reAdded = requestCaptor.getValue();
+    Assert.assertEquals(pri6, reAdded.getPriority());
+    Assert.assertEquals(taskAsk, reAdded.getCapability());
+    Assert.assertEquals(mockTask3WaitCookie, reAdded.getCookie().getAppCookie());
+    
+    // remove fudging.
+    scheduler.delayedContainerManager.delayedContainers.clear();
+    
     scheduler.allocateTask(mockTask3Retry, taskAsk, null,
                            null, pri5, obj3, null);
     // no preemption - higher pri. exact match
     scheduler.getProgress();
     drainableAppCallback.drain();
-    verify(mockRMClient, times(0)).releaseAssignedContainer((ContainerId)any());
+    verify(mockRMClient, times(1)).releaseAssignedContainer((ContainerId)any());
 
     scheduler.allocateTask(mockTask2, taskAsk, null,
                            null, pri4, null, null);
@@ -1080,7 +1122,7 @@ public class TestTaskScheduler {
     // mockTaskPri3Kill gets preempted
     scheduler.getProgress();
     drainableAppCallback.drain();
-    verify(mockRMClient, times(1)).releaseAssignedContainer((ContainerId)any());
+    verify(mockRMClient, times(2)).releaseAssignedContainer((ContainerId)any());
     verify(mockRMClient, times(1)).releaseAssignedContainer(mockCId3);
 
     AppFinalStatus finalStatus =