You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2014/01/22 19:25:11 UTC

git commit: Only make the SchedulerDriver available after registered() has been called.

Updated Branches:
  refs/heads/master 479791381 -> 28372648a


Only make the SchedulerDriver available after registered() has been called.

I've also removed some functions from the Driver interface, since they should
be used exclusively by SchedulerLifecycle.

Bugs closed: AURORA-45

Reviewed at https://reviews.apache.org/r/16995/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/28372648
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/28372648
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/28372648

Branch: refs/heads/master
Commit: 28372648a364bcef17b9dffaa258081dbfc5b54f
Parents: 4797913
Author: Bill Farner <wf...@apache.org>
Authored: Wed Jan 22 10:23:31 2014 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Wed Jan 22 10:23:31 2014 -0800

----------------------------------------------------------------------
 .../org/apache/aurora/scheduler/Driver.java     | 46 +++++------------
 .../aurora/scheduler/SchedulerLifecycle.java    | 54 ++++++++------------
 .../aurora/scheduler/SchedulerModule.java       |  8 +--
 .../org/apache/aurora/scheduler/DriverTest.java | 28 +++-------
 .../scheduler/SchedulerLifecycleTest.java       | 15 +++---
 5 files changed, 50 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/28372648/src/main/java/org/apache/aurora/scheduler/Driver.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/Driver.java b/src/main/java/org/apache/aurora/scheduler/Driver.java
index 5339069..7af8721 100644
--- a/src/main/java/org/apache/aurora/scheduler/Driver.java
+++ b/src/main/java/org/apache/aurora/scheduler/Driver.java
@@ -16,19 +16,19 @@
 package org.apache.aurora.scheduler;
 
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.logging.Logger;
 
 import javax.inject.Inject;
 
-import com.google.common.base.Optional;
-import com.google.common.base.Supplier;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.Atomics;
 import com.twitter.common.stats.Stats;
 import com.twitter.common.util.StateMachine;
 
 import org.apache.mesos.Protos;
 import org.apache.mesos.Protos.OfferID;
-import org.apache.mesos.Protos.Status;
 import org.apache.mesos.Protos.TaskInfo;
 import org.apache.mesos.SchedulerDriver;
 
