You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2014/01/04 02:14:34 UTC

[2/2] git commit: Replace StorageStarted event with TaskStateChange events.

Replace StorageStarted event with TaskStateChange events.

This change makes it easier to consume events about the state of tasks.
Previously, ~every consumer of TaskStateChange needed to know to also consume
StorageStarted and query the full storage.  The new change makes TaskStateChange
more intuitive and therefore less error-prone.

Reviewed at https://reviews.apache.org/r/16575/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/d0b46fc3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/d0b46fc3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/d0b46fc3

Branch: refs/heads/master
Commit: d0b46fc3b316a9bc56e616be05dac4a9570d30e5
Parents: 9f35586
Author: Bill Farner <wf...@apache.org>
Authored: Fri Jan 3 17:14:19 2014 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Fri Jan 3 17:14:19 2014 -0800

----------------------------------------------------------------------
 .../aurora/scheduler/MesosSchedulerImpl.java    |  14 ++-
 .../aurora/scheduler/SchedulerLifecycle.java    |  28 +++--
 .../org/apache/aurora/scheduler/TaskVars.java   |  31 ++----
 .../aurora/scheduler/async/HistoryPruner.java   |  33 +-----
 .../scheduler/async/RescheduleCalculator.java   |   3 +
 .../aurora/scheduler/async/TaskGroups.java      |  35 ++-----
 .../aurora/scheduler/async/TaskScheduler.java   |   2 +-
 .../aurora/scheduler/async/TaskTimeout.java     |  21 +---
 .../aurora/scheduler/events/EventSink.java      |  30 ++++++
 .../events/NotifyingMethodInterceptor.java      |  65 ------------
 .../events/NotifyingSchedulingFilter.java       |   7 +-
 .../aurora/scheduler/events/PubsubEvent.java    |  84 ++++++---------
 .../scheduler/events/PubsubEventModule.java     |  29 +-----
 .../aurora/scheduler/metadata/NearestFit.java   |   8 +-
 .../aurora/scheduler/state/CronJobManager.java  |   8 +-
 .../scheduler/state/MaintenanceController.java  |  40 +-------
 .../scheduler/state/SideEffectStorage.java      |  10 +-
 .../scheduler/state/StateManagerImpl.java       |   9 +-
 .../storage/CallOrderEnforcingStorage.java      |  25 ++++-
 .../scheduler/MesosSchedulerImplTest.java       |  15 ++-
 .../scheduler/SchedulerLifecycleTest.java       |   8 +-
 .../apache/aurora/scheduler/TaskVarsTest.java   |  67 +++++-------
 .../scheduler/async/HistoryPrunerTest.java      |  39 ++-----
 .../scheduler/async/TaskSchedulerImplTest.java  |  10 +-
 .../scheduler/async/TaskSchedulerTest.java      |  21 ++--
 .../aurora/scheduler/async/TaskTimeoutTest.java |  40 ++------
 .../events/NotifyingMethodInterceptorTest.java  | 101 -------------------
 .../events/NotifyingSchedulingFilterTest.java   |   7 +-
 .../scheduler/metadata/NearestFitTest.java      |   2 +-
 .../state/BaseSchedulerCoreImplTest.java        |   8 +-
 .../scheduler/state/CronJobManagerTest.java     |   9 +-
 .../state/MaintenanceControllerImplTest.java    |  37 ++-----
 .../aurora/scheduler/state/PubsubTestUtil.java  |   7 +-
 .../scheduler/state/StateManagerImplTest.java   |  18 ++--
 34 files changed, 268 insertions(+), 603 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d0b46fc3/src/main/java/org/apache/aurora/scheduler/MesosSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/MesosSchedulerImpl.java b/src/main/java/org/apache/aurora/scheduler/MesosSchedulerImpl.java
index 9033922..2e7d7f7 100644
--- a/src/main/java/org/apache/aurora/scheduler/MesosSchedulerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/MesosSchedulerImpl.java
@@ -35,8 +35,9 @@ import org.apache.aurora.gen.comm.SchedulerMessage;
 import org.apache.aurora.scheduler.base.Conversions;
 import org.apache.aurora.scheduler.base.SchedulerException;
 import org.apache.aurora.scheduler.configuration.Resources;
-import org.apache.aurora.scheduler.events.PubsubEvent.Interceptors.Event;
-import org.apache.aurora.scheduler.events.PubsubEvent.Interceptors.SendNotification;
+import org.apache.aurora.scheduler.events.EventSink;
+import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
+import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered;
 import org.apache.aurora.scheduler.state.SchedulerCore;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
