You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by mi...@apache.org on 2016/09/21 23:20:09 UTC
tez git commit: TEZ-3000. Fix TestContainerReuse. (mingma)
Repository: tez
Updated Branches:
refs/heads/master de51d40e9 -> 92b20cc26
TEZ-3000. Fix TestContainerReuse. (mingma)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/92b20cc2
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/92b20cc2
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/92b20cc2
Branch: refs/heads/master
Commit: 92b20cc26d2ab3847922aca8077e1e303f8a597c
Parents: de51d40
Author: Ming Ma <mi...@twitter.com>
Authored: Wed Sep 21 16:19:54 2016 -0700
Committer: Ming Ma <mi...@twitter.com>
Committed: Wed Sep 21 16:19:54 2016 -0700
----------------------------------------------------------------------
CHANGES.txt | 2 +
tez-dag/findbugs-exclude.xml | 7 ++
.../dag/app/rm/YarnTaskSchedulerService.java | 16 +++-
.../tez/dag/app/rm/TestContainerReuse.java | 88 +++++---------------
.../dag/app/rm/TestTaskSchedulerHelpers.java | 16 ++--
5 files changed, 50 insertions(+), 79 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/92b20cc2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bdfd4c2..9a24eac 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3000. Fix TestContainerReuse.
TEZ-3436. Check input and output count before start in MapProcessor.
TEZ-3163. Reuse and tune Inflaters and Deflaters to speed DME processing
TEZ-3434. Add unit tests for flushing of recovery events.
@@ -111,6 +112,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3000. Fix TestContainerReuse.
TEZ-3436. Check input and output count before start in MapProcessor.
TEZ-3426. Second AM attempt launched for session mode and recovery disabled for certain cases
TEZ-3326. Display JVM system properties in AM and task logs.
http://git-wip-us.apache.org/repos/asf/tez/blob/92b20cc2/tez-dag/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-dag/findbugs-exclude.xml b/tez-dag/findbugs-exclude.xml
index 0f3cdca..5eed7eb 100644
--- a/tez-dag/findbugs-exclude.xml
+++ b/tez-dag/findbugs-exclude.xml
@@ -224,6 +224,13 @@
<Bug pattern="JLM_JSR166_UTILCONCURRENT_MONITORENTER"/>
</Match>
+ <!-- TEZ-3000 -->
+ <Match>
+ <Class name="org.apache.tez.dag.app.rm.YarnTaskSchedulerService$DelayedContainerManager"/>
+ <Method name="addDelayedContainer"/><Field name="drainedDelayedContainersForTest"/>
+ <Bug pattern="JLM_JSR166_UTILCONCURRENT_MONITORENTER"/>
+ </Match>
+
<!-- TEZ-1981 -->
<Match>
<Class name="org.apache.tez.dag.app.dag.TaskAttempt$TaskAttemptStatus"/>
http://git-wip-us.apache.org/repos/asf/tez/blob/92b20cc2/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
index bd4ac2f..6dfc7b9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
@@ -134,9 +134,9 @@ public class YarnTaskSchedulerService extends TaskScheduler
AtomicBoolean isStopStarted = new AtomicBoolean(false);
- private ContainerAssigner NODE_LOCAL_ASSIGNER = new NodeLocalContainerAssigner();
- private ContainerAssigner RACK_LOCAL_ASSIGNER = new RackLocalContainerAssigner();
- private ContainerAssigner NON_LOCAL_ASSIGNER = new NonLocalContainerAssigner();
+ private ContainerAssigner NODE_LOCAL_ASSIGNER;
+ private ContainerAssigner RACK_LOCAL_ASSIGNER;
+ private ContainerAssigner NON_LOCAL_ASSIGNER;
DelayedContainerManager delayedContainerManager;
long localitySchedulingDelay;
@@ -339,6 +339,9 @@ public class YarnTaskSchedulerService extends TaskScheduler
Preconditions.checkArgument(preemptionMaxWaitTime >=0, "Preemption max wait time must be >=0");
delayedContainerManager = new DelayedContainerManager();
+ NODE_LOCAL_ASSIGNER = new NodeLocalContainerAssigner();
+ RACK_LOCAL_ASSIGNER = new RackLocalContainerAssigner();
+ NON_LOCAL_ASSIGNER = new NonLocalContainerAssigner();
LOG.info("YarnTaskScheduler initialized with configuration: " +
"maxRMHeartbeatInterval: " + heartbeatIntervalMax +
", containerReuseEnabled: " + shouldReuseContainers +
@@ -674,7 +677,7 @@ public class YarnTaskSchedulerService extends TaskScheduler
heldContainer.resetLocalityMatchLevel();
delayedContainerManager.addDelayedContainer(
heldContainer.getContainer(), currentTime
- + localitySchedulingDelay);
+ + localitySchedulingDelay);
}
} else if (state.equals(AMState.RUNNING_APP)) {
// clear min held containers since we need to allocate to tasks
@@ -2130,6 +2133,11 @@ public class YarnTaskSchedulerService extends TaskScheduler
boolean added = false;
synchronized(this) {
added = delayedContainers.offer(delayedContainer);
+ if (drainedDelayedContainersForTest != null) {
+ synchronized (drainedDelayedContainersForTest) {
+ drainedDelayedContainersForTest.set(false);
+ }
+ }
this.notify();
}
if (!added) {
http://git-wip-us.apache.org/repos/asf/tez/blob/92b20cc2/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
index a45f092..f21de3e 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
@@ -191,17 +191,12 @@ public class TestContainerReuse {
createLaunchRequestEvent(taID31, ta31, resource, host1,
defaultRack, priority);
- drainNotifier.set(false);
taskSchedulerManager.handleEvent(lrTa11);
- TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
- drainNotifier.set(false);
taskSchedulerManager.handleEvent(lrTa21);
- TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
Container containerHost1 = createContainer(1, host1[0], resource, priority);
Container containerHost2 = createContainer(2, host2[0], resource, priority);
- drainNotifier.set(false);
taskScheduler.onContainersAllocated(
Lists.newArrayList(containerHost1, containerHost2));
TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
@@ -320,17 +315,12 @@ public class TestContainerReuse {
AMSchedulerEventTALaunchRequest lrTa31 = createLaunchRequestEvent(
taID31, ta31, resource, host1, defaultRack, priority);
- drainNotifier.set(false);
taskSchedulerManager.handleEvent(lrTa11);
- TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
- drainNotifier.set(false);
taskSchedulerManager.handleEvent(lrTa21);
- TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
Container containerHost1 = createContainer(1, host1[0], resource, priority);
Container containerHost2 = createContainer(2, host2[0], resource, priority);
- drainNotifier.set(false);
taskScheduler.onContainersAllocated(Lists.newArrayList(containerHost1, containerHost2));
TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
drainableAppCallback.drain();
@@ -432,7 +422,6 @@ public class TestContainerReuse {
Container container1 = createContainer(1, "host1", resource1, priority1);
// One container allocated.
- drainNotifier.set(false);
taskScheduler.onContainersAllocated(Collections.singletonList(container1));
TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
drainableAppCallback.drain();
@@ -478,7 +467,6 @@ public class TestContainerReuse {
Container container2 = createContainer(2, "host2", resource1, priority1);
// Second container allocated. Should be allocated to the last task.
- drainNotifier.set(false);
taskScheduler.onContainersAllocated(Collections.singletonList(container2));
TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
drainableAppCallback.drain();
@@ -573,17 +561,12 @@ public class TestContainerReuse {
AMSchedulerEventTALaunchRequest lrEvent2 =
createLaunchRequestEvent(taID12, ta12, resource1, host1, racks, priority1);
- drainNotifier.set(false);
taskSchedulerManager.handleEvent(lrEvent1);
- TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
- drainNotifier.set(false);
taskSchedulerManager.handleEvent(lrEvent2);
- TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
Container container1 = createContainer(1, "host1", resource1, priority1);
// One container allocated.
- drainNotifier.set(false);
taskScheduler.onContainersAllocated(Collections.singletonList(container1));
TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
drainableAppCallback.drain();
@@ -622,16 +605,11 @@ public class TestContainerReuse {
createLaunchRequestEvent(taID14, ta14, resource1, host2, racks,
priority1, localResources, tsLaunchCmdOpts);
- Container container2 = createContainer(2, "host2", resource1, priority1);
- drainNotifier.set(false);
taskSchedulerManager.handleEvent(lrEvent3);
- TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
- drainNotifier.set(false);
taskSchedulerManager.handleEvent(lrEvent4);
- TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
// Container started
- drainNotifier.set(false);
+ Container container2 = createContainer(2, "host2", resource1, priority1);
taskScheduler.onContainersAllocated(Collections.singletonList(container2));
TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
drainableAppCallback.drain();
@@ -675,11 +653,10 @@ public class TestContainerReuse {
priority1, localResources, taskSpecificLaunchCmdOption.getTaskSpecificOption("", "v1", 6));
// Container started
- Container container3 = createContainer(2, "host3", resource1, priority1);
+ Container container3 = createContainer(3, "host3", resource1, priority1);
taskSchedulerManager.handleEvent(lrEvent5);
taskSchedulerManager.handleEvent(lrEvent6);
- drainNotifier.set(false);
taskScheduler.onContainersAllocated(Collections.singletonList(container3));
TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
drainableAppCallback.drain();
@@ -772,14 +749,11 @@ public class TestContainerReuse {
taID12, ta12, resource1, emptyHosts, racks, priority);
// Send launch request for task 1 only, deterministic assignment to this task.
- drainNotifier.set(false);
taskSchedulerManager.handleEvent(lrEvent11);
- TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
Container container1 = createContainer(1, "randomHost", resource1, priority);
// One container allocated.
- drainNotifier.set(false);
taskScheduler.onContainersAllocated(Collections.singletonList(container1));
TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
drainableAppCallback.drain();
@@ -803,8 +777,8 @@ public class TestContainerReuse {
eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
eventHandler.reset();
- LOG.info("Sleeping to ensure that the scheduling loop runs");
- Thread.sleep(3000l);
+ TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
+ drainableAppCallback.drain();
verify(taskSchedulerManager).taskAllocated(
eq(0), eq(ta12), any(Object.class), eq(container1));
@@ -812,9 +786,10 @@ public class TestContainerReuse {
taskSchedulerManager.handleEvent(
new AMSchedulerEventTAEnded(ta12, container1.getId(),
TaskAttemptState.SUCCEEDED, null, null, 0));
+
+ TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
drainableAppCallback.drain();
- LOG.info("Sleeping to ensure that the scheduling loop runs");
- Thread.sleep(3000l);
+
verify(rmClient).releaseAssignedContainer(eq(container1.getId()));
eventHandler.verifyInvocation(AMContainerEventStopRequest.class);
@@ -831,7 +806,9 @@ public class TestContainerReuse {
tezConf.setLong(
TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 1l);
tezConf.setLong(
- TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 2000l);
+ TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 20l);
+ tezConf.setLong(
+ TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 30l);
tezConf.setInt(
TezConfiguration.TEZ_AM_SESSION_MIN_HELD_CONTAINERS, 1);
@@ -900,14 +877,11 @@ public class TestContainerReuse {
taID21, ta21, resource1, host1, racks, priority2);
// Send launch request for task 1 onle, deterministic assignment to this task.
- drainNotifier.set(false);
taskSchedulerManager.handleEvent(lrEvent11);
- TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
Container container1 = createContainer(1, host1[0], resource1, priority1);
// One container allocated.
- drainNotifier.set(false);
taskScheduler.onContainersAllocated(Collections.singletonList(container1));
TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
drainableAppCallback.drain();
@@ -934,8 +908,9 @@ public class TestContainerReuse {
TaskAttemptState.SUCCEEDED, null, null, 0));
verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
- LOG.info("Sleeping to ensure that the scheduling loop runs");
- Thread.sleep(3000l);
+ LOG.info("Sleeping to ensure that the container has been idled longer " +
+ "than TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS ");
+ Thread.sleep(50l);
// container should not get released due to min held containers
verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
@@ -1014,17 +989,12 @@ public class TestContainerReuse {
TaskAttempt ta112 = mock(TaskAttempt.class);
AMSchedulerEventTALaunchRequest lrEvent12 = createLaunchRequestEvent(taID112, ta112, resource1, host1, racks, priority1, dag1LRs);
- drainNotifier.set(false);
taskSchedulerManager.handleEvent(lrEvent11);
- TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
- drainNotifier.set(false);
taskSchedulerManager.handleEvent(lrEvent12);
- TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
Container container1 = createContainer(1, "host1", resource1, priority1);
// One container allocated.
- drainNotifier.set(false);
taskScheduler.onContainersAllocated(Collections.singletonList(container1));
TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
drainableAppCallback.drain();
@@ -1078,11 +1048,9 @@ public class TestContainerReuse {
taskSchedulerManager.handleEvent(lrEvent21);
taskSchedulerManager.handleEvent(lrEvent22);
- drainableAppCallback.drain();
- // TODO This is terrible, need a better way to ensure the scheduling loop has run
- LOG.info("Sleeping to ensure that the scheduling loop runs");
- Thread.sleep(6000l);
+ TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
+ drainableAppCallback.drain();
verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta211), any(Object.class),
eq(container1));
verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
@@ -1204,18 +1172,13 @@ public class TestContainerReuse {
taID114, ta114, resource1, host1, racks, priority1, new HashMap<String, LocalResource>());
- drainNotifier.set(false);
taskSchedulerManager.handleEvent(lrEvent11);
- TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
- drainNotifier.set(false);
taskSchedulerManager.handleEvent(lrEvent12);
- TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
Container container1 = createContainer(1, "host1", resource1, priority1);
Container container2 = createContainer(2, "host1", resource1, priority1);
// One container allocated.
- drainNotifier.set(false);
taskScheduler.onContainersAllocated(Collections.singletonList(container1));
TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
drainableAppCallback.drain();
@@ -1247,10 +1210,9 @@ public class TestContainerReuse {
eventHandler.reset();
// Task 3
- drainNotifier.set(false);
taskSchedulerManager.handleEvent(lrEvent13);
TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
-
+ drainableAppCallback.drain();
verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta113), any(Object.class), eq(container1));
verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
@@ -1265,10 +1227,9 @@ public class TestContainerReuse {
eventHandler.reset();
// Task 4
- drainNotifier.set(false);
taskSchedulerManager.handleEvent(lrEvent14);
TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
-
+ drainableAppCallback.drain();
verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta114), any(Object.class), eq(container1));
verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
@@ -1302,16 +1263,15 @@ public class TestContainerReuse {
host1, racks, priority1, v21LR);
taskSchedulerManager.handleEvent(lrEvent21);
- drainableAppCallback.drain();
- // TODO This is terrible, need a better way to ensure the scheduling loop has run
- LOG.info("Sleeping to ensure that the scheduling loop runs");
- Thread.sleep(6000l);
- drainNotifier.set(false);
- taskScheduler.onContainersAllocated(Collections.singletonList(container2));
+ TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
+ drainableAppCallback.drain();
- Thread.sleep(6000l);
verify(rmClient, times(1)).releaseAssignedContainer(eq(container1.getId()));
+
+ taskScheduler.onContainersAllocated(Collections.singletonList(container2));
+ TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
+ drainableAppCallback.drain();
verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta211), any(Object.class), eq(container2));
eventHandler.reset();
@@ -1364,7 +1324,6 @@ public class TestContainerReuse {
Resource resource1 = Resource.newInstance(1024, 1);
String[] host1 = {"host1"};
- String[] host2 = {"host2"};
String []racks = {"/default-rack"};
Priority priority1 = Priority.newInstance(1);
@@ -1381,7 +1340,6 @@ public class TestContainerReuse {
Container container1 = createContainer(1, "host1", resource1, priority1);
// One container allocated.
- drainNotifier.set(false);
taskScheduler.onContainersAllocated(Collections.singletonList(container1));
drainableAppCallback.drain();
verify(taskSchedulerManager, times(0)).taskAllocated(eq(0), eq(ta11),
http://git-wip-us.apache.org/repos/asf/tez/blob/92b20cc2/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
index ab85751..d8170e3 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
@@ -29,11 +29,12 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
@@ -47,7 +48,6 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -163,7 +163,7 @@ class TestTaskSchedulerHelpers {
TaskSchedulerContextDrainable drainable = new TaskSchedulerContextDrainable(wrapper);
taskSchedulers[0] = new TaskSchedulerWrapper(
- new TaskSchedulerWithDrainableContext(drainable, amrmClientAsync));
+ spy(new TaskSchedulerWithDrainableContext(drainable, amrmClientAsync)));
taskSchedulerServiceWrappers[0] =
new ServicePluginLifecycleAbstractService(taskSchedulers[0].getTaskScheduler());
}
@@ -176,11 +176,8 @@ class TestTaskSchedulerHelpers {
public void serviceStart() {
instantiateSchedulers("host", 0, "", appContext);
// Init the service so that reuse configuration is picked up.
- ((AbstractService)taskSchedulerServiceWrappers[0]).init(getConfig());
- ((AbstractService)taskSchedulerServiceWrappers[0]).start();
- // For some reason, the spy needs to be setup after sertvice startup.
- taskSchedulers[0] = new TaskSchedulerWrapper(spy(taskSchedulers[0].getTaskScheduler()));
-
+ taskSchedulerServiceWrappers[0].init(getConfig());
+ taskSchedulerServiceWrappers[0].start();
}
@Override
@@ -191,8 +188,7 @@ class TestTaskSchedulerHelpers {
@SuppressWarnings("rawtypes")
static class CapturingEventHandler implements EventHandler {
- private List<Event> events = new LinkedList<Event>();
-
+ private Queue<Event> events = new ConcurrentLinkedQueue<Event>();
public void handle(Event event) {
events.add(event);