You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2014/01/06 21:39:23 UTC

git commit: Ensure SchedulerActive event is dispatched before leader is advertised.

Updated Branches:
  refs/heads/master 704939645 -> 080124ffd


Ensure SchedulerActive event is dispatched before leader is advertised.

I dug into this after noticing SchedulerIT test flakiness (deadlock) as a result
of tearDown actions being executed SchedulerActive event was being dispatched.
The test is arranged such that it expects the effects of that event to occur
before the test exits.  The deadlock was related to EasyMock's default global
lock for mock answering, so that particular deadlock does not affect us in
production.

Reviewed at https://reviews.apache.org/r/16636/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/080124ff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/080124ff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/080124ff

Branch: refs/heads/master
Commit: 080124ffd30eed7470203ffd708f6a9df402ecaf
Parents: 7049396
Author: Bill Farner <wf...@apache.org>
Authored: Mon Jan 6 12:25:24 2014 -0800
Committer: Bill Farner <bi...@twitter.com>
Committed: Mon Jan 6 12:25:24 2014 -0800

----------------------------------------------------------------------
 .../aurora/scheduler/SchedulerLifecycle.java    | 130 ++++++++++++++-----
 .../scheduler/SchedulerLifecycleTest.java       |   3 +
 .../aurora/scheduler/app/SchedulerIT.java       |   6 +-
 3 files changed, 101 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/080124ff/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java b/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
index bf926a7..3ab73e6 100644
--- a/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
+++ b/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
@@ -121,8 +121,8 @@ public class SchedulerLifecycle implements EventSubscriber {
       Lifecycle lifecycle,
       Driver driver,
       DriverReference driverRef,
-      final LeadingOptions leadingOptions,
-      final ScheduledExecutorService executorService,
+      LeadingOptions leadingOptions,
+      ScheduledExecutorService executorService,
       Clock clock,
       EventSink eventSink) {
 
@@ -132,31 +132,52 @@ public class SchedulerLifecycle implements EventSubscriber {
         lifecycle,
         driver,
         driverRef,
-        new DelayedActions() {
-          @Override public void blockingDriverJoin(Runnable runnable) {
-            executorService.execute(runnable);
-          }
-
-          @Override public void onAutoFailover(Runnable runnable) {
-            executorService.schedule(
-                runnable,
-                leadingOptions.leadingTimeLimit.getValue(),
-                leadingOptions.leadingTimeLimit.getUnit().getTimeUnit());
-          }
-
-          @Override public void onRegistrationTimeout(Runnable runnable) {
-            LOG.info(
-                "Giving up on registration in " + leadingOptions.registrationDelayLimit);
-            executorService.schedule(
-                runnable,
-                leadingOptions.registrationDelayLimit.getValue(),
-                leadingOptions.registrationDelayLimit.getUnit().getTimeUnit());
-          }
-        },
+        new DefaultDelayedActions(leadingOptions, executorService),
         clock,
         eventSink);
   }
 