@@ -74,6 +75,7 @@ class MesosSchedulerImpl implements Scheduler {
   private final Storage storage;
   private final SchedulerCore schedulerCore;
   private final Lifecycle lifecycle;
+  private final EventSink eventSink;
   private volatile boolean registered = false;
 
   /**
@@ -88,12 +90,14 @@ class MesosSchedulerImpl implements Scheduler {
       Storage storage,
       SchedulerCore schedulerCore,
       final Lifecycle lifecycle,
-      List<TaskLauncher> taskLaunchers) {
+      List<TaskLauncher> taskLaunchers,
+      EventSink eventSink) {
 
     this.storage = checkNotNull(storage);
     this.schedulerCore = checkNotNull(schedulerCore);
     this.lifecycle = checkNotNull(lifecycle);
     this.taskLaunchers = checkNotNull(taskLaunchers);
+    this.eventSink = checkNotNull(eventSink);
   }
 
   @Override
@@ -101,7 +105,6 @@ class MesosSchedulerImpl implements Scheduler {
     LOG.info("Received notification of lost slave: " + slaveId);
   }
 
-  @SendNotification(after = Event.DriverRegistered)
   @Override
   public void registered(
       SchedulerDriver driver,
@@ -116,13 +119,14 @@ class MesosSchedulerImpl implements Scheduler {
       }
     });
     registered = true;
+    eventSink.post(new DriverRegistered());
   }
 
-  @SendNotification(after = Event.DriverDisconnected)
   @Override
   public void disconnected(SchedulerDriver schedulerDriver) {
     LOG.warning("Framework disconnected.");
     frameworkDisconnects.incrementAndGet();
+    eventSink.post(new DriverDisconnected());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d0b46fc3/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java b/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
index a54c342..bf926a7 100644
--- a/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
+++ b/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
@@ -46,8 +46,10 @@ import com.twitter.common.zookeeper.Group.JoinException;
 import com.twitter.common.zookeeper.ServerSet;
 import com.twitter.common.zookeeper.SingletonService.LeaderControl;
 
+import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
+import org.apache.aurora.scheduler.events.PubsubEvent.SchedulerActive;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork;
 import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
@@ -95,7 +97,7 @@ public class SchedulerLifecycle implements EventSubscriber {
     STORAGE_PREPARED,
     LEADER_AWAITING_REGISTRATION,
     REGISTERED_LEADER,
-    RUNNING,
+    ACTIVE,
     DEAD
   }
 
@@ -114,14 +116,15 @@ public class SchedulerLifecycle implements EventSubscriber {
 
   @Inject
   SchedulerLifecycle(
-      final DriverFactory driverFactory,
-      final NonVolatileStorage storage,
-      final Lifecycle lifecycle,
-      final Driver driver,
-      final DriverReference driverRef,
+      DriverFactory driverFactory,
+      NonVolatileStorage storage,
+      Lifecycle lifecycle,
+      Driver driver,
+      DriverReference driverRef,
       final LeadingOptions leadingOptions,
       final ScheduledExecutorService executorService,
-      final Clock clock) {
+      Clock clock,
+      EventSink eventSink) {
 
     this(
         driverFactory,
@@ -150,7 +153,8 @@ public class SchedulerLifecycle implements EventSubscriber {
                 leadingOptions.registrationDelayLimit.getUnit().getTimeUnit());
           }
         },
-        clock);
+        clock,
+        eventSink);
   }
 
   @VisibleForTesting
@@ -163,7 +167,8 @@ public class SchedulerLifecycle implements EventSubscriber {
       final Driver driver,
       final DriverReference driverRef,
       final DelayedActions delayedActions,
-      final Clock clock) {
+      final Clock clock,
+      final EventSink eventSink) {
 
     Stats.export(new StatImpl<Integer>("framework_registered") {
       @Override public Integer read() {
@@ -241,6 +246,7 @@ public class SchedulerLifecycle implements EventSubscriber {
     final Closure<Transition<State>> handleRegistered = new Closure<Transition<State>>() {
       @Override public void execute(Transition<State> transition) {
         registrationAcked.set(true);
+        eventSink.post(new SchedulerActive());
         try {
           leaderControl.get().advertise();
         } catch (JoinException e) {
@@ -306,9 +312,9 @@ public class SchedulerLifecycle implements EventSubscriber {
             State.REGISTERED_LEADER, State.DEAD)
         .addState(
             State.REGISTERED_LEADER,
-            State.RUNNING, State.DEAD)
+            State.ACTIVE, State.DEAD)
         .addState(
-            State.RUNNING,
+            State.ACTIVE,
             State.DEAD)
         .addState(
             State.DEAD,

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d0b46fc3/src/main/java/org/apache/aurora/scheduler/TaskVars.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/TaskVars.java b/src/main/java/org/apache/aurora/scheduler/TaskVars.java
index 32ec939..d5d752e 100644
--- a/src/main/java/org/apache/aurora/scheduler/TaskVars.java
+++ b/src/main/java/org/apache/aurora/scheduler/TaskVars.java
@@ -34,9 +34,8 @@ import com.twitter.common.stats.StatsProvider;
 
 import org.apache.aurora.gen.Attribute;
 import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
-import org.apache.aurora.scheduler.events.PubsubEvent.StorageStarted;
+import org.apache.aurora.scheduler.events.PubsubEvent.SchedulerActive;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
 import org.apache.aurora.scheduler.storage.AttributeStore;
@@ -53,12 +52,6 @@ import static com.google.common.base.Preconditions.checkNotNull;
 class TaskVars implements EventSubscriber {
   private static final Logger LOG = Logger.getLogger(TaskVars.class.getName());
 
-  // Used to ignore pubsub events sent before storage has completely started.  This avoids a
-  // miscount where a StorageStarted consumer is invoked before storageStarted is invoked here,
-  // and pubsub events are fired for tasks that we have not yet counted.  For example, if
-  // tasksDeleted is invoked, we would end up with a negative count.
-  private volatile boolean storageStarted = false;
-
   private final LoadingCache<String, AtomicLong> countersByStatus;
   private final LoadingCache<String, AtomicLong> countersByRack;
 
@@ -116,14 +109,12 @@ class TaskVars implements EventSubscriber {
 
   @Subscribe
   public void taskChangedState(TaskStateChange stateChange) {
-    if (!storageStarted) {
-      return;
-    }
-
     IScheduledTask task = stateChange.getTask();
-    if (stateChange.getOldState() != ScheduleStatus.INIT) {
-      decrementCount(stateChange.getOldState());
+    Optional<ScheduleStatus> previousState = stateChange.getOldState();
+    if (stateChange.isTransition() && !previousState.equals(Optional.of(ScheduleStatus.INIT))) {
+      decrementCount(previousState.get());
     }
+
     incrementCount(task.getStatus());
 
     if (stateChange.getNewState() == ScheduleStatus.LOST) {
@@ -146,10 +137,9 @@ class TaskVars implements EventSubscriber {
   }
 
   @Subscribe
-  public void storageStarted(StorageStarted event) {
-    for (IScheduledTask task : Storage.Util.consistentFetchTasks(storage, Query.unscoped())) {
-      incrementCount(task.getStatus());
-    }
+  public void schedulerActive(SchedulerActive event) {
+    // TODO(wfarner): This should probably induce the initial 'export' of stats, so that incomplete
+    // values are not surfaced while storage is recovering.
 
     // Dummy read the counter for each status counter. This is important to guarantee a stat with
     // value zero is present for each state, even if all states are not represented in the task
@@ -157,15 +147,10 @@ class TaskVars implements EventSubscriber {
     for (ScheduleStatus status : ScheduleStatus.values()) {
       getCounter(status);
     }
-    storageStarted = true;
   }
 
   @Subscribe
   public void tasksDeleted(final TasksDeleted event) {
-    if (!storageStarted) {
-      return;
-    }
-
     for (IScheduledTask task : event.getTasks()) {
       decrementCount(task.getStatus());
     }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d0b46fc3/src/main/java/org/apache/aurora/scheduler/async/HistoryPruner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/HistoryPruner.java b/src/main/java/org/apache/aurora/scheduler/async/HistoryPruner.java
index 462e0df..9f77bfc 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/HistoryPruner.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/HistoryPruner.java
@@ -41,10 +41,8 @@ import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Time;
 import com.twitter.common.util.Clock;
 
-import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.state.StateManager;
-import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 
@@ -55,9 +53,7 @@ import static java.lang.annotation.RetentionPolicy.RUNTIME;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import static org.apache.aurora.scheduler.base.Tasks.LATEST_ACTIVITY;
 import static org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
-import static org.apache.aurora.scheduler.events.PubsubEvent.StorageStarted;
 import static org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import static org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
 
@@ -68,9 +64,6 @@ import static org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
 public class HistoryPruner implements EventSubscriber {
   private static final Logger LOG = Logger.getLogger(HistoryPruner.class.getName());
 
-  @VisibleForTesting
-  static final Query.Builder INACTIVE_QUERY = Query.unscoped().terminal();
-
   private final Multimap<IJobKey, String> tasksByJob =
       Multimaps.synchronizedSetMultimap(LinkedHashMultimap.<IJobKey, String>create());
   @VisibleForTesting
@@ -79,7 +72,6 @@ public class HistoryPruner implements EventSubscriber {
   }
 
   private final ScheduledExecutorService executor;
-  private final Storage storage;
   private final StateManager stateManager;
   private final Clock clock;
   private final long pruneThresholdMillis;
@@ -93,14 +85,12 @@ public class HistoryPruner implements EventSubscriber {
   @Inject
   HistoryPruner(
       final ScheduledExecutorService executor,
-      final Storage storage,
       final StateManager stateManager,
       final Clock clock,
       @PruneThreshold Amount<Long, Time> inactivePruneThreshold,
       @PruneThreshold int perJobHistoryGoal) {
 
     this.executor = checkNotNull(executor);
-    this.storage = checkNotNull(storage);
     this.stateManager = checkNotNull(stateManager);
     this.clock = checkNotNull(clock);
     this.pruneThresholdMillis = inactivePruneThreshold.as(Time.MILLISECONDS);
@@ -120,28 +110,13 @@ public class HistoryPruner implements EventSubscriber {
   @Subscribe
   public void recordStateChange(TaskStateChange change) {
     if (Tasks.isTerminated(change.getNewState())) {
+      long timeoutBasis = change.isTransition()
+          ? clock.nowMillis()
+          : Iterables.getLast(change.getTask().getTaskEvents()).getTimestamp();
       registerInactiveTask(
           Tasks.SCHEDULED_TO_JOB_KEY.apply(change.getTask()),
           change.getTaskId(),
-          calculateTimeout(clock.nowMillis()));
-    }
-  }
-
-  /**
-   * When triggered, iterates through inactive tasks in the system and prunes tasks that
-   * exceed the history goal for a job or are beyond the time threshold.
-   *
-   * @param event A new StorageStarted event.
-   */
-  @Subscribe
-  public void storageStarted(StorageStarted event) {
-    for (IScheduledTask task
-        : LATEST_ACTIVITY.sortedCopy(Storage.Util.consistentFetchTasks(storage, INACTIVE_QUERY))) {
-
-      registerInactiveTask(
-          Tasks.SCHEDULED_TO_JOB_KEY.apply(task),
-          Tasks.id(task),
-          calculateTimeout(Iterables.getLast(task.getTaskEvents()).getTimestamp()));
+          calculateTimeout(timeoutBasis));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d0b46fc3/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java b/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java
index fb4d2b9..0265bf9 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java
@@ -68,6 +68,8 @@ interface RescheduleCalculator {
    */
   long getReadyTimeMs(IScheduledTask task);
 
+  // TODO(wfarner): Create a unit test for this class.  It currently piggybacks on
+  // TaskSchedulerTest.  Once a unit test exists, TaskSchedulerTest should use a mock.
   class RescheduleCalculatorImpl implements RescheduleCalculator {
 
     private static final Logger LOG = Logger.getLogger(TaskGroups.class.getName());
@@ -75,6 +77,7 @@ interface RescheduleCalculator {
     private final Storage storage;
     private final RescheduleCalculatorSettings settings;
     private final Clock clock;
+    // TODO(wfarner): Inject 'random' in the constructor for better test coverage.
     private final Random random = new Random.SystemRandom(new java.util.Random());
 
     private static final Predicate<ScheduleStatus> IS_ACTIVE_STATUS =

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d0b46fc3/src/main/java/org/apache/aurora/scheduler/async/TaskGroups.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskGroups.java b/src/main/java/org/apache/aurora/scheduler/async/TaskGroups.java
index 1119344..b50c625 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskGroups.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskGroups.java
@@ -22,6 +22,7 @@ import java.util.logging.Logger;
 
 import javax.inject.Inject;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
@@ -41,13 +42,10 @@ import com.twitter.common.util.Clock;
 import com.twitter.common.util.concurrent.ExecutorServiceShutdown;
 
 import org.apache.aurora.scheduler.base.JobKeys;
-import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
-import org.apache.aurora.scheduler.events.PubsubEvent.StorageStarted;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
-import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
@@ -70,7 +68,6 @@ public class TaskGroups implements EventSubscriber {
 
   private static final Logger LOG = Logger.getLogger(TaskGroups.class.getName());
 
-  private final Storage storage;
   private final LoadingCache<GroupKey, TaskGroup> groups;
   private final Clock clock;
   private final RescheduleCalculator rescheduleCalculator;
@@ -88,7 +85,6 @@ public class TaskGroups implements EventSubscriber {
   @Inject
   TaskGroups(
       ShutdownRegistry shutdownRegistry,
-      Storage storage,
       TaskGroupsSettings settings,
       TaskScheduler taskScheduler,
       Clock clock,
@@ -96,7 +92,6 @@ public class TaskGroups implements EventSubscriber {
 
     this(
         createThreadPool(shutdownRegistry),
-        storage,
         settings.taskGroupBackoff,
         settings.rateLimiter,
         taskScheduler,
@@ -104,16 +99,15 @@ public class TaskGroups implements EventSubscriber {
         rescheduleCalculator);
   }
 
+  @VisibleForTesting
   TaskGroups(
       final ScheduledExecutorService executor,
-      final Storage storage,
       final BackoffStrategy taskGroupBackoffStrategy,
       final RateLimiter rateLimiter,
       final TaskScheduler taskScheduler,
       final Clock clock,
       final RescheduleCalculator rescheduleCalculator) {
 
-    this.storage = checkNotNull(storage);
     checkNotNull(executor);
     checkNotNull(taskGroupBackoffStrategy);
     checkNotNull(rateLimiter);
@@ -222,26 +216,11 @@ public class TaskGroups implements EventSubscriber {
   @Subscribe
   public synchronized void taskChangedState(TaskStateChange stateChange) {
     if (stateChange.getNewState() == PENDING) {
-      add(
-          stateChange.getTask().getAssignedTask(),
-          rescheduleCalculator.getReadyTimeMs(stateChange.getTask()));
-    }
-  }
-
-  /**
-   * Signals that storage has started and is consistent.
-   * <p>
-   * Upon this signal, all {@link org.apache.aurora.gen.ScheduleStatus#PENDING} tasks in the stoage
-   * will become eligible for scheduling.
-   *
-   * @param event Storage started notification.
-   */
-  @Subscribe
-  public void storageStarted(StorageStarted event) {
-    for (IScheduledTask task
-        : Storage.Util.consistentFetchTasks(storage, Query.unscoped().byStatus(PENDING))) {
-
-      add(task.getAssignedTask(), rescheduleCalculator.getStartupReadyTimeMs(task));
+      IScheduledTask task = stateChange.getTask();
+      long readyAtMs = stateChange.isTransition()
+          ? rescheduleCalculator.getReadyTimeMs(task)
+          : rescheduleCalculator.getStartupReadyTimeMs(task);
+      add(task.getAssignedTask(), readyAtMs);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d0b46fc3/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
index 4a91d9f..96c76ba 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
@@ -210,7 +210,7 @@ interface TaskScheduler extends EventSubscriber {
 
     @Subscribe
     public void taskChanged(final TaskStateChange stateChangeEvent) {
-      if (stateChangeEvent.getOldState() == PENDING) {
+      if (Optional.of(PENDING).equals(stateChangeEvent.getOldState())) {
         reservations.invalidateTask(stateChangeEvent.getTaskId());
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d0b46fc3/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java b/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java
index 046befb..9f14a99 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java
@@ -43,13 +43,9 @@ import com.twitter.common.util.Clock;
 
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
-import org.apache.aurora.scheduler.events.PubsubEvent.StorageStarted;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.state.StateManager;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
@@ -76,9 +72,6 @@ class TaskTimeout implements EventSubscriber {
       ScheduleStatus.RESTARTING,
       ScheduleStatus.KILLING);
 
-  @VisibleForTesting
-  static final Query.Builder TRANSIENT_QUERY = Query.unscoped().byStatus(TRANSIENT_STATES);
-
   private final Map<TimeoutKey, Context> futures = Maps.newConcurrentMap();
 
   private static final class TimeoutKey {
@@ -111,7 +104,6 @@ class TaskTimeout implements EventSubscriber {
     }
   }
 
-  private final Storage storage;
   private final ScheduledExecutorService executor;
   private final StateManager stateManager;
   private final long timeoutMillis;
@@ -120,14 +112,12 @@ class TaskTimeout implements EventSubscriber {
 
   @Inject
   TaskTimeout(
-      Storage storage,
       ScheduledExecutorService executor,
       StateManager stateManager,
       final Clock clock,
       Amount<Long, Time> timeout,
       StatsProvider statsProvider) {
 
-    this.storage = checkNotNull(storage);
     this.executor = checkNotNull(executor);
     this.stateManager = checkNotNull(stateManager);
     this.timeoutMillis = timeout.as(Time.MILLISECONDS);
@@ -162,8 +152,8 @@ class TaskTimeout implements EventSubscriber {
   public void recordStateChange(TaskStateChange change) {
     String taskId = change.getTaskId();
     ScheduleStatus newState = change.getNewState();
-    if (isTransient(change.getOldState())) {
-      TimeoutKey oldKey = new TimeoutKey(taskId, change.getOldState());
+    if (change.isTransition() && isTransient(change.getOldState().get())) {
+      TimeoutKey oldKey = new TimeoutKey(taskId, change.getOldState().get());
       Context context = futures.remove(oldKey);
       if (context != null) {
         LOG.fine("Canceling state timeout for task " + oldKey);
@@ -176,13 +166,6 @@ class TaskTimeout implements EventSubscriber {
     }
   }
 
-  @Subscribe
-  public void storageStarted(StorageStarted event) {
-    for (IScheduledTask task : Storage.Util.consistentFetchTasks(storage, TRANSIENT_QUERY)) {
-      registerTimeout(new TimeoutKey(Tasks.id(task), task.getStatus()));
-    }
-  }
-
   private class TimedOutTaskHandler implements Runnable {
     private final TimeoutKey key;
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d0b46fc3/src/main/java/org/apache/aurora/scheduler/events/EventSink.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/events/EventSink.java b/src/main/java/org/apache/aurora/scheduler/events/EventSink.java
new file mode 100644
index 0000000..4fc425e
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/events/EventSink.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.events;
+
+/**
+ * A publishing channel for pubsub events, which will cause the event to be announced to relevant
+ * subscribers.
+ */
+public interface EventSink {
+
+  /**
+   * Posts an event.
+   *
+   * @param event Event to announce.
+   */
+  void post(PubsubEvent event);
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d0b46fc3/src/main/java/org/apache/aurora/scheduler/events/NotifyingMethodInterceptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/events/NotifyingMethodInterceptor.java b/src/main/java/org/apache/aurora/scheduler/events/NotifyingMethodInterceptor.java
deleted file mode 100644
index 8003262..0000000
--- a/src/main/java/org/apache/aurora/scheduler/events/NotifyingMethodInterceptor.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Copyright 2013 Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.events;
-
-import java.lang.reflect.Method;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-
-import com.google.common.base.Preconditions;
-import com.twitter.common.base.Closure;
-
-import org.aopalliance.intercept.MethodInterceptor;
-import org.aopalliance.intercept.MethodInvocation;
-import org.apache.aurora.scheduler.events.PubsubEvent.Interceptors.Event;
-import org.apache.aurora.scheduler.events.PubsubEvent.Interceptors.SendNotification;
-
-/**
- * A method interceptor that sends pubsub notifications before and/or after a method annotated
- * with {@link SendNotification}
- * is invoked.
- */
-class NotifyingMethodInterceptor implements MethodInterceptor {
-  private static final Logger LOG = Logger.getLogger(NotifyingMethodInterceptor.class.getName());
-
-  @Inject
-  private Closure<PubsubEvent> eventSink;
-
-  private void maybeFire(Event event) {
-    if (event != Event.None) {
-      eventSink.execute(event.getEvent());
-    }
-  }
-
-  @Override
-  public Object invoke(MethodInvocation invocation) throws Throwable {
-    Preconditions.checkNotNull(eventSink, "Event sink has not yet been set.");
-
-    Method method = invocation.getMethod();
-    SendNotification sendNotification = method.getAnnotation(SendNotification.class);
-    if (sendNotification == null) {
-      LOG.warning("Interceptor should not match methods without @"
-          + SendNotification.class.getSimpleName());
-      return invocation.proceed();
-    }
-
-    maybeFire(sendNotification.before());
-    Object result = invocation.proceed();
-    maybeFire(sendNotification.after());
-    return result;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d0b46fc3/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java b/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java
index e5ab284..c7f4a1b 100644
--- a/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java
+++ b/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java
@@ -22,7 +22,6 @@ import java.util.Set;
 import javax.inject.Inject;
 
 import com.google.inject.BindingAnnotation;
-import com.twitter.common.base.Closure;
 
 import org.apache.aurora.scheduler.ResourceSlot;
 import org.apache.aurora.scheduler.events.PubsubEvent.Vetoed;
@@ -49,12 +48,12 @@ class NotifyingSchedulingFilter implements SchedulingFilter {
   public @interface NotifyDelegate { }
 
   private final SchedulingFilter delegate;
-  private final Closure<PubsubEvent> eventSink;
+  private final EventSink eventSink;
 
   @Inject
   NotifyingSchedulingFilter(
       @NotifyDelegate SchedulingFilter delegate,
-      Closure<PubsubEvent> eventSink) {
+      EventSink eventSink) {
 
     this.delegate = checkNotNull(delegate);
     this.eventSink = checkNotNull(eventSink);
@@ -64,7 +63,7 @@ class NotifyingSchedulingFilter implements SchedulingFilter {
   public Set<Veto> filter(ResourceSlot offer, String slaveHost, ITaskConfig task, String taskId) {
     Set<Veto> vetoes = delegate.filter(offer, slaveHost, task, taskId);
     if (!vetoes.isEmpty()) {
-      eventSink.execute(new Vetoed(taskId, vetoes));
+      eventSink.post(new Vetoed(taskId, vetoes));
     }
 
     return vetoes;

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d0b46fc3/src/main/java/org/apache/aurora/scheduler/events/PubsubEvent.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/events/PubsubEvent.java b/src/main/java/org/apache/aurora/scheduler/events/PubsubEvent.java
index 971f40c..59e18ea 100644
--- a/src/main/java/org/apache/aurora/scheduler/events/PubsubEvent.java
+++ b/src/main/java/org/apache/aurora/scheduler/events/PubsubEvent.java
@@ -15,11 +15,10 @@
  */
 package org.apache.aurora.scheduler.events;
 
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
 import java.util.Set;
 
 import com.google.common.base.Objects;
+import com.google.common.base.Optional;
 
 import org.apache.aurora.gen.HostStatus;
 import org.apache.aurora.gen.ScheduleStatus;
@@ -27,9 +26,6 @@ import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
 import static com.google.common.base.Preconditions.checkNotNull;
 
 /**
@@ -83,23 +79,48 @@ public interface PubsubEvent {
   /**
    * Event sent when a task changed state.
    */
-  public static class TaskStateChange implements PubsubEvent {
+  public static final class TaskStateChange implements PubsubEvent {
     private final IScheduledTask task;
-    private final ScheduleStatus oldState;
+    private final Optional<ScheduleStatus> oldState;
 
-    public TaskStateChange(IScheduledTask task, ScheduleStatus oldState) {
+    private TaskStateChange(IScheduledTask task, Optional<ScheduleStatus> oldState) {
       this.task = checkNotNull(task);
       this.oldState = checkNotNull(oldState);
     }
 
+    /**
+     * Creates a state change event that represents the initial value of a task.
+     *
+     * @param task Task structure.
+     * @return A state change event.
+     */
+    public static TaskStateChange initialized(IScheduledTask task) {
+      return new TaskStateChange(task, Optional.<ScheduleStatus>absent());
+    }
+
+    /**
+     * Creates a state change event that represents a transition from one state to another.
+     *
+     * @param task Current task structure.
+     * @param oldState State the task was previously in.
+     * @return A state change event.
+     */
+    public static TaskStateChange transition(IScheduledTask task, ScheduleStatus oldState) {
+      return new TaskStateChange(task, Optional.of(oldState));
+    }
+
     public String getTaskId() {
       return Tasks.id(task);
     }
 
-    public ScheduleStatus getOldState() {
+    public Optional<ScheduleStatus> getOldState() {
       return oldState;
     }
 
+    public boolean isTransition() {
+      return oldState.isPresent();
+    }
+
     public IScheduledTask getTask() {
       return task;
     }
@@ -242,7 +263,7 @@ public interface PubsubEvent {
     }
   }
 
-  public static class StorageStarted implements PubsubEvent {
+  public static class DriverRegistered implements PubsubEvent {
     @Override
     public boolean equals(Object o) {
       return (o != null) && getClass().equals(o.getClass());
@@ -254,7 +275,7 @@ public interface PubsubEvent {
     }
   }
 
-  public static class DriverRegistered implements PubsubEvent {
+  public static class DriverDisconnected implements PubsubEvent {
     @Override
     public boolean equals(Object o) {
       return (o != null) && getClass().equals(o.getClass());
@@ -266,7 +287,7 @@ public interface PubsubEvent {
     }
   }
 
-  public static class DriverDisconnected implements PubsubEvent {
+  public static class SchedulerActive implements PubsubEvent {
     @Override
     public boolean equals(Object o) {
       return (o != null) && getClass().equals(o.getClass());
@@ -277,43 +298,4 @@ public interface PubsubEvent {
       return getClass().hashCode();
     }
   }
-
-  public static final class Interceptors {
-    private Interceptors() {
-      // Utility class.
-    }
-
-    public enum Event {
-      None(null),
-      StorageStarted(new StorageStarted()),
-      DriverRegistered(new DriverRegistered()),
-      DriverDisconnected(new DriverDisconnected());
-
-      private final PubsubEvent event;
-      private Event(PubsubEvent event) {
-        this.event = event;
-      }
-
-      public PubsubEvent getEvent() {
-        return event;
-      }
-    }
-
-    /**
-     * An annotation to place on methods of injected classes that which to fire events before
-     * and/or after their invocation.
-     */
-    @Target(METHOD) @Retention(RUNTIME)
-    public @interface SendNotification {
-      /**
-       * Event to fire prior to invocation.
-       */
-      Event before() default Event.None;
-
-      /**
-       * Event to fire after invocation.
-       */
-      Event after() default Event.None;
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d0b46fc3/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java b/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java
index 1260a16..ebdede9 100644
--- a/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java
@@ -27,17 +27,12 @@ import com.google.common.eventbus.EventBus;
 import com.google.common.eventbus.Subscribe;
 import com.google.inject.AbstractModule;
 import com.google.inject.Binder;
-import com.google.inject.TypeLiteral;
-import com.google.inject.matcher.Matchers;
 import com.google.inject.multibindings.Multibinder;
 import com.twitter.common.application.modules.LifecycleModule;
-import com.twitter.common.base.Closure;
 import com.twitter.common.base.Command;
 
-import org.aopalliance.intercept.MethodInterceptor;
 import org.apache.aurora.scheduler.events.NotifyingSchedulingFilter.NotifyDelegate;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
-import org.apache.aurora.scheduler.events.PubsubEvent.Interceptors.SendNotification;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
 
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -69,17 +64,16 @@ public final class PubsubEventModule extends AbstractModule {
 
     bind(EventBus.class).toInstance(eventBus);
 
-    Closure<PubsubEvent> eventPoster = new Closure<PubsubEvent>() {
-      @Override public void execute(PubsubEvent event) {
+    EventSink eventSink = new EventSink() {
+      @Override public void post(PubsubEvent event) {
         eventBus.post(event);
       }
     };
-    bind(new TypeLiteral<Closure<PubsubEvent>>() { }).toInstance(eventPoster);
+    bind(EventSink.class).toInstance(eventSink);
 
     // Ensure at least an empty binding is present.
     getSubscriberBinder(binder());
     LifecycleModule.bindStartupAction(binder(), RegisterSubscribers.class);
-    bindNotifyingInterceptor(binder());
   }
 
   static class RegisterSubscribers implements Command {
@@ -126,21 +120,4 @@ public final class PubsubEventModule extends AbstractModule {
   public static void bindSubscriber(Binder binder, Class<? extends EventSubscriber> subscriber) {
     getSubscriberBinder(binder).addBinding().to(subscriber);
   }
-
-  /**
-   * Binds a method interceptor to all methods annotated with {@link SendNotification}.
-   * <p>
-   * The interceptor will send notifications before and/or after the wrapped method invocation.
-   *
-   * @param binder Guice binder.
-   */
-  @VisibleForTesting
-  public static void bindNotifyingInterceptor(Binder binder) {
-    MethodInterceptor interceptor = new NotifyingMethodInterceptor();
-    binder.requestInjection(interceptor);
-    binder.bindInterceptor(
-        Matchers.any(),
-        Matchers.annotatedWith(SendNotification.class),
-        interceptor);
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d0b46fc3/src/main/java/org/apache/aurora/scheduler/metadata/NearestFit.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/metadata/NearestFit.java b/src/main/java/org/apache/aurora/scheduler/metadata/NearestFit.java
index f801f80..96fcbc4 100644
--- a/src/main/java/org/apache/aurora/scheduler/metadata/NearestFit.java
+++ b/src/main/java/org/apache/aurora/scheduler/metadata/NearestFit.java
@@ -96,12 +96,12 @@ public class NearestFit implements EventSubscriber {
    * Records a task state change event.
    * This will ignore any events where the previous state is not {@link ScheduleStatus#PENDING}.
    *
-   * @param stateChangeEvent Task state change.
+   * @param event Task state change.
    */
   @Subscribe
-  public synchronized void stateChanged(TaskStateChange stateChangeEvent) {
-    if (stateChangeEvent.getOldState() == ScheduleStatus.PENDING) {
-      fitByTask.invalidate(stateChangeEvent.getTaskId());
+  public synchronized void stateChanged(TaskStateChange event) {
+    if (event.isTransition() && event.getOldState().get() == ScheduleStatus.PENDING) {
+      fitByTask.invalidate(event.getTaskId());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d0b46fc3/src/main/java/org/apache/aurora/scheduler/state/CronJobManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/CronJobManager.java b/src/main/java/org/apache/aurora/scheduler/state/CronJobManager.java
index e1773e9..b772712 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/CronJobManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/CronJobManager.java
@@ -61,7 +61,7 @@ import org.apache.aurora.scheduler.configuration.SanitizedConfiguration;
 import org.apache.aurora.scheduler.cron.CronException;
 import org.apache.aurora.scheduler.cron.CronScheduler;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
-import org.apache.aurora.scheduler.events.PubsubEvent.StorageStarted;
+import org.apache.aurora.scheduler.events.PubsubEvent.SchedulerActive;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork;
 import org.apache.aurora.scheduler.storage.Storage.Work;
@@ -165,13 +165,13 @@ public class CronJobManager extends JobManager implements EventSubscriber {
   }
 
   /**
-   * Notifies the cron job manager that storage is started, and job configurations are ready to
+   * Notifies the cron job manager that the scheduler is active, and job configurations are ready to
    * load.
    *
-   * @param storageStarted Event.
+   * @param schedulerActive Event.
    */
   @Subscribe
-  public void storageStarted(StorageStarted storageStarted) {
+  public void schedulerActive(SchedulerActive schedulerActive) {
     cron.start();
     shutdownRegistry.addAction(new ExceptionalCommand<CronException>() {
       @Override public void execute() throws CronException {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d0b46fc3/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java b/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
index 007369c..3dd2271 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
@@ -22,14 +22,12 @@ import javax.inject.Inject;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
-import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 import com.google.common.eventbus.Subscribe;
 import com.twitter.common.base.Closure;
-import com.twitter.common.base.Closures;
 
 import org.apache.aurora.gen.HostAttributes;
 import org.apache.aurora.gen.HostStatus;
@@ -37,9 +35,9 @@ import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
-import org.apache.aurora.scheduler.events.PubsubEvent.StorageStarted;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.storage.AttributeStore;
 import org.apache.aurora.scheduler.storage.Storage;
@@ -108,13 +106,13 @@ public interface MaintenanceController {
   class MaintenanceControllerImpl implements MaintenanceController, EventSubscriber {
     private final Storage storage;
     private final StateManager stateManager;
-    private final Closure<PubsubEvent> eventSink;
+    private final EventSink eventSink;
 
     @Inject
     public MaintenanceControllerImpl(
         Storage storage,
         StateManager stateManager,
-        Closure<PubsubEvent> eventSink) {
+        EventSink eventSink) {
 
       this.storage = checkNotNull(storage);
       this.stateManager = checkNotNull(stateManager);
@@ -146,36 +144,6 @@ public interface MaintenanceController {
           .build();
     }
 
-    private Set<HostStatus> watchDrainingTasks(MutableStoreProvider store, Set<String> hosts) {
-      return watchDrainingTasks(store, hosts, Closures.<Query.Builder>noop());
-    }
-
-    private static final Predicate<HostAttributes> IS_DRAINING = new Predicate<HostAttributes>() {
-      @Override public boolean apply(HostAttributes attributes) {
-        return DRAINING == attributes.getMode();
-      }
-    };
-
-    /**
-     * Notifies the MaintenanceController that storage has started, and maintenance statuses are
-     * ready to be loaded.
-     *
-     * @param started Event.
-     */
-    @Subscribe
-    public void storageStarted(StorageStarted started) {
-      storage.write(new MutateWork.NoResult.Quiet() {
-        @Override protected void execute(MutableStoreProvider storeProvider) {
-          Set<String> drainingHosts =
-              FluentIterable.from(storeProvider.getAttributeStore().getHostAttributes())
-                  .filter(IS_DRAINING)
-                  .transform(HOST_NAME)
-                  .toSet();
-          watchDrainingTasks(storeProvider, drainingHosts);
-        }
-      });
-    }
-
     /**
      * Notifies the MaintenanceController that a task has changed state
      *
@@ -292,7 +260,7 @@ public interface MaintenanceController {
       for (String host : hosts) {
         if (store.setMaintenanceMode(host, mode)) {
           HostStatus status = new HostStatus().setHost(host).setMode(mode);
-          eventSink.execute(new PubsubEvent.HostMaintenanceStateChange(status.deepCopy()));
+          eventSink.post(new PubsubEvent.HostMaintenanceStateChange(status.deepCopy()));
           statuses.add(status);
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d0b46fc3/src/main/java/org/apache/aurora/scheduler/state/SideEffectStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/SideEffectStorage.java b/src/main/java/org/apache/aurora/scheduler/state/SideEffectStorage.java
index 46e1568..2bdd459 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/SideEffectStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/SideEffectStorage.java
@@ -21,8 +21,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
-import com.twitter.common.base.Closure;
 
+import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
@@ -47,7 +47,7 @@ class SideEffectStorage {
 
   private final Storage storage;
   private final OperationFinalizer operationFinalizer;
-  private final Closure<PubsubEvent> taskEventSink;
+  private final EventSink eventSink;
 
   interface OperationFinalizer {
     /**
@@ -68,11 +68,11 @@ class SideEffectStorage {
   SideEffectStorage(
       Storage storage,
       OperationFinalizer operationFinalizer,
-      Closure<PubsubEvent> taskEventSink) {
+      EventSink eventSink) {
 
     this.storage = checkNotNull(storage);
     this.operationFinalizer = checkNotNull(operationFinalizer);
-    this.taskEventSink = checkNotNull(taskEventSink);
+    this.eventSink = checkNotNull(eventSink);
   }
 
   /**
@@ -154,7 +154,7 @@ class SideEffectStorage {
           operationFinalizer.finalize(work, storeProvider);
           if (topLevelOperation) {
             while (!events.isEmpty()) {
-              taskEventSink.execute(events.remove());
+              eventSink.post(events.remove());
             }
           }
           return result;

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d0b46fc3/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
index 0024222..b6dd537 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
@@ -38,7 +38,6 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Atomics;
-import com.twitter.common.base.Closure;
 import com.twitter.common.stats.Stats;
 import com.twitter.common.util.Clock;
 
@@ -49,6 +48,7 @@ import org.apache.aurora.scheduler.Driver;
 import org.apache.aurora.scheduler.TaskIdGenerator;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent;
 import org.apache.aurora.scheduler.state.SideEffectStorage.SideEffectWork;
 import org.apache.aurora.scheduler.storage.Storage;
@@ -157,7 +157,7 @@ public class StateManagerImpl implements StateManager {
       final Clock clock,
       Driver driver,
       TaskIdGenerator taskIdGenerator,
-      Closure<PubsubEvent> taskEventSink) {
+      EventSink eventSink) {
 
     checkNotNull(storage);
     this.clock = checkNotNull(clock);
@@ -168,8 +168,7 @@ public class StateManagerImpl implements StateManager {
       }
     };
 
-    this.storage = new SideEffectStorage(storage, finalizer, taskEventSink);
-
+    this.storage = new SideEffectStorage(storage, finalizer, eventSink);
     this.driver = checkNotNull(driver);
     this.taskIdGenerator = checkNotNull(taskIdGenerator);
 
@@ -369,7 +368,7 @@ public class StateManagerImpl implements StateManager {
               }
             });
             sideEffectWork.addTaskEvent(
-                new PubsubEvent.TaskStateChange(
+                PubsubEvent.TaskStateChange.transition(
                     Iterables.getOnlyElement(taskStore.fetchTasks(idQuery)),
                     stateMachine.getPreviousState()));
             break;

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d0b46fc3/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java
index cdc7ce6..a556ae2 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java
@@ -28,10 +28,15 @@ import com.google.inject.Module;
 import com.google.inject.PrivateModule;
 import com.twitter.common.util.StateMachine;
 
-import org.apache.aurora.scheduler.events.PubsubEvent.Interceptors.Event;
-import org.apache.aurora.scheduler.events.PubsubEvent.Interceptors.SendNotification;
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.events.EventSink;
+import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult.Quiet;
 import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+
+import static com.google.common.base.Preconditions.checkNotNull;
 
 /**
  * A non-volatile storage wrapper that enforces method call ordering.
@@ -47,6 +52,7 @@ public class CallOrderEnforcingStorage implements NonVolatileStorage {
   private @interface EnforceOrderOn { }
 
   private final NonVolatileStorage wrapped;
+  private final EventSink eventSink;
 
   private enum State {
     CONSTRUCTED,
@@ -64,8 +70,9 @@ public class CallOrderEnforcingStorage implements NonVolatileStorage {
       .build();
 
   @Inject
-  CallOrderEnforcingStorage(@EnforceOrderOn NonVolatileStorage wrapped) {
-    this.wrapped = wrapped;
+  CallOrderEnforcingStorage(@EnforceOrderOn NonVolatileStorage wrapped, EventSink eventSink) {
+    this.wrapped = checkNotNull(wrapped);
+    this.eventSink = checkNotNull(eventSink);
   }
 
   private void checkInState(State state) throws StorageException {
@@ -81,12 +88,20 @@ public class CallOrderEnforcingStorage implements NonVolatileStorage {
     stateMachine.transition(State.PREPARED);
   }
 
-  @SendNotification(after = Event.StorageStarted)
   @Override
   public void start(Quiet initializationLogic) throws StorageException {
     checkInState(State.PREPARED);
     wrapped.start(initializationLogic);
     stateMachine.transition(State.READY);
+    wrapped.write(new MutateWork.NoResult.Quiet() {
+      @Override protected void execute(MutableStoreProvider storeProvider) {
+        Iterable<IScheduledTask> tasks = Tasks.LATEST_ACTIVITY.sortedCopy(
+            storeProvider.getTaskStore().fetchTasks(Query.unscoped()));
+        for (IScheduledTask task : tasks) {
+          eventSink.post(TaskStateChange.initialized(task));
+        }
+      }
+    });
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d0b46fc3/src/test/java/org/apache/aurora/scheduler/MesosSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/MesosSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/MesosSchedulerImplTest.java
index 5937e9d..fd4d5d5 100644
--- a/src/test/java/org/apache/aurora/scheduler/MesosSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/MesosSchedulerImplTest.java
@@ -28,17 +28,15 @@ import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.google.inject.TypeLiteral;
 import com.twitter.common.application.Lifecycle;
-import com.twitter.common.base.Closure;
 import com.twitter.common.base.Command;
 import com.twitter.common.testing.easymock.EasyMockTest;
 
 import org.apache.aurora.scheduler.base.Conversions;
 import org.apache.aurora.scheduler.base.SchedulerException;
 import org.apache.aurora.scheduler.configuration.Resources;
-import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
 import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered;
-import org.apache.aurora.scheduler.events.PubsubEventModule;
 import org.apache.aurora.scheduler.state.SchedulerCore;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.StorageException;
@@ -108,7 +106,7 @@ public class MesosSchedulerImplTest extends EasyMockTest {
   private TaskLauncher systemLauncher;
   private TaskLauncher userLauncher;
   private SchedulerDriver driver;
-  private Closure<PubsubEvent> eventBus;
+  private EventSink eventSink;
 
   private MesosSchedulerImpl scheduler;
 
@@ -119,7 +117,7 @@ public class MesosSchedulerImplTest extends EasyMockTest {
         new Lifecycle(createMock(Command.class), createMock(UncaughtExceptionHandler.class));
     systemLauncher = createMock(TaskLauncher.class);
     userLauncher = createMock(TaskLauncher.class);
-    eventBus = createMock(new Clazz<Closure<PubsubEvent>>() { });
+    eventSink = createMock(EventSink.class);
 
     Injector injector = Guice.createInjector(new AbstractModule() {
       @Override protected void configure() {
@@ -128,8 +126,7 @@ public class MesosSchedulerImplTest extends EasyMockTest {
         bind(Lifecycle.class).toInstance(lifecycle);
         bind(new TypeLiteral<List<TaskLauncher>>() { })
             .toInstance(Arrays.asList(systemLauncher, userLauncher));
-        bind(new TypeLiteral<Closure<PubsubEvent>>() { }).toInstance(eventBus);
-        PubsubEventModule.bindNotifyingInterceptor(binder());
+        bind(EventSink.class).toInstance(eventSink);
       }
     });
     scheduler = injector.getInstance(MesosSchedulerImpl.class);
@@ -260,7 +257,7 @@ public class MesosSchedulerImplTest extends EasyMockTest {
   public void testDisconnected() throws Exception {
     new RegisteredFixture() {
       @Override void expectations() throws Exception {
-        eventBus.execute(new DriverDisconnected());
+        eventSink.post(new DriverDisconnected());
       }
 
       @Override void test() {
@@ -287,7 +284,7 @@ public class MesosSchedulerImplTest extends EasyMockTest {
 
     void run() throws Exception {
       runCalled.set(true);
-      eventBus.execute(new DriverRegistered());
+      eventSink.post(new DriverRegistered());
       storageUtil.expectOperations();
       storageUtil.schedulerStore.saveFrameworkId(FRAMEWORK_ID);
       expectations();

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d0b46fc3/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java b/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java
index fc789e7..24cd6fa 100644
--- a/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java
@@ -26,7 +26,9 @@ import com.twitter.common.zookeeper.SingletonService.LeadershipListener;
 
 import org.apache.aurora.scheduler.SchedulerLifecycle.DelayedActions;
 import org.apache.aurora.scheduler.SchedulerLifecycle.DriverReference;
+import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered;
+import org.apache.aurora.scheduler.events.PubsubEvent.SchedulerActive;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult.Quiet;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
 import org.apache.mesos.Protos.Status;
@@ -52,6 +54,7 @@ public class SchedulerLifecycleTest extends EasyMockTest {
   private LeaderControl leaderControl;
   private SchedulerDriver schedulerDriver;
   private DelayedActions delayedActions;
+  private EventSink eventSink;
 
   private SchedulerLifecycle schedulerLifecycle;
 
@@ -65,6 +68,7 @@ public class SchedulerLifecycleTest extends EasyMockTest {
     leaderControl = createMock(LeaderControl.class);
     schedulerDriver = createMock(SchedulerDriver.class);
     delayedActions = createMock(DelayedActions.class);
+    eventSink = createMock(EventSink.class);
     schedulerLifecycle = new SchedulerLifecycle(
         driverFactory,
         storageUtil.storage,
@@ -76,7 +80,8 @@ public class SchedulerLifecycleTest extends EasyMockTest {
         driver,
         driverRef,
         delayedActions,
-        createMock(Clock.class));
+        createMock(Clock.class),
+        eventSink);
   }
 
   @Test
@@ -98,6 +103,7 @@ public class SchedulerLifecycleTest extends EasyMockTest {
     delayedActions.blockingDriverJoin(EasyMock.<Runnable>anyObject());
 
     leaderControl.advertise();
+    eventSink.post(new SchedulerActive());
     leaderControl.leave();
     driver.stop();
     storageUtil.storage.stop();

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d0b46fc3/src/test/java/org/apache/aurora/scheduler/TaskVarsTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/TaskVarsTest.java b/src/test/java/org/apache/aurora/scheduler/TaskVarsTest.java
index f09acfa..dde053c 100644
--- a/src/test/java/org/apache/aurora/scheduler/TaskVarsTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/TaskVarsTest.java
@@ -31,8 +31,7 @@ import org.apache.aurora.gen.Identity;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.events.PubsubEvent.StorageStarted;
+import org.apache.aurora.scheduler.events.PubsubEvent.SchedulerActive;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -67,28 +66,31 @@ public class TaskVarsTest extends EasyMockTest {
   public void setUp() {
     storageUtil = new StorageTestUtil(this);
     trackedStats = createMock(StatsProvider.class);
+    vars = new TaskVars(storageUtil.storage, trackedStats);
+
+    storageUtil.expectOperations();
+    globalCounters = Maps.newHashMap();
   }
 
-  private void initialize() {
-    vars = new TaskVars(storageUtil.storage, trackedStats);
-    vars.storageStarted(new StorageStarted());
+  private void expectStatusCountersInitialized() {
+    for (ScheduleStatus status : ScheduleStatus.values()) {
+      AtomicLong counter = new AtomicLong(0);
+      globalCounters.put(status, counter);
+      expect(trackedStats.makeCounter(TaskVars.getVarName(status))).andReturn(counter);
+    }
   }
 
   private void changeState(IScheduledTask task, ScheduleStatus status) {
-    vars.taskChangedState(new TaskStateChange(
+    vars.taskChangedState(TaskStateChange.transition(
         IScheduledTask.build(task.newBuilder().setStatus(status)),
         task.getStatus()));
   }
 
-  private void expectLoadStorage(IScheduledTask... result) {
-    storageUtil.expectOperations();
-    storageUtil.expectTaskFetch(Query.unscoped(), result);
-    globalCounters = Maps.newHashMap();
-    for (ScheduleStatus status : ScheduleStatus.values()) {
-      AtomicLong counter = new AtomicLong(0);
-      globalCounters.put(status, counter);
-      expect(trackedStats.makeCounter(TaskVars.getVarName(status))).andReturn(counter);
+  private void schedulerActivated(IScheduledTask... initialTasks) {
+    for (IScheduledTask task : initialTasks) {
+      vars.taskChangedState(TaskStateChange.initialized(task));
     }
+    vars.schedulerActive(new SchedulerActive());
   }
 
   private IScheduledTask makeTask(String job, ScheduleStatus status, String host) {
@@ -114,10 +116,10 @@ public class TaskVarsTest extends EasyMockTest {
 
   @Test
   public void testStartsAtZero() {
-    expectLoadStorage();
+    expectStatusCountersInitialized();
 
     control.replay();
-    initialize();
+    schedulerActivated();
 
     assertAllZero();
   }
@@ -132,10 +134,10 @@ public class TaskVarsTest extends EasyMockTest {
 
   @Test
   public void testTaskLifeCycle() {
-    expectLoadStorage();
+    expectStatusCountersInitialized();
 
     control.replay();
-    initialize();
+    schedulerActivated();
 
     IScheduledTask taskA = makeTask(JOB_A, INIT);
     changeState(taskA, PENDING);
@@ -156,14 +158,15 @@ public class TaskVarsTest extends EasyMockTest {
 
   @Test
   public void testLoadsFromStorage() {
-    expectLoadStorage(
+    expectStatusCountersInitialized();
+
+    control.replay();
+    schedulerActivated(
         makeTask(JOB_A, PENDING),
         makeTask(JOB_A, RUNNING),
         makeTask(JOB_A, FINISHED),
         makeTask(JOB_B, PENDING),
         makeTask(JOB_B, FAILED));
-    control.replay();
-    initialize();
 
     assertEquals(2, globalCounters.get(PENDING).get());
     assertEquals(1, globalCounters.get(RUNNING).get());
@@ -182,7 +185,7 @@ public class TaskVarsTest extends EasyMockTest {
 
   @Test
   public void testLostCounters() {
-    expectLoadStorage();
+    expectStatusCountersInitialized();
     expectGetHostRack("host1", "rackA").atLeastOnce();
     expectGetHostRack("host2", "rackB").atLeastOnce();
     expectGetHostRack("host3", "rackB").atLeastOnce();
@@ -193,7 +196,7 @@ public class TaskVarsTest extends EasyMockTest {
     expect(trackedStats.makeCounter(TaskVars.rackStatName("rackB"))).andReturn(rackB);
 
     control.replay();
-    initialize();
+    schedulerActivated();
 
     IScheduledTask a = makeTask("jobA", RUNNING, "host1");
     IScheduledTask b = makeTask("jobB", RUNNING, "host2");
@@ -211,29 +214,15 @@ public class TaskVarsTest extends EasyMockTest {
 
   @Test
   public void testRackMissing() {
-    expectLoadStorage();
+    expectStatusCountersInitialized();
     expect(storageUtil.attributeStore.getHostAttributes("a"))
         .andReturn(Optional.<HostAttributes>absent());
 
     control.replay();
-    initialize();
+    schedulerActivated();
 
     IScheduledTask a = makeTask(JOB_A, RUNNING, "a");
     changeState(a, LOST);
     // Since no attributes are stored for the host, a variable is not exported/updated.
   }
-
-  @Test
-  public void testDeleteBeforeStorageStarted() {
-    expectLoadStorage();
-
-    control.replay();
-
-    vars = new TaskVars(storageUtil.storage, trackedStats);
-    vars.tasksDeleted(new TasksDeleted(ImmutableSet.of(makeTask("jobA", RUNNING, "host1"))));
-    assertEquals(0, globalCounters.get(RUNNING).get());
-
-    vars.storageStarted(new StorageStarted());
-    assertEquals(0, globalCounters.get(RUNNING).get());
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d0b46fc3/src/test/java/org/apache/aurora/scheduler/async/HistoryPrunerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/HistoryPrunerTest.java b/src/test/java/org/apache/aurora/scheduler/async/HistoryPrunerTest.java
index 530254a..afb7ce9 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/HistoryPrunerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/HistoryPrunerTest.java
@@ -39,12 +39,10 @@ import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.gen.TaskEvent;
 import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.events.PubsubEvent;
-import org.apache.aurora.scheduler.events.PubsubEvent.StorageStarted;
+import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
 import org.easymock.IAnswer;
@@ -78,7 +76,6 @@ public class HistoryPrunerTest extends EasyMockTest {
   private ScheduledFuture<?> future;
   private ScheduledExecutorService executor;
   private FakeClock clock;
-  private StorageTestUtil storageUtil;
   private StateManager stateManager;
   private HistoryPruner pruner;
 
@@ -87,12 +84,9 @@ public class HistoryPrunerTest extends EasyMockTest {
     future = createMock(new Clazz<ScheduledFuture<?>>() { });
     executor = createMock(ScheduledExecutorService.class);
     clock = new FakeClock();
-    storageUtil = new StorageTestUtil(this);
-    storageUtil.expectOperations();
     stateManager = createMock(StateManager.class);
     pruner = new HistoryPruner(
         executor,
-        storageUtil.storage,
         stateManager,
         clock,
         ONE_DAY,
@@ -109,16 +103,7 @@ public class HistoryPrunerTest extends EasyMockTest {
   }
 
   @Test
-  public void testNoTasksOnStorageStart() {
-    expectGetInactiveTasks();
-
-    control.replay();
-
-    pruner.storageStarted(new StorageStarted());
-  }
-
-  @Test
-  public void testStorageStartWithoutPruning() {
+  public void testNoPruning() {
     long taskATimestamp = clock.nowMillis();
     IScheduledTask a = makeTask("a", FINISHED);
 
@@ -126,7 +111,6 @@ public class HistoryPrunerTest extends EasyMockTest {
     long taskBTimestamp = clock.nowMillis();
     IScheduledTask b = makeTask("b", LOST);
 
-    expectGetInactiveTasks(a, b);
     expectOneTaskWatch(taskATimestamp);
     expectOneTaskWatch(taskBTimestamp);
 
@@ -134,7 +118,8 @@ public class HistoryPrunerTest extends EasyMockTest {
 
     control.replay();
 
-    pruner.storageStarted(new StorageStarted());
+    pruner.recordStateChange(TaskStateChange.initialized(a));
+    pruner.recordStateChange(TaskStateChange.initialized(b));
 
     // Clean-up
     pruner.tasksDeleted(new TasksDeleted(ImmutableSet.of(a, b)));
@@ -157,7 +142,6 @@ public class HistoryPrunerTest extends EasyMockTest {
     IScheduledTask d = makeTask("d", FINISHED);
     IScheduledTask e = makeTask("job-x", "e", FINISHED);
 
-    expectGetInactiveTasks(a, b, c, d, e);
     expectOneTaskWatch(taskATimestamp);
     expectOneTaskWatch(taskBTimestamp);
     expectOneTaskWatch(taskCTimestamp);
@@ -176,7 +160,9 @@ public class HistoryPrunerTest extends EasyMockTest {
 
     control.replay();
 
-    pruner.storageStarted(new StorageStarted());
+    for (IScheduledTask task : ImmutableList.of(a, b, c, d, e)) {
+      pruner.recordStateChange(TaskStateChange.initialized(task));
+    }
 
     // Clean-up
     pruner.tasksDeleted(new TasksDeleted(ImmutableSet.of(c, d, e)));
@@ -243,13 +229,12 @@ public class HistoryPrunerTest extends EasyMockTest {
   public void testTasksDeleted() {
     IScheduledTask a = makeTask("a", FINISHED);
     IScheduledTask b = makeTask("b", FINISHED);
-    expectGetInactiveTasks(a);
     expectDefaultTaskWatch();
     expectCancelFuture();
 
     control.replay();
 
-    pruner.storageStarted(new StorageStarted());
+    pruner.recordStateChange(TaskStateChange.initialized(a));
 
     // Cancels existing future for task 'a'
     pruner.tasksDeleted(new TasksDeleted(ImmutableSet.of(a)));
@@ -315,7 +300,6 @@ public class HistoryPrunerTest extends EasyMockTest {
             .build());
     return new HistoryPruner(
         realExecutor,
-        storageUtil.storage,
         stateManager,
         clock,
         Amount.of(1L, Time.MILLISECONDS),
@@ -392,15 +376,10 @@ public class HistoryPrunerTest extends EasyMockTest {
 
   private IScheduledTask changeState(String taskId, ScheduleStatus from, ScheduleStatus to) {
     IScheduledTask task = makeTask(taskId, to);
-    pruner.recordStateChange(new PubsubEvent.TaskStateChange(task, from));
+    pruner.recordStateChange(TaskStateChange.transition(task, from));
     return task;
   }
 
-  private void expectGetInactiveTasks(IScheduledTask... tasks) {
-    expect(storageUtil.taskStore.fetchTasks(HistoryPruner.INACTIVE_QUERY))
-        .andReturn(ImmutableSet.copyOf(tasks));
-  }
-
   private IScheduledTask makeTask(
       String job,
       String taskId,

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d0b46fc3/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
index 4c17176..35dd666 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
@@ -21,7 +21,6 @@ import com.google.common.collect.ImmutableSet;
 import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
-import com.twitter.common.base.Closure;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Time;
 import com.twitter.common.testing.easymock.EasyMockTest;
@@ -33,7 +32,8 @@ import org.apache.aurora.gen.Identity;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.events.EventSink;
+import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.state.PubsubTestUtil;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.state.TaskAssigner;
@@ -62,7 +62,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
   private Preemptor preemptor;
   private Amount<Long, Time> reservationDuration;
   private Amount<Long, Time> halfReservationDuration;
-  private Closure<PubsubEvent> eventSink;
+  private EventSink eventSink;
 
   @Before
   public void setUp() throws Exception {
@@ -163,7 +163,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
     assertEquals(scheduler.schedule("a"), TaskScheduler.TaskSchedulerResult.TRY_AGAIN);
     assertEquals(scheduler.schedule("a"), TaskScheduler.TaskSchedulerResult.SUCCESS);
     firstAssignment.getValue().apply(offerA);
-    eventSink.execute(new PubsubEvent.TaskStateChange(taskA, PENDING));
+    eventSink.post(TaskStateChange.transition(taskA, PENDING));
     clock.advance(halfReservationDuration);
     assertEquals(scheduler.schedule("b"), TaskScheduler.TaskSchedulerResult.SUCCESS);
     secondAssignment.getValue().apply(offerA);
@@ -222,7 +222,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
     assertEquals(scheduler.schedule("a"), TaskScheduler.TaskSchedulerResult.TRY_AGAIN);
     clock.advance(halfReservationDuration);
     // Task is killed by user before it is scheduled
-    eventSink.execute(new PubsubEvent.TaskStateChange(taskA, PENDING));
+    eventSink.post(TaskStateChange.transition(taskA, PENDING));
     assertEquals(scheduler.schedule("b"), TaskScheduler.TaskSchedulerResult.SUCCESS);
     assignment.getValue().apply(offerA);
   }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d0b46fc3/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
index 8bc15a4..9698f28 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
@@ -24,6 +24,7 @@ import javax.annotation.Nullable;
 
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.util.concurrent.RateLimiter;
 import com.twitter.common.quantity.Amount;
@@ -48,7 +49,6 @@ import org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.PubsubEvent.HostMaintenanceStateChange;
-import org.apache.aurora.scheduler.events.PubsubEvent.StorageStarted;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
 import org.apache.aurora.scheduler.state.MaintenanceController;
@@ -144,7 +144,6 @@ public class TaskSchedulerTest extends EasyMockTest {
         clock);
     taskGroups = new TaskGroups(
         executor,
-        storage,
         retryStrategy,
         rateLimiter,
         scheduler,
@@ -185,7 +184,7 @@ public class TaskSchedulerTest extends EasyMockTest {
         }
       }
     });
-    taskGroups.taskChangedState(new TaskStateChange(copy, oldState));
+    taskGroups.taskChangedState(TaskStateChange.transition(copy, oldState));
   }
 
   private Capture<Runnable> expectTaskRetryIn(long penaltyMs) {
@@ -253,16 +252,17 @@ public class TaskSchedulerTest extends EasyMockTest {
 
     replayAndCreateScheduler();
 
+    final IScheduledTask a = makeTask("a", KILLED);
+    final IScheduledTask b = makeTask("b", PENDING);
     final IScheduledTask c = makeTask("c", RUNNING);
     storage.write(new MutateWork.NoResult.Quiet() {
       @Override protected void execute(MutableStoreProvider store) {
-        store.getUnsafeTaskStore().saveTasks(ImmutableSet.of(
-            makeTask("a", KILLED),
-            makeTask("b", PENDING),
-            c));
+        store.getUnsafeTaskStore().saveTasks(ImmutableSet.of(a, b, c));
       }
     });
-    taskGroups.storageStarted(new StorageStarted());
+    for (IScheduledTask task : ImmutableList.of(a, b, c)) {
+      taskGroups.taskChangedState(TaskStateChange.initialized(task));
+    }
     changeState(c, RUNNING, FINISHED);
   }
 
@@ -272,7 +272,7 @@ public class TaskSchedulerTest extends EasyMockTest {
 
     replayAndCreateScheduler();
 
-    taskGroups.taskChangedState(new TaskStateChange(makeTask("a", PENDING), INIT));
+    taskGroups.taskChangedState(TaskStateChange.transition(makeTask("a", PENDING), INIT));
     timeoutCapture.getValue().run();
   }
 
@@ -581,7 +581,8 @@ public class TaskSchedulerTest extends EasyMockTest {
     // Ensure the offer was consumed.
     changeState(task, INIT, PENDING);
     storage.write(new MutateWork.NoResult.Quiet() {
-      @Override protected void execute(MutableStoreProvider storeProvider) {
+      @Override
+      protected void execute(MutableStoreProvider storeProvider) {
         storeProvider.getUnsafeTaskStore().deleteTasks(Tasks.ids(task));
       }
     });

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d0b46fc3/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java
index 375b6c2..023905b 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java
@@ -36,11 +36,9 @@ import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.gen.TaskEvent;
 import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.events.PubsubEvent.StorageStarted;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
 import org.easymock.IExpectationSetters;
@@ -73,7 +71,6 @@ public class TaskTimeoutTest extends EasyMockTest {
   private Capture<Supplier<Number>> stateCountCapture;
   private Map<ScheduleStatus, Capture<Supplier<Number>>> stateCaptures;
 
-  private StorageTestUtil storageUtil;
   private ScheduledExecutorService executor;
   private ScheduledFuture<?> future;
   private StateManager stateManager;
@@ -83,8 +80,6 @@ public class TaskTimeoutTest extends EasyMockTest {
 
   @Before
   public void setUp() {
-    storageUtil = new StorageTestUtil(this);
-    storageUtil.expectOperations();
     executor = createMock(ScheduledExecutorService.class);
     future = createMock(new Clazz<ScheduledFuture<?>>() { });
     stateManager = createMock(StateManager.class);
@@ -122,7 +117,6 @@ public class TaskTimeoutTest extends EasyMockTest {
   private void replayAndCreate() {
     control.replay();
     timeout = new TaskTimeout(
-        storageUtil.storage,
         executor,
         stateManager,
         clock,
@@ -152,7 +146,7 @@ public class TaskTimeoutTest extends EasyMockTest {
     IScheduledTask task = IScheduledTask.build(new ScheduledTask()
         .setStatus(to)
         .setAssignedTask(new AssignedTask().setTaskId(taskId)));
-    timeout.recordStateChange(new TaskStateChange(task, from));
+    timeout.recordStateChange(TaskStateChange.transition(task, from));
   }
 
   private void changeState(ScheduleStatus from, ScheduleStatus to) {
@@ -234,13 +228,7 @@ public class TaskTimeoutTest extends EasyMockTest {
 
   @Test
   public void testStorageStart() {
-    clock.setNowMillis(TIMEOUT_MS * 2);
-    storageUtil.expectTaskFetch(
-        TaskTimeout.TRANSIENT_QUERY,
-        makeTask("a", ASSIGNED, 0),
-        makeTask("b", KILLING, TIMEOUT_MS),
-        makeTask("c", PREEMPTING, TIMEOUT_MS * 3) /* In the future */
-    );
+
     expectTaskWatch(TIMEOUT_MS);
     expectTaskWatch(TIMEOUT_MS);
     expectTaskWatch(TIMEOUT_MS);
@@ -248,24 +236,18 @@ public class TaskTimeoutTest extends EasyMockTest {
 
     replayAndCreate();
 
-    timeout.storageStarted(new StorageStarted());
-    changeState("a", ASSIGNED, RUNNING);
-    changeState("b", KILLING, KILLED);
-    changeState("c", PREEMPTING, FINISHED);
-  }
-
-  @Test
-  public void testStorageStartTwice() {
-    // This should never happen, but testing that the class handles it gracefully.
-    storageUtil.expectTaskFetch(TaskTimeout.TRANSIENT_QUERY, makeTask("a", ASSIGNED, 0)).times(2);
-    expectTaskWatch();
-    expectCancel();
+    clock.setNowMillis(TIMEOUT_MS * 2);
+    for (IScheduledTask task : ImmutableList.of(
+        makeTask("a", ASSIGNED, 0),
+        makeTask("b", KILLING, TIMEOUT_MS),
+        makeTask("c", PREEMPTING, clock.nowMillis() + TIMEOUT_MS))) {
 
-    replayAndCreate();
+      timeout.recordStateChange(TaskStateChange.initialized(task));
+    }
 
-    timeout.storageStarted(new StorageStarted());
-    timeout.storageStarted(new StorageStarted());
     changeState("a", ASSIGNED, RUNNING);
+    changeState("b", KILLING, KILLED);
+    changeState("c", PREEMPTING, FINISHED);
   }
 
   private void checkOutstandingTimer(ScheduleStatus status, long expectedValue) {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d0b46fc3/src/test/java/org/apache/aurora/scheduler/events/NotifyingMethodInterceptorTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/events/NotifyingMethodInterceptorTest.java b/src/test/java/org/apache/aurora/scheduler/events/NotifyingMethodInterceptorTest.java
deleted file mode 100644
index a38d208..0000000
--- a/src/test/java/org/apache/aurora/scheduler/events/NotifyingMethodInterceptorTest.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Copyright 2013 Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.events;
-
-import javax.inject.Singleton;
-
-import com.google.inject.AbstractModule;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-import com.google.inject.TypeLiteral;
-import com.google.inject.matcher.Matchers;
-import com.twitter.common.base.Closure;
-import com.twitter.common.testing.easymock.EasyMockTest;
-
-import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered;
-import org.apache.aurora.scheduler.events.PubsubEvent.Interceptors.Event;
-import org.apache.aurora.scheduler.events.PubsubEvent.Interceptors.SendNotification;
-import org.apache.aurora.scheduler.events.PubsubEvent.StorageStarted;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-public class NotifyingMethodInterceptorTest extends EasyMockTest {
-
-  private Closure<PubsubEvent> eventSink;
-  private NotifyingMethodInterceptor interceptor;
-
-  @Before
-  public void setUp() throws Exception {
-    eventSink = createMock(new Clazz<Closure<PubsubEvent>>() { });
-    Injector injector = Guice.createInjector(new AbstractModule() {
-      @Override protected void configure() {
-        bind(new TypeLiteral<Closure<PubsubEvent>>() { }).toInstance(eventSink);
-        NotifyingMethodInterceptor bound = new NotifyingMethodInterceptor();
-        bind(NotifyingMethodInterceptor.class).toInstance(bound);
-        requestInjection(bound);
-      }
-    });
-    interceptor = injector.getInstance(NotifyingMethodInterceptor.class);
-  }
-
-  @Test
-  public void testNotifications() {
-    eventSink.execute(new DriverRegistered());
-    eventSink.execute(new StorageStarted());
-    eventSink.execute(new DriverRegistered());
-
-    control.replay();
-
-    Injector injector = Guice.createInjector(new AbstractModule() {
-      @Override protected void configure() {
-        bind(Math.class).in(Singleton.class);
-        bindInterceptor(
-            Matchers.any(),
-            Matchers.annotatedWith(SendNotification.class),
-            interceptor);
-      }
-    });
-
-    Math math = injector.getInstance(Math.class);
-    assertEquals(4, math.add(2, 2));
-    assertEquals(0, math.subtract(2, 2));
-    assertEquals(4, math.multiply(2, 2));
-    assertEquals(1, math.divide(2, 2));
-  }
-
-  static class Math {
-    @SendNotification(before = Event.DriverRegistered, after = Event.StorageStarted)
-    int add(int a, int b) {
-      return a + b;
-    }
-
-    @SendNotification(after = Event.DriverRegistered)
-    int subtract(int a, int b) {
-      return a - b;
-    }
-
-    @SendNotification
-    int multiply(int a, int b) {
-      return a * b;
-    }
-
-    int divide(int a, int b) {
-      return a / b;
-    }
-  }
-}