You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2014/01/09 01:26:11 UTC
git commit: Use a separate thread for Driver.join() to avoid stalling
the executor service.
Updated Branches:
refs/heads/master 05b27bc27 -> d801596fa
Use a separate thread for Driver.join() to avoid stalling the executor service.
The unit test changes were made to reproduce AURORA-25 (and happen to be more
true to real-world behavior).
Also, running the scheduler locally prior to this change would silently
reproduce this issue (not joining the server set).
Bugs closed: AURORA-25
Reviewed at https://reviews.apache.org/r/16743/
Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/d801596f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/d801596f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/d801596f
Branch: refs/heads/master
Commit: d801596fabffd25c217d8a00b3102b46733d3687
Parents: 05b27bc
Author: Bill Farner <wf...@apache.org>
Authored: Wed Jan 8 16:25:49 2014 -0800
Committer: Bill Farner <bi...@twitter.com>
Committed: Wed Jan 8 16:25:49 2014 -0800
----------------------------------------------------------------------
.../aurora/scheduler/SchedulerLifecycle.java | 11 +++++-
.../aurora/scheduler/SchedulerModule.java | 7 ++--
.../aurora/scheduler/async/AsyncModule.java | 3 +-
.../aurora/scheduler/app/SchedulerIT.java | 35 +++++++++++++++-----
4 files changed, 42 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d801596f/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java b/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
index 3ab73e6..247e940 100644
--- a/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
+++ b/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
@@ -32,6 +32,7 @@ import com.google.common.base.Predicates;
import com.google.common.base.Supplier;
import com.google.common.eventbus.Subscribe;
import com.google.common.util.concurrent.Atomics;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.twitter.common.application.Lifecycle;
import com.twitter.common.base.Closure;
import com.twitter.common.base.Closures;
@@ -151,7 +152,15 @@ public class SchedulerLifecycle implements EventSubscriber {
@Override
public void blockingDriverJoin(Runnable runnable) {
- executorService.execute(runnable);
+ // We intentionally use an independent thread for this operation, since it blocks
+ // indefinitely. Using a separate thread allows us to inject an executor service with a safe
+ // expectation of operations that use minimal blocking.
+ new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat("BlockingDriverJoin")
+ .build()
+ .newThread(runnable)
+ .start();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d801596f/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java b/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
index a5ce840..43f0c4e 100644
--- a/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
@@ -90,9 +90,10 @@ public class SchedulerModule extends AbstractModule {
@Override protected void configure() {
bind(LeadingOptions.class).toInstance(
new LeadingOptions(MAX_REGISTRATION_DELAY.get(), MAX_LEADING_DURATION.get()));
- final ScheduledExecutorService executor = Executors.newScheduledThreadPool(
- 1,
- new ThreadFactoryBuilder().setNameFormat("Lifecycle-%d").setDaemon(true).build());
+
+ final ScheduledExecutorService executor = Executors.newScheduledThreadPool(
+ 1,
+ new ThreadFactoryBuilder().setNameFormat("Lifecycle-%d").setDaemon(true).build());
bind(ScheduledExecutorService.class).toInstance(executor);
bind(SchedulerLifecycle.class).in(Singleton.class);
expose(SchedulerLifecycle.class);
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d801596f/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
index 106fbb3..6be658d 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
@@ -197,8 +197,7 @@ public class AsyncModule extends AbstractModule {
LOG.warning("Preemptor Disabled.");
}
expose(PREEMPTOR_KEY);
- bind(new TypeLiteral<Amount<Long, Time>>() {
- }).annotatedWith(PreemptionDelay.class)
+ bind(new TypeLiteral<Amount<Long, Time>>() { }).annotatedWith(PreemptionDelay.class)
.toInstance(PREEMPTION_DELAY.get());
bind(TaskGroups.class).in(Singleton.class);
expose(TaskGroups.class);
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d801596f/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
index 2e682a1..9c0e32d 100644
--- a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
@@ -98,6 +98,7 @@ import org.apache.mesos.Protos.MasterInfo;
import org.apache.mesos.Protos.Status;
import org.apache.mesos.Scheduler;
import org.apache.mesos.SchedulerDriver;
+import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.easymock.IMocksControl;
import org.junit.Before;
@@ -157,6 +158,9 @@ public class SchedulerIT extends BaseZooKeeperTest {
});
driver = control.createMock(SchedulerDriver.class);
+ // This is necessary to allow driver to block, otherwise it would stall other mocks.
+ EasyMock.makeThreadSafe(driver, false);
+
driverFactory = control.createMock(DriverFactory.class);
log = control.createMock(Log.class);
logStream = control.createMock(Stream.class);
@@ -227,10 +231,6 @@ public class SchedulerIT extends BaseZooKeeperTest {
});
}
- private Scheduler getScheduler() {
- return injector.getInstance(Scheduler.class);
- }
-
private HostAndPort awaitSchedulerReady() throws Exception {
return executor.submit(new Callable<HostAndPort>() {
@Override public HostAndPort call() throws Exception {
@@ -327,18 +327,37 @@ public class SchedulerIT extends BaseZooKeeperTest {
logStream.close();
expectLastCall().anyTimes();
- final Scheduler scheduler = injector.getInstance(Scheduler.class);
+ final CountDownLatch driverStarted = new CountDownLatch(1);
expect(driver.start()).andAnswer(new IAnswer<Status>() {
@Override public Status answer() {
- scheduler.registered(driver,
- FrameworkID.newBuilder().setValue(FRAMEWORK_ID).build(),
- MasterInfo.getDefaultInstance());
+ driverStarted.countDown();
return Status.DRIVER_RUNNING;
}
});
+ // Try to be a good test suite citizen by releasing the blocked thread when the test case exits.
+ final CountDownLatch testCompleted = new CountDownLatch(1);
+ expect(driver.join()).andAnswer(new IAnswer<Status>() {
+ @Override public Status answer() throws Throwable {
+ testCompleted.await();
+ return Status.DRIVER_STOPPED;
+ }
+ });
+ addTearDown(new TearDown() {
+ @Override public void tearDown() {
+ testCompleted.countDown();
+ }
+ });
+ expect(driver.stop(true)).andReturn(Status.DRIVER_STOPPED).anyTimes();
+
control.replay();
startScheduler();
+
+ driverStarted.await();
+ injector.getInstance(Scheduler.class).registered(driver,
+ FrameworkID.newBuilder().setValue(FRAMEWORK_ID).build(),
+ MasterInfo.getDefaultInstance());
+
awaitSchedulerReady();
assertEquals(0L, Stats.<Long>getVariable("task_store_PENDING").read().longValue());