+  private static final class DefaultDelayedActions implements DelayedActions {
+    private final LeadingOptions leadingOptions;
+    private final ScheduledExecutorService executorService;
+
+    private DefaultDelayedActions(
+        LeadingOptions leadingOptions,
+        ScheduledExecutorService executorService) {
+
+      this.leadingOptions = checkNotNull(leadingOptions);
+      this.executorService = checkNotNull(executorService);
+    }
+
+    @Override
+    public void blockingDriverJoin(Runnable runnable) {
+      executorService.execute(runnable);
+    }
+
+    @Override
+    public void onAutoFailover(Runnable runnable) {
+      executorService.schedule(
+          runnable,
+          leadingOptions.leadingTimeLimit.getValue(),
+          leadingOptions.leadingTimeLimit.getUnit().getTimeUnit());
+    }
+
+    @Override
+    public void onRegistrationTimeout(Runnable runnable) {
+      LOG.info(
+          "Giving up on registration in " + leadingOptions.registrationDelayLimit);
+      executorService.schedule(
+          runnable,
+          leadingOptions.registrationDelayLimit.getValue(),
+          leadingOptions.registrationDelayLimit.getUnit().getTimeUnit());
+    }
+
+    @Override
+    public void onRegistered(Runnable runnable) {
+      executorService.submit(runnable);
+    }
+  }
+
   @VisibleForTesting
   SchedulerLifecycle(
       final DriverFactory driverFactory,
@@ -170,6 +191,15 @@ public class SchedulerLifecycle implements EventSubscriber {
       final Clock clock,
       final EventSink eventSink) {
 
+    checkNotNull(driverFactory);
+    checkNotNull(storage);
+    checkNotNull(lifecycle);
+    checkNotNull(driver);
+    checkNotNull(driverRef);
+    checkNotNull(delayedActions);
+    checkNotNull(clock);
+    checkNotNull(eventSink);
+
     Stats.export(new StatImpl<Integer>("framework_registered") {
       @Override public Integer read() {
         return registrationAcked.get() ? 1 : 0;
@@ -246,17 +276,47 @@ public class SchedulerLifecycle implements EventSubscriber {
     final Closure<Transition<State>> handleRegistered = new Closure<Transition<State>>() {
       @Override public void execute(Transition<State> transition) {
         registrationAcked.set(true);
-        eventSink.post(new SchedulerActive());
-        try {
-          leaderControl.get().advertise();
-        } catch (JoinException e) {
-          LOG.log(Level.SEVERE, "Failed to advertise leader, shutting down.", e);
-          stateMachine.transition(State.DEAD);
-        } catch (InterruptedException e) {
-          LOG.log(Level.SEVERE, "Interrupted while advertising leader, shutting down.", e);
-          stateMachine.transition(State.DEAD);
-          Thread.currentThread().interrupt();
-        }
+
+        // This action sequence must be deferred due to a subtle detail of how guava's EventBus
+        // works. EventBus event handlers are guaranteed to not be reentrant, meaning that posting
+        // an event from an event handler will not dispatch in the same sequence as the calls to
+        // post().
+        // In short, this is to enforce a happens-before relationship between delivering
+        // SchedulerActive and advertising leadership. Without deferring, you end up with a call
+        // sequence like this:
+        //
+        // - Enter DriverRegistered handler
+        //   - Post SchedulerActive event
+        //   - Announce leadership
+        // - Exit DriverRegistered handler
+        // - Dispatch SchedulerActive to subscribers
+        //
+        // With deferring, we get this instead:
+        //
+        // - Enter DriverRegistered handler
+        // - Exit DriverRegistered handler
+        // (executor service dispatch delay)
+        // - Post SchedulerActive Event
+        //   - Dispatch SchedulerActive to subscribers
+        // - Announce leadership
+        //
+        // The latter is preferable since it makes it easier to reason about the state of an
+        // announced scheduler.
+        delayedActions.onRegistered(new Runnable() {
+          @Override public void run() {
+            eventSink.post(new SchedulerActive());
+            try {
+              leaderControl.get().advertise();
+            } catch (JoinException e) {
+              LOG.log(Level.SEVERE, "Failed to advertise leader, shutting down.", e);
+              stateMachine.transition(State.DEAD);
+            } catch (InterruptedException e) {
+              LOG.log(Level.SEVERE, "Interrupted while advertising leader, shutting down.", e);
+              stateMachine.transition(State.DEAD);
+              Thread.currentThread().interrupt();
+            }
+          }
+        });
       }
     };
 
@@ -417,5 +477,7 @@ public class SchedulerLifecycle implements EventSubscriber {
     void onAutoFailover(Runnable runnable);
 
     void onRegistrationTimeout(Runnable runnable);
+
+    void onRegistered(Runnable runnable);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/080124ff/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java b/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java
index 24cd6fa..da7a167 100644
--- a/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java
@@ -102,6 +102,8 @@ public class SchedulerLifecycleTest extends EasyMockTest {
     expect(driver.start()).andReturn(Status.DRIVER_RUNNING);
     delayedActions.blockingDriverJoin(EasyMock.<Runnable>anyObject());
 
+    Capture<Runnable> handleRegisteredCapture = createCapture();
+    delayedActions.onRegistered(capture(handleRegisteredCapture));
     leaderControl.advertise();
     eventSink.post(new SchedulerActive());
     leaderControl.leave();
@@ -114,6 +116,7 @@ public class SchedulerLifecycleTest extends EasyMockTest {
     LeadershipListener leaderListener = schedulerLifecycle.prepare();
     leaderListener.onLeading(leaderControl);
     schedulerLifecycle.registered(new DriverRegistered());
+    handleRegisteredCapture.getValue().run();
     triggerFailoverCapture.getValue().run();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/080124ff/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
index 41fd3a0..2e682a1 100644
--- a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
@@ -327,10 +327,10 @@ public class SchedulerIT extends BaseZooKeeperTest {
     logStream.close();
     expectLastCall().anyTimes();
 
-    final AtomicReference<Scheduler> scheduler = Atomics.newReference();
+    final Scheduler scheduler = injector.getInstance(Scheduler.class);
     expect(driver.start()).andAnswer(new IAnswer<Status>() {
       @Override public Status answer() {
-        scheduler.get().registered(driver,
+        scheduler.registered(driver,
             FrameworkID.newBuilder().setValue(FRAMEWORK_ID).build(),
             MasterInfo.getDefaultInstance());
         return Status.DRIVER_RUNNING;
@@ -339,8 +339,6 @@ public class SchedulerIT extends BaseZooKeeperTest {
 
     control.replay();
     startScheduler();
-
-    scheduler.set(getScheduler());
     awaitSchedulerReady();
 
     assertEquals(0L, Stats.<Long>getVariable("task_store_PENDING").read().longValue());