You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zj...@apache.org on 2015/08/21 07:33:23 UTC

tez git commit: TEZ-2687. ATS History shutdown happens before the min-held containers are released (zjffdu)

Repository: tez
Updated Branches:
  refs/heads/master 9bb01e4f8 -> 663ead2dc


TEZ-2687. ATS History shutdown happens before the min-held containers are released (zjffdu)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/663ead2d
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/663ead2d
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/663ead2d

Branch: refs/heads/master
Commit: 663ead2dc5f3778bbdeb0f479811e2d89f506afc
Parents: 9bb01e4
Author: Jeff Zhang <zj...@apache.org>
Authored: Fri Aug 21 13:33:03 2015 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Fri Aug 21 13:33:03 2015 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   4 +
 .../org/apache/tez/dag/app/DAGAppMaster.java    |   6 +
 .../dag/app/rm/LocalTaskSchedulerService.java   |   5 +
 .../dag/app/rm/TaskSchedulerEventHandler.java   |   6 +-
 .../tez/dag/app/rm/TaskSchedulerService.java    |   2 +
 .../dag/app/rm/YarnTaskSchedulerService.java    |  53 ++++-
 .../tez/dag/app/rm/TestTaskScheduler.java       | 226 ++++++++++++++++++-
 7 files changed, 292 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/663ead2d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a662484..8fe9627 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -10,6 +10,7 @@ INCOMPATIBLE CHANGES
   TEZ-2468. Change the minimum Java version to Java 7.
 
 ALL CHANGES:
+  TEZ-2687. ATS History shutdown happens before the min-held containers are released
   TEZ-2629. LimitExceededException in Tez client when DAG has exceeds the default max counters
   TEZ-2730. tez-api missing dependency on org.codehaus.jettison for json.
   TEZ-2719. Consider reducing logs in unordered fetcher with shared-fetch option
@@ -74,6 +75,7 @@ Release 0.7.1: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2687. ATS History shutdown happens before the min-held containers are released
   TEZ-2629. LimitExceededException in Tez client when DAG has exceeds the default max counters
   TEZ-2540. Create both tez-dist minimal and minimal.tar.gz formats as part of build
   TEZ-2630. TezChild receives IP address instead of FQDN.
@@ -296,6 +298,7 @@ Release 0.6.3: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2687. ATS History shutdown happens before the min-held containers are released
   TEZ-2629. LimitExceededException in Tez client when DAG has exceeds the default max counters
   TEZ-2630. TezChild receives IP address instead of FQDN.
 
@@ -504,6 +507,7 @@ INCOMPATIBLE CHANGES
   TEZ-2552. CRC errors can cause job to run for very long time in large jobs.
 
 ALL CHANGES:
+  TEZ-2687. ATS History shutdown happens before the min-held containers are released
   TEZ-2629. LimitExceededException in Tez client when DAG has exceeds the default max counters
   TEZ-2630. TezChild receives IP address instead of FQDN.
   TEZ-2635. Limit number of attempts being downloaded in unordered fetch.

http://git-wip-us.apache.org/repos/asf/tez/blob/663ead2d/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index b91c3d1..401bfbc 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -1857,6 +1857,10 @@ public class DAGAppMaster extends AbstractService {
   }
 
 
