You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2014/01/22 19:25:11 UTC
git commit: Only make the SchedulerDriver available after
registered() has been called.
Updated Branches:
refs/heads/master 479791381 -> 28372648a
Only make the SchedulerDriver available after registered() has been called.
I've also removed some functions from the Driver interface, since they should
be used exclusively by SchedulerLifecycle.
Bugs closed: AURORA-45
Reviewed at https://reviews.apache.org/r/16995/
Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/28372648
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/28372648
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/28372648
Branch: refs/heads/master
Commit: 28372648a364bcef17b9dffaa258081dbfc5b54f
Parents: 4797913
Author: Bill Farner <wf...@apache.org>
Authored: Wed Jan 22 10:23:31 2014 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Wed Jan 22 10:23:31 2014 -0800
----------------------------------------------------------------------
.../org/apache/aurora/scheduler/Driver.java | 46 +++++------------
.../aurora/scheduler/SchedulerLifecycle.java | 54 ++++++++------------
.../aurora/scheduler/SchedulerModule.java | 8 +--
.../org/apache/aurora/scheduler/DriverTest.java | 28 +++-------
.../scheduler/SchedulerLifecycleTest.java | 15 +++---
5 files changed, 50 insertions(+), 101 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/28372648/src/main/java/org/apache/aurora/scheduler/Driver.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/Driver.java b/src/main/java/org/apache/aurora/scheduler/Driver.java
index 5339069..7af8721 100644
--- a/src/main/java/org/apache/aurora/scheduler/Driver.java
+++ b/src/main/java/org/apache/aurora/scheduler/Driver.java
@@ -16,19 +16,19 @@
package org.apache.aurora.scheduler;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Logger;
import javax.inject.Inject;
-import com.google.common.base.Optional;
-import com.google.common.base.Supplier;
+import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.Atomics;
import com.twitter.common.stats.Stats;
import com.twitter.common.util.StateMachine;
import org.apache.mesos.Protos;
import org.apache.mesos.Protos.OfferID;
-import org.apache.mesos.Protos.Status;
import org.apache.mesos.Protos.TaskInfo;
import org.apache.mesos.SchedulerDriver;
@@ -71,24 +71,14 @@ public interface Driver {
*/
void stop();
- /**
- * Starts the underlying driver. Can only be called once.
- *
- * @return The status of the underlying driver run request.
- */
- Protos.Status start();
-
- /**
- * Blocks until the underlying driver is stopped or aborted.
- *
- * @return The status of the underlying driver upon exit.
- */
- Protos.Status join();
+ interface SettableDriver extends Driver {
+ void initialize(SchedulerDriver driver);
+ }
/**
* Mesos driver implementation.
*/
- static class DriverImpl implements Driver {
+ static class DriverImpl implements SettableDriver {
private static final Logger LOG = Logger.getLogger(Driver.class.getName());
/**
@@ -101,18 +91,15 @@ public interface Driver {
}
private final StateMachine<State> stateMachine;
- private final Supplier<Optional<SchedulerDriver>> driverSupplier;
+ private final AtomicReference<SchedulerDriver> driverRef = Atomics.newReference();
private final AtomicLong killFailures = Stats.exportLong("scheduler_driver_kill_failures");
/**
* Creates a driver manager that will only ask for the underlying mesos driver when actually
* needed.
- *
- * @param driverSupplier A factory for the underlying driver.
*/
@Inject
- DriverImpl(Supplier<Optional<SchedulerDriver>> driverSupplier) {
- this.driverSupplier = driverSupplier;
+ DriverImpl() {
this.stateMachine =
StateMachine.<State>builder("scheduler_driver")
.initialState(State.INIT)
@@ -126,8 +113,7 @@ public interface Driver {
private synchronized SchedulerDriver get(State expected) {
// TODO(William Farner): Formalize the failure case here by throwing a checked exception.
stateMachine.checkState(expected);
- // This will and should fail if the driver is not present.
- return driverSupplier.get().get();
+ return Preconditions.checkNotNull(driverRef.get());
}
@Override
@@ -141,15 +127,11 @@ public interface Driver {
}
@Override
- public Protos.Status start() {
- SchedulerDriver driver = get(State.INIT);
+ public void initialize(SchedulerDriver driver) {
+ Preconditions.checkNotNull(driver);
+ stateMachine.checkState(State.INIT);
+ Preconditions.checkState(driverRef.compareAndSet(null, driver));
stateMachine.transition(State.RUNNING);
- return driver.start();
- }
-
- @Override
- public Status join() {
- return get(State.RUNNING).join();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/28372648/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java b/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
index f8b6623..8304727 100644
--- a/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
+++ b/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
@@ -25,11 +25,9 @@ import javax.annotation.Nullable;
import javax.inject.Inject;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
-import com.google.common.base.Supplier;
import com.google.common.eventbus.Subscribe;
import com.google.common.util.concurrent.Atomics;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -47,6 +45,7 @@ import com.twitter.common.zookeeper.Group.JoinException;
import com.twitter.common.zookeeper.ServerSet;
import com.twitter.common.zookeeper.SingletonService.LeaderControl;
+import org.apache.aurora.scheduler.Driver.SettableDriver;
import org.apache.aurora.scheduler.events.EventSink;
import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered;
import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
@@ -115,13 +114,18 @@ public class SchedulerLifecycle implements EventSubscriber {
private final AtomicReference<LeaderControl> leaderControl = Atomics.newReference();
private final StateMachine<State> stateMachine;
+ // The local driver reference, distinct from the global SettableDriver.
+ // This is used to perform actions with the driver (i.e. invoke start(), join()),
+ // which no other code should do. It also permits us to save the reference until we are ready to
+ // make the driver ready by invoking SettableDriver.initialize().
+ private final AtomicReference<SchedulerDriver> driverRef = Atomics.newReference();
+
@Inject
SchedulerLifecycle(
DriverFactory driverFactory,
NonVolatileStorage storage,
Lifecycle lifecycle,
- Driver driver,
- DriverReference driverRef,
+ SettableDriver driver,
LeadingOptions leadingOptions,
ScheduledExecutorService executorService,
Clock clock,
@@ -132,7 +136,6 @@ public class SchedulerLifecycle implements EventSubscriber {
storage,
lifecycle,
driver,
- driverRef,
new DefaultDelayedActions(leadingOptions, executorService),
clock,
eventSink);
@@ -192,10 +195,7 @@ public class SchedulerLifecycle implements EventSubscriber {
final DriverFactory driverFactory,
final NonVolatileStorage storage,
final Lifecycle lifecycle,
- // TODO(wfarner): The presence of Driver and DriverReference is quite confusing. Figure out
- // a clean way to collapse the duties of DriverReference into DriverImpl.
- final Driver driver,
- final DriverReference driverRef,
+ final SettableDriver driver,
final DelayedActions delayedActions,
final Clock clock,
final EventSink eventSink) {
@@ -204,7 +204,6 @@ public class SchedulerLifecycle implements EventSubscriber {
checkNotNull(storage);
checkNotNull(lifecycle);
checkNotNull(driver);
- checkNotNull(driverRef);
checkNotNull(delayedActions);
checkNotNull(clock);
checkNotNull(eventSink);
@@ -244,6 +243,9 @@ public class SchedulerLifecycle implements EventSubscriber {
return storeProvider.getSchedulerStore().fetchFrameworkId();
}
});
+
+ // Save the prepared driver locally, but don't expose it until the registered callback is
+ // received.
driverRef.set(driverFactory.apply(frameworkId));
delayedActions.onRegistrationTimeout(
@@ -265,21 +267,22 @@ public class SchedulerLifecycle implements EventSubscriber {
}
});
- Protos.Status status = driver.start();
+ Protos.Status status = driverRef.get().start();
LOG.info("Driver started with code " + status);
- delayedActions.blockingDriverJoin(new Runnable() {
- @Override public void run() {
- // Blocks until driver exits.
- driver.join();
- stateMachine.transition(State.DEAD);
- }
- });
}
};
final Closure<Transition<State>> handleRegistered = new Closure<Transition<State>>() {
@Override public void execute(Transition<State> transition) {
registrationAcked.set(true);
+ driver.initialize(driverRef.get());
+ delayedActions.blockingDriverJoin(new Runnable() {
+ @Override public void run() {
+ // Blocks until driver exits.
+ driverRef.get().join();
+ stateMachine.transition(State.DEAD);
+ }
+ });
// This action sequence must be deferred due to a subtle detail of how guava's EventBus
// works. EventBus event handlers are guaranteed to not be reentrant, meaning that posting
@@ -422,21 +425,6 @@ public class SchedulerLifecycle implements EventSubscriber {
stateMachine.transition(State.REGISTERED_LEADER);
}
- /**
- * Maintains a reference to the driver.
- */
- static class DriverReference implements Supplier<Optional<SchedulerDriver>> {
- private final AtomicReference<SchedulerDriver> driver = Atomics.newReference();
-
- @Override public Optional<SchedulerDriver> get() {
- return Optional.fromNullable(driver.get());
- }
-
- private void set(SchedulerDriver ref) {
- driver.set(ref);
- }
- }
-
private static class SchedulerCandidateImpl implements LeadershipListener {
private final StateMachine<State> stateMachine;
private final AtomicReference<LeaderControl> leaderControl;
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/28372648/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java b/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
index 43f0c4e..aa3b383 100644
--- a/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
@@ -22,27 +22,24 @@ import java.util.concurrent.ScheduledExecutorService;
import javax.inject.Singleton;
import com.google.common.base.Optional;
-import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.AbstractModule;
import com.google.inject.PrivateModule;
import com.google.inject.Provides;
-import com.google.inject.TypeLiteral;
import com.twitter.common.args.Arg;
import com.twitter.common.args.CmdLine;
import com.twitter.common.quantity.Amount;
import com.twitter.common.quantity.Time;
import org.apache.aurora.scheduler.Driver.DriverImpl;
-import org.apache.aurora.scheduler.SchedulerLifecycle.DriverReference;
+import org.apache.aurora.scheduler.Driver.SettableDriver;
import org.apache.aurora.scheduler.SchedulerLifecycle.LeadingOptions;
import org.apache.aurora.scheduler.TaskIdGenerator.TaskIdGeneratorImpl;
import org.apache.aurora.scheduler.events.PubsubEventModule;
import org.apache.aurora.scheduler.periodic.GcExecutorLauncher;
import org.apache.aurora.scheduler.periodic.GcExecutorLauncher.GcExecutorSettings;
import org.apache.mesos.Scheduler;
-import org.apache.mesos.SchedulerDriver;
/**
* Binding module for top-level scheduling logic.
@@ -70,9 +67,8 @@ public class SchedulerModule extends AbstractModule {
@Override
protected void configure() {
bind(Driver.class).to(DriverImpl.class);
+ bind(SettableDriver.class).to(DriverImpl.class);
bind(DriverImpl.class).in(Singleton.class);
- bind(new TypeLiteral<Supplier<Optional<SchedulerDriver>>>() { }).to(DriverReference.class);
- bind(DriverReference.class).in(Singleton.class);
bind(Scheduler.class).to(MesosSchedulerImpl.class);
bind(MesosSchedulerImpl.class).in(Singleton.class);
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/28372648/src/test/java/org/apache/aurora/scheduler/DriverTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/DriverTest.java b/src/test/java/org/apache/aurora/scheduler/DriverTest.java
index c3aa69c..d5e24b1 100644
--- a/src/test/java/org/apache/aurora/scheduler/DriverTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/DriverTest.java
@@ -15,8 +15,6 @@
*/
package org.apache.aurora.scheduler;
-import com.google.common.base.Optional;
-import com.google.common.base.Supplier;
import com.twitter.common.testing.easymock.EasyMockTest;
import org.apache.aurora.scheduler.Driver.DriverImpl;
@@ -28,7 +26,6 @@ import org.junit.Test;
import static org.apache.mesos.Protos.Status.DRIVER_ABORTED;
import static org.apache.mesos.Protos.Status.DRIVER_RUNNING;
import static org.easymock.EasyMock.expect;
-import static org.junit.Assert.assertEquals;
public class DriverTest extends EasyMockTest {
@@ -36,8 +33,6 @@ public class DriverTest extends EasyMockTest {
private static final String TASK_2 = "2";
private SchedulerDriver schedulerDriver;
-
- private Supplier<Optional<SchedulerDriver>> driverSupplier;
private DriverImpl driver;
private static Protos.TaskID createTaskId(String taskId) {
@@ -47,8 +42,7 @@ public class DriverTest extends EasyMockTest {
@Before
public void setUp() {
schedulerDriver = createMock(SchedulerDriver.class);
- driverSupplier = createMock(new Clazz<Supplier<Optional<SchedulerDriver>>>() { });
- driver = new DriverImpl(driverSupplier);
+ driver = new DriverImpl();
}
@Test
@@ -60,37 +54,31 @@ public class DriverTest extends EasyMockTest {
@Test
public void testMultipleStops() {
- expect(driverSupplier.get()).andReturn(Optional.of(schedulerDriver)).times(2);
- expect(schedulerDriver.start()).andReturn(DRIVER_RUNNING);
expect(schedulerDriver.stop(true)).andReturn(DRIVER_ABORTED);
control.replay();
- assertEquals(DRIVER_RUNNING, driver.start());
+ driver.initialize(schedulerDriver);
driver.stop();
driver.stop();
}
@Test
public void testStop() {
- expect(driverSupplier.get()).andReturn(Optional.of(schedulerDriver)).times(2);
- expect(schedulerDriver.start()).andReturn(DRIVER_RUNNING);
expect(schedulerDriver.stop(true)).andReturn(DRIVER_ABORTED);
control.replay();
- assertEquals(DRIVER_RUNNING, driver.start());
+ driver.initialize(schedulerDriver);
driver.stop();
}
@Test
public void testNormalLifecycle() {
- expect(driverSupplier.get()).andReturn(Optional.of(schedulerDriver)).times(4);
- expect(schedulerDriver.start()).andReturn(DRIVER_RUNNING);
expect(schedulerDriver.killTask(createTaskId(TASK_1))).andReturn(DRIVER_RUNNING);
expect(schedulerDriver.killTask(createTaskId(TASK_2))).andReturn(DRIVER_RUNNING);
expect(schedulerDriver.stop(true)).andReturn(DRIVER_ABORTED);
control.replay();
- assertEquals(DRIVER_RUNNING, driver.start());
+ driver.initialize(schedulerDriver);
driver.killTask(TASK_1);
driver.killTask(TASK_2);
driver.stop();
@@ -104,12 +92,10 @@ public class DriverTest extends EasyMockTest {
}
@Test(expected = IllegalStateException.class)
- public void testOnlyOneRunAllowed() {
- expect(driverSupplier.get()).andReturn(Optional.of(schedulerDriver));
- expect(schedulerDriver.start()).andReturn(DRIVER_RUNNING);
+ public void testOnlyOneSetAllowed() {
control.replay();
- assertEquals(DRIVER_RUNNING, driver.start());
- driver.start();
+ driver.initialize(schedulerDriver);
+ driver.initialize(schedulerDriver);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/28372648/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java b/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java
index ce9b393..282cb02 100644
--- a/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java
@@ -24,8 +24,8 @@ import com.twitter.common.util.Clock;
import com.twitter.common.zookeeper.SingletonService.LeaderControl;
import com.twitter.common.zookeeper.SingletonService.LeadershipListener;
+import org.apache.aurora.scheduler.Driver.SettableDriver;
import org.apache.aurora.scheduler.SchedulerLifecycle.DelayedActions;
-import org.apache.aurora.scheduler.SchedulerLifecycle.DriverReference;
import org.apache.aurora.scheduler.events.EventSink;
import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered;
import org.apache.aurora.scheduler.events.PubsubEvent.SchedulerActive;
@@ -51,8 +51,7 @@ public class SchedulerLifecycleTest extends EasyMockTest {
private DriverFactory driverFactory;
private StorageTestUtil storageUtil;
private Command shutdownRegistry;
- private Driver driver;
- private DriverReference driverRef;
+ private SettableDriver driver;
private LeaderControl leaderControl;
private SchedulerDriver schedulerDriver;
private DelayedActions delayedActions;
@@ -65,8 +64,7 @@ public class SchedulerLifecycleTest extends EasyMockTest {
driverFactory = createMock(DriverFactory.class);
storageUtil = new StorageTestUtil(this);
shutdownRegistry = createMock(Command.class);
- driver = createMock(Driver.class);
- driverRef = new DriverReference();
+ driver = createMock(SettableDriver.class);
leaderControl = createMock(LeaderControl.class);
schedulerDriver = createMock(SchedulerDriver.class);
delayedActions = createMock(DelayedActions.class);
@@ -80,7 +78,6 @@ public class SchedulerLifecycleTest extends EasyMockTest {
}
}),
driver,
- driverRef,
delayedActions,
createMock(Clock.class),
eventSink);
@@ -101,7 +98,8 @@ public class SchedulerLifecycleTest extends EasyMockTest {
Capture<Runnable> triggerFailoverCapture = createCapture();
delayedActions.onAutoFailover(capture(triggerFailoverCapture));
delayedActions.onRegistrationTimeout(EasyMock.<Runnable>anyObject());
- expect(driver.start()).andReturn(Status.DRIVER_RUNNING);
+ driver.initialize(schedulerDriver);
+ expect(schedulerDriver.start()).andReturn(Status.DRIVER_RUNNING);
delayedActions.blockingDriverJoin(EasyMock.<Runnable>anyObject());
Capture<Runnable> handleRegisteredCapture = createCapture();
@@ -131,8 +129,7 @@ public class SchedulerLifecycleTest extends EasyMockTest {
expect(driverFactory.apply(FRAMEWORK_ID)).andReturn(schedulerDriver);
delayedActions.onAutoFailover(EasyMock.<Runnable>anyObject());
delayedActions.onRegistrationTimeout(EasyMock.<Runnable>anyObject());
- expect(driver.start()).andReturn(Status.DRIVER_RUNNING);
- delayedActions.blockingDriverJoin(EasyMock.<Runnable>anyObject());
+ expect(schedulerDriver.start()).andReturn(Status.DRIVER_RUNNING);
// Important piece here is what's absent - leader presence is not advertised.
leaderControl.leave();