You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by mi...@apache.org on 2016/09/21 23:20:09 UTC

tez git commit: TEZ-3000. Fix TestContainerReuse. (mingma)

Repository: tez
Updated Branches:
  refs/heads/master de51d40e9 -> 92b20cc26


TEZ-3000. Fix TestContainerReuse. (mingma)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/92b20cc2
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/92b20cc2
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/92b20cc2

Branch: refs/heads/master
Commit: 92b20cc26d2ab3847922aca8077e1e303f8a597c
Parents: de51d40
Author: Ming Ma <mi...@twitter.com>
Authored: Wed Sep 21 16:19:54 2016 -0700
Committer: Ming Ma <mi...@twitter.com>
Committed: Wed Sep 21 16:19:54 2016 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 tez-dag/findbugs-exclude.xml                    |  7 ++
 .../dag/app/rm/YarnTaskSchedulerService.java    | 16 +++-
 .../tez/dag/app/rm/TestContainerReuse.java      | 88 +++++---------------
 .../dag/app/rm/TestTaskSchedulerHelpers.java    | 16 ++--
 5 files changed, 50 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/92b20cc2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bdfd4c2..9a24eac 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3000. Fix TestContainerReuse.
   TEZ-3436. Check input and output count before start in MapProcessor.
   TEZ-3163. Reuse and tune Inflaters and Deflaters to speed DME processing
   TEZ-3434. Add unit tests for flushing of recovery events.
@@ -111,6 +112,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3000. Fix TestContainerReuse.
   TEZ-3436. Check input and output count before start in MapProcessor.
   TEZ-3426. Second AM attempt launched for session mode and recovery disabled for certain cases
   TEZ-3326. Display JVM system properties in AM and task logs.

http://git-wip-us.apache.org/repos/asf/tez/blob/92b20cc2/tez-dag/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-dag/findbugs-exclude.xml b/tez-dag/findbugs-exclude.xml
index 0f3cdca..5eed7eb 100644
--- a/tez-dag/findbugs-exclude.xml
+++ b/tez-dag/findbugs-exclude.xml
@@ -224,6 +224,13 @@
     <Bug pattern="JLM_JSR166_UTILCONCURRENT_MONITORENTER"/>
   </Match>
 
+  <!-- TEZ-3000 -->
+  <Match>
+    <Class name="org.apache.tez.dag.app.rm.YarnTaskSchedulerService$DelayedContainerManager"/>
+    <Method name="addDelayedContainer"/><Field name="drainedDelayedContainersForTest"/>
+    <Bug pattern="JLM_JSR166_UTILCONCURRENT_MONITORENTER"/>
+  </Match>
+
   <!-- TEZ-1981 -->
   <Match>
     <Class name="org.apache.tez.dag.app.dag.TaskAttempt$TaskAttemptStatus"/>

http://git-wip-us.apache.org/repos/asf/tez/blob/92b20cc2/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
index bd4ac2f..6dfc7b9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
@@ -134,9 +134,9 @@ public class YarnTaskSchedulerService extends TaskScheduler
 
   AtomicBoolean isStopStarted = new AtomicBoolean(false);
 
-  private ContainerAssigner NODE_LOCAL_ASSIGNER = new NodeLocalContainerAssigner();
-  private ContainerAssigner RACK_LOCAL_ASSIGNER = new RackLocalContainerAssigner();
-  private ContainerAssigner NON_LOCAL_ASSIGNER = new NonLocalContainerAssigner();
+  private ContainerAssigner NODE_LOCAL_ASSIGNER;
+  private ContainerAssigner RACK_LOCAL_ASSIGNER;
+  private ContainerAssigner NON_LOCAL_ASSIGNER;
 
   DelayedContainerManager delayedContainerManager;
   long localitySchedulingDelay;
@@ -339,6 +339,9 @@ public class YarnTaskSchedulerService extends TaskScheduler
     Preconditions.checkArgument(preemptionMaxWaitTime >=0, "Preemption max wait time must be >=0");
 
     delayedContainerManager = new DelayedContainerManager();