+  private void initiateStop() {
+    taskSchedulerEventHandler.initiateStop();
+  }
+
   @Override
   public void serviceStop() throws Exception {
     if (isSession) {
@@ -1866,6 +1870,8 @@ public class DAGAppMaster extends AbstractService {
       if (this.dagSubmissionTimer != null) {
         this.dagSubmissionTimer.cancel();
       }
+      // release all the held containers before stop services TEZ-2687
+      initiateStop();
       stopServices();
 
       // Given pre-emption, we should delete tez scratch dir only if unregister is

http://git-wip-us.apache.org/repos/asf/tez/blob/663ead2d/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
index 51d8b9d..2c7c10c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
@@ -194,6 +194,11 @@ public class LocalTaskSchedulerService extends TaskSchedulerService {
     return true;
   }
 
+  @Override
+  public void initiateStop() {
+
+  }
+
   static class LocalContainerFactory {
     final AppContext appContext;
     AtomicInteger nextId;

http://git-wip-us.apache.org/repos/asf/tez/blob/663ead2d/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index a3cd284..4428665 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -394,7 +394,11 @@ public class TaskSchedulerEventHandler extends AbstractService
   
   protected void notifyForTest() {
   }
-  
+
+  public void initiateStop() {
+    taskScheduler.initiateStop();
+  }
+
   @Override
   public void serviceStop() {
     synchronized(this) {

http://git-wip-us.apache.org/repos/asf/tez/blob/663ead2d/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java
index 48d5455..7b729e7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java
@@ -70,6 +70,8 @@ public abstract class TaskSchedulerService extends AbstractService{
 
   public abstract boolean hasUnregistered();
 
+  public abstract void initiateStop();
+
   public interface TaskSchedulerAppCallback {
     public class AppFinalStatus {
       public final FinalApplicationStatus exitStatus;

http://git-wip-us.apache.org/repos/asf/tez/blob/663ead2d/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
index 19902b3..73fcb3d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
@@ -19,6 +19,7 @@
 package org.apache.tez.dag.app.rm;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
@@ -39,6 +40,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.math3.random.RandomDataGenerator;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
@@ -133,7 +135,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
   final AppContext appContext;
   private AtomicBoolean hasUnregistered = new AtomicBoolean(false);
 
-  AtomicBoolean isStopped = new AtomicBoolean(false);
+  AtomicBoolean isStopStarted = new AtomicBoolean(false);
 
   private ContainerAssigner NODE_LOCAL_ASSIGNER = new NodeLocalContainerAssigner();
   private ContainerAssigner RACK_LOCAL_ASSIGNER = new RackLocalContainerAssigner();
@@ -393,7 +395,6 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
       // Wait for contianers to be released.
       delayedContainerManager.join(2000l);
       synchronized (this) {
-        isStopped.set(true);
         if (shouldUnregister.get()) {
           AppFinalStatus status = appClientDelegate.getFinalAppStatus();
           LOG.info("Unregistering application from RM"
@@ -426,7 +427,10 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
   // AMRMClientAsync interface methods
   @Override
   public void onContainersCompleted(List<ContainerStatus> statuses) {
-    if (isStopped.get()) {
+    if (isStopStarted.get()) {
+      for (ContainerStatus status : statuses) {
+        LOG.info("Container " + status.getContainerId() + " is completed");
+      }
       return;
     }
     Map<Object, ContainerStatus> appContainerStatus =
@@ -483,7 +487,11 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
 
   @Override
   public void onContainersAllocated(List<Container> containers) {
-    if (isStopped.get()) {
+    if (isStopStarted.get()) {
+      for (Container container : containers) {
+        LOG.info("Release container:" + container.getId() + ", because it is shutting down.");
+        releaseContainer(container.getId());
+      }
       return;
     }
     Map<CookieContainerRequest, Container> assignedContainers;
@@ -857,7 +865,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
 
   @Override
   public void onShutdownRequest() {
-    if (isStopped.get()) {
+    if (isStopStarted.get()) {
       return;
     }
     // upcall to app must be outside locks
@@ -866,7 +874,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
 
   @Override
   public void onNodesUpdated(List<NodeReport> updatedNodes) {
-    if (isStopped.get()) {
+    if (isStopStarted.get()) {
       return;
     }
     // ignore bad nodes for now
@@ -876,7 +884,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
 
   @Override
   public float getProgress() {
-    if (isStopped.get()) {
+    if (isStopStarted.get()) {
       return 1;
     }
 
@@ -898,7 +906,8 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
 
   @Override
   public void onError(Throwable t) {
-    if (isStopped.get()) {
+    if (isStopStarted.get()) {
+      LOG.error("Got TaskSchedulerError, " + ExceptionUtils.getStackTrace(t));
       return;
     }
     appClientDelegate.onError(t);
@@ -1064,6 +1073,34 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
     return null;
   }
 
+  @Override
+  public synchronized void initiateStop() {
+    LOG.info("Initiate stop to YarnTaskScheduler");
+    // release held containers
+    LOG.info("Release held containers");
+    isStopStarted.set(true);
+    // Create a new list for containerIds to iterate, otherwise it would cause ConcurrentModificationException
+    // because method releaseContainer will change heldContainers.
+    List<ContainerId> heldContainerIds = new ArrayList<ContainerId>(heldContainers.size());
+    for (ContainerId containerId : heldContainers.keySet()) {
+      heldContainerIds.add(containerId);
+    }
+    for (ContainerId containerId : heldContainerIds) {
+      releaseContainer(containerId);
+    }
+
+    // remove taskRequest from AMRMClient to avoid allocating new containers in the next heartbeat
+    LOG.info("Remove all the taskRequests");
+    // Create a new list for tasks to avoid ConcurrentModificationException
+    List<Object> tasks = new ArrayList<Object>(taskRequests.size());
+    for (Object task : taskRequests.keySet()) {
+      tasks.add(task);
+    }
+    for (Object task : tasks) {
+      removeTaskRequest(task);
+    }
+  }
+
   boolean canFit(Resource arg0, Resource arg1) {
     int mem0 = arg0.getMemory();
     int mem1 = arg1.getMemory();

http://git-wip-us.apache.org/repos/asf/tez/blob/663ead2d/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
index dabae67..7e2c674 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
@@ -503,7 +503,231 @@ public class TestTaskScheduler {
     verify(mockRMClient).stop();
     scheduler.close();
   }
-  
+
+  @SuppressWarnings({ "unchecked" })
+  @Test(timeout=10000)
+  public void testTaskSchedulerInitiateStop() throws Exception {
+    RackResolver.init(new YarnConfiguration());
+    TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
+    AppContext mockAppContext = mock(AppContext.class);
+    when(mockAppContext.getAMState()).thenReturn(DAGAppMasterState.RUNNING);
+
+    TezAMRMClientAsync<CookieContainerRequest> mockRMClient =
+                                                  mock(TezAMRMClientAsync.class);
+
+    String appHost = "host";
+    int appPort = 0;
+    String appUrl = "url";
+    TaskSchedulerWithDrainableAppCallback scheduler =
+      new TaskSchedulerWithDrainableAppCallback(
+        mockApp, new AlwaysMatchesContainerMatcher(), appHost, appPort,
+        appUrl, mockRMClient, mockAppContext);
+    final TaskSchedulerAppCallbackDrainable drainableAppCallback = scheduler
+        .getDrainableAppCallback();
+
+    Configuration conf = new Configuration();
+    conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 0);
+    // keep containers held for 10 seconds
+    conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 10000);
+    conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 10000);
+    scheduler.init(conf);
+    drainableAppCallback.drain();
+
+    RegisterApplicationMasterResponse mockRegResponse =
+                                mock(RegisterApplicationMasterResponse.class);
+    Resource mockMaxResource = mock(Resource.class);
+    Map<ApplicationAccessType, String> mockAcls = mock(Map.class);
+    when(mockRegResponse.getMaximumResourceCapability()).
+                                                   thenReturn(mockMaxResource);
+    when(mockRegResponse.getApplicationACLs()).thenReturn(mockAcls);
+    when(mockRMClient.
+          registerApplicationMaster(anyString(), anyInt(), anyString())).
+                                                   thenReturn(mockRegResponse);
+    Resource mockClusterResource = mock(Resource.class);
+    when(mockRMClient.getAvailableResources()).
+                                              thenReturn(mockClusterResource);
+
+    scheduler.start();
+    drainableAppCallback.drain();
+
+    Object mockTask1 = mock(Object.class);
+    when(mockTask1.toString()).thenReturn("task1");
+    Object mockCookie1 = mock(Object.class);
+    Resource mockCapability = mock(Resource.class);
+    String[] hosts = {"host1", "host5"};
+    String[] racks = {"/default-rack", "/default-rack"};
+    final Priority mockPriority1 = Priority.newInstance(1);
+    final Priority mockPriority2 = Priority.newInstance(2);
+    final Priority mockPriority3 = Priority.newInstance(3);
+    final Priority mockPriority4 = Priority.newInstance(4);
+    final Priority mockPriority5 = Priority.newInstance(5);
+    Object mockTask2 = mock(Object.class);
+    when(mockTask2.toString()).thenReturn("task2");
+    Object mockCookie2 = mock(Object.class);
+    Object mockTask3 = mock(Object.class);
+    when(mockTask3.toString()).thenReturn("task3");
+    Object mockCookie3 = mock(Object.class);
+    ArgumentCaptor<CookieContainerRequest> requestCaptor =
+        ArgumentCaptor.forClass(CookieContainerRequest.class);
+
+    scheduler.allocateTask(mockTask1, mockCapability, hosts,
+        racks, mockPriority1, null, mockCookie1);
+    drainableAppCallback.drain();
+    verify(mockRMClient, times(1)).
+                                addContainerRequest(requestCaptor.capture());
+    CookieContainerRequest request1 = requestCaptor.getValue();
+    scheduler.allocateTask(mockTask2, mockCapability, hosts,
+        racks, mockPriority2, null, mockCookie2);
+    drainableAppCallback.drain();
+    verify(mockRMClient, times(2)).
+                                addContainerRequest(requestCaptor.capture());
+    CookieContainerRequest request2 = requestCaptor.getValue();
+    scheduler.allocateTask(mockTask3, mockCapability, hosts,
+        racks, mockPriority3, null, mockCookie3);
+    drainableAppCallback.drain();
+    verify(mockRMClient, times(3)).
+                                addContainerRequest(requestCaptor.capture());
+    CookieContainerRequest request3 = requestCaptor.getValue();
+
+    List<Container> containers = new ArrayList<Container>();
+    // sending lower priority container first to make sure its not matched
+    Container mockContainer1 = mock(Container.class, RETURNS_DEEP_STUBS);
+    when(mockContainer1.getNodeId().getHost()).thenReturn("host1");
+    when(mockContainer1.getPriority()).thenReturn(mockPriority1);
+    when(mockContainer1.toString()).thenReturn("container1");
+    ContainerId mockCId1 = mock(ContainerId.class);
+    when(mockContainer1.getId()).thenReturn(mockCId1);
+    when(mockCId1.toString()).thenReturn("container1");
+    containers.add(mockContainer1);
+    Container mockContainer2 = mock(Container.class, RETURNS_DEEP_STUBS);
+    when(mockContainer2.getNodeId().getHost()).thenReturn("host2");
+    when(mockContainer2.getPriority()).thenReturn(mockPriority2);
+    when(mockContainer2.toString()).thenReturn("container2");
+    ContainerId mockCId2 = mock(ContainerId.class);
+    when(mockContainer2.getId()).thenReturn(mockCId2);
+    when(mockCId2.toString()).thenReturn("container2");
+    containers.add(mockContainer2);
+
+    ArrayList<CookieContainerRequest> hostContainers =
+                             new ArrayList<CookieContainerRequest>();
+    hostContainers.add(request1);
+    ArrayList<CookieContainerRequest> rackContainers =
+                             new ArrayList<CookieContainerRequest>();
+    rackContainers.add(request2);
+    ArrayList<CookieContainerRequest> anyContainers =
+                             new ArrayList<CookieContainerRequest>();
+    anyContainers.add(request3);
+
+    final List<ArrayList<CookieContainerRequest>> hostList =
+                        new LinkedList<ArrayList<CookieContainerRequest>>();
+    hostList.add(hostContainers);
+    final List<ArrayList<CookieContainerRequest>> rackList =
+                        new LinkedList<ArrayList<CookieContainerRequest>>();
+    rackList.add(rackContainers);
+    final List<ArrayList<CookieContainerRequest>> anyList =
+                        new LinkedList<ArrayList<CookieContainerRequest>>();
+    anyList.add(anyContainers);
+    final List<ArrayList<CookieContainerRequest>> emptyList =
+                        new LinkedList<ArrayList<CookieContainerRequest>>();
+    // return pri1 requests for host1
+    when(
+        mockRMClient.getMatchingRequestsForTopPriority(eq("host1"),
+            (Resource) any())).thenAnswer(
+        new Answer<List<? extends Collection<CookieContainerRequest>>>() {
+          @Override
+          public List<? extends Collection<CookieContainerRequest>> answer(
+              InvocationOnMock invocation) throws Throwable {
+            return hostList;
+          }
+
+        });
+    // second request matched to rack. RackResolver by default puts hosts in
+    // /default-rack. We need to workaround by returning rack matches only once
+    when(
+        mockRMClient.getMatchingRequestsForTopPriority(eq("/default-rack"),
+            (Resource) any())).thenAnswer(
+        new Answer<List<? extends Collection<CookieContainerRequest>>>() {
+          @Override
+          public List<? extends Collection<CookieContainerRequest>> answer(
+              InvocationOnMock invocation) throws Throwable {
+            return rackList;
+          }
+
+        }).thenAnswer(
+        new Answer<List<? extends Collection<CookieContainerRequest>>>() {
+          @Override
+          public List<? extends Collection<CookieContainerRequest>> answer(
+              InvocationOnMock invocation) throws Throwable {
+            return emptyList;
+          }
+
+        });
+    // third request matched to ANY
+    when(
+        mockRMClient.getMatchingRequestsForTopPriority(
+            eq(ResourceRequest.ANY), (Resource) any())).thenAnswer(
+        new Answer<List<? extends Collection<CookieContainerRequest>>>() {
+          @Override
+          public List<? extends Collection<CookieContainerRequest>> answer(
+              InvocationOnMock invocation) throws Throwable {
+            return anyList;
+          }
+
+        }).thenAnswer(
+        new Answer<List<? extends Collection<CookieContainerRequest>>>() {
+          @Override
+          public List<? extends Collection<CookieContainerRequest>> answer(
+              InvocationOnMock invocation) throws Throwable {
+            return emptyList;
+          }
+
+        });
+
+    when(mockRMClient.getTopPriority()).then(
+        new Answer<Priority>() {
+          @Override
+          public Priority answer(
+              InvocationOnMock invocation) throws Throwable {
+            int allocations = drainableAppCallback.count.get();
+            if (allocations == 0) {
+              return mockPriority1;
+            }
+            if (allocations == 1) {
+              return mockPriority2;
+            }
+            if (allocations == 2) {
+              return mockPriority3;
+            }
+            if (allocations == 3) {
+              return mockPriority4;
+            }
+            return null;
+          }
+        });
+
+    AtomicBoolean drainNotifier = new AtomicBoolean(false);
+    scheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier;
+
+    scheduler.onContainersAllocated(containers);
+    TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
+    drainableAppCallback.drain();
+
+    Assert.assertEquals(2, scheduler.heldContainers.size());
+    Assert.assertEquals(1, scheduler.taskRequests.size());
+    // 2 containers are allocated and their corresponding taskRequests are removed.
+    verify(mockRMClient).removeContainerRequest(request1);
+    verify(mockRMClient).removeContainerRequest(request2);
+
+    scheduler.initiateStop();
+    // verify all the containers are released
+    Assert.assertEquals(0, scheduler.heldContainers.size());
+    verify(mockRMClient).releaseAssignedContainer(mockCId1);
+    verify(mockRMClient).releaseAssignedContainer(mockCId2);
+    // verify taskRequests are removed
+    Assert.assertEquals(0, scheduler.taskRequests.size());
+    verify(mockRMClient).removeContainerRequest(request3);
+  }
+
   @SuppressWarnings({ "unchecked" })
   @Test(timeout=10000)
   public void testTaskSchedulerWithReuse() throws Exception {