You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zj...@apache.org on 2015/08/21 07:33:23 UTC
tez git commit: TEZ-2687. ATS History shutdown happens before the
min-held containers are released (zjffdu)
Repository: tez
Updated Branches:
refs/heads/master 9bb01e4f8 -> 663ead2dc
TEZ-2687. ATS History shutdown happens before the min-held containers are released (zjffdu)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/663ead2d
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/663ead2d
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/663ead2d
Branch: refs/heads/master
Commit: 663ead2dc5f3778bbdeb0f479811e2d89f506afc
Parents: 9bb01e4
Author: Jeff Zhang <zj...@apache.org>
Authored: Fri Aug 21 13:33:03 2015 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Fri Aug 21 13:33:03 2015 +0800
----------------------------------------------------------------------
CHANGES.txt | 4 +
.../org/apache/tez/dag/app/DAGAppMaster.java | 6 +
.../dag/app/rm/LocalTaskSchedulerService.java | 5 +
.../dag/app/rm/TaskSchedulerEventHandler.java | 6 +-
.../tez/dag/app/rm/TaskSchedulerService.java | 2 +
.../dag/app/rm/YarnTaskSchedulerService.java | 53 ++++-
.../tez/dag/app/rm/TestTaskScheduler.java | 226 ++++++++++++++++++-
7 files changed, 292 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/663ead2d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a662484..8fe9627 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -10,6 +10,7 @@ INCOMPATIBLE CHANGES
TEZ-2468. Change the minimum Java version to Java 7.
ALL CHANGES:
+ TEZ-2687. ATS History shutdown happens before the min-held containers are released
TEZ-2629. LimitExceededException in Tez client when DAG has exceeds the default max counters
TEZ-2730. tez-api missing dependency on org.codehaus.jettison for json.
TEZ-2719. Consider reducing logs in unordered fetcher with shared-fetch option
@@ -74,6 +75,7 @@ Release 0.7.1: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2687. ATS History shutdown happens before the min-held containers are released
TEZ-2629. LimitExceededException in Tez client when DAG has exceeds the default max counters
TEZ-2540. Create both tez-dist minimal and minimal.tar.gz formats as part of build
TEZ-2630. TezChild receives IP address instead of FQDN.
@@ -296,6 +298,7 @@ Release 0.6.3: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2687. ATS History shutdown happens before the min-held containers are released
TEZ-2629. LimitExceededException in Tez client when DAG has exceeds the default max counters
TEZ-2630. TezChild receives IP address instead of FQDN.
@@ -504,6 +507,7 @@ INCOMPATIBLE CHANGES
TEZ-2552. CRC errors can cause job to run for very long time in large jobs.
ALL CHANGES:
+ TEZ-2687. ATS History shutdown happens before the min-held containers are released
TEZ-2629. LimitExceededException in Tez client when DAG has exceeds the default max counters
TEZ-2630. TezChild receives IP address instead of FQDN.
TEZ-2635. Limit number of attempts being downloaded in unordered fetch.
http://git-wip-us.apache.org/repos/asf/tez/blob/663ead2d/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index b91c3d1..401bfbc 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -1857,6 +1857,10 @@ public class DAGAppMaster extends AbstractService {
}
+ private void initiateStop() {
+ taskSchedulerEventHandler.initiateStop();
+ }
+
@Override
public void serviceStop() throws Exception {
if (isSession) {
@@ -1866,6 +1870,8 @@ public class DAGAppMaster extends AbstractService {
if (this.dagSubmissionTimer != null) {
this.dagSubmissionTimer.cancel();
}
+ // release all the held containers before stop services TEZ-2687
+ initiateStop();
stopServices();
// Given pre-emption, we should delete tez scratch dir only if unregister is
http://git-wip-us.apache.org/repos/asf/tez/blob/663ead2d/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
index 51d8b9d..2c7c10c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
@@ -194,6 +194,11 @@ public class LocalTaskSchedulerService extends TaskSchedulerService {
return true;
}
+ @Override
+ public void initiateStop() {
+
+ }
+
static class LocalContainerFactory {
final AppContext appContext;
AtomicInteger nextId;
http://git-wip-us.apache.org/repos/asf/tez/blob/663ead2d/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index a3cd284..4428665 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -394,7 +394,11 @@ public class TaskSchedulerEventHandler extends AbstractService
protected void notifyForTest() {
}
-
+
+ public void initiateStop() {
+ taskScheduler.initiateStop();
+ }
+
@Override
public void serviceStop() {
synchronized(this) {
http://git-wip-us.apache.org/repos/asf/tez/blob/663ead2d/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java
index 48d5455..7b729e7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java
@@ -70,6 +70,8 @@ public abstract class TaskSchedulerService extends AbstractService{
public abstract boolean hasUnregistered();
+ public abstract void initiateStop();
+
public interface TaskSchedulerAppCallback {
public class AppFinalStatus {
public final FinalApplicationStatus exitStatus;
http://git-wip-us.apache.org/repos/asf/tez/blob/663ead2d/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
index 19902b3..73fcb3d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
@@ -19,6 +19,7 @@
package org.apache.tez.dag.app.rm;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
@@ -39,6 +40,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.math3.random.RandomDataGenerator;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
@@ -133,7 +135,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
final AppContext appContext;
private AtomicBoolean hasUnregistered = new AtomicBoolean(false);
- AtomicBoolean isStopped = new AtomicBoolean(false);
+ AtomicBoolean isStopStarted = new AtomicBoolean(false);
private ContainerAssigner NODE_LOCAL_ASSIGNER = new NodeLocalContainerAssigner();
private ContainerAssigner RACK_LOCAL_ASSIGNER = new RackLocalContainerAssigner();
@@ -393,7 +395,6 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
// Wait for contianers to be released.
delayedContainerManager.join(2000l);
synchronized (this) {
- isStopped.set(true);
if (shouldUnregister.get()) {
AppFinalStatus status = appClientDelegate.getFinalAppStatus();
LOG.info("Unregistering application from RM"
@@ -426,7 +427,10 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
// AMRMClientAsync interface methods
@Override
public void onContainersCompleted(List<ContainerStatus> statuses) {
- if (isStopped.get()) {
+ if (isStopStarted.get()) {
+ for (ContainerStatus status : statuses) {
+ LOG.info("Container " + status.getContainerId() + " is completed");
+ }
return;
}
Map<Object, ContainerStatus> appContainerStatus =
@@ -483,7 +487,11 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
@Override
public void onContainersAllocated(List<Container> containers) {
- if (isStopped.get()) {
+ if (isStopStarted.get()) {
+ for (Container container : containers) {
+ LOG.info("Release container:" + container.getId() + ", because it is shutting down.");
+ releaseContainer(container.getId());
+ }
return;
}
Map<CookieContainerRequest, Container> assignedContainers;
@@ -857,7 +865,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
@Override
public void onShutdownRequest() {
- if (isStopped.get()) {
+ if (isStopStarted.get()) {
return;
}
// upcall to app must be outside locks
@@ -866,7 +874,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
@Override
public void onNodesUpdated(List<NodeReport> updatedNodes) {
- if (isStopped.get()) {
+ if (isStopStarted.get()) {
return;
}
// ignore bad nodes for now
@@ -876,7 +884,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
@Override
public float getProgress() {
- if (isStopped.get()) {
+ if (isStopStarted.get()) {
return 1;
}
@@ -898,7 +906,8 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
@Override
public void onError(Throwable t) {
- if (isStopped.get()) {
+ if (isStopStarted.get()) {
+ LOG.error("Got TaskSchedulerError, " + ExceptionUtils.getStackTrace(t));
return;
}
appClientDelegate.onError(t);
@@ -1064,6 +1073,34 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
return null;
}
+ @Override
+ public synchronized void initiateStop() {
+ LOG.info("Initiate stop to YarnTaskScheduler");
+ // release held containers
+ LOG.info("Release held containers");
+ isStopStarted.set(true);
+ // Create a new list for containerIds to iterate, otherwise it would cause ConcurrentModificationException
+ // because method releaseContainer will change heldContainers.
+ List<ContainerId> heldContainerIds = new ArrayList<ContainerId>(heldContainers.size());
+ for (ContainerId containerId : heldContainers.keySet()) {
+ heldContainerIds.add(containerId);
+ }
+ for (ContainerId containerId : heldContainerIds) {
+ releaseContainer(containerId);
+ }
+
+ // remove taskRequest from AMRMClient to avoid allocating new containers in the next heartbeat
+ LOG.info("Remove all the taskRequests");
+ // Create a new list for tasks to avoid ConcurrentModificationException
+ List<Object> tasks = new ArrayList<Object>(taskRequests.size());
+ for (Object task : taskRequests.keySet()) {
+ tasks.add(task);
+ }
+ for (Object task : tasks) {
+ removeTaskRequest(task);
+ }
+ }
+
boolean canFit(Resource arg0, Resource arg1) {
int mem0 = arg0.getMemory();
int mem1 = arg1.getMemory();
http://git-wip-us.apache.org/repos/asf/tez/blob/663ead2d/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
index dabae67..7e2c674 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
@@ -503,7 +503,231 @@ public class TestTaskScheduler {
verify(mockRMClient).stop();
scheduler.close();
}
-
+
+ @SuppressWarnings({ "unchecked" })
+ @Test(timeout=10000)
+ public void testTaskSchedulerInitiateStop() throws Exception {
+ RackResolver.init(new YarnConfiguration());
+ TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
+ AppContext mockAppContext = mock(AppContext.class);
+ when(mockAppContext.getAMState()).thenReturn(DAGAppMasterState.RUNNING);
+
+ TezAMRMClientAsync<CookieContainerRequest> mockRMClient =
+ mock(TezAMRMClientAsync.class);
+
+ String appHost = "host";
+ int appPort = 0;
+ String appUrl = "url";
+ TaskSchedulerWithDrainableAppCallback scheduler =
+ new TaskSchedulerWithDrainableAppCallback(
+ mockApp, new AlwaysMatchesContainerMatcher(), appHost, appPort,
+ appUrl, mockRMClient, mockAppContext);
+ final TaskSchedulerAppCallbackDrainable drainableAppCallback = scheduler
+ .getDrainableAppCallback();
+
+ Configuration conf = new Configuration();
+ conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 0);
+ // keep containers held for 10 seconds
+ conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 10000);
+ conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 10000);
+ scheduler.init(conf);
+ drainableAppCallback.drain();
+
+ RegisterApplicationMasterResponse mockRegResponse =
+ mock(RegisterApplicationMasterResponse.class);
+ Resource mockMaxResource = mock(Resource.class);
+ Map<ApplicationAccessType, String> mockAcls = mock(Map.class);
+ when(mockRegResponse.getMaximumResourceCapability()).
+ thenReturn(mockMaxResource);
+ when(mockRegResponse.getApplicationACLs()).thenReturn(mockAcls);
+ when(mockRMClient.
+ registerApplicationMaster(anyString(), anyInt(), anyString())).
+ thenReturn(mockRegResponse);
+ Resource mockClusterResource = mock(Resource.class);
+ when(mockRMClient.getAvailableResources()).
+ thenReturn(mockClusterResource);
+
+ scheduler.start();
+ drainableAppCallback.drain();
+
+ Object mockTask1 = mock(Object.class);
+ when(mockTask1.toString()).thenReturn("task1");
+ Object mockCookie1 = mock(Object.class);
+ Resource mockCapability = mock(Resource.class);
+ String[] hosts = {"host1", "host5"};
+ String[] racks = {"/default-rack", "/default-rack"};
+ final Priority mockPriority1 = Priority.newInstance(1);
+ final Priority mockPriority2 = Priority.newInstance(2);
+ final Priority mockPriority3 = Priority.newInstance(3);
+ final Priority mockPriority4 = Priority.newInstance(4);
+ final Priority mockPriority5 = Priority.newInstance(5);
+ Object mockTask2 = mock(Object.class);
+ when(mockTask2.toString()).thenReturn("task2");
+ Object mockCookie2 = mock(Object.class);
+ Object mockTask3 = mock(Object.class);
+ when(mockTask3.toString()).thenReturn("task3");
+ Object mockCookie3 = mock(Object.class);
+ ArgumentCaptor<CookieContainerRequest> requestCaptor =
+ ArgumentCaptor.forClass(CookieContainerRequest.class);
+
+ scheduler.allocateTask(mockTask1, mockCapability, hosts,
+ racks, mockPriority1, null, mockCookie1);
+ drainableAppCallback.drain();
+ verify(mockRMClient, times(1)).
+ addContainerRequest(requestCaptor.capture());
+ CookieContainerRequest request1 = requestCaptor.getValue();
+ scheduler.allocateTask(mockTask2, mockCapability, hosts,
+ racks, mockPriority2, null, mockCookie2);
+ drainableAppCallback.drain();
+ verify(mockRMClient, times(2)).
+ addContainerRequest(requestCaptor.capture());
+ CookieContainerRequest request2 = requestCaptor.getValue();
+ scheduler.allocateTask(mockTask3, mockCapability, hosts,
+ racks, mockPriority3, null, mockCookie3);
+ drainableAppCallback.drain();
+ verify(mockRMClient, times(3)).
+ addContainerRequest(requestCaptor.capture());
+ CookieContainerRequest request3 = requestCaptor.getValue();
+
+ List<Container> containers = new ArrayList<Container>();
+ // sending lower priority container first to make sure its not matched
+ Container mockContainer1 = mock(Container.class, RETURNS_DEEP_STUBS);
+ when(mockContainer1.getNodeId().getHost()).thenReturn("host1");
+ when(mockContainer1.getPriority()).thenReturn(mockPriority1);
+ when(mockContainer1.toString()).thenReturn("container1");
+ ContainerId mockCId1 = mock(ContainerId.class);
+ when(mockContainer1.getId()).thenReturn(mockCId1);
+ when(mockCId1.toString()).thenReturn("container1");
+ containers.add(mockContainer1);
+ Container mockContainer2 = mock(Container.class, RETURNS_DEEP_STUBS);
+ when(mockContainer2.getNodeId().getHost()).thenReturn("host2");
+ when(mockContainer2.getPriority()).thenReturn(mockPriority2);
+ when(mockContainer2.toString()).thenReturn("container2");
+ ContainerId mockCId2 = mock(ContainerId.class);
+ when(mockContainer2.getId()).thenReturn(mockCId2);
+ when(mockCId2.toString()).thenReturn("container2");
+ containers.add(mockContainer2);
+
+ ArrayList<CookieContainerRequest> hostContainers =
+ new ArrayList<CookieContainerRequest>();
+ hostContainers.add(request1);
+ ArrayList<CookieContainerRequest> rackContainers =
+ new ArrayList<CookieContainerRequest>();
+ rackContainers.add(request2);
+ ArrayList<CookieContainerRequest> anyContainers =
+ new ArrayList<CookieContainerRequest>();
+ anyContainers.add(request3);
+
+ final List<ArrayList<CookieContainerRequest>> hostList =
+ new LinkedList<ArrayList<CookieContainerRequest>>();
+ hostList.add(hostContainers);
+ final List<ArrayList<CookieContainerRequest>> rackList =
+ new LinkedList<ArrayList<CookieContainerRequest>>();
+ rackList.add(rackContainers);
+ final List<ArrayList<CookieContainerRequest>> anyList =
+ new LinkedList<ArrayList<CookieContainerRequest>>();
+ anyList.add(anyContainers);
+ final List<ArrayList<CookieContainerRequest>> emptyList =
+ new LinkedList<ArrayList<CookieContainerRequest>>();
+ // return pri1 requests for host1
+ when(
+ mockRMClient.getMatchingRequestsForTopPriority(eq("host1"),
+ (Resource) any())).thenAnswer(
+ new Answer<List<? extends Collection<CookieContainerRequest>>>() {
+ @Override
+ public List<? extends Collection<CookieContainerRequest>> answer(
+ InvocationOnMock invocation) throws Throwable {
+ return hostList;
+ }
+
+ });
+ // second request matched to rack. RackResolver by default puts hosts in
+ // /default-rack. We need to workaround by returning rack matches only once
+ when(
+ mockRMClient.getMatchingRequestsForTopPriority(eq("/default-rack"),
+ (Resource) any())).thenAnswer(
+ new Answer<List<? extends Collection<CookieContainerRequest>>>() {
+ @Override
+ public List<? extends Collection<CookieContainerRequest>> answer(
+ InvocationOnMock invocation) throws Throwable {
+ return rackList;
+ }
+
+ }).thenAnswer(
+ new Answer<List<? extends Collection<CookieContainerRequest>>>() {
+ @Override
+ public List<? extends Collection<CookieContainerRequest>> answer(
+ InvocationOnMock invocation) throws Throwable {
+ return emptyList;
+ }
+
+ });
+ // third request matched to ANY
+ when(
+ mockRMClient.getMatchingRequestsForTopPriority(
+ eq(ResourceRequest.ANY), (Resource) any())).thenAnswer(
+ new Answer<List<? extends Collection<CookieContainerRequest>>>() {
+ @Override
+ public List<? extends Collection<CookieContainerRequest>> answer(
+ InvocationOnMock invocation) throws Throwable {
+ return anyList;
+ }
+
+ }).thenAnswer(
+ new Answer<List<? extends Collection<CookieContainerRequest>>>() {
+ @Override
+ public List<? extends Collection<CookieContainerRequest>> answer(
+ InvocationOnMock invocation) throws Throwable {
+ return emptyList;
+ }
+
+ });
+
+ when(mockRMClient.getTopPriority()).then(
+ new Answer<Priority>() {
+ @Override
+ public Priority answer(
+ InvocationOnMock invocation) throws Throwable {
+ int allocations = drainableAppCallback.count.get();
+ if (allocations == 0) {
+ return mockPriority1;
+ }
+ if (allocations == 1) {
+ return mockPriority2;
+ }
+ if (allocations == 2) {
+ return mockPriority3;
+ }
+ if (allocations == 3) {
+ return mockPriority4;
+ }
+ return null;
+ }
+ });
+
+ AtomicBoolean drainNotifier = new AtomicBoolean(false);
+ scheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier;
+
+ scheduler.onContainersAllocated(containers);
+ TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
+ drainableAppCallback.drain();
+
+ Assert.assertEquals(2, scheduler.heldContainers.size());
+ Assert.assertEquals(1, scheduler.taskRequests.size());
+ // 2 containers are allocated and their corresponding taskRequests are removed.
+ verify(mockRMClient).removeContainerRequest(request1);
+ verify(mockRMClient).removeContainerRequest(request2);
+
+ scheduler.initiateStop();
+ // verify all the containers are released
+ Assert.assertEquals(0, scheduler.heldContainers.size());
+ verify(mockRMClient).releaseAssignedContainer(mockCId1);
+ verify(mockRMClient).releaseAssignedContainer(mockCId2);
+ // verify taskRequests are removed
+ Assert.assertEquals(0, scheduler.taskRequests.size());
+ verify(mockRMClient).removeContainerRequest(request3);
+ }
+
@SuppressWarnings({ "unchecked" })
@Test(timeout=10000)
public void testTaskSchedulerWithReuse() throws Exception {