@@ -71,24 +71,14 @@ public interface Driver {
    */
   void stop();
 
-  /**
-   * Starts the underlying driver.  Can only be called once.
-   *
-   * @return The status of the underlying driver run request.
-   */
-  Protos.Status start();
-
-  /**
-   * Blocks until the underlying driver is stopped or aborted.
-   *
-   * @return The status of the underlying driver upon exit.
-   */
-  Protos.Status join();
+  interface SettableDriver extends Driver {
+    void initialize(SchedulerDriver driver);
+  }
 
   /**
    * Mesos driver implementation.
    */
-  static class DriverImpl implements Driver {
+  static class DriverImpl implements SettableDriver {
     private static final Logger LOG = Logger.getLogger(Driver.class.getName());
 
     /**
@@ -101,18 +91,15 @@ public interface Driver {
     }
 
     private final StateMachine<State> stateMachine;
-    private final Supplier<Optional<SchedulerDriver>> driverSupplier;
+    private final AtomicReference<SchedulerDriver> driverRef = Atomics.newReference();
     private final AtomicLong killFailures = Stats.exportLong("scheduler_driver_kill_failures");
 
     /**
      * Creates a driver manager that will only ask for the underlying mesos driver when actually
      * needed.
-     *
-     * @param driverSupplier A factory for the underlying driver.
      */
     @Inject
-    DriverImpl(Supplier<Optional<SchedulerDriver>> driverSupplier) {
-      this.driverSupplier = driverSupplier;
+    DriverImpl() {
       this.stateMachine =
           StateMachine.<State>builder("scheduler_driver")
               .initialState(State.INIT)
@@ -126,8 +113,7 @@ public interface Driver {
     private synchronized SchedulerDriver get(State expected) {
       // TODO(William Farner): Formalize the failure case here by throwing a checked exception.
       stateMachine.checkState(expected);
-      // This will and should fail if the driver is not present.
-      return driverSupplier.get().get();
+      return Preconditions.checkNotNull(driverRef.get());
     }
 
     @Override
@@ -141,15 +127,11 @@ public interface Driver {
     }
 
     @Override
-    public Protos.Status start() {
-      SchedulerDriver driver = get(State.INIT);
+    public void initialize(SchedulerDriver driver) {
+      Preconditions.checkNotNull(driver);
+      stateMachine.checkState(State.INIT);
+      Preconditions.checkState(driverRef.compareAndSet(null, driver));
       stateMachine.transition(State.RUNNING);
-      return driver.start();
-    }
-
-    @Override
-    public Status join() {
-      return get(State.RUNNING).join();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/28372648/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java b/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
index f8b6623..8304727 100644
--- a/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
+++ b/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
@@ -25,11 +25,9 @@ import javax.annotation.Nullable;
 import javax.inject.Inject;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
-import com.google.common.base.Supplier;
 import com.google.common.eventbus.Subscribe;
 import com.google.common.util.concurrent.Atomics;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -47,6 +45,7 @@ import com.twitter.common.zookeeper.Group.JoinException;
 import com.twitter.common.zookeeper.ServerSet;
 import com.twitter.common.zookeeper.SingletonService.LeaderControl;
 
+import org.apache.aurora.scheduler.Driver.SettableDriver;
 import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
@@ -115,13 +114,18 @@ public class SchedulerLifecycle implements EventSubscriber {
   private final AtomicReference<LeaderControl> leaderControl = Atomics.newReference();
   private final StateMachine<State> stateMachine;
 
+  // The local driver reference, distinct from the global SettableDriver.
+  // This is used to perform actions with the driver (i.e. invoke start(), join()),
+  // which no other code should do.  It also permits us to save the reference until we are ready to
+  // make the driver ready by invoking SettableDriver.initialize().
+  private final AtomicReference<SchedulerDriver> driverRef = Atomics.newReference();
+
   @Inject
   SchedulerLifecycle(
       DriverFactory driverFactory,
       NonVolatileStorage storage,
       Lifecycle lifecycle,
-      Driver driver,
-      DriverReference driverRef,
+      SettableDriver driver,
       LeadingOptions leadingOptions,
       ScheduledExecutorService executorService,
       Clock clock,
@@ -132,7 +136,6 @@ public class SchedulerLifecycle implements EventSubscriber {
         storage,
         lifecycle,
         driver,
-        driverRef,
         new DefaultDelayedActions(leadingOptions, executorService),
         clock,
         eventSink);
@@ -192,10 +195,7 @@ public class SchedulerLifecycle implements EventSubscriber {
       final DriverFactory driverFactory,
       final NonVolatileStorage storage,
       final Lifecycle lifecycle,
-      // TODO(wfarner): The presence of Driver and DriverReference is quite confusing.  Figure out
-      //                a clean way to collapse the duties of DriverReference into DriverImpl.
-      final Driver driver,
-      final DriverReference driverRef,
+      final SettableDriver driver,
       final DelayedActions delayedActions,
       final Clock clock,
       final EventSink eventSink) {
@@ -204,7 +204,6 @@ public class SchedulerLifecycle implements EventSubscriber {
     checkNotNull(storage);
     checkNotNull(lifecycle);
     checkNotNull(driver);
-    checkNotNull(driverRef);
     checkNotNull(delayedActions);
     checkNotNull(clock);
     checkNotNull(eventSink);
@@ -244,6 +243,9 @@ public class SchedulerLifecycle implements EventSubscriber {
                 return storeProvider.getSchedulerStore().fetchFrameworkId();
               }
             });
+
+        // Save the prepared driver locally, but don't expose it until the registered callback is
+        // received.
         driverRef.set(driverFactory.apply(frameworkId));
 
         delayedActions.onRegistrationTimeout(
@@ -265,21 +267,22 @@ public class SchedulerLifecycle implements EventSubscriber {
               }
             });
 
-        Protos.Status status = driver.start();
+        Protos.Status status = driverRef.get().start();
         LOG.info("Driver started with code " + status);
-        delayedActions.blockingDriverJoin(new Runnable() {
-          @Override public void run() {
-            // Blocks until driver exits.
-            driver.join();
-            stateMachine.transition(State.DEAD);
-          }
-        });
       }
     };
 
     final Closure<Transition<State>> handleRegistered = new Closure<Transition<State>>() {
       @Override public void execute(Transition<State> transition) {
         registrationAcked.set(true);
+        driver.initialize(driverRef.get());
+        delayedActions.blockingDriverJoin(new Runnable() {
+          @Override public void run() {
+            // Blocks until driver exits.
+            driverRef.get().join();
+            stateMachine.transition(State.DEAD);
+          }
+        });
 
         // This action sequence must be deferred due to a subtle detail of how guava's EventBus
         // works. EventBus event handlers are guaranteed to not be reentrant, meaning that posting
@@ -422,21 +425,6 @@ public class SchedulerLifecycle implements EventSubscriber {
     stateMachine.transition(State.REGISTERED_LEADER);
   }
 
-  /**
-   * Maintains a reference to the driver.
-   */
-  static class DriverReference implements Supplier<Optional<SchedulerDriver>> {
-    private final AtomicReference<SchedulerDriver> driver = Atomics.newReference();
-
-    @Override public Optional<SchedulerDriver> get() {
-      return Optional.fromNullable(driver.get());
-    }
-
-    private void set(SchedulerDriver ref) {
-      driver.set(ref);
-    }
-  }
-
   private static class SchedulerCandidateImpl implements LeadershipListener {
     private final StateMachine<State> stateMachine;
     private final AtomicReference<LeaderControl> leaderControl;

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/28372648/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java b/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
index 43f0c4e..aa3b383 100644
--- a/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
@@ -22,27 +22,24 @@ import java.util.concurrent.ScheduledExecutorService;
 import javax.inject.Singleton;
 
 import com.google.common.base.Optional;
-import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableList;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.inject.AbstractModule;
 import com.google.inject.PrivateModule;
 import com.google.inject.Provides;
-import com.google.inject.TypeLiteral;
 import com.twitter.common.args.Arg;
 import com.twitter.common.args.CmdLine;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Time;
 
 import org.apache.aurora.scheduler.Driver.DriverImpl;
-import org.apache.aurora.scheduler.SchedulerLifecycle.DriverReference;
+import org.apache.aurora.scheduler.Driver.SettableDriver;
 import org.apache.aurora.scheduler.SchedulerLifecycle.LeadingOptions;
 import org.apache.aurora.scheduler.TaskIdGenerator.TaskIdGeneratorImpl;
 import org.apache.aurora.scheduler.events.PubsubEventModule;
 import org.apache.aurora.scheduler.periodic.GcExecutorLauncher;
 import org.apache.aurora.scheduler.periodic.GcExecutorLauncher.GcExecutorSettings;
 import org.apache.mesos.Scheduler;
-import org.apache.mesos.SchedulerDriver;
 
 /**
  * Binding module for top-level scheduling logic.
@@ -70,9 +67,8 @@ public class SchedulerModule extends AbstractModule {
   @Override
   protected void configure() {
     bind(Driver.class).to(DriverImpl.class);
+    bind(SettableDriver.class).to(DriverImpl.class);
     bind(DriverImpl.class).in(Singleton.class);
-    bind(new TypeLiteral<Supplier<Optional<SchedulerDriver>>>() { }).to(DriverReference.class);
-    bind(DriverReference.class).in(Singleton.class);
 
     bind(Scheduler.class).to(MesosSchedulerImpl.class);
     bind(MesosSchedulerImpl.class).in(Singleton.class);

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/28372648/src/test/java/org/apache/aurora/scheduler/DriverTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/DriverTest.java b/src/test/java/org/apache/aurora/scheduler/DriverTest.java
index c3aa69c..d5e24b1 100644
--- a/src/test/java/org/apache/aurora/scheduler/DriverTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/DriverTest.java
@@ -15,8 +15,6 @@
  */
 package org.apache.aurora.scheduler;
 
-import com.google.common.base.Optional;
-import com.google.common.base.Supplier;
 import com.twitter.common.testing.easymock.EasyMockTest;
 
 import org.apache.aurora.scheduler.Driver.DriverImpl;
@@ -28,7 +26,6 @@ import org.junit.Test;
 import static org.apache.mesos.Protos.Status.DRIVER_ABORTED;
 import static org.apache.mesos.Protos.Status.DRIVER_RUNNING;
 import static org.easymock.EasyMock.expect;
-import static org.junit.Assert.assertEquals;
 
 public class DriverTest extends EasyMockTest {
 
@@ -36,8 +33,6 @@ public class DriverTest extends EasyMockTest {
   private static final String TASK_2 = "2";
 
   private SchedulerDriver schedulerDriver;
-
-  private Supplier<Optional<SchedulerDriver>>  driverSupplier;
   private DriverImpl driver;
 
   private static Protos.TaskID createTaskId(String taskId) {
@@ -47,8 +42,7 @@ public class DriverTest extends EasyMockTest {
   @Before
   public void setUp() {
     schedulerDriver = createMock(SchedulerDriver.class);
-    driverSupplier = createMock(new Clazz<Supplier<Optional<SchedulerDriver>>>() { });
-    driver = new DriverImpl(driverSupplier);
+    driver = new DriverImpl();
   }
 
   @Test
@@ -60,37 +54,31 @@ public class DriverTest extends EasyMockTest {
 
   @Test
   public void testMultipleStops() {
-    expect(driverSupplier.get()).andReturn(Optional.of(schedulerDriver)).times(2);
-    expect(schedulerDriver.start()).andReturn(DRIVER_RUNNING);
     expect(schedulerDriver.stop(true)).andReturn(DRIVER_ABORTED);
     control.replay();
 
-    assertEquals(DRIVER_RUNNING, driver.start());
+    driver.initialize(schedulerDriver);
     driver.stop();
     driver.stop();
   }
 
   @Test
   public void testStop() {
-    expect(driverSupplier.get()).andReturn(Optional.of(schedulerDriver)).times(2);
-    expect(schedulerDriver.start()).andReturn(DRIVER_RUNNING);
     expect(schedulerDriver.stop(true)).andReturn(DRIVER_ABORTED);
     control.replay();
 
-    assertEquals(DRIVER_RUNNING, driver.start());
+    driver.initialize(schedulerDriver);
     driver.stop();
   }
 
   @Test
   public void testNormalLifecycle() {
-    expect(driverSupplier.get()).andReturn(Optional.of(schedulerDriver)).times(4);
-    expect(schedulerDriver.start()).andReturn(DRIVER_RUNNING);
     expect(schedulerDriver.killTask(createTaskId(TASK_1))).andReturn(DRIVER_RUNNING);
     expect(schedulerDriver.killTask(createTaskId(TASK_2))).andReturn(DRIVER_RUNNING);
     expect(schedulerDriver.stop(true)).andReturn(DRIVER_ABORTED);
     control.replay();
 
-    assertEquals(DRIVER_RUNNING, driver.start());
+    driver.initialize(schedulerDriver);
     driver.killTask(TASK_1);
     driver.killTask(TASK_2);
     driver.stop();
@@ -104,12 +92,10 @@ public class DriverTest extends EasyMockTest {
   }
 
   @Test(expected = IllegalStateException.class)
-  public void testOnlyOneRunAllowed() {
-    expect(driverSupplier.get()).andReturn(Optional.of(schedulerDriver));
-    expect(schedulerDriver.start()).andReturn(DRIVER_RUNNING);
+  public void testOnlyOneSetAllowed() {
     control.replay();
 
-    assertEquals(DRIVER_RUNNING, driver.start());
-    driver.start();
+    driver.initialize(schedulerDriver);
+    driver.initialize(schedulerDriver);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/28372648/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java b/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java
index ce9b393..282cb02 100644
--- a/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java
@@ -24,8 +24,8 @@ import com.twitter.common.util.Clock;
 import com.twitter.common.zookeeper.SingletonService.LeaderControl;
 import com.twitter.common.zookeeper.SingletonService.LeadershipListener;
 
+import org.apache.aurora.scheduler.Driver.SettableDriver;
 import org.apache.aurora.scheduler.SchedulerLifecycle.DelayedActions;
-import org.apache.aurora.scheduler.SchedulerLifecycle.DriverReference;
 import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered;
 import org.apache.aurora.scheduler.events.PubsubEvent.SchedulerActive;
@@ -51,8 +51,7 @@ public class SchedulerLifecycleTest extends EasyMockTest {
   private DriverFactory driverFactory;
   private StorageTestUtil storageUtil;
   private Command shutdownRegistry;
-  private Driver driver;
-  private DriverReference driverRef;
+  private SettableDriver driver;
   private LeaderControl leaderControl;
   private SchedulerDriver schedulerDriver;
   private DelayedActions delayedActions;
@@ -65,8 +64,7 @@ public class SchedulerLifecycleTest extends EasyMockTest {
     driverFactory = createMock(DriverFactory.class);
     storageUtil = new StorageTestUtil(this);
     shutdownRegistry = createMock(Command.class);
-    driver = createMock(Driver.class);
-    driverRef = new DriverReference();
+    driver = createMock(SettableDriver.class);
     leaderControl = createMock(LeaderControl.class);
     schedulerDriver = createMock(SchedulerDriver.class);
     delayedActions = createMock(DelayedActions.class);
@@ -80,7 +78,6 @@ public class SchedulerLifecycleTest extends EasyMockTest {
           }
         }),
         driver,
-        driverRef,
         delayedActions,
         createMock(Clock.class),
         eventSink);
@@ -101,7 +98,8 @@ public class SchedulerLifecycleTest extends EasyMockTest {
     Capture<Runnable> triggerFailoverCapture = createCapture();
     delayedActions.onAutoFailover(capture(triggerFailoverCapture));
     delayedActions.onRegistrationTimeout(EasyMock.<Runnable>anyObject());
-    expect(driver.start()).andReturn(Status.DRIVER_RUNNING);
+    driver.initialize(schedulerDriver);
+    expect(schedulerDriver.start()).andReturn(Status.DRIVER_RUNNING);
     delayedActions.blockingDriverJoin(EasyMock.<Runnable>anyObject());
 
     Capture<Runnable> handleRegisteredCapture = createCapture();
@@ -131,8 +129,7 @@ public class SchedulerLifecycleTest extends EasyMockTest {
     expect(driverFactory.apply(FRAMEWORK_ID)).andReturn(schedulerDriver);
     delayedActions.onAutoFailover(EasyMock.<Runnable>anyObject());
     delayedActions.onRegistrationTimeout(EasyMock.<Runnable>anyObject());
-    expect(driver.start()).andReturn(Status.DRIVER_RUNNING);
-    delayedActions.blockingDriverJoin(EasyMock.<Runnable>anyObject());
+    expect(schedulerDriver.start()).andReturn(Status.DRIVER_RUNNING);
 
     // Important piece here is what's absent - leader presence is not advertised.
     leaderControl.leave();