You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2013/11/08 09:31:25 UTC

git commit: TEZ-588. TaskScheduler may end up releasing containers and getting stuck without enough resources (bikas)

Updated Branches:
  refs/heads/master 5c217f3c5 -> ae541f78d


TEZ-588. TaskScheduler may end up releasing containers and getting stuck without enough resources (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/ae541f78
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/ae541f78
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/ae541f78

Branch: refs/heads/master
Commit: ae541f78de02fb7418ae2955bf8da9557e9901c6
Parents: 5c217f3
Author: Bikas Saha <bi...@apache.org>
Authored: Fri Nov 8 00:28:03 2013 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Fri Nov 8 00:28:03 2013 -0800

----------------------------------------------------------------------
 .../apache/tez/dag/app/rm/TaskScheduler.java    |  50 ++-
 .../tez/dag/app/rm/TezAMRMClientAsync.java      |   9 +
 .../tez/dag/app/rm/TestContainerReuse.java      |  21 +-
 .../tez/dag/app/rm/TestTaskScheduler.java       | 303 ++++++++++++++++++-
 .../dag/app/rm/TestTaskSchedulerHelpers.java    |  11 +
 5 files changed, 373 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ae541f78/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
index 4ea9c74..ccc4a94 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
@@ -539,7 +539,8 @@ public class TaskScheduler extends AbstractService
       long currentTime = System.currentTimeMillis();
       if (isNew || (heldContainer.getContainerExpiryTime() <= currentTime
           && sessionDelay != -1)) {
-        LOG.info("Container's session delay expired or is new. Releasing container"
+        LOG.info("No taskRequests. Container's session delay expired or is new. " +
+        	"Releasing container"
           + ", containerId=" + heldContainer.container.getId()
           + ", containerExpiryTime="
           + heldContainer.getContainerExpiryTime()
@@ -626,7 +627,8 @@ public class TaskScheduler extends AbstractService
         long currentTime = System.currentTimeMillis();
 
         // Release container if final expiry time is reached
-        if (heldContainer.getContainerExpiryTime() <= currentTime
+        // Dont release a new container. The RM may not give us new ones
+        if (!isNew && heldContainer.getContainerExpiryTime() <= currentTime
           && sessionDelay != -1) {
           LOG.info("Container's session delay expired. Releasing container"
             + ", containerId=" + heldContainer.container.getId()
@@ -660,11 +662,24 @@ public class TaskScheduler extends AbstractService
               hitFinalMatchLevel = false;
             }
           }
-
+          
           if (hitFinalMatchLevel) {
+            boolean safeToRelease = true;
+            Priority topPendingPriority = amRmClient.getTopPriority();
+            Priority containerPriority = heldContainer.container.getPriority();
+            if (topPendingPriority != null && 
+                containerPriority.compareTo(topPendingPriority) < 0) {
+              // this container is of lower priority and given to us by the RM for
+              // a task that will be matched after the current top priority. Keep 
+              // this container for those pending tasks since the RM is not going
+              // to give this container to us again
+              safeToRelease = false;
+            }
+            
             // Are there any pending requests at any priority?
             // release if there are tasks or this is not a session
-            if (!taskRequests.isEmpty() || !appContext.isSession()) {
+            if (safeToRelease && 
+                (!taskRequests.isEmpty() || !appContext.isSession())) {
               LOG.info("Releasing held container as either there are pending but "
                 + " unmatched requests or this is not a session"
                 + ", containerId=" + heldContainer.container.getId()
@@ -1151,6 +1166,31 @@ public class TaskScheduler extends AbstractService
     Map<CookieContainerRequest, Container> assignedContainers,
     boolean honorLocality) {
 
+    Priority containerPriority = container.getPriority();
+    Priority topPendingTaskPriority = amRmClient.getTopPriority();
+    if (topPendingTaskPriority == null) {
+      // nothing left to assign
+      return false;
+    }
+    
+    if (topPendingTaskPriority.compareTo(containerPriority) > 0) {
+      // if the next task to assign is higher priority than the container then 
+      // dont assign this container to that task.
+      // if task and container are equal priority - then its first use or reuse
+      // within the same priority - safe to use
+      // if task is lower priority than container then its we use a container that
+      // is no longer needed by higher priority tasks All those higher pri tasks 
+      // have been assigned resources - safe to use (first use or reuse)
+      // if task is higher priority than container then we may end up using a 
+      // container that was assigned by the RM for a lower priority pending task 
+      // that will be assigned after this higher priority task is assigned. If we
+      // use that task's container now then we may not be able to match this 
+      // container to that task later on. However the RM has already assigned us 
+      // all containers and is not going to give us new containers. We will get 
+      // stuck for resources.
+      return false;
+    }
+    
     CookieContainerRequest assigned =
       assigner.assignReUsedContainer(container, honorLocality);
     if (assigned != null) {
@@ -1163,7 +1203,7 @@ public class TaskScheduler extends AbstractService
   private void releaseUnassignedContainers(Iterable<Container> containers) {
     for (Container container : containers) {
       releaseContainer(container.getId());
-      LOG.info("Releasing container, No RM requests matching container: "
+      LOG.info("Releasing unused container: "
           + container);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ae541f78/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TezAMRMClientAsync.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TezAMRMClientAsync.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TezAMRMClientAsync.java
index 5d4e0bc..487c57a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TezAMRMClientAsync.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TezAMRMClientAsync.java
@@ -71,6 +71,15 @@ public class TezAMRMClientAsync<T extends ContainerRequest> extends AMRMClientAs
       AMRMClientAsync.CallbackHandler callbackHandler) {
     super(client, intervalMs, callbackHandler);
   }
+  
+  public synchronized Priority getTopPriority() {
+    Iterator<Priority> iter =
+        knownRequestsByPriority.descendingKeySet().iterator();
+    if (!iter.hasNext()) {
+      return null;
+    }
+    return iter.next();
+  }
 
   @Override
   public synchronized void addContainerRequest(T req) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ae541f78/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
index d14c74f..4e30763 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
@@ -84,15 +84,6 @@ import org.junit.Test;
 import com.google.common.collect.Lists;
 
 public class TestContainerReuse {
-
-  void waitForDelayedDrainNotify(AtomicBoolean drainNotifier)
-      throws InterruptedException {
-    while (!drainNotifier.get()) {
-      synchronized (drainNotifier) {
-        drainNotifier.wait();
-      }
-    }
-  }
   
   @Test(timeout = 15000l)
   public void testDelayedReuseContainerBecomesAvailable()
@@ -190,7 +181,7 @@ public class TestContainerReuse {
     drainNotifier.set(false);
     taskScheduler.onContainersAllocated(
       Lists.newArrayList(containerHost1, containerHost2));
-    waitForDelayedDrainNotify(drainNotifier);
+    TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
     drainableAppCallback.drain();
     verify(taskSchedulerEventHandler).taskAllocated(
       eq(ta11), any(Object.class), eq(containerHost1));
@@ -319,7 +310,7 @@ public class TestContainerReuse {
 
     drainNotifier.set(false);
     taskScheduler.onContainersAllocated(Lists.newArrayList(containerHost1, containerHost2));
-    waitForDelayedDrainNotify(drainNotifier);
+    TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
     drainableAppCallback.drain();
     verify(taskSchedulerEventHandler).taskAllocated(eq(ta11), any(Object.class), eq(containerHost1));
     verify(taskSchedulerEventHandler).taskAllocated(eq(ta21), any(Object.class), eq(containerHost2));
@@ -424,7 +415,7 @@ public class TestContainerReuse {
     // One container allocated.
     drainNotifier.set(false);
     taskScheduler.onContainersAllocated(Collections.singletonList(container1));
-    waitForDelayedDrainNotify(drainNotifier);
+    TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
     drainableAppCallback.drain();
     verify(taskSchedulerEventHandler).taskAllocated(eq(ta11), any(Object.class), eq(container1));
 
@@ -461,7 +452,7 @@ public class TestContainerReuse {
     // Second container allocated. Should be allocated to the last task.
     drainNotifier.set(false);
     taskScheduler.onContainersAllocated(Collections.singletonList(container2));
-    waitForDelayedDrainNotify(drainNotifier);
+    TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
     drainableAppCallback.drain();
     verify(taskSchedulerEventHandler).taskAllocated(eq(ta14), any(Object.class), eq(container2));
 
@@ -562,7 +553,7 @@ public class TestContainerReuse {
     // One container allocated.
     drainNotifier.set(false);
     taskScheduler.onContainersAllocated(Collections.singletonList(container1));
-    waitForDelayedDrainNotify(drainNotifier);
+    TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
     drainableAppCallback.drain();
     verify(taskSchedulerEventHandler).taskAllocated(
       eq(ta11), any(Object.class), eq(container1));
@@ -688,7 +679,7 @@ public class TestContainerReuse {
     // One container allocated.
     drainNotifier.set(false);
     taskScheduler.onContainersAllocated(Collections.singletonList(container1));
-    waitForDelayedDrainNotify(drainNotifier);
+    TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
     drainableAppCallback.drain();
     verify(taskSchedulerEventHandler).taskAllocated(
       eq(ta11), any(Object.class), eq(container1));

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ae541f78/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
index 1cffd1d..1ae7d70 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
@@ -40,6 +40,8 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
@@ -72,6 +74,7 @@ import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.PreemptionMatcher;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -83,7 +86,7 @@ public class TestTaskScheduler {
 
   @SuppressWarnings({ "unchecked" })
   @Test
-  public void testTaskScheduler() throws Exception {
+  public void testTaskSchedulerNoReuse() throws Exception {
     RackResolver.init(new YarnConfiguration());
     TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
     AppContext mockAppContext = mock(AppContext.class);
@@ -307,6 +310,304 @@ public class TestTaskScheduler {
 
     // deallocate allocated task
     assertTrue(scheduler.deallocateTask(mockTask1, true));
+    drainableAppCallback.drain();
+    verify(mockApp).containerBeingReleased(mockCId1);
+    verify(mockRMClient).releaseAssignedContainer(mockCId1);
+    // deallocate allocated container
+    Assert.assertEquals(mockTask2, scheduler.deallocateContainer(mockCId2));
+    drainableAppCallback.drain();
+    verify(mockRMClient).releaseAssignedContainer(mockCId2);
+    verify(mockRMClient, times(3)).releaseAssignedContainer((ContainerId) any());
+
+    List<ContainerStatus> statuses = new ArrayList<ContainerStatus>();
+    ContainerStatus mockStatus1 = mock(ContainerStatus.class);
+    when(mockStatus1.getContainerId()).thenReturn(mockCId1);
+    statuses.add(mockStatus1);
+    ContainerStatus mockStatus2 = mock(ContainerStatus.class);
+    when(mockStatus2.getContainerId()).thenReturn(mockCId2);
+    statuses.add(mockStatus2);
+    ContainerStatus mockStatus3 = mock(ContainerStatus.class);
+    when(mockStatus3.getContainerId()).thenReturn(mockCId3);
+    statuses.add(mockStatus3);
+    ContainerStatus mockStatus4 = mock(ContainerStatus.class);
+    when(mockStatus4.getContainerId()).thenReturn(mockCId4);
+    statuses.add(mockStatus4);
+
+    scheduler.onContainersCompleted(statuses);
+    drainableAppCallback.drain();
+    // released container status returned
+    verify(mockApp).containerCompleted(mockTask1, mockStatus1);
+    verify(mockApp).containerCompleted(mockTask2, mockStatus2);
+    // currently allocated container status returned and not released
+    verify(mockApp).containerCompleted(mockTask3, mockStatus3);
+    // no other statuses returned
+    verify(mockApp, times(3)).containerCompleted(any(), (ContainerStatus) any());
+    verify(mockRMClient, times(3)).releaseAssignedContainer((ContainerId) any());
+
+
+    float progress = 0.5f;
+    when(mockApp.getProgress()).thenReturn(progress);
+    Assert.assertEquals(progress, scheduler.getProgress(), 0);
+
+    List<NodeReport> mockUpdatedNodes = mock(List.class);
+    scheduler.onNodesUpdated(mockUpdatedNodes);
+    drainableAppCallback.drain();
+    verify(mockApp).nodesUpdated(mockUpdatedNodes);
+
+    Exception mockException = mock(Exception.class);
+    scheduler.onError(mockException);
+    drainableAppCallback.drain();
+    verify(mockApp).onError(mockException);
+
+    scheduler.onShutdownRequest();
+    drainableAppCallback.drain();
+    verify(mockApp).appShutdownRequested();
+
+    String appMsg = "success";
+    AppFinalStatus finalStatus =
+        new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
+    when(mockApp.getFinalAppStatus()).thenReturn(finalStatus);
+    scheduler.stop();
+    drainableAppCallback.drain();
+    verify(mockRMClient).
+                  unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
+                                              appMsg, appUrl);
+    verify(mockRMClient).stop();
+    scheduler.close();
+  }
+  
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  @Test
+  public void testTaskSchedulerWithReuse() throws Exception {
+    RackResolver.init(new YarnConfiguration());
+    TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
+    AppContext mockAppContext = mock(AppContext.class);
+    when(mockAppContext.getAMState()).thenReturn(DAGAppMasterState.RUNNING);
+
+    TezAMRMClientAsync<CookieContainerRequest> mockRMClient =
+                                                  mock(TezAMRMClientAsync.class);
+
+    String appHost = "host";
+    int appPort = 0;
+    String appUrl = "url";
+    TaskSchedulerWithDrainableAppCallback scheduler =
+      new TaskSchedulerWithDrainableAppCallback(
+        mockApp, new AlwaysMatchesContainerMatcher(), appHost, appPort,
+        appUrl, mockRMClient, mockAppContext);
+    TaskSchedulerAppCallbackDrainable drainableAppCallback = scheduler
+        .getDrainableAppCallback();
+
+    Configuration conf = new Configuration();
+    // to match all in the same pass
+    conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 0);
+    // to release immediately after deallocate
+    conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_SESSION_DELAY_ALLOCATION_MILLIS, 0);
+    scheduler.init(conf);
+    drainableAppCallback.drain();
+
+    RegisterApplicationMasterResponse mockRegResponse =
+                                mock(RegisterApplicationMasterResponse.class);
+    Resource mockMaxResource = mock(Resource.class);
+    Map<ApplicationAccessType, String> mockAcls = mock(Map.class);
+    when(mockRegResponse.getMaximumResourceCapability()).
+                                                   thenReturn(mockMaxResource);
+    when(mockRegResponse.getApplicationACLs()).thenReturn(mockAcls);
+    when(mockRMClient.
+          registerApplicationMaster(anyString(), anyInt(), anyString())).
+                                                   thenReturn(mockRegResponse);
+    Resource mockClusterResource = mock(Resource.class);
+    when(mockRMClient.getAvailableResources()).
+                                              thenReturn(mockClusterResource);
+
+    scheduler.start();
+    drainableAppCallback.drain();
+
+    Object mockTask1 = mock(Object.class);
+    Object mockCookie1 = mock(Object.class);
+    Resource mockCapability = mock(Resource.class);
+    String[] hosts = {"host1", "host5"};
+    String[] racks = {"/default-rack", "/default-rack"};
+    final Priority mockPriority1 = Priority.newInstance(1);
+    final Priority mockPriority2 = Priority.newInstance(2);
+    final Priority mockPriority3 = Priority.newInstance(3);
+    Object mockTask2 = mock(Object.class);
+    Object mockCookie2 = mock(Object.class);
+    Object mockTask3 = mock(Object.class);
+    Object mockCookie3 = mock(Object.class);
+    ArgumentCaptor<CookieContainerRequest> requestCaptor =
+        ArgumentCaptor.forClass(CookieContainerRequest.class);
+
+    scheduler.allocateTask(mockTask1, mockCapability, hosts,
+        racks, mockPriority1, null, mockCookie1);
+    drainableAppCallback.drain();
+    verify(mockRMClient, times(1)).
+                                addContainerRequest(requestCaptor.capture());
+    CookieContainerRequest request1 = requestCaptor.getValue();
+    scheduler.allocateTask(mockTask2, mockCapability, hosts,
+        racks, mockPriority2, null, mockCookie2);
+    drainableAppCallback.drain();
+    verify(mockRMClient, times(2)).
+                                addContainerRequest(requestCaptor.capture());
+    CookieContainerRequest request2 = requestCaptor.getValue();
+    scheduler.allocateTask(mockTask3, mockCapability, hosts,
+        racks, mockPriority3, null, mockCookie3);
+    drainableAppCallback.drain();
+    verify(mockRMClient, times(3)).
+                                addContainerRequest(requestCaptor.capture());
+    CookieContainerRequest request3 = requestCaptor.getValue();
+
+    List<Container> containers = new ArrayList<Container>();
+    // sending lower priority container first to make sure its not matched
+    Container mockContainer4 = mock(Container.class, RETURNS_DEEP_STUBS);
+    when(mockContainer4.getNodeId().getHost()).thenReturn("host4");
+    when(mockContainer4.getPriority()).thenReturn(mockPriority3);
+    ContainerId mockCId4 = mock(ContainerId.class);
+    when(mockContainer4.getId()).thenReturn(mockCId4);
+    containers.add(mockContainer4);
+    Container mockContainer1 = mock(Container.class, RETURNS_DEEP_STUBS);
+    when(mockContainer1.getNodeId().getHost()).thenReturn("host1");
+    when(mockContainer1.getPriority()).thenReturn(mockPriority1);
+    ContainerId mockCId1 = mock(ContainerId.class);
+    when(mockContainer1.getId()).thenReturn(mockCId1);
+    containers.add(mockContainer1);
+    Container mockContainer2 = mock(Container.class, RETURNS_DEEP_STUBS);
+    when(mockContainer2.getNodeId().getHost()).thenReturn("host2");
+    when(mockContainer2.getPriority()).thenReturn(mockPriority2);
+    ContainerId mockCId2 = mock(ContainerId.class);
+    when(mockContainer2.getId()).thenReturn(mockCId2);
+    containers.add(mockContainer2);
+    Container mockContainer3 = mock(Container.class, RETURNS_DEEP_STUBS);
+    when(mockContainer3.getNodeId().getHost()).thenReturn("host3");
+    when(mockContainer3.getPriority()).thenReturn(mockPriority3);
+    ContainerId mockCId3 = mock(ContainerId.class);
+    when(mockContainer3.getId()).thenReturn(mockCId3);
+    containers.add(mockContainer3);
+
+    ArrayList<CookieContainerRequest> hostContainers =
+                             new ArrayList<CookieContainerRequest>();
+    hostContainers.add(request1);
+    hostContainers.add(request2);
+    hostContainers.add(request3);
+    ArrayList<CookieContainerRequest> rackContainers =
+                             new ArrayList<CookieContainerRequest>();
+    rackContainers.add(request2);
+    rackContainers.add(request3);
+    ArrayList<CookieContainerRequest> anyContainers =
+                             new ArrayList<CookieContainerRequest>();
+    anyContainers.add(request3);
+
+    final List<ArrayList<CookieContainerRequest>> hostList =
+                        new LinkedList<ArrayList<CookieContainerRequest>>();
+    hostList.add(hostContainers);
+    final List<ArrayList<CookieContainerRequest>> rackList =
+                        new LinkedList<ArrayList<CookieContainerRequest>>();
+    rackList.add(rackContainers);
+    final List<ArrayList<CookieContainerRequest>> anyList =
+                        new LinkedList<ArrayList<CookieContainerRequest>>();
+    anyList.add(anyContainers);
+    final List<ArrayList<CookieContainerRequest>> emptyList =
+                        new LinkedList<ArrayList<CookieContainerRequest>>();
+    // return all requests for host1
+    when(
+        mockRMClient.getMatchingRequestsForTopPriority(eq("host1"),
+            (Resource) any())).thenAnswer(
+        new Answer<List<? extends Collection<CookieContainerRequest>>>() {
+          @Override
+          public List<? extends Collection<CookieContainerRequest>> answer(
+              InvocationOnMock invocation) throws Throwable {
+            return hostList;
+          }
+
+        });
+    // first request matched by host
+    // second request matched to rack. RackResolver by default puts hosts in
+    // /default-rack. We need to workaround by returning rack matches only once
+    when(
+        mockRMClient.getMatchingRequestsForTopPriority(eq("/default-rack"),
+            (Resource) any())).thenAnswer(
+        new Answer<List<? extends Collection<CookieContainerRequest>>>() {
+          @Override
+          public List<? extends Collection<CookieContainerRequest>> answer(
+              InvocationOnMock invocation) throws Throwable {
+            return rackList;
+          }
+
+        }).thenAnswer(
+        new Answer<List<? extends Collection<CookieContainerRequest>>>() {
+          @Override
+          public List<? extends Collection<CookieContainerRequest>> answer(
+              InvocationOnMock invocation) throws Throwable {
+            return emptyList;
+          }
+
+        });
+    // third request matched to ANY
+    when(
+        mockRMClient.getMatchingRequestsForTopPriority(
+            eq(ResourceRequest.ANY), (Resource) any())).thenAnswer(
+        new Answer<List<? extends Collection<CookieContainerRequest>>>() {
+          @Override
+          public List<? extends Collection<CookieContainerRequest>> answer(
+              InvocationOnMock invocation) throws Throwable {
+            return anyList;
+          }
+
+        }).thenAnswer(
+        new Answer<List<? extends Collection<CookieContainerRequest>>>() {
+          @Override
+          public List<? extends Collection<CookieContainerRequest>> answer(
+              InvocationOnMock invocation) throws Throwable {
+            return emptyList;
+          }
+
+        });
+    
+    final AtomicInteger count = new AtomicInteger(0);
+    Mockito.doAnswer(new Answer() {
+      public Object answer(InvocationOnMock invocation) {
+        count.incrementAndGet();
+        return null;
+      }})
+    .when(mockApp).taskAllocated(any(), any(), (Container)any());
+    when(mockRMClient.getTopPriority()).then(        
+        new Answer<Priority>() {
+          @Override
+          public Priority answer(
+              InvocationOnMock invocation) throws Throwable {
+            int allocations = count.get();
+            if (allocations == 0) {
+              return mockPriority1;
+            }
+            if (allocations == 1) {
+              return mockPriority2;
+            }
+            if (allocations == 2) {
+              return mockPriority3;
+            }
+            return null;
+          }
+        });
+    
+    AtomicBoolean drainNotifier = new AtomicBoolean(false);
+    scheduler.delayedContainerManager.drainedDelayedContainers = drainNotifier;
+    
+    scheduler.onContainersAllocated(containers);
+    TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
+    drainableAppCallback.drain();
+    // exact number allocations returned
+    verify(mockApp, times(3)).taskAllocated(any(), any(), (Container) any());
+    // first container allocated
+    verify(mockApp).taskAllocated(mockTask1, mockCookie1, mockContainer1);
+    verify(mockApp).taskAllocated(mockTask2, mockCookie2, mockContainer2);
+    verify(mockApp).taskAllocated(mockTask3, mockCookie3, mockContainer3);
+    verify(mockRMClient).removeContainerRequest(request1);
+    verify(mockRMClient).removeContainerRequest(request2);
+    verify(mockRMClient).removeContainerRequest(request3);
+    // verify unwanted container released
+    verify(mockRMClient).releaseAssignedContainer(mockCId4);
+
+    // deallocate allocated task
+    assertTrue(scheduler.deallocateTask(mockTask1, true));
     verify(mockApp).containerBeingReleased(mockCId1);
     verify(mockRMClient).releaseAssignedContainer(mockCId1);
     // deallocate allocated container

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ae541f78/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
index 7b9a962..bd81618 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
@@ -31,6 +31,7 @@ import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@@ -328,5 +329,15 @@ class TestTaskSchedulerHelpers {
       return false;
     }
   }
+  
+
+  static void waitForDelayedDrainNotify(AtomicBoolean drainNotifier)
+      throws InterruptedException {
+    while (!drainNotifier.get()) {
+      synchronized (drainNotifier) {
+        drainNotifier.wait();
+      }
+    }
+  }
 
 }