+    NODE_LOCAL_ASSIGNER = new NodeLocalContainerAssigner();
+    RACK_LOCAL_ASSIGNER = new RackLocalContainerAssigner();
+    NON_LOCAL_ASSIGNER = new NonLocalContainerAssigner();
     LOG.info("YarnTaskScheduler initialized with configuration: " +
             "maxRMHeartbeatInterval: " + heartbeatIntervalMax +
             ", containerReuseEnabled: " + shouldReuseContainers +
@@ -674,7 +677,7 @@ public class YarnTaskSchedulerService extends TaskScheduler
         heldContainer.resetLocalityMatchLevel();
         delayedContainerManager.addDelayedContainer(
             heldContainer.getContainer(), currentTime
-                + localitySchedulingDelay);        
+                + localitySchedulingDelay);
       }
     } else if (state.equals(AMState.RUNNING_APP)) {
       // clear min held containers since we need to allocate to tasks
@@ -2130,6 +2133,11 @@ public class YarnTaskSchedulerService extends TaskScheduler
       boolean added =  false;
       synchronized(this) {
         added = delayedContainers.offer(delayedContainer);
+        if (drainedDelayedContainersForTest != null) {
+          synchronized (drainedDelayedContainersForTest) {
+            drainedDelayedContainersForTest.set(false);
+          }
+        }
         this.notify();
       }
       if (!added) {

http://git-wip-us.apache.org/repos/asf/tez/blob/92b20cc2/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
index a45f092..f21de3e 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
@@ -191,17 +191,12 @@ public class TestContainerReuse {
       createLaunchRequestEvent(taID31, ta31, resource, host1,
         defaultRack, priority);
 
-    drainNotifier.set(false);
     taskSchedulerManager.handleEvent(lrTa11);
-    TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
-    drainNotifier.set(false);
     taskSchedulerManager.handleEvent(lrTa21);
-    TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
 
     Container containerHost1 = createContainer(1, host1[0], resource, priority);
     Container containerHost2 = createContainer(2, host2[0], resource, priority);
 
-    drainNotifier.set(false);
     taskScheduler.onContainersAllocated(
       Lists.newArrayList(containerHost1, containerHost2));
     TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
@@ -320,17 +315,12 @@ public class TestContainerReuse {
     AMSchedulerEventTALaunchRequest lrTa31 = createLaunchRequestEvent(
       taID31, ta31, resource, host1, defaultRack, priority);
 
-    drainNotifier.set(false);
     taskSchedulerManager.handleEvent(lrTa11);
-    TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
-    drainNotifier.set(false);
     taskSchedulerManager.handleEvent(lrTa21);
-    TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
 
     Container containerHost1 = createContainer(1, host1[0], resource, priority);
     Container containerHost2 = createContainer(2, host2[0], resource, priority);
 
-    drainNotifier.set(false);
     taskScheduler.onContainersAllocated(Lists.newArrayList(containerHost1, containerHost2));
     TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
     drainableAppCallback.drain();
@@ -432,7 +422,6 @@ public class TestContainerReuse {
     Container container1 = createContainer(1, "host1", resource1, priority1);
 
     // One container allocated.
-    drainNotifier.set(false);
     taskScheduler.onContainersAllocated(Collections.singletonList(container1));
     TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
     drainableAppCallback.drain();
@@ -478,7 +467,6 @@ public class TestContainerReuse {
     Container container2 = createContainer(2, "host2", resource1, priority1);
 
     // Second container allocated. Should be allocated to the last task.
-    drainNotifier.set(false);
     taskScheduler.onContainersAllocated(Collections.singletonList(container2));
     TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
     drainableAppCallback.drain();
@@ -573,17 +561,12 @@ public class TestContainerReuse {
     AMSchedulerEventTALaunchRequest lrEvent2 =
         createLaunchRequestEvent(taID12, ta12, resource1, host1, racks, priority1);
 
-    drainNotifier.set(false);
     taskSchedulerManager.handleEvent(lrEvent1);
-    TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
-    drainNotifier.set(false);
     taskSchedulerManager.handleEvent(lrEvent2);
-    TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
 
     Container container1 = createContainer(1, "host1", resource1, priority1);
 
     // One container allocated.
-    drainNotifier.set(false);
     taskScheduler.onContainersAllocated(Collections.singletonList(container1));
     TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
     drainableAppCallback.drain();
@@ -622,16 +605,11 @@ public class TestContainerReuse {
         createLaunchRequestEvent(taID14, ta14, resource1, host2, racks,
             priority1, localResources, tsLaunchCmdOpts);
 
-    Container container2 = createContainer(2, "host2", resource1, priority1);
-    drainNotifier.set(false);
     taskSchedulerManager.handleEvent(lrEvent3);
-    TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
-    drainNotifier.set(false);
     taskSchedulerManager.handleEvent(lrEvent4);
-    TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
 
     // Container started
-    drainNotifier.set(false);
+    Container container2 = createContainer(2, "host2", resource1, priority1);
     taskScheduler.onContainersAllocated(Collections.singletonList(container2));
     TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
     drainableAppCallback.drain();
@@ -675,11 +653,10 @@ public class TestContainerReuse {
           priority1, localResources, taskSpecificLaunchCmdOption.getTaskSpecificOption("", "v1", 6));
 
     // Container started
-    Container container3 = createContainer(2, "host3", resource1, priority1);
+    Container container3 = createContainer(3, "host3", resource1, priority1);
     taskSchedulerManager.handleEvent(lrEvent5);
     taskSchedulerManager.handleEvent(lrEvent6);
 
-    drainNotifier.set(false);
     taskScheduler.onContainersAllocated(Collections.singletonList(container3));
     TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
     drainableAppCallback.drain();
@@ -772,14 +749,11 @@ public class TestContainerReuse {
       taID12, ta12, resource1, emptyHosts, racks, priority);
 
     // Send launch request for task 1 only, deterministic assignment to this task.
-    drainNotifier.set(false);
     taskSchedulerManager.handleEvent(lrEvent11);
-    TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
 
     Container container1 = createContainer(1, "randomHost", resource1, priority);
 
     // One container allocated.
-    drainNotifier.set(false);
     taskScheduler.onContainersAllocated(Collections.singletonList(container1));
     TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
     drainableAppCallback.drain();
@@ -803,8 +777,8 @@ public class TestContainerReuse {
     eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
     eventHandler.reset();
 
-    LOG.info("Sleeping to ensure that the scheduling loop runs");
-    Thread.sleep(3000l);
+    TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
+    drainableAppCallback.drain();
     verify(taskSchedulerManager).taskAllocated(
         eq(0), eq(ta12), any(Object.class), eq(container1));
 
@@ -812,9 +786,10 @@ public class TestContainerReuse {
     taskSchedulerManager.handleEvent(
       new AMSchedulerEventTAEnded(ta12, container1.getId(),
         TaskAttemptState.SUCCEEDED, null, null, 0));
+
+    TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
     drainableAppCallback.drain();
-    LOG.info("Sleeping to ensure that the scheduling loop runs");
-    Thread.sleep(3000l);
+
     verify(rmClient).releaseAssignedContainer(eq(container1.getId()));
     eventHandler.verifyInvocation(AMContainerEventStopRequest.class);
 
@@ -831,7 +806,9 @@ public class TestContainerReuse {
     tezConf.setLong(
         TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 1l);
     tezConf.setLong(
-        TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 2000l);
+        TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 20l);
+    tezConf.setLong(
+            TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 30l);
     tezConf.setInt(
         TezConfiguration.TEZ_AM_SESSION_MIN_HELD_CONTAINERS, 1);
 
@@ -900,14 +877,11 @@ public class TestContainerReuse {
       taID21, ta21, resource1, host1, racks, priority2);
 
     // Send launch request for task 1 onle, deterministic assignment to this task.
-    drainNotifier.set(false);
     taskSchedulerManager.handleEvent(lrEvent11);
-    TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
 
     Container container1 = createContainer(1, host1[0], resource1, priority1);
 
     // One container allocated.
-    drainNotifier.set(false);
     taskScheduler.onContainersAllocated(Collections.singletonList(container1));
     TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
     drainableAppCallback.drain();
@@ -934,8 +908,9 @@ public class TestContainerReuse {
             TaskAttemptState.SUCCEEDED, null, null, 0));
     verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
 
-    LOG.info("Sleeping to ensure that the scheduling loop runs");
-    Thread.sleep(3000l);
+    LOG.info("Sleeping to ensure that the container has been idled longer " +
+        "than TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS ");
+    Thread.sleep(50l);
     // container should not get released due to min held containers
     verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
 
@@ -1014,17 +989,12 @@ public class TestContainerReuse {
     TaskAttempt ta112 = mock(TaskAttempt.class);
     AMSchedulerEventTALaunchRequest lrEvent12 = createLaunchRequestEvent(taID112, ta112, resource1, host1, racks, priority1, dag1LRs);
 
-    drainNotifier.set(false);
     taskSchedulerManager.handleEvent(lrEvent11);
-    TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
-    drainNotifier.set(false);
     taskSchedulerManager.handleEvent(lrEvent12);
-    TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
 
     Container container1 = createContainer(1, "host1", resource1, priority1);
 
     // One container allocated.
-    drainNotifier.set(false);
     taskScheduler.onContainersAllocated(Collections.singletonList(container1));
     TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
     drainableAppCallback.drain();
@@ -1078,11 +1048,9 @@ public class TestContainerReuse {
 
     taskSchedulerManager.handleEvent(lrEvent21);
     taskSchedulerManager.handleEvent(lrEvent22);
-    drainableAppCallback.drain();
 
-    // TODO This is terrible, need a better way to ensure the scheduling loop has run
-    LOG.info("Sleeping to ensure that the scheduling loop runs");
-    Thread.sleep(6000l);
+    TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
+    drainableAppCallback.drain();
     verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta211), any(Object.class),
         eq(container1));
     verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
@@ -1204,18 +1172,13 @@ public class TestContainerReuse {
         taID114, ta114, resource1, host1, racks, priority1, new HashMap<String, LocalResource>());
 
 
-    drainNotifier.set(false);
     taskSchedulerManager.handleEvent(lrEvent11);
-    TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
-    drainNotifier.set(false);
     taskSchedulerManager.handleEvent(lrEvent12);
-    TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
 
     Container container1 = createContainer(1, "host1", resource1, priority1);
     Container container2 = createContainer(2, "host1", resource1, priority1);
 
     // One container allocated.
-    drainNotifier.set(false);
     taskScheduler.onContainersAllocated(Collections.singletonList(container1));
     TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
     drainableAppCallback.drain();
@@ -1247,10 +1210,9 @@ public class TestContainerReuse {
     eventHandler.reset();
 
     // Task 3
-    drainNotifier.set(false);
     taskSchedulerManager.handleEvent(lrEvent13);
     TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
-
+    drainableAppCallback.drain();
     verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta113), any(Object.class), eq(container1));
     verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
     eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
@@ -1265,10 +1227,9 @@ public class TestContainerReuse {
     eventHandler.reset();
 
     // Task 4
-    drainNotifier.set(false);
     taskSchedulerManager.handleEvent(lrEvent14);
     TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
-
+    drainableAppCallback.drain();
     verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta114), any(Object.class), eq(container1));
     verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
     eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
@@ -1302,16 +1263,15 @@ public class TestContainerReuse {
         host1, racks, priority1, v21LR);
 
     taskSchedulerManager.handleEvent(lrEvent21);
-    drainableAppCallback.drain();
 
-    // TODO This is terrible, need a better way to ensure the scheduling loop has run
-    LOG.info("Sleeping to ensure that the scheduling loop runs");
-    Thread.sleep(6000l);
-    drainNotifier.set(false);
-    taskScheduler.onContainersAllocated(Collections.singletonList(container2));
+    TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
+    drainableAppCallback.drain();
 
-    Thread.sleep(6000l);
     verify(rmClient, times(1)).releaseAssignedContainer(eq(container1.getId()));
+
+    taskScheduler.onContainersAllocated(Collections.singletonList(container2));
+    TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
+    drainableAppCallback.drain();
     verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta211), any(Object.class), eq(container2));
     eventHandler.reset();
 
@@ -1364,7 +1324,6 @@ public class TestContainerReuse {
 
     Resource resource1 = Resource.newInstance(1024, 1);
     String[] host1 = {"host1"};
-    String[] host2 = {"host2"};
 
     String []racks = {"/default-rack"};
     Priority priority1 = Priority.newInstance(1);
@@ -1381,7 +1340,6 @@ public class TestContainerReuse {
     Container container1 = createContainer(1, "host1", resource1, priority1);
 
     // One container allocated.
-    drainNotifier.set(false);
     taskScheduler.onContainersAllocated(Collections.singletonList(container1));
     drainableAppCallback.drain();
     verify(taskSchedulerManager, times(0)).taskAllocated(eq(0), eq(ta11),

http://git-wip-us.apache.org/repos/asf/tez/blob/92b20cc2/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
index ab85751..d8170e3 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
@@ -29,11 +29,12 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collection;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
@@ -47,7 +48,6 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -163,7 +163,7 @@ class TestTaskSchedulerHelpers {
       TaskSchedulerContextDrainable drainable = new TaskSchedulerContextDrainable(wrapper);
 
       taskSchedulers[0] = new TaskSchedulerWrapper(
-          new TaskSchedulerWithDrainableContext(drainable, amrmClientAsync));
+          spy(new TaskSchedulerWithDrainableContext(drainable, amrmClientAsync)));
       taskSchedulerServiceWrappers[0] =
           new ServicePluginLifecycleAbstractService(taskSchedulers[0].getTaskScheduler());
     }
@@ -176,11 +176,8 @@ class TestTaskSchedulerHelpers {
     public void serviceStart() {
       instantiateSchedulers("host", 0, "", appContext);
       // Init the service so that reuse configuration is picked up.
-      ((AbstractService)taskSchedulerServiceWrappers[0]).init(getConfig());
-      ((AbstractService)taskSchedulerServiceWrappers[0]).start();
-      // For some reason, the spy needs to be setup after sertvice startup.
-      taskSchedulers[0] = new TaskSchedulerWrapper(spy(taskSchedulers[0].getTaskScheduler()));
-
+      taskSchedulerServiceWrappers[0].init(getConfig());
+      taskSchedulerServiceWrappers[0].start();
     }
 
     @Override
@@ -191,8 +188,7 @@ class TestTaskSchedulerHelpers {
   @SuppressWarnings("rawtypes")
   static class CapturingEventHandler implements EventHandler {
 
-    private List<Event> events = new LinkedList<Event>();
-
+    private Queue<Event> events = new ConcurrentLinkedQueue<Event>();
 
     public void handle(Event event) {
       events.add(event);