You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2013/11/08 09:31:25 UTC
git commit: TEZ-588. TaskScheduler may end up releasing containers
and getting stuck without enough resources (bikas)
Updated Branches:
refs/heads/master 5c217f3c5 -> ae541f78d
TEZ-588. TaskScheduler may end up releasing containers and getting stuck without enough resources (bikas)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/ae541f78
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/ae541f78
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/ae541f78
Branch: refs/heads/master
Commit: ae541f78de02fb7418ae2955bf8da9557e9901c6
Parents: 5c217f3
Author: Bikas Saha <bi...@apache.org>
Authored: Fri Nov 8 00:28:03 2013 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Fri Nov 8 00:28:03 2013 -0800
----------------------------------------------------------------------
.../apache/tez/dag/app/rm/TaskScheduler.java | 50 ++-
.../tez/dag/app/rm/TezAMRMClientAsync.java | 9 +
.../tez/dag/app/rm/TestContainerReuse.java | 21 +-
.../tez/dag/app/rm/TestTaskScheduler.java | 303 ++++++++++++++++++-
.../dag/app/rm/TestTaskSchedulerHelpers.java | 11 +
5 files changed, 373 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ae541f78/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
index 4ea9c74..ccc4a94 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
@@ -539,7 +539,8 @@ public class TaskScheduler extends AbstractService
long currentTime = System.currentTimeMillis();
if (isNew || (heldContainer.getContainerExpiryTime() <= currentTime
&& sessionDelay != -1)) {
- LOG.info("Container's session delay expired or is new. Releasing container"
+ LOG.info("No taskRequests. Container's session delay expired or is new. " +
+ "Releasing container"
+ ", containerId=" + heldContainer.container.getId()
+ ", containerExpiryTime="
+ heldContainer.getContainerExpiryTime()
@@ -626,7 +627,8 @@ public class TaskScheduler extends AbstractService
long currentTime = System.currentTimeMillis();
// Release container if final expiry time is reached
- if (heldContainer.getContainerExpiryTime() <= currentTime
+ // Dont release a new container. The RM may not give us new ones
+ if (!isNew && heldContainer.getContainerExpiryTime() <= currentTime
&& sessionDelay != -1) {
LOG.info("Container's session delay expired. Releasing container"
+ ", containerId=" + heldContainer.container.getId()
@@ -660,11 +662,24 @@ public class TaskScheduler extends AbstractService
hitFinalMatchLevel = false;
}
}
-
+
if (hitFinalMatchLevel) {
+ boolean safeToRelease = true;
+ Priority topPendingPriority = amRmClient.getTopPriority();
+ Priority containerPriority = heldContainer.container.getPriority();
+ if (topPendingPriority != null &&
+ containerPriority.compareTo(topPendingPriority) < 0) {
+ // this container is of lower priority and given to us by the RM for
+ // a task that will be matched after the current top priority. Keep
+ // this container for those pending tasks since the RM is not going
+ // to give this container to us again
+ safeToRelease = false;
+ }
+
// Are there any pending requests at any priority?
// release if there are tasks or this is not a session
- if (!taskRequests.isEmpty() || !appContext.isSession()) {
+ if (safeToRelease &&
+ (!taskRequests.isEmpty() || !appContext.isSession())) {
LOG.info("Releasing held container as either there are pending but "
+ " unmatched requests or this is not a session"
+ ", containerId=" + heldContainer.container.getId()
@@ -1151,6 +1166,31 @@ public class TaskScheduler extends AbstractService
Map<CookieContainerRequest, Container> assignedContainers,
boolean honorLocality) {
+ Priority containerPriority = container.getPriority();
+ Priority topPendingTaskPriority = amRmClient.getTopPriority();
+ if (topPendingTaskPriority == null) {
+ // nothing left to assign
+ return false;
+ }
+
+ if (topPendingTaskPriority.compareTo(containerPriority) > 0) {
+ // if the next task to assign is higher priority than the container then
+ // dont assign this container to that task.
+ // if task and container are equal priority - then its first use or reuse
+ // within the same priority - safe to use
+ // if task is lower priority than container then its we use a container that
+ // is no longer needed by higher priority tasks All those higher pri tasks
+ // have been assigned resources - safe to use (first use or reuse)
+ // if task is higher priority than container then we may end up using a
+ // container that was assigned by the RM for a lower priority pending task
+ // that will be assigned after this higher priority task is assigned. If we
+ // use that task's container now then we may not be able to match this
+ // container to that task later on. However the RM has already assigned us
+ // all containers and is not going to give us new containers. We will get
+ // stuck for resources.
+ return false;
+ }
+
CookieContainerRequest assigned =
assigner.assignReUsedContainer(container, honorLocality);
if (assigned != null) {
@@ -1163,7 +1203,7 @@ public class TaskScheduler extends AbstractService
private void releaseUnassignedContainers(Iterable<Container> containers) {
for (Container container : containers) {
releaseContainer(container.getId());
- LOG.info("Releasing container, No RM requests matching container: "
+ LOG.info("Releasing unused container: "
+ container);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ae541f78/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TezAMRMClientAsync.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TezAMRMClientAsync.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TezAMRMClientAsync.java
index 5d4e0bc..487c57a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TezAMRMClientAsync.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TezAMRMClientAsync.java
@@ -71,6 +71,15 @@ public class TezAMRMClientAsync<T extends ContainerRequest> extends AMRMClientAs
AMRMClientAsync.CallbackHandler callbackHandler) {
super(client, intervalMs, callbackHandler);
}
+
+ public synchronized Priority getTopPriority() {
+ Iterator<Priority> iter =
+ knownRequestsByPriority.descendingKeySet().iterator();
+ if (!iter.hasNext()) {
+ return null;
+ }
+ return iter.next();
+ }
@Override
public synchronized void addContainerRequest(T req) {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ae541f78/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
index d14c74f..4e30763 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
@@ -84,15 +84,6 @@ import org.junit.Test;
import com.google.common.collect.Lists;
public class TestContainerReuse {
-
- void waitForDelayedDrainNotify(AtomicBoolean drainNotifier)
- throws InterruptedException {
- while (!drainNotifier.get()) {
- synchronized (drainNotifier) {
- drainNotifier.wait();
- }
- }
- }
@Test(timeout = 15000l)
public void testDelayedReuseContainerBecomesAvailable()
@@ -190,7 +181,7 @@ public class TestContainerReuse {
drainNotifier.set(false);
taskScheduler.onContainersAllocated(
Lists.newArrayList(containerHost1, containerHost2));
- waitForDelayedDrainNotify(drainNotifier);
+ TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
drainableAppCallback.drain();
verify(taskSchedulerEventHandler).taskAllocated(
eq(ta11), any(Object.class), eq(containerHost1));
@@ -319,7 +310,7 @@ public class TestContainerReuse {
drainNotifier.set(false);
taskScheduler.onContainersAllocated(Lists.newArrayList(containerHost1, containerHost2));
- waitForDelayedDrainNotify(drainNotifier);
+ TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
drainableAppCallback.drain();
verify(taskSchedulerEventHandler).taskAllocated(eq(ta11), any(Object.class), eq(containerHost1));
verify(taskSchedulerEventHandler).taskAllocated(eq(ta21), any(Object.class), eq(containerHost2));
@@ -424,7 +415,7 @@ public class TestContainerReuse {
// One container allocated.
drainNotifier.set(false);
taskScheduler.onContainersAllocated(Collections.singletonList(container1));
- waitForDelayedDrainNotify(drainNotifier);
+ TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
drainableAppCallback.drain();
verify(taskSchedulerEventHandler).taskAllocated(eq(ta11), any(Object.class), eq(container1));
@@ -461,7 +452,7 @@ public class TestContainerReuse {
// Second container allocated. Should be allocated to the last task.
drainNotifier.set(false);
taskScheduler.onContainersAllocated(Collections.singletonList(container2));
- waitForDelayedDrainNotify(drainNotifier);
+ TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
drainableAppCallback.drain();
verify(taskSchedulerEventHandler).taskAllocated(eq(ta14), any(Object.class), eq(container2));
@@ -562,7 +553,7 @@ public class TestContainerReuse {
// One container allocated.
drainNotifier.set(false);
taskScheduler.onContainersAllocated(Collections.singletonList(container1));
- waitForDelayedDrainNotify(drainNotifier);
+ TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
drainableAppCallback.drain();
verify(taskSchedulerEventHandler).taskAllocated(
eq(ta11), any(Object.class), eq(container1));
@@ -688,7 +679,7 @@ public class TestContainerReuse {
// One container allocated.
drainNotifier.set(false);
taskScheduler.onContainersAllocated(Collections.singletonList(container1));
- waitForDelayedDrainNotify(drainNotifier);
+ TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
drainableAppCallback.drain();
verify(taskSchedulerEventHandler).taskAllocated(
eq(ta11), any(Object.class), eq(container1));
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ae541f78/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
index 1cffd1d..1ae7d70 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
@@ -40,6 +40,8 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
@@ -72,6 +74,7 @@ import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.PreemptionMatcher;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -83,7 +86,7 @@ public class TestTaskScheduler {
@SuppressWarnings({ "unchecked" })
@Test
- public void testTaskScheduler() throws Exception {
+ public void testTaskSchedulerNoReuse() throws Exception {
RackResolver.init(new YarnConfiguration());
TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
AppContext mockAppContext = mock(AppContext.class);
@@ -307,6 +310,304 @@ public class TestTaskScheduler {
// deallocate allocated task
assertTrue(scheduler.deallocateTask(mockTask1, true));
+ drainableAppCallback.drain();
+ verify(mockApp).containerBeingReleased(mockCId1);
+ verify(mockRMClient).releaseAssignedContainer(mockCId1);
+ // deallocate allocated container
+ Assert.assertEquals(mockTask2, scheduler.deallocateContainer(mockCId2));
+ drainableAppCallback.drain();
+ verify(mockRMClient).releaseAssignedContainer(mockCId2);
+ verify(mockRMClient, times(3)).releaseAssignedContainer((ContainerId) any());
+
+ List<ContainerStatus> statuses = new ArrayList<ContainerStatus>();
+ ContainerStatus mockStatus1 = mock(ContainerStatus.class);
+ when(mockStatus1.getContainerId()).thenReturn(mockCId1);
+ statuses.add(mockStatus1);
+ ContainerStatus mockStatus2 = mock(ContainerStatus.class);
+ when(mockStatus2.getContainerId()).thenReturn(mockCId2);
+ statuses.add(mockStatus2);
+ ContainerStatus mockStatus3 = mock(ContainerStatus.class);
+ when(mockStatus3.getContainerId()).thenReturn(mockCId3);
+ statuses.add(mockStatus3);
+ ContainerStatus mockStatus4 = mock(ContainerStatus.class);
+ when(mockStatus4.getContainerId()).thenReturn(mockCId4);
+ statuses.add(mockStatus4);
+
+ scheduler.onContainersCompleted(statuses);
+ drainableAppCallback.drain();
+ // released container status returned
+ verify(mockApp).containerCompleted(mockTask1, mockStatus1);
+ verify(mockApp).containerCompleted(mockTask2, mockStatus2);
+ // currently allocated container status returned and not released
+ verify(mockApp).containerCompleted(mockTask3, mockStatus3);
+ // no other statuses returned
+ verify(mockApp, times(3)).containerCompleted(any(), (ContainerStatus) any());
+ verify(mockRMClient, times(3)).releaseAssignedContainer((ContainerId) any());
+
+
+ float progress = 0.5f;
+ when(mockApp.getProgress()).thenReturn(progress);
+ Assert.assertEquals(progress, scheduler.getProgress(), 0);
+
+ List<NodeReport> mockUpdatedNodes = mock(List.class);
+ scheduler.onNodesUpdated(mockUpdatedNodes);
+ drainableAppCallback.drain();
+ verify(mockApp).nodesUpdated(mockUpdatedNodes);
+
+ Exception mockException = mock(Exception.class);
+ scheduler.onError(mockException);
+ drainableAppCallback.drain();
+ verify(mockApp).onError(mockException);
+
+ scheduler.onShutdownRequest();
+ drainableAppCallback.drain();
+ verify(mockApp).appShutdownRequested();
+
+ String appMsg = "success";
+ AppFinalStatus finalStatus =
+ new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
+ when(mockApp.getFinalAppStatus()).thenReturn(finalStatus);
+ scheduler.stop();
+ drainableAppCallback.drain();
+ verify(mockRMClient).
+ unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
+ appMsg, appUrl);
+ verify(mockRMClient).stop();
+ scheduler.close();
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ @Test
+ public void testTaskSchedulerWithReuse() throws Exception {
+ RackResolver.init(new YarnConfiguration());
+ TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
+ AppContext mockAppContext = mock(AppContext.class);
+ when(mockAppContext.getAMState()).thenReturn(DAGAppMasterState.RUNNING);
+
+ TezAMRMClientAsync<CookieContainerRequest> mockRMClient =
+ mock(TezAMRMClientAsync.class);
+
+ String appHost = "host";
+ int appPort = 0;
+ String appUrl = "url";
+ TaskSchedulerWithDrainableAppCallback scheduler =
+ new TaskSchedulerWithDrainableAppCallback(
+ mockApp, new AlwaysMatchesContainerMatcher(), appHost, appPort,
+ appUrl, mockRMClient, mockAppContext);
+ TaskSchedulerAppCallbackDrainable drainableAppCallback = scheduler
+ .getDrainableAppCallback();
+
+ Configuration conf = new Configuration();
+ // to match all in the same pass
+ conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 0);
+ // to release immediately after deallocate
+ conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_SESSION_DELAY_ALLOCATION_MILLIS, 0);
+ scheduler.init(conf);
+ drainableAppCallback.drain();
+
+ RegisterApplicationMasterResponse mockRegResponse =
+ mock(RegisterApplicationMasterResponse.class);
+ Resource mockMaxResource = mock(Resource.class);
+ Map<ApplicationAccessType, String> mockAcls = mock(Map.class);
+ when(mockRegResponse.getMaximumResourceCapability()).
+ thenReturn(mockMaxResource);
+ when(mockRegResponse.getApplicationACLs()).thenReturn(mockAcls);
+ when(mockRMClient.
+ registerApplicationMaster(anyString(), anyInt(), anyString())).
+ thenReturn(mockRegResponse);
+ Resource mockClusterResource = mock(Resource.class);
+ when(mockRMClient.getAvailableResources()).
+ thenReturn(mockClusterResource);
+
+ scheduler.start();
+ drainableAppCallback.drain();
+
+ Object mockTask1 = mock(Object.class);
+ Object mockCookie1 = mock(Object.class);
+ Resource mockCapability = mock(Resource.class);
+ String[] hosts = {"host1", "host5"};
+ String[] racks = {"/default-rack", "/default-rack"};
+ final Priority mockPriority1 = Priority.newInstance(1);
+ final Priority mockPriority2 = Priority.newInstance(2);
+ final Priority mockPriority3 = Priority.newInstance(3);
+ Object mockTask2 = mock(Object.class);
+ Object mockCookie2 = mock(Object.class);
+ Object mockTask3 = mock(Object.class);
+ Object mockCookie3 = mock(Object.class);
+ ArgumentCaptor<CookieContainerRequest> requestCaptor =
+ ArgumentCaptor.forClass(CookieContainerRequest.class);
+
+ scheduler.allocateTask(mockTask1, mockCapability, hosts,
+ racks, mockPriority1, null, mockCookie1);
+ drainableAppCallback.drain();
+ verify(mockRMClient, times(1)).
+ addContainerRequest(requestCaptor.capture());
+ CookieContainerRequest request1 = requestCaptor.getValue();
+ scheduler.allocateTask(mockTask2, mockCapability, hosts,
+ racks, mockPriority2, null, mockCookie2);
+ drainableAppCallback.drain();
+ verify(mockRMClient, times(2)).
+ addContainerRequest(requestCaptor.capture());
+ CookieContainerRequest request2 = requestCaptor.getValue();
+ scheduler.allocateTask(mockTask3, mockCapability, hosts,
+ racks, mockPriority3, null, mockCookie3);
+ drainableAppCallback.drain();
+ verify(mockRMClient, times(3)).
+ addContainerRequest(requestCaptor.capture());
+ CookieContainerRequest request3 = requestCaptor.getValue();
+
+ List<Container> containers = new ArrayList<Container>();
+ // sending lower priority container first to make sure its not matched
+ Container mockContainer4 = mock(Container.class, RETURNS_DEEP_STUBS);
+ when(mockContainer4.getNodeId().getHost()).thenReturn("host4");
+ when(mockContainer4.getPriority()).thenReturn(mockPriority3);
+ ContainerId mockCId4 = mock(ContainerId.class);
+ when(mockContainer4.getId()).thenReturn(mockCId4);
+ containers.add(mockContainer4);
+ Container mockContainer1 = mock(Container.class, RETURNS_DEEP_STUBS);
+ when(mockContainer1.getNodeId().getHost()).thenReturn("host1");
+ when(mockContainer1.getPriority()).thenReturn(mockPriority1);
+ ContainerId mockCId1 = mock(ContainerId.class);
+ when(mockContainer1.getId()).thenReturn(mockCId1);
+ containers.add(mockContainer1);
+ Container mockContainer2 = mock(Container.class, RETURNS_DEEP_STUBS);
+ when(mockContainer2.getNodeId().getHost()).thenReturn("host2");
+ when(mockContainer2.getPriority()).thenReturn(mockPriority2);
+ ContainerId mockCId2 = mock(ContainerId.class);
+ when(mockContainer2.getId()).thenReturn(mockCId2);
+ containers.add(mockContainer2);
+ Container mockContainer3 = mock(Container.class, RETURNS_DEEP_STUBS);
+ when(mockContainer3.getNodeId().getHost()).thenReturn("host3");
+ when(mockContainer3.getPriority()).thenReturn(mockPriority3);
+ ContainerId mockCId3 = mock(ContainerId.class);
+ when(mockContainer3.getId()).thenReturn(mockCId3);
+ containers.add(mockContainer3);
+
+ ArrayList<CookieContainerRequest> hostContainers =
+ new ArrayList<CookieContainerRequest>();
+ hostContainers.add(request1);
+ hostContainers.add(request2);
+ hostContainers.add(request3);
+ ArrayList<CookieContainerRequest> rackContainers =
+ new ArrayList<CookieContainerRequest>();
+ rackContainers.add(request2);
+ rackContainers.add(request3);
+ ArrayList<CookieContainerRequest> anyContainers =
+ new ArrayList<CookieContainerRequest>();
+ anyContainers.add(request3);
+
+ final List<ArrayList<CookieContainerRequest>> hostList =
+ new LinkedList<ArrayList<CookieContainerRequest>>();
+ hostList.add(hostContainers);
+ final List<ArrayList<CookieContainerRequest>> rackList =
+ new LinkedList<ArrayList<CookieContainerRequest>>();
+ rackList.add(rackContainers);
+ final List<ArrayList<CookieContainerRequest>> anyList =
+ new LinkedList<ArrayList<CookieContainerRequest>>();
+ anyList.add(anyContainers);
+ final List<ArrayList<CookieContainerRequest>> emptyList =
+ new LinkedList<ArrayList<CookieContainerRequest>>();
+ // return all requests for host1
+ when(
+ mockRMClient.getMatchingRequestsForTopPriority(eq("host1"),
+ (Resource) any())).thenAnswer(
+ new Answer<List<? extends Collection<CookieContainerRequest>>>() {
+ @Override
+ public List<? extends Collection<CookieContainerRequest>> answer(
+ InvocationOnMock invocation) throws Throwable {
+ return hostList;
+ }
+
+ });
+ // first request matched by host
+ // second request matched to rack. RackResolver by default puts hosts in
+ // /default-rack. We need to workaround by returning rack matches only once
+ when(
+ mockRMClient.getMatchingRequestsForTopPriority(eq("/default-rack"),
+ (Resource) any())).thenAnswer(
+ new Answer<List<? extends Collection<CookieContainerRequest>>>() {
+ @Override
+ public List<? extends Collection<CookieContainerRequest>> answer(
+ InvocationOnMock invocation) throws Throwable {
+ return rackList;
+ }
+
+ }).thenAnswer(
+ new Answer<List<? extends Collection<CookieContainerRequest>>>() {
+ @Override
+ public List<? extends Collection<CookieContainerRequest>> answer(
+ InvocationOnMock invocation) throws Throwable {
+ return emptyList;
+ }
+
+ });
+ // third request matched to ANY
+ when(
+ mockRMClient.getMatchingRequestsForTopPriority(
+ eq(ResourceRequest.ANY), (Resource) any())).thenAnswer(
+ new Answer<List<? extends Collection<CookieContainerRequest>>>() {
+ @Override
+ public List<? extends Collection<CookieContainerRequest>> answer(
+ InvocationOnMock invocation) throws Throwable {
+ return anyList;
+ }
+
+ }).thenAnswer(
+ new Answer<List<? extends Collection<CookieContainerRequest>>>() {
+ @Override
+ public List<? extends Collection<CookieContainerRequest>> answer(
+ InvocationOnMock invocation) throws Throwable {
+ return emptyList;
+ }
+
+ });
+
+ final AtomicInteger count = new AtomicInteger(0);
+ Mockito.doAnswer(new Answer() {
+ public Object answer(InvocationOnMock invocation) {
+ count.incrementAndGet();
+ return null;
+ }})
+ .when(mockApp).taskAllocated(any(), any(), (Container)any());
+ when(mockRMClient.getTopPriority()).then(
+ new Answer<Priority>() {
+ @Override
+ public Priority answer(
+ InvocationOnMock invocation) throws Throwable {
+ int allocations = count.get();
+ if (allocations == 0) {
+ return mockPriority1;
+ }
+ if (allocations == 1) {
+ return mockPriority2;
+ }
+ if (allocations == 2) {
+ return mockPriority3;
+ }
+ return null;
+ }
+ });
+
+ AtomicBoolean drainNotifier = new AtomicBoolean(false);
+ scheduler.delayedContainerManager.drainedDelayedContainers = drainNotifier;
+
+ scheduler.onContainersAllocated(containers);
+ TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
+ drainableAppCallback.drain();
+ // exact number allocations returned
+ verify(mockApp, times(3)).taskAllocated(any(), any(), (Container) any());
+ // first container allocated
+ verify(mockApp).taskAllocated(mockTask1, mockCookie1, mockContainer1);
+ verify(mockApp).taskAllocated(mockTask2, mockCookie2, mockContainer2);
+ verify(mockApp).taskAllocated(mockTask3, mockCookie3, mockContainer3);
+ verify(mockRMClient).removeContainerRequest(request1);
+ verify(mockRMClient).removeContainerRequest(request2);
+ verify(mockRMClient).removeContainerRequest(request3);
+ // verify unwanted container released
+ verify(mockRMClient).releaseAssignedContainer(mockCId4);
+
+ // deallocate allocated task
+ assertTrue(scheduler.deallocateTask(mockTask1, true));
verify(mockApp).containerBeingReleased(mockCId1);
verify(mockRMClient).releaseAssignedContainer(mockCId1);
// deallocate allocated container
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ae541f78/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
index 7b9a962..bd81618 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
@@ -31,6 +31,7 @@ import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@@ -328,5 +329,15 @@ class TestTaskSchedulerHelpers {
return false;
}
}
+
+
+ static void waitForDelayedDrainNotify(AtomicBoolean drainNotifier)
+ throws InterruptedException {
+ while (!drainNotifier.get()) {
+ synchronized (drainNotifier) {
+ drainNotifier.wait();
+ }
+ }
+ }
}