You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2013/12/31 22:20:31 UTC

[38/51] [partial] Rename twitter* and com.twitter to apache and org.apache directories to preserve all file history before the refactor.

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java b/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
new file mode 100644
index 0000000..f90869d
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
@@ -0,0 +1,417 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.annotation.Nullable;
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.base.Supplier;
+import com.google.common.eventbus.Subscribe;
+import com.google.common.util.concurrent.Atomics;
+
+import org.apache.mesos.Protos;
+import org.apache.mesos.SchedulerDriver;
+
+import com.twitter.aurora.scheduler.events.PubsubEvent.DriverRegistered;
+import com.twitter.aurora.scheduler.events.PubsubEvent.EventSubscriber;
+import com.twitter.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import com.twitter.aurora.scheduler.storage.Storage.MutateWork;
+import com.twitter.aurora.scheduler.storage.Storage.NonVolatileStorage;
+import com.twitter.aurora.scheduler.storage.Storage.StoreProvider;
+import com.twitter.aurora.scheduler.storage.Storage.Work;
+import com.twitter.aurora.scheduler.storage.StorageBackfill;
+import com.twitter.common.application.Lifecycle;
+import com.twitter.common.base.Closure;
+import com.twitter.common.base.Closures;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.StatImpl;
+import com.twitter.common.stats.Stats;
+import com.twitter.common.util.Clock;
+import com.twitter.common.util.StateMachine;
+import com.twitter.common.util.StateMachine.Transition;
+import com.twitter.common.zookeeper.Group.JoinException;
+import com.twitter.common.zookeeper.ServerSet;
+import com.twitter.common.zookeeper.SingletonService.LeaderControl;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import static com.twitter.common.zookeeper.SingletonService.LeadershipListener;
+
+/**
+ * The central driver of the scheduler runtime lifecycle.  Handles the transitions from startup and
+ * initialization through acting as a standby scheduler / log replica and finally to becoming the
+ * scheduler leader.
+ * <p>
+ * The (enforced) call order to be used with this class is:
+ * <ol>
+ *   <li>{@link #prepare()}, to initialize the storage system.</li>
+ *   <li>{@link LeadershipListener#onLeading(LeaderControl) onLeading()} on the
+ *       {@link LeadershipListener LeadershipListener}
+ *       returned from {@link #prepare()}, signaling that this process has exclusive control of the
+ *       cluster.</li>
+ *   <li>{@link #registered(DriverRegistered) registered()},
+ *       indicating that registration with the mesos master has succeeded.
+ *       At this point, the scheduler's presence will be announced via
+ *       {@link LeaderControl#advertise() advertise()}.</li>
+ * </ol>
+ * If this call order is broken, calls will fail by throwing
+ * {@link java.lang.IllegalStateException}.
+ * <p>
+ * At any point in the lifecycle, the scheduler will respond to
+ * {@link LeadershipListener#onDefeated(com.twitter.common.zookeeper.ServerSet.EndpointStatus)
+ * onDefeated()} by initiating a clean shutdown using {@link Lifecycle#shutdown() shutdown()}.
+ * A clean shutdown will also be initiated if control actions fail during normal state transitions.
+ */
+public class SchedulerLifecycle implements EventSubscriber {
+
+  private static final Logger LOG = Logger.getLogger(SchedulerLifecycle.class.getName());
+
+  private enum State {
+    IDLE,
+    PREPARING_STORAGE,
+    STORAGE_PREPARED,
+    LEADER_AWAITING_REGISTRATION,
+    REGISTERED_LEADER,
+    RUNNING,
+    DEAD
+  }
+
+  private static final Predicate<Transition<State>> IS_DEAD = new Predicate<Transition<State>>() {
+    @Override public boolean apply(Transition<State> state) {
+      return state.getTo() == State.DEAD;
+    }
+  };
+
+  private static final Predicate<Transition<State>> NOT_DEAD = Predicates.not(IS_DEAD);
+
+  private final LeadershipListener leadershipListener;
+  private final AtomicBoolean registrationAcked = new AtomicBoolean(false);
+  private final AtomicReference<LeaderControl> leaderControl = Atomics.newReference();
+  private final StateMachine<State> stateMachine;
+
+  @Inject
+  SchedulerLifecycle(
+      final DriverFactory driverFactory,
+      final NonVolatileStorage storage,
+      final Lifecycle lifecycle,
+      final Driver driver,
+      final DriverReference driverRef,
+      final LeadingOptions leadingOptions,
+      final ScheduledExecutorService executorService,
+      final Clock clock) {
+
+    this(
+        driverFactory,
+        storage,
+        lifecycle,
+        driver,
+        driverRef,
+        new DelayedActions() {
+          @Override public void blockingDriverJoin(Runnable runnable) {
+            executorService.execute(runnable);
+          }
+
+          @Override public void onAutoFailover(Runnable runnable) {
+            executorService.schedule(
+                runnable,
+                leadingOptions.leadingTimeLimit.getValue(),
+                leadingOptions.leadingTimeLimit.getUnit().getTimeUnit());
+          }
+
+          @Override public void onRegistrationTimeout(Runnable runnable) {
+            LOG.info(
+                "Giving up on registration in " + leadingOptions.registrationDelayLimit);
+            executorService.schedule(
+                runnable,
+                leadingOptions.registrationDelayLimit.getValue(),
+                leadingOptions.registrationDelayLimit.getUnit().getTimeUnit());
+          }
+        },
+        clock);
+  }
+
+  @VisibleForTesting
+  SchedulerLifecycle(
+      final DriverFactory driverFactory,
+      final NonVolatileStorage storage,
+      final Lifecycle lifecycle,
+      // TODO(wfarner): The presence of Driver and DriverReference is quite confusing.  Figure out
+      //                a clean way to collapse the duties of DriverReference into DriverImpl.
+      final Driver driver,
+      final DriverReference driverRef,
+      final DelayedActions delayedActions,
+      final Clock clock) {
+
+    Stats.export(new StatImpl<Integer>("framework_registered") {
+      @Override public Integer read() {
+        return registrationAcked.get() ? 1 : 0;
+      }
+    });
+    for (final State state : State.values()) {
+      Stats.export(new StatImpl<Integer>("scheduler_lifecycle_" + state) {
+        @Override public Integer read() {
+          return (state == stateMachine.getState()) ? 1 : 0;
+        }
+      });
+    }
+
+    final Closure<Transition<State>> prepareStorage = new Closure<Transition<State>>() {
+      @Override public void execute(Transition<State> transition) {
+        try {
+          storage.prepare();
+          stateMachine.transition(State.STORAGE_PREPARED);
+        } catch (RuntimeException e) {
+          stateMachine.transition(State.DEAD);
+          throw e;
+        }
+      }
+    };
+
+    final Closure<Transition<State>> handleLeading = new Closure<Transition<State>>() {
+      @Override public void execute(Transition<State> transition) {
+        LOG.info("Elected as leading scheduler!");
+        storage.start(new MutateWork.NoResult.Quiet() {
+          @Override protected void execute(MutableStoreProvider storeProvider) {
+            StorageBackfill.backfill(storeProvider, clock);
+          }
+        });
+
+        @Nullable final String frameworkId = storage.consistentRead(
+            new Work.Quiet<String>() {
+              @Override public String apply(StoreProvider storeProvider) {
+                return storeProvider.getSchedulerStore().fetchFrameworkId();
+              }
+            });
+        driverRef.set(driverFactory.apply(frameworkId));
+
+        delayedActions.onRegistrationTimeout(
+            new Runnable() {
+              @Override public void run() {
+                if (!registrationAcked.get()) {
+                  LOG.severe(
+                      "Framework has not been registered within the tolerated delay.");
+                  stateMachine.transition(State.DEAD);
+                }
+              }
+            });
+
+        delayedActions.onAutoFailover(
+            new Runnable() {
+              @Override public void run() {
+                LOG.info("Triggering automatic failover.");
+                stateMachine.transition(State.DEAD);
+              }
+            });
+
+        Protos.Status status = driver.start();
+        LOG.info("Driver started with code " + status);
+        delayedActions.blockingDriverJoin(new Runnable() {
+          @Override public void run() {
+            // Blocks until driver exits.
+            driver.join();
+            stateMachine.transition(State.DEAD);
+          }
+        });
+      }
+    };
+
+    final Closure<Transition<State>> handleRegistered = new Closure<Transition<State>>() {
+      @Override public void execute(Transition<State> transition) {
+        registrationAcked.set(true);
+        try {
+          leaderControl.get().advertise();
+        } catch (JoinException e) {
+          LOG.log(Level.SEVERE, "Failed to advertise leader, shutting down.", e);
+          stateMachine.transition(State.DEAD);
+        } catch (InterruptedException e) {
+          LOG.log(Level.SEVERE, "Interrupted while advertising leader, shutting down.", e);
+          stateMachine.transition(State.DEAD);
+          Thread.currentThread().interrupt();
+        }
+      }
+    };
+
+    final Closure<Transition<State>> shutDown = new Closure<Transition<State>>() {
+      private final AtomicBoolean invoked = new AtomicBoolean(false);
+      @Override public void execute(Transition<State> transition) {
+        if (!invoked.compareAndSet(false, true)) {
+          LOG.info("Shutdown already invoked, ignoring extra call.");
+          return;
+        }
+
+        // TODO(wfarner): Consider using something like guava's Closer to abstractly tear down
+        // resources here.
+        try {
+          LeaderControl control = leaderControl.get();
+          if (control != null) {
+            try {
+              control.leave();
+            } catch (JoinException e) {
+              LOG.log(Level.WARNING, "Failed to leave leadership: " + e, e);
+            } catch (ServerSet.UpdateException e) {
+              LOG.log(Level.WARNING, "Failed to leave server set: " + e, e);
+            }
+          }
+
+          // TODO(wfarner): Re-evaluate tear-down ordering here.  Should the top-level shutdown
+          // be invoked first, or the underlying critical components?
+          driver.stop();
+          storage.stop();
+        } finally {
+          lifecycle.shutdown();
+        }
+      }
+    };
+
+    stateMachine = StateMachine.<State>builder("SchedulerLifecycle")
+        .initialState(State.IDLE)
+        .logTransitions()
+        .addState(
+            Closures.filter(NOT_DEAD, prepareStorage),
+            State.IDLE,
+            State.PREPARING_STORAGE, State.DEAD)
+        .addState(
+            State.PREPARING_STORAGE,
+            State.STORAGE_PREPARED, State.DEAD)
+        .addState(
+            Closures.filter(NOT_DEAD, handleLeading),
+            State.STORAGE_PREPARED,
+            State.LEADER_AWAITING_REGISTRATION, State.DEAD)
+        .addState(
+            Closures.filter(NOT_DEAD, handleRegistered),
+            State.LEADER_AWAITING_REGISTRATION,
+            State.REGISTERED_LEADER, State.DEAD)
+        .addState(
+            State.REGISTERED_LEADER,
+            State.RUNNING, State.DEAD)
+        .addState(
+            State.RUNNING,
+            State.DEAD)
+        .addState(
+            State.DEAD,
+            // Allow cycles in DEAD to prevent throwing and avoid the need for call-site checking.
+            State.DEAD
+        )
+        .onAnyTransition(
+            Closures.filter(IS_DEAD, shutDown))
+        .build();
+
+    this.leadershipListener = new SchedulerCandidateImpl(stateMachine, leaderControl);
+  }
+
+  /**
+   * Prepares a scheduler to offer itself as a leader candidate.  After this call the scheduler will
+   * host a live log replica and start syncing data from the leader via the log until it gets called
+   * upon to lead.
+   *
+   * @return A listener that can be offered for leadership of a distributed election.
+   */
+  public LeadershipListener prepare() {
+    stateMachine.transition(State.PREPARING_STORAGE);
+    return leadershipListener;
+  }
+
+  @Subscribe
+  public void registered(DriverRegistered event) {
+    stateMachine.transition(State.REGISTERED_LEADER);
+  }
+
+  /**
+   * Maintains a reference to the driver.
+   */
+  static class DriverReference implements Supplier<Optional<SchedulerDriver>> {
+    private final AtomicReference<SchedulerDriver> driver = Atomics.newReference();
+
+    @Override public Optional<SchedulerDriver> get() {
+      return Optional.fromNullable(driver.get());
+    }
+
+    private void set(SchedulerDriver ref) {
+      driver.set(ref);
+    }
+  }
+
+  private static class SchedulerCandidateImpl implements LeadershipListener {
+    private final StateMachine<State> stateMachine;
+    private final AtomicReference<LeaderControl> leaderControl;
+
+    SchedulerCandidateImpl(
+        StateMachine<State> stateMachine,
+        AtomicReference<LeaderControl> leaderControl) {
+
+      this.stateMachine = stateMachine;
+      this.leaderControl = leaderControl;
+    }
+
+    @Override public void onLeading(LeaderControl control) {
+      leaderControl.set(control);
+      stateMachine.transition(State.LEADER_AWAITING_REGISTRATION);
+    }
+
+    @Override public void onDefeated(@Nullable ServerSet.EndpointStatus status) {
+      LOG.severe("Lost leadership, committing suicide.");
+      stateMachine.transition(State.DEAD);
+    }
+  }
+
+  public static class LeadingOptions {
+    private final Amount<Long, Time> registrationDelayLimit;
+    private final Amount<Long, Time> leadingTimeLimit;
+
+    /**
+     * Creates a new collection of options for tuning leadership behavior.
+     *
+     * @param registrationDelayLimit Maximum amount of time to wait for framework registration to
+     *                               complete.
+     * @param leadingTimeLimit Maximum amount of time to serve as leader before abdicating.
+     */
+    public LeadingOptions(
+        Amount<Long, Time> registrationDelayLimit,
+        Amount<Long, Time> leadingTimeLimit) {
+
+      Preconditions.checkArgument(
+          registrationDelayLimit.getValue() >= 0,
+          "Registration delay limit must be positive.");
+      Preconditions.checkArgument(
+          leadingTimeLimit.getValue() >= 0,
+          "Leading time limit must be positive.");
+
+      this.registrationDelayLimit = checkNotNull(registrationDelayLimit);
+      this.leadingTimeLimit = checkNotNull(leadingTimeLimit);
+    }
+  }
+
+  @VisibleForTesting
+  interface DelayedActions {
+    void blockingDriverJoin(Runnable runnable);
+
+    void onAutoFailover(Runnable runnable);
+
+    void onRegistrationTimeout(Runnable runnable);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java b/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
new file mode 100644
index 0000000..be4c2b1
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
@@ -0,0 +1,115 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler;
+
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import javax.inject.Singleton;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.inject.AbstractModule;
+import com.google.inject.PrivateModule;
+import com.google.inject.Provides;
+import com.google.inject.TypeLiteral;
+
+import org.apache.mesos.Scheduler;
+import org.apache.mesos.SchedulerDriver;
+
+import com.twitter.aurora.scheduler.Driver.DriverImpl;
+import com.twitter.aurora.scheduler.SchedulerLifecycle.DriverReference;
+import com.twitter.aurora.scheduler.SchedulerLifecycle.LeadingOptions;
+import com.twitter.aurora.scheduler.TaskIdGenerator.TaskIdGeneratorImpl;
+import com.twitter.aurora.scheduler.events.PubsubEventModule;
+import com.twitter.aurora.scheduler.periodic.GcExecutorLauncher;
+import com.twitter.aurora.scheduler.periodic.GcExecutorLauncher.GcExecutorSettings;
+import com.twitter.common.args.Arg;
+import com.twitter.common.args.CmdLine;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+
+/**
+ * Binding module for top-level scheduling logic.
+ */
+public class SchedulerModule extends AbstractModule {
+
+  @CmdLine(name = "executor_gc_interval",
+      help = "Max interval on which to run the GC executor on a host to clean up dead tasks.")
+  private static final Arg<Amount<Long, Time>> EXECUTOR_GC_INTERVAL =
+      Arg.create(Amount.of(1L, Time.HOURS));
+
+  @CmdLine(name = "gc_executor_path", help = "Path to the gc executor launch script.")
+  private static final Arg<String> GC_EXECUTOR_PATH = Arg.create(null);
+
+  @CmdLine(name = "max_registration_delay",
+      help = "Max allowable delay to allow the driver to register before aborting")
+  private static final Arg<Amount<Long, Time>> MAX_REGISTRATION_DELAY =
+      Arg.create(Amount.of(1L, Time.MINUTES));
+
+  @CmdLine(name = "max_leading_duration",
+      help = "After leading for this duration, the scheduler should commit suicide.")
+  private static final Arg<Amount<Long, Time>> MAX_LEADING_DURATION =
+      Arg.create(Amount.of(1L, Time.DAYS));
+
+  @Override
+  protected void configure() {
+    bind(Driver.class).to(DriverImpl.class);
+    bind(DriverImpl.class).in(Singleton.class);
+    bind(new TypeLiteral<Supplier<Optional<SchedulerDriver>>>() { }).to(DriverReference.class);
+    bind(DriverReference.class).in(Singleton.class);
+
+    bind(Scheduler.class).to(MesosSchedulerImpl.class);
+    bind(MesosSchedulerImpl.class).in(Singleton.class);
+
+    bind(TaskIdGenerator.class).to(TaskIdGeneratorImpl.class);
+
+    bind(GcExecutorSettings.class).toInstance(new GcExecutorSettings(
+        EXECUTOR_GC_INTERVAL.get(),
+        Optional.fromNullable(GC_EXECUTOR_PATH.get())));
+
+    bind(GcExecutorLauncher.class).in(Singleton.class);
+    bind(UserTaskLauncher.class).in(Singleton.class);
+
+    install(new PrivateModule() {
+      @Override protected void configure() {
+        bind(LeadingOptions.class).toInstance(
+            new LeadingOptions(MAX_REGISTRATION_DELAY.get(), MAX_LEADING_DURATION.get()));
+          final ScheduledExecutorService executor = Executors.newScheduledThreadPool(
+              1,
+              new ThreadFactoryBuilder().setNameFormat("Lifecycle-%d").setDaemon(true).build());
+        bind(ScheduledExecutorService.class).toInstance(executor);
+        bind(SchedulerLifecycle.class).in(Singleton.class);
+        expose(SchedulerLifecycle.class);
+      }
+    });
+
+    PubsubEventModule.bindSubscriber(binder(), SchedulerLifecycle.class);
+    PubsubEventModule.bindSubscriber(binder(), TaskVars.class);
+  }
+
+  @Provides
+  @Singleton
+  List<TaskLauncher> provideTaskLaunchers(
+      GcExecutorLauncher gcLauncher,
+      UserTaskLauncher userTaskLauncher) {
+
+    return ImmutableList.of(gcLauncher, userTaskLauncher);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/TaskIdGenerator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/TaskIdGenerator.java b/src/main/java/org/apache/aurora/scheduler/TaskIdGenerator.java
new file mode 100644
index 0000000..240649e
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/TaskIdGenerator.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler;
+
+import java.util.UUID;
+
+import javax.inject.Inject;
+
+import com.google.common.base.Preconditions;
+
+import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
+import com.twitter.common.util.Clock;
+
+/**
+ * A function that generates universally-unique (not guaranteed, but highly confident) task IDs.
+ */
+public interface TaskIdGenerator {
+
+  /**
+   * Generates a universally-unique ID for the task.  This is not necessarily a repeatable
+   * operation, two subsequent invocations with the same object need not return the same value.
+   *
+   * @param task Configuration of the task to create an ID for.
+   * @param instanceId Instance ID for the task.
+   * @return A universally-unique ID for the task.
+   */
+  String generate(ITaskConfig task, int instanceId);
+
+  class TaskIdGeneratorImpl implements TaskIdGenerator {
+    private final Clock clock;
+
+    @Inject
+    TaskIdGeneratorImpl(Clock clock) {
+      this.clock = Preconditions.checkNotNull(clock);
+    }
+
+    @Override
+    public String generate(ITaskConfig task, int instanceId) {
+      String sep = "-";
+      return new StringBuilder()
+          .append(clock.nowMillis())               // Allows chronological sorting.
+          .append(sep)
+          .append(task.getOwner().getRole())       // Identification and collision prevention.
+          .append(sep)
+          .append(task.getEnvironment())
+          .append(sep)
+          .append(task.getJobName())
+          .append(sep)
+          .append(instanceId)                      // Collision prevention within job.
+          .append(sep)
+          .append(UUID.randomUUID())               // Just-in-case collision prevention.
+          .toString().replaceAll("[^\\w-]", sep);  // Constrain character set.
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/TaskLauncher.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/TaskLauncher.java b/src/main/java/org/apache/aurora/scheduler/TaskLauncher.java
new file mode 100644
index 0000000..aade6da
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/TaskLauncher.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler;
+
+import com.google.common.base.Optional;
+
+import org.apache.mesos.Protos.Offer;
+import org.apache.mesos.Protos.OfferID;
+import org.apache.mesos.Protos.TaskInfo;
+import org.apache.mesos.Protos.TaskStatus;
+
+/**
+ * A receiver of resource offers and task status updates.
+ */
+public interface TaskLauncher {
+
+  /**
+   * Grants a resource offer to the task launcher, which will be passed to any subsequent task
+   * launchers if this one does not accept.
+   * <p>
+   * A task launcher may choose to retain an offer for later use.  Any retained offers must be
+   * cleaned up with {@link #cancelOffer(OfferID)}.
+   *
+   * @param offer The resource offer.
+   * @return A task, absent if the launcher chooses not to accept the offer.
+   */
+  Optional<TaskInfo> createTask(Offer offer);
+
+  /**
+   * Informs the launcher that a status update has been received for a task.  If the task is not
+   * associated with the launcher, it should return {@code false} so that another launcher may
+   * receive it.
+   *
+   * @param status The status update.
+   * @return {@code true} if the status is relevant to the launcher and should not be delivered to
+   * other launchers, {@code false} otherwise.
+   */
+  boolean statusUpdate(TaskStatus status);
+
+  /**
+   * Informs the launcher that a previously-advertised offer is canceled and may not be used.
+   *
+   * @param offer The canceled offer.
+   */
+  void cancelOffer(OfferID offer);
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/TaskVars.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/TaskVars.java b/src/main/java/org/apache/aurora/scheduler/TaskVars.java
new file mode 100644
index 0000000..5574631
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/TaskVars.java
@@ -0,0 +1,173 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Iterables;
+import com.google.common.eventbus.Subscribe;
+
+import com.twitter.aurora.gen.Attribute;
+import com.twitter.aurora.gen.ScheduleStatus;
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.events.PubsubEvent.EventSubscriber;
+import com.twitter.aurora.scheduler.events.PubsubEvent.StorageStarted;
+import com.twitter.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+import com.twitter.aurora.scheduler.events.PubsubEvent.TasksDeleted;
+import com.twitter.aurora.scheduler.storage.AttributeStore;
+import com.twitter.aurora.scheduler.storage.Storage;
+import com.twitter.aurora.scheduler.storage.Storage.StoreProvider;
+import com.twitter.aurora.scheduler.storage.Storage.Work;
+import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
+import com.twitter.common.stats.StatsProvider;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A container that tracks and exports stat counters for tasks.
+ */
+class TaskVars implements EventSubscriber {
+  private static final Logger LOG = Logger.getLogger(TaskVars.class.getName());
+
+  // Used to ignore pubsub events sent before storage has completely started.  This avoids a
+  // miscount where a StorageStarted consumer is invoked before storageStarted is invoked here,
+  // and pubsub events are fired for tasks that we have not yet counted.  For example, if
+  // tasksDeleted is invoked, we would end up with a negative count.
+  private volatile boolean storageStarted = false;
+
+  private final LoadingCache<String, AtomicLong> countersByStatus;
+  private final LoadingCache<String, AtomicLong> countersByRack;
+
+  private final Storage storage;
+
+  @Inject
+  TaskVars(Storage storage, final StatsProvider statProvider) {
+    this.storage = checkNotNull(storage);
+    checkNotNull(statProvider);
+    countersByStatus = CacheBuilder.newBuilder().build(new CacheLoader<String, AtomicLong>() {
+      @Override public AtomicLong load(String statName) {
+        return statProvider.makeCounter(statName);
+      }
+    });
+    countersByRack = CacheBuilder.newBuilder().build(new CacheLoader<String, AtomicLong>() {
+      @Override public AtomicLong load(String rack) {
+        return statProvider.makeCounter(rackStatName(rack));
+      }
+    });
+  }
+
+  @VisibleForTesting
+  static String getVarName(ScheduleStatus status) {
+    return "task_store_" + status;
+  }
+
+  @VisibleForTesting
+  static String rackStatName(String rack) {
+    return "tasks_lost_rack_" + rack;
+  }
+
+  private static final Predicate<Attribute> IS_RACK = new Predicate<Attribute>() {
+    @Override public boolean apply(Attribute attr) {
+      return "rack".equals(attr.getName());
+    }
+  };
+
+  private static final Function<Attribute, String> ATTR_VALUE = new Function<Attribute, String>() {
+    @Override public String apply(Attribute attr) {
+      return Iterables.getOnlyElement(attr.getValues());
+    }
+  };
+
+  private AtomicLong getCounter(ScheduleStatus status) {
+    return countersByStatus.getUnchecked(getVarName(status));
+  }
+
+  private void incrementCount(ScheduleStatus status) {
+    getCounter(status).incrementAndGet();
+  }
+
+  private void decrementCount(ScheduleStatus status) {
+    getCounter(status).decrementAndGet();
+  }
+
+  @Subscribe
+  public void taskChangedState(TaskStateChange stateChange) {
+    if (!storageStarted) {
+      return;
+    }
+
+    IScheduledTask task = stateChange.getTask();
+    if (stateChange.getOldState() != ScheduleStatus.INIT) {
+      decrementCount(stateChange.getOldState());
+    }
+    incrementCount(task.getStatus());
+
+    if (stateChange.getNewState() == ScheduleStatus.LOST) {
+      final String host = stateChange.getTask().getAssignedTask().getSlaveHost();
+      Optional<String> rack = storage.consistentRead(new Work.Quiet<Optional<String>>() {
+        @Override public Optional<String> apply(StoreProvider storeProvider) {
+          Optional<Attribute> rack = FluentIterable
+              .from(AttributeStore.Util.attributesOrNone(storeProvider, host))
+              .firstMatch(IS_RACK);
+          return rack.transform(ATTR_VALUE);
+        }
+      });
+
+      if (rack.isPresent()) {
+        countersByRack.getUnchecked(rack.get()).incrementAndGet();
+      } else {
+        LOG.warning("Failed to find rack attribute associated with host " + host);
+      }
+    }
+  }
+
+  @Subscribe
+  public void storageStarted(StorageStarted event) {
+    for (IScheduledTask task : Storage.Util.consistentFetchTasks(storage, Query.unscoped())) {
+      incrementCount(task.getStatus());
+    }
+
+    // Dummy read the counter for each status counter. This is important to guarantee a stat with
+    // value zero is present for each state, even if all states are not represented in the task
+    // store.
+    for (ScheduleStatus status : ScheduleStatus.values()) {
+      getCounter(status);
+    }
+    storageStarted = true;
+  }
+
+  @Subscribe
+  public void tasksDeleted(final TasksDeleted event) {
+    if (!storageStarted) {
+      return;
+    }
+
+    for (IScheduledTask task : event.getTasks()) {
+      decrementCount(task.getStatus());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java b/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java
new file mode 100644
index 0000000..0fb3bbb
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.annotation.Nullable;
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+
+import org.apache.mesos.Protos.Offer;
+import org.apache.mesos.Protos.OfferID;
+import org.apache.mesos.Protos.TaskInfo;
+import org.apache.mesos.Protos.TaskStatus;
+
+import com.twitter.aurora.gen.ScheduleStatus;
+import com.twitter.aurora.scheduler.async.OfferQueue;
+import com.twitter.aurora.scheduler.base.Conversions;
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.base.SchedulerException;
+import com.twitter.aurora.scheduler.state.StateManager;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A task launcher that matches resource offers against user tasks.
+ */
+class UserTaskLauncher implements TaskLauncher {
+
+  private static final Logger LOG = Logger.getLogger(UserTaskLauncher.class.getName());
+
+  @VisibleForTesting
+  static final String MEMORY_LIMIT_EXCEEDED = "MEMORY STATISTICS";
+
+  @VisibleForTesting
+  static final String MEMORY_LIMIT_DISPLAY = "Task used more memory than requested.";
+
+  private final OfferQueue offerQueue;
+  private final StateManager stateManager;
+
+  @Inject
+  UserTaskLauncher(OfferQueue offerQueue, StateManager stateManager) {
+    this.offerQueue = checkNotNull(offerQueue);
+    this.stateManager = checkNotNull(stateManager);
+  }
+
+  @Override
+  public Optional<TaskInfo> createTask(Offer offer) {
+    checkNotNull(offer);
+
+    offerQueue.addOffer(offer);
+    return Optional.absent();
+  }
+
+  @Override
+  public synchronized boolean statusUpdate(TaskStatus status) {
+    @Nullable String message = null;
+    if (status.hasMessage()) {
+      message = status.getMessage();
+    }
+
+    try {
+      ScheduleStatus translatedState = Conversions.convertProtoState(status.getState());
+      // TODO(William Farner): Remove this hack once Mesos API change is done.
+      //                       Tracked by: https://issues.apache.org/jira/browse/MESOS-343
+      if ((translatedState == ScheduleStatus.FAILED)
+          && (message != null)
+          && (message.contains(MEMORY_LIMIT_EXCEEDED))) {
+        message = MEMORY_LIMIT_DISPLAY;
+      }
+
+      stateManager.changeState(
+          Query.taskScoped(status.getTaskId().getValue()),
+          translatedState,
+          Optional.fromNullable(message));
+    } catch (SchedulerException e) {
+      LOG.log(Level.WARNING, "Failed to update status for: " + status, e);
+      throw e;
+    }
+    return true;
+  }
+
+  @Override
+  public void cancelOffer(OfferID offer) {
+    offerQueue.cancelOffer(offer);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/app/AppModule.java b/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
new file mode 100644
index 0000000..24702b0
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
@@ -0,0 +1,184 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.app;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+import javax.inject.Singleton;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Iterables;
+import com.google.inject.AbstractModule;
+import com.google.inject.Key;
+import com.google.inject.Provides;
+
+import org.apache.mesos.Scheduler;
+import org.apache.zookeeper.data.ACL;
+
+import com.twitter.aurora.GuiceUtils;
+import com.twitter.aurora.scheduler.SchedulerModule;
+import com.twitter.aurora.scheduler.async.AsyncModule;
+import com.twitter.aurora.scheduler.events.PubsubEventModule;
+import com.twitter.aurora.scheduler.filter.SchedulingFilterImpl;
+import com.twitter.aurora.scheduler.http.ClusterName;
+import com.twitter.aurora.scheduler.http.ServletModule;
+import com.twitter.aurora.scheduler.metadata.MetadataModule;
+import com.twitter.aurora.scheduler.quota.QuotaModule;
+import com.twitter.aurora.scheduler.state.StateModule;
+import com.twitter.aurora.scheduler.stats.AsyncStatsModule;
+import com.twitter.common.application.ShutdownRegistry;
+import com.twitter.common.application.modules.LifecycleModule;
+import com.twitter.common.base.Command;
+import com.twitter.common.inject.TimedInterceptor;
+import com.twitter.common.net.pool.DynamicHostSet;
+import com.twitter.common.stats.Stats;
+import com.twitter.common.stats.StatsProvider;
+import com.twitter.common.util.Clock;
+import com.twitter.common.zookeeper.ServerSet;
+import com.twitter.common.zookeeper.ServerSetImpl;
+import com.twitter.common.zookeeper.SingletonService;
+import com.twitter.common.zookeeper.ZooKeeperClient;
+import com.twitter.common.zookeeper.ZooKeeperClient.Credentials;
+import com.twitter.common.zookeeper.ZooKeeperUtils;
+import com.twitter.common.zookeeper.guice.client.ZooKeeperClientModule.ClientConfig;
+import com.twitter.thrift.ServiceInstance;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import static com.twitter.common.base.MorePreconditions.checkNotBlank;
+
+/**
+ * Binding module for the aurora scheduler application.
+ */
+class AppModule extends AbstractModule {
+  private static final Logger LOG = Logger.getLogger(AppModule.class.getName());
+
+  private final String clusterName;
+  private final String serverSetPath;
+  private final ClientConfig zkClientConfig;
+
+  AppModule(String clusterName, String serverSetPath, ClientConfig zkClientConfig) {
+    this.clusterName = checkNotBlank(clusterName);
+    this.serverSetPath = checkNotBlank(serverSetPath);
+    this.zkClientConfig = checkNotNull(zkClientConfig);
+  }
+
+  @Override
+  protected void configure() {
+    // Enable intercepted method timings and context classloader repair.
+    TimedInterceptor.bind(binder());
+    GuiceUtils.bindJNIContextClassLoader(binder(), Scheduler.class);
+    GuiceUtils.bindExceptionTrap(binder(), Scheduler.class);
+
+    bind(Clock.class).toInstance(Clock.SYSTEM_CLOCK);
+
+    bind(Key.get(String.class, ClusterName.class)).toInstance(clusterName);
+
+    // Filter layering: notifier filter -> base impl
+    PubsubEventModule.bind(binder(), SchedulingFilterImpl.class);
+    bind(SchedulingFilterImpl.class).in(Singleton.class);
+
+    LifecycleModule.bindStartupAction(binder(), RegisterShutdownStackPrinter.class);
+
+    install(new AsyncModule());
+    install(new AsyncStatsModule());
+    install(new MetadataModule());
+    install(new QuotaModule());
+    install(new ServletModule());
+    install(new SchedulerModule());
+    install(new StateModule());
+
+    bind(StatsProvider.class).toInstance(Stats.STATS_PROVIDER);
+  }
+
+  /**
+   * Command to register a thread stack printer that identifies initiator of a shutdown.
+   */
+  private static class RegisterShutdownStackPrinter implements Command {
+    private static final Function<StackTraceElement, String> STACK_ELEM_TOSTRING =
+        new Function<StackTraceElement, String>() {
+          @Override public String apply(StackTraceElement element) {
+            return element.getClassName() + "." + element.getMethodName()
+                + String.format("(%s:%s)", element.getFileName(), element.getLineNumber());
+          }
+        };
+
+    private final ShutdownRegistry shutdownRegistry;
+
+    @Inject
+    RegisterShutdownStackPrinter(ShutdownRegistry shutdownRegistry) {
+      this.shutdownRegistry = shutdownRegistry;
+    }
+
+    @Override
+    public void execute() {
+      shutdownRegistry.addAction(new Command() {
+        @Override public void execute() {
+          Thread thread = Thread.currentThread();
+          String message = new StringBuilder()
+              .append("Thread: ").append(thread.getName())
+              .append(" (id ").append(thread.getId()).append(")")
+              .append("\n")
+              .append(Joiner.on("\n  ").join(
+                  Iterables.transform(Arrays.asList(thread.getStackTrace()), STACK_ELEM_TOSTRING)))
+              .toString();
+
+          LOG.info("Shutdown initiated by: " + message);
+        }
+      });
+    }
+  }
+
+  @Provides
+  @Singleton
+  List<ACL> provideAcls() {
+    if (zkClientConfig.credentials == Credentials.NONE) {
+      LOG.warning("Running without ZooKeeper digest credentials. ZooKeeper ACLs are disabled.");
+      return ZooKeeperUtils.OPEN_ACL_UNSAFE;
+    } else {
+      return ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL;
+    }
+  }
+
+  @Provides
+  @Singleton
+  ServerSet provideServerSet(ZooKeeperClient client, List<ACL> zooKeeperAcls) {
+    return new ServerSetImpl(client, zooKeeperAcls, serverSetPath);
+  }
+
+  @Provides
+  @Singleton
+  DynamicHostSet<ServiceInstance> provideSchedulerHostSet(ServerSet serverSet) {
+    // Used for a type re-binding of the serverset.
+    return serverSet;
+  }
+
+  @Provides
+  @Singleton
+  SingletonService provideSingletonService(
+      ZooKeeperClient client,
+      ServerSet serverSet,
+      List<ACL> zookeeperAcls) {
+
+    return new SingletonService(
+        serverSet,
+        SingletonService.createSingletonCandidate(client, serverSetPath, zookeeperAcls));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/app/Log4jConfigurator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/app/Log4jConfigurator.java b/src/main/java/org/apache/aurora/scheduler/app/Log4jConfigurator.java
new file mode 100644
index 0000000..0ea2204
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/app/Log4jConfigurator.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.app;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.log4j.BasicConfigurator;
+import org.apache.log4j.ConsoleAppender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+import com.twitter.common.logging.RootLogConfig;
+import com.twitter.common.logging.RootLogConfig.Configuration;
+import com.twitter.common.logging.log4j.GlogLayout;
+
+/**
+ * Configures log4j logging.
+ */
+final class Log4jConfigurator {
+  private static final java.util.logging.Logger LOG =
+      java.util.logging.Logger.getLogger(Log4jConfigurator.class.getName());
+
+  /**
+   * Configures log4j to log to stderr with a glog format.
+   *
+   * @param glogConfig The glog configuration in effect.
+   */
+  static void configureConsole(Configuration glogConfig) {
+    Preconditions.checkNotNull(glogConfig);
+
+    BasicConfigurator.configure(
+        new ConsoleAppender(new GlogLayout(), ConsoleAppender.SYSTEM_ERR));
+    Logger.getRootLogger().setLevel(getLevel(glogConfig));
+  }
+
+  private static Level getLevel(RootLogConfig.Configuration logConfig) {
+    switch (logConfig.getVlog()) {
+      case FINEST: // fall through
+      case FINER: // fall through
+      case FINE: // fall through
+      case CONFIG:
+        return Level.TRACE;
+      case INFO:
+        return Level.INFO;
+      case WARNING:
+        return Level.WARN;
+      case SEVERE:
+        return Level.ERROR;
+      default:
+        LOG.warning("Mapping unexpected vlog value of " + logConfig.getVlog() + " to log4j TRACE");
+        return Level.TRACE;
+    }
+  }
+
+  private Log4jConfigurator() {
+    // Utility class.
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/app/Modules.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/app/Modules.java b/src/main/java/org/apache/aurora/scheduler/app/Modules.java
new file mode 100644
index 0000000..72a80e1
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/app/Modules.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.app;
+
+import com.google.inject.Module;
+import com.google.inject.PrivateModule;
+
+/**
+ * A utility class for managing guice modules.
+ */
+final class Modules {
+
+  private Modules() {
+    // Utility class
+  }
+
+  private static Module instantiateModule(final Class<? extends Module> moduleClass) {
+    try {
+      return moduleClass.newInstance();
+    } catch (InstantiationException e) {
+      throw new IllegalArgumentException(
+          String.format(
+              "Failed to instantiate module %s. Are you sure it has a no-arg constructor?",
+              moduleClass.getName()),
+          e);
+    } catch (IllegalAccessException e) {
+      throw new IllegalArgumentException(
+          String.format(
+              "Failed to instantiate module %s. Are you sure it's public?",
+              moduleClass.getName()),
+          e);
+    }
+  }
+
+  // Defensively wrap each module provided on the command-line in a PrivateModule that only
+  // exposes requested classes to ensure that we don't depend on surprise extra bindings across
+  // different implementations.
+  static Module wrapInPrivateModule(
+      Class<? extends Module> moduleClass,
+      final Iterable<Class<?>> exposedClasses) {
+
+    final Module module = instantiateModule(moduleClass);
+    return new PrivateModule() {
+      @Override protected void configure() {
+        install(module);
+        for (Class<?> klass : exposedClasses) {
+          expose(klass);
+        }
+      }
+    };
+  }
+
+  static Module getModule(Class<? extends Module> moduleClass) {
+    return instantiateModule(moduleClass);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
new file mode 100644
index 0000000..693c364
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
@@ -0,0 +1,256 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.app;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.logging.Logger;
+
+import javax.annotation.Nonnegative;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableList.Builder;
+import com.google.inject.AbstractModule;
+import com.google.inject.Module;
+
+import com.twitter.aurora.auth.CapabilityValidator;
+import com.twitter.aurora.auth.SessionValidator;
+import com.twitter.aurora.auth.UnsecureAuthModule;
+import com.twitter.aurora.scheduler.DriverFactory;
+import com.twitter.aurora.scheduler.DriverFactory.DriverFactoryImpl;
+import com.twitter.aurora.scheduler.MesosTaskFactory.ExecutorConfig;
+import com.twitter.aurora.scheduler.SchedulerLifecycle;
+import com.twitter.aurora.scheduler.cron.CronPredictor;
+import com.twitter.aurora.scheduler.cron.CronScheduler;
+import com.twitter.aurora.scheduler.cron.noop.NoopCronModule;
+import com.twitter.aurora.scheduler.local.IsolatedSchedulerModule;
+import com.twitter.aurora.scheduler.log.mesos.MesosLogStreamModule;
+import com.twitter.aurora.scheduler.storage.backup.BackupModule;
+import com.twitter.aurora.scheduler.storage.log.LogStorage;
+import com.twitter.aurora.scheduler.storage.log.LogStorageModule;
+import com.twitter.aurora.scheduler.storage.log.SnapshotStoreImpl;
+import com.twitter.aurora.scheduler.storage.mem.MemStorageModule;
+import com.twitter.aurora.scheduler.thrift.ThriftConfiguration;
+import com.twitter.aurora.scheduler.thrift.ThriftModule;
+import com.twitter.aurora.scheduler.thrift.auth.ThriftAuthModule;
+import com.twitter.common.application.AbstractApplication;
+import com.twitter.common.application.AppLauncher;
+import com.twitter.common.application.Lifecycle;
+import com.twitter.common.application.modules.HttpModule;
+import com.twitter.common.application.modules.LocalServiceRegistry;
+import com.twitter.common.application.modules.LogModule;
+import com.twitter.common.application.modules.StatsModule;
+import com.twitter.common.args.Arg;
+import com.twitter.common.args.CmdLine;
+import com.twitter.common.args.constraints.NotEmpty;
+import com.twitter.common.args.constraints.NotNull;
+import com.twitter.common.inject.Bindings;
+import com.twitter.common.logging.RootLogConfig;
+import com.twitter.common.zookeeper.Group;
+import com.twitter.common.zookeeper.SingletonService;
+import com.twitter.common.zookeeper.SingletonService.LeadershipListener;
+import com.twitter.common.zookeeper.guice.client.ZooKeeperClientModule;
+import com.twitter.common.zookeeper.guice.client.ZooKeeperClientModule.ClientConfig;
+import com.twitter.common.zookeeper.guice.client.flagged.FlaggedClientConfig;
+
+/**
+ * Launcher for the aurora scheduler.
+ */
+public class SchedulerMain extends AbstractApplication {
+
+  private static final Logger LOG = Logger.getLogger(SchedulerMain.class.getName());
+
+  @CmdLine(name = "testing_isolated_scheduler",
+      help = "If true, run in a testing mode with the scheduler isolated from other components.")
+  private static final Arg<Boolean> ISOLATED_SCHEDULER = Arg.create(false);
+
+  @NotNull
+  @CmdLine(name = "cluster_name", help = "Name to identify the cluster being served.")
+  private static final Arg<String> CLUSTER_NAME = Arg.create();
+
+  @NotNull
+  @NotEmpty
+  @CmdLine(name = "serverset_path", help = "ZooKeeper ServerSet path to register at.")
+  private static final Arg<String> SERVERSET_PATH = Arg.create();
+
+  @CmdLine(name = "mesos_ssl_keyfile",
+      help = "JKS keyfile for operating the Mesos Thrift-over-SSL interface.")
+  private static final Arg<File> MESOS_SSL_KEY_FILE = Arg.create();
+
+  @Nonnegative
+  @CmdLine(name = "thrift_port", help = "Thrift server port.")
+  private static final Arg<Integer> THRIFT_PORT = Arg.create(0);
+
+  @NotNull
+  @CmdLine(name = "thermos_executor_path", help = "Path to the thermos executor launch script.")
+  private static final Arg<String> THERMOS_EXECUTOR_PATH = Arg.create();
+
+  @CmdLine(name = "auth_module",
+      help = "A Guice module to provide auth bindings. NOTE: The default is unsecure.")
+  private static final Arg<? extends Class<? extends Module>> AUTH_MODULE =
+      Arg.create(UnsecureAuthModule.class);
+
+  private static final Iterable<Class<?>> AUTH_MODULE_CLASSES = ImmutableList.<Class<?>>builder()
+      .add(SessionValidator.class)
+      .add(CapabilityValidator.class)
+      .build();
+
+  @CmdLine(name = "cron_module",
+      help = "A Guice module to provide cron bindings. NOTE: The default is a no-op.")
+  private static final Arg<? extends Class<? extends Module>> CRON_MODULE =
+      Arg.create(NoopCronModule.class);
+
+  private static final Iterable<Class<?>> CRON_MODULE_CLASSES = ImmutableList.<Class<?>>builder()
+      .add(CronPredictor.class)
+      .add(CronScheduler.class)
+      .build();
+
+  // TODO(Suman Karumuri): Pass in AUTH and CRON modules as extra modules
+  @CmdLine(name = "extra_modules",
+      help = "A list of modules that provide additional functionality.")
+  private static final Arg<List<Class<? extends Module>>> EXTRA_MODULES =
+      Arg.create((List<Class<? extends Module>>) ImmutableList.<Class<? extends Module>>of());
+
+  @Inject private SingletonService schedulerService;
+  @Inject private LocalServiceRegistry serviceRegistry;
+  @Inject private SchedulerLifecycle schedulerLifecycle;
+  @Inject private Lifecycle appLifecycle;
+  @Inject private Optional<RootLogConfig.Configuration> glogConfig;
+
+  private static Iterable<? extends Module> getSystemModules() {
+    return ImmutableList.of(
+        new LogModule(),
+        new HttpModule(),
+        new StatsModule()
+    );
+  }
+
+  private static Iterable<? extends Module> getExtraModules() {
+    Builder<Module> modules = ImmutableList.builder();
+    modules.add(Modules.wrapInPrivateModule(AUTH_MODULE.get(), AUTH_MODULE_CLASSES))
+        .add(Modules.wrapInPrivateModule(CRON_MODULE.get(), CRON_MODULE_CLASSES));
+
+    for (Class<? extends Module> moduleClass : EXTRA_MODULES.get()) {
+      modules.add(Modules.getModule(moduleClass));
+    }
+
+    return modules.build();
+  }
+
+  static Iterable<? extends Module> getModules(
+      String clusterName,
+      String serverSetPath,
+      ClientConfig zkClientConfig) {
+
+    return ImmutableList.<Module>builder()
+        .addAll(getSystemModules())
+        .add(new AppModule(clusterName, serverSetPath, zkClientConfig))
+        .addAll(getExtraModules())
+        .add(new LogStorageModule())
+        .add(new MemStorageModule(Bindings.annotatedKeyFactory(LogStorage.WriteBehind.class)))
+        .add(new ThriftModule())
+        .add(new ThriftAuthModule())
+        .build();
+  }
+
+  @Override
+  public Iterable<? extends Module> getModules() {
+    Module additional;
+    final ClientConfig zkClientConfig = FlaggedClientConfig.create();
+    if (ISOLATED_SCHEDULER.get()) {
+      additional = new IsolatedSchedulerModule();
+    } else {
+      // TODO(Kevin Sweeney): Push these bindings down into a "production" module.
+      additional = new AbstractModule() {
+        @Override protected void configure() {
+          bind(DriverFactory.class).to(DriverFactoryImpl.class);
+          bind(DriverFactoryImpl.class).in(Singleton.class);
+          install(new MesosLogStreamModule(zkClientConfig));
+        }
+      };
+    }
+
+    Module configModule = new AbstractModule() {
+      @Override protected void configure() {
+        bind(ThriftConfiguration.class).toInstance(new ThriftConfiguration() {
+          @Override public Optional<InputStream> getSslKeyStream() throws FileNotFoundException {
+            if (MESOS_SSL_KEY_FILE.hasAppliedValue()) {
+              return Optional.<InputStream>of(new FileInputStream(MESOS_SSL_KEY_FILE.get()));
+            } else {
+              return Optional.absent();
+            }
+          }
+
+          @Override public int getServingPort() {
+            return THRIFT_PORT.get();
+          }
+        });
+        bind(ExecutorConfig.class).toInstance(new ExecutorConfig(THERMOS_EXECUTOR_PATH.get()));
+      }
+    };
+
+    return ImmutableList.<Module>builder()
+        .add(new BackupModule(SnapshotStoreImpl.class))
+        .addAll(getModules(CLUSTER_NAME.get(), SERVERSET_PATH.get(), zkClientConfig))
+        .add(new ZooKeeperClientModule(zkClientConfig))
+        .add(configModule)
+        .add(additional)
+        .build();
+  }
+
+  @Override
+  public void run() {
+    if (glogConfig.isPresent()) {
+      // Setup log4j to match our jul glog config in order to pick up zookeeper logging.
+      Log4jConfigurator.configureConsole(glogConfig.get());
+    } else {
+      LOG.warning("Running without expected glog configuration.");
+    }
+
+    LeadershipListener leaderListener = schedulerLifecycle.prepare();
+
+    Optional<InetSocketAddress> primarySocket = serviceRegistry.getPrimarySocket();
+    if (!primarySocket.isPresent()) {
+      throw new IllegalStateException("No primary service registered with LocalServiceRegistry.");
+    }
+
+    try {
+      schedulerService.lead(
+          primarySocket.get(),
+          serviceRegistry.getAuxiliarySockets(),
+          leaderListener);
+    } catch (Group.WatchException e) {
+      throw new IllegalStateException("Failed to watch group and lead service.", e);
+    } catch (Group.JoinException e) {
+      throw new IllegalStateException("Failed to join scheduler service group.", e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException("Interrupted while joining scheduler service group.", e);
+    }
+
+    appLifecycle.awaitShutdown();
+  }
+
+  public static void main(String[] args) {
+    AppLauncher.launch(SchedulerMain.class, args);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
new file mode 100644
index 0000000..faf3269
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
@@ -0,0 +1,274 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.async;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.logging.Logger;
+
+import javax.inject.Singleton;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.RateLimiter;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.inject.AbstractModule;
+import com.google.inject.Binder;
+import com.google.inject.BindingAnnotation;
+import com.google.inject.Key;
+import com.google.inject.PrivateModule;
+import com.google.inject.TypeLiteral;
+
+import com.twitter.aurora.scheduler.async.OfferQueue.OfferQueueImpl;
+import com.twitter.aurora.scheduler.async.OfferQueue.OfferReturnDelay;
+import com.twitter.aurora.scheduler.async.RescheduleCalculator.RescheduleCalculatorImpl;
+import com.twitter.aurora.scheduler.async.TaskGroups.TaskGroupsSettings;
+import com.twitter.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl;
+import com.twitter.aurora.scheduler.events.PubsubEventModule;
+import com.twitter.common.args.Arg;
+import com.twitter.common.args.CmdLine;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.StatImpl;
+import com.twitter.common.stats.Stats;
+import com.twitter.common.stats.StatsProvider;
+import com.twitter.common.util.Random;
+import com.twitter.common.util.TruncatedBinaryBackoff;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import static com.twitter.aurora.scheduler.async.HistoryPruner.PruneThreshold;
+import static com.twitter.aurora.scheduler.async.Preemptor.PreemptorImpl;
+import static com.twitter.aurora.scheduler.async.Preemptor.PreemptorImpl.PreemptionDelay;
+import static com.twitter.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl.ReservationDuration;
+
+/**
+ * Binding module for async task management.
+ */
+public class AsyncModule extends AbstractModule {
+
+  private static final Logger LOG = Logger.getLogger(AsyncModule.class.getName());
+
+  @CmdLine(name = "async_worker_threads",
+      help = "The number of worker threads to process async task operations with.")
+  private static final Arg<Integer> ASYNC_WORKER_THREADS = Arg.create(1);
+
+  @CmdLine(name = "transient_task_state_timeout",
+      help = "The amount of time after which to treat a task stuck in a transient state as LOST.")
+  private static final Arg<Amount<Long, Time>> TRANSIENT_TASK_STATE_TIMEOUT =
+      Arg.create(Amount.of(5L, Time.MINUTES));
+
+  @CmdLine(name = "initial_schedule_delay",
+      help = "Initial amount of time to wait before attempting to schedule a PENDING task.")
+  private static final Arg<Amount<Long, Time>> INITIAL_SCHEDULE_DELAY =
+      Arg.create(Amount.of(1L, Time.SECONDS));
+
+  @CmdLine(name = "max_schedule_delay",
+      help = "Maximum delay between attempts to schedule a PENDING tasks.")
+  private static final Arg<Amount<Long, Time>> MAX_SCHEDULE_DELAY =
+      Arg.create(Amount.of(30L, Time.SECONDS));
+
+  @CmdLine(name = "min_offer_hold_time",
+      help = "Minimum amount of time to hold a resource offer before declining.")
+  private static final Arg<Amount<Integer, Time>> MIN_OFFER_HOLD_TIME =
+      Arg.create(Amount.of(5, Time.MINUTES));
+
+  @CmdLine(name = "history_prune_threshold",
+      help = "Time after which the scheduler will prune terminated task history.")
+  private static final Arg<Amount<Long, Time>> HISTORY_PRUNE_THRESHOLD =
+      Arg.create(Amount.of(2L, Time.DAYS));
+
+  @CmdLine(name = "max_schedule_attempts_per_sec",
+      help = "Maximum number of scheduling attempts to make per second.")
+  private static final Arg<Double> MAX_SCHEDULE_ATTEMPTS_PER_SEC = Arg.create(10D);
+
+  @CmdLine(name = "flapping_task_threshold",
+      help = "A task that repeatedly runs for less than this time is considered to be flapping.")
+  private static final Arg<Amount<Long, Time>> FLAPPING_THRESHOLD =
+      Arg.create(Amount.of(5L, Time.MINUTES));
+
+  @CmdLine(name = "initial_flapping_task_delay",
+      help = "Initial amount of time to wait before attempting to schedule a flapping task.")
+  private static final Arg<Amount<Long, Time>> INITIAL_FLAPPING_DELAY =
+      Arg.create(Amount.of(30L, Time.SECONDS));
+
+  @CmdLine(name = "max_flapping_task_delay",
+      help = "Maximum delay between attempts to schedule a flapping task.")
+  private static final Arg<Amount<Long, Time>> MAX_FLAPPING_DELAY =
+      Arg.create(Amount.of(5L, Time.MINUTES));
+
+  @CmdLine(name = "max_reschedule_task_delay_on_startup",
+      help = "Upper bound of random delay for pending task rescheduling on scheduler startup.")
+  private static final Arg<Amount<Integer, Time>> MAX_RESCHEDULING_DELAY =
+      Arg.create(Amount.of(30, Time.SECONDS));
+
+  @CmdLine(name = "preemption_delay",
+      help = "Time interval after which a pending task becomes eligible to preempt other tasks")
+  private static final Arg<Amount<Long, Time>> PREEMPTION_DELAY =
+      Arg.create(Amount.of(10L, Time.MINUTES));
+
+  @CmdLine(name = "enable_preemptor",
+      help = "Enable the preemptor and preemption")
+  private static final Arg<Boolean> ENABLE_PREEMPTOR = Arg.create(true);
+
+  private static final Preemptor NULL_PREEMPTOR = new Preemptor() {
+    @Override public Optional<String> findPreemptionSlotFor(String taskId) {
+      return Optional.absent();
+    }
+  };
+
+  @CmdLine(name = "offer_reservation_duration", help = "Time to reserve a slave's offers while "
+      + "trying to satisfy a task preempting another.")
+  private static final Arg<Amount<Long, Time>> RESERVATION_DURATION =
+      Arg.create(Amount.of(3L, Time.MINUTES));
+
+  @BindingAnnotation
+  @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+  private @interface PreemptionBinding { }
+
+  @VisibleForTesting
+  static final Key<Preemptor> PREEMPTOR_KEY = Key.get(Preemptor.class, PreemptionBinding.class);
+
+  @Override
+  protected void configure() {
+    // Don't worry about clean shutdown, these can be daemon and cleanup-free.
+    final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(
+        ASYNC_WORKER_THREADS.get(),
+        new ThreadFactoryBuilder().setNameFormat("AsyncProcessor-%d").setDaemon(true).build());
+    Stats.exportSize("timeout_queue_size", executor.getQueue());
+    Stats.export(new StatImpl<Long>("async_tasks_completed") {
+      @Override public Long read() {
+        return executor.getCompletedTaskCount();
+      }
+    });
+
+    // AsyncModule itself is not a subclass of PrivateModule because TaskEventModule internally uses
+    // a MultiBinder, which cannot span multiple injectors.
+    binder().install(new PrivateModule() {
+      @Override protected void configure() {
+        bind(new TypeLiteral<Amount<Long, Time>>() { })
+            .toInstance(TRANSIENT_TASK_STATE_TIMEOUT.get());
+        bind(ScheduledExecutorService.class).toInstance(executor);
+
+        bind(TaskTimeout.class).in(Singleton.class);
+        requireBinding(StatsProvider.class);
+        expose(TaskTimeout.class);
+      }
+    });
+    PubsubEventModule.bindSubscriber(binder(), TaskTimeout.class);
+
+    binder().install(new PrivateModule() {
+      @Override protected void configure() {
+        bind(TaskGroupsSettings.class).toInstance(new TaskGroupsSettings(
+            new TruncatedBinaryBackoff(INITIAL_SCHEDULE_DELAY.get(), MAX_SCHEDULE_DELAY.get()),
+            RateLimiter.create(MAX_SCHEDULE_ATTEMPTS_PER_SEC.get())));
+
+        bind(RescheduleCalculatorImpl.RescheduleCalculatorSettings.class)
+            .toInstance(new RescheduleCalculatorImpl.RescheduleCalculatorSettings(
+                new TruncatedBinaryBackoff(INITIAL_FLAPPING_DELAY.get(), MAX_FLAPPING_DELAY.get()),
+                FLAPPING_THRESHOLD.get(),
+                MAX_RESCHEDULING_DELAY.get()));
+
+        bind(RescheduleCalculator.class).to(RescheduleCalculatorImpl.class).in(Singleton.class);
+        if (ENABLE_PREEMPTOR.get()) {
+          bind(PREEMPTOR_KEY).to(PreemptorImpl.class);
+          bind(PreemptorImpl.class).in(Singleton.class);
+          LOG.info("Preemptor Enabled.");
+        } else {
+          bind(PREEMPTOR_KEY).toInstance(NULL_PREEMPTOR);
+          LOG.warning("Preemptor Disabled.");
+        }
+        expose(PREEMPTOR_KEY);
+        bind(new TypeLiteral<Amount<Long, Time>>() {
+        }).annotatedWith(PreemptionDelay.class)
+            .toInstance(PREEMPTION_DELAY.get());
+        bind(TaskGroups.class).in(Singleton.class);
+        expose(TaskGroups.class);
+      }
+    });
+    bindTaskScheduler(binder(), PREEMPTOR_KEY, RESERVATION_DURATION.get());
+    PubsubEventModule.bindSubscriber(binder(), TaskGroups.class);
+
+    binder().install(new PrivateModule() {
+      @Override protected void configure() {
+        bind(OfferReturnDelay.class).to(RandomJitterReturnDelay.class);
+        bind(ScheduledExecutorService.class).toInstance(executor);
+        bind(OfferQueue.class).to(OfferQueueImpl.class);
+        bind(OfferQueueImpl.class).in(Singleton.class);
+        expose(OfferQueue.class);
+      }
+    });
+    PubsubEventModule.bindSubscriber(binder(), OfferQueue.class);
+
+    binder().install(new PrivateModule() {
+      @Override protected void configure() {
+        // TODO(ksweeney): Create a configuration validator module so this can be injected.
+        // TODO(William Farner): Revert this once large task counts is cheap ala hierarchichal store
+        bind(Integer.class).annotatedWith(PruneThreshold.class).toInstance(100);
+        bind(new TypeLiteral<Amount<Long, Time>>() { }).annotatedWith(PruneThreshold.class)
+            .toInstance(HISTORY_PRUNE_THRESHOLD.get());
+        bind(ScheduledExecutorService.class).toInstance(executor);
+
+        bind(HistoryPruner.class).in(Singleton.class);
+        expose(HistoryPruner.class);
+      }
+    });
+    PubsubEventModule.bindSubscriber(binder(), HistoryPruner.class);
+  }
+
+  /**
+   * This method exists because we want to test the wiring up of TaskSchedulerImpl class to the
+   * PubSub system in the TaskSchedulerImplTest class. The method has a complex signature because
+   * the binding of the TaskScheduler and friends occurs in a PrivateModule which does not interact
+   * well with the MultiBinder that backs the PubSub system.
+   */
+  @VisibleForTesting
+  static void bindTaskScheduler(
+      Binder binder,
+      final Key<Preemptor> preemptorKey,
+      final Amount<Long, Time> reservationDuration) {
+        binder.install(new PrivateModule() {
+          @Override protected void configure() {
+            bind(Preemptor.class).to(preemptorKey);
+            bind(new TypeLiteral<Amount<Long, Time>>() { }).annotatedWith(ReservationDuration.class)
+                .toInstance(reservationDuration);
+            bind(TaskScheduler.class).to(TaskSchedulerImpl.class);
+            bind(TaskSchedulerImpl.class).in(Singleton.class);
+            expose(TaskScheduler.class);
+          }
+        });
+        PubsubEventModule.bindSubscriber(binder, TaskScheduler.class);
+  }
+
+  /**
+   * Returns offers after a random duration within a fixed window.
+   */
+  private static class RandomJitterReturnDelay implements OfferReturnDelay {
+    private static final int JITTER_WINDOW_MS = Amount.of(1, Time.MINUTES).as(Time.MILLISECONDS);
+
+    private final int minHoldTimeMs = MIN_OFFER_HOLD_TIME.get().as(Time.MILLISECONDS);
+    private final Random random = new Random.SystemRandom(new java.util.Random());
+
+    @Override public Amount<Integer, Time> get() {
+      return Amount.of(minHoldTimeMs + random.nextInt(JITTER_WINDOW_MS), Time.MILLISECONDS);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/async/HistoryPruner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/HistoryPruner.java b/src/main/java/org/apache/aurora/scheduler/async/HistoryPruner.java
new file mode 100644
index 0000000..9af6d36
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/async/HistoryPruner.java
@@ -0,0 +1,212 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.async;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.LinkedHashMultimap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import com.google.common.eventbus.Subscribe;
+import com.google.inject.BindingAnnotation;
+
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.base.Tasks;
+import com.twitter.aurora.scheduler.state.StateManager;
+import com.twitter.aurora.scheduler.storage.Storage;
+import com.twitter.aurora.scheduler.storage.entities.IJobKey;
+import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.util.Clock;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import static com.twitter.aurora.scheduler.base.Tasks.LATEST_ACTIVITY;
+import static com.twitter.aurora.scheduler.events.PubsubEvent.EventSubscriber;
+import static com.twitter.aurora.scheduler.events.PubsubEvent.StorageStarted;
+import static com.twitter.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+import static com.twitter.aurora.scheduler.events.PubsubEvent.TasksDeleted;
+
+/**
+ * Prunes tasks in a job based on per-job history and an inactive time threshold by observing tasks
+ * transitioning into one of the inactive states.
+ */
+public class HistoryPruner implements EventSubscriber {
+  private static final Logger LOG = Logger.getLogger(HistoryPruner.class.getName());
+
+  @VisibleForTesting
+  static final Query.Builder INACTIVE_QUERY = Query.unscoped().terminal();
+
+  private final Multimap<IJobKey, String> tasksByJob =
+      Multimaps.synchronizedSetMultimap(LinkedHashMultimap.<IJobKey, String>create());
+  @VisibleForTesting
+  Multimap<IJobKey, String> getTasksByJob() {
+    return tasksByJob;
+  }
+
+  private final ScheduledExecutorService executor;
+  private final Storage storage;
+  private final StateManager stateManager;
+  private final Clock clock;
+  private final long pruneThresholdMillis;
+  private final int perJobHistoryGoal;
+  private final Map<String, Future<?>> taskIdToFuture = Maps.newConcurrentMap();
+
+  @BindingAnnotation
+  @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+  public @interface PruneThreshold { }
+
+  @Inject
+  HistoryPruner(
+      final ScheduledExecutorService executor,
+      final Storage storage,
+      final StateManager stateManager,
+      final Clock clock,
+      @PruneThreshold Amount<Long, Time> inactivePruneThreshold,
+      @PruneThreshold int perJobHistoryGoal) {
+
+    this.executor = checkNotNull(executor);
+    this.storage = checkNotNull(storage);
+    this.stateManager = checkNotNull(stateManager);
+    this.clock = checkNotNull(clock);
+    this.pruneThresholdMillis = inactivePruneThreshold.as(Time.MILLISECONDS);
+    this.perJobHistoryGoal = perJobHistoryGoal;
+  }
+
+  @VisibleForTesting
+  long calculateTimeout(long taskEventTimestampMillis) {
+    return pruneThresholdMillis - Math.max(0, clock.nowMillis() - taskEventTimestampMillis);
+  }
+
+  /**
+   * When triggered, records an inactive task state change.
+   *
+   * @param change Event when a task changes state.
+   */
+  @Subscribe
+  public void recordStateChange(TaskStateChange change) {
+    if (Tasks.isTerminated(change.getNewState())) {
+      registerInactiveTask(
+          Tasks.SCHEDULED_TO_JOB_KEY.apply(change.getTask()),
+          change.getTaskId(),
+          calculateTimeout(clock.nowMillis()));
+    }
+  }
+
+  /**
+   * When triggered, iterates through inactive tasks in the system and prunes tasks that
+   * exceed the history goal for a job or are beyond the time threshold.
+   *
+   * @param event A new StorageStarted event.
+   */
+  @Subscribe
+  public void storageStarted(StorageStarted event) {
+    for (IScheduledTask task
+        : LATEST_ACTIVITY.sortedCopy(Storage.Util.consistentFetchTasks(storage, INACTIVE_QUERY))) {
+
+      registerInactiveTask(
+          Tasks.SCHEDULED_TO_JOB_KEY.apply(task),
+          Tasks.id(task),
+          calculateTimeout(Iterables.getLast(task.getTaskEvents()).getTimestamp()));
+    }
+  }
+
+  private void deleteTasks(Set<String> taskIds) {
+    LOG.info("Pruning inactive tasks " + taskIds);
+    stateManager.deleteTasks(taskIds);
+  }
+
+  /**
+   * When triggered, removes the tasks scheduled for pruning and cancels any existing future.
+   *
+   * @param event A new TasksDeleted event.
+   */
+  @Subscribe
+  public void tasksDeleted(final TasksDeleted event) {
+    for (IScheduledTask task : event.getTasks()) {
+      String id = Tasks.id(task);
+      tasksByJob.remove(Tasks.SCHEDULED_TO_JOB_KEY.apply(task), id);
+      Future<?> future = taskIdToFuture.remove(id);
+      if (future != null) {
+        future.cancel(false);
+      }
+    }
+  }
+
+  private void registerInactiveTask(
+      final IJobKey jobKey,
+      final String taskId,
+      long timeRemaining) {
+
+    LOG.fine("Prune task " + taskId + " in " + timeRemaining + " ms.");
+    // Insert the latest inactive task at the tail.
+    tasksByJob.put(jobKey, taskId);
+    Runnable runnable = new Runnable() {
+      @Override public void run() {
+        LOG.info("Pruning expired inactive task " + taskId);
+        tasksByJob.remove(jobKey, taskId);
+        taskIdToFuture.remove(taskId);
+        deleteTasks(ImmutableSet.of(taskId));
+      }
+    };
+    taskIdToFuture.put(taskId, executor.schedule(runnable, timeRemaining, TimeUnit.MILLISECONDS));
+
+    ImmutableSet.Builder<String> pruneTaskIds = ImmutableSet.builder();
+    Collection<String> tasks = tasksByJob.get(jobKey);
+    // From Multimaps javadoc: "It is imperative that the user manually synchronize on the returned
+    // multimap when accessing any of its collection views".
+    synchronized (tasksByJob) {
+      Iterator<String> iterator = tasks.iterator();
+      while (tasks.size() > perJobHistoryGoal) {
+        // Pick oldest task from the head. Guaranteed by LinkedHashMultimap based on insertion
+        // order.
+        String id = iterator.next();
+        iterator.remove();
+        pruneTaskIds.add(id);
+        Future<?> future = taskIdToFuture.remove(id);
+        if (future != null) {
+          future.cancel(false);
+        }
+      }
+    }
+
+    Set<String> ids = pruneTaskIds.build();
+    if (!ids.isEmpty()) {
+      deleteTasks(ids);
+    }
+  }
+}