You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2013/12/31 22:20:30 UTC

[37/51] [partial] Rename twitter* and com.twitter to apache and org.apache directories to preserve all file history before the refactor.

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java b/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java
new file mode 100644
index 0000000..95334ff
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java
@@ -0,0 +1,341 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.async;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.google.common.base.Function;
+import com.google.common.base.Objects;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Supplier;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Ordering;
+import com.google.common.eventbus.Subscribe;
+
+import org.apache.mesos.Protos.Offer;
+import org.apache.mesos.Protos.OfferID;
+import org.apache.mesos.Protos.TaskInfo;
+
+import com.twitter.aurora.gen.HostStatus;
+import com.twitter.aurora.gen.MaintenanceMode;
+import com.twitter.aurora.scheduler.Driver;
+import com.twitter.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
+import com.twitter.aurora.scheduler.events.PubsubEvent.EventSubscriber;
+import com.twitter.aurora.scheduler.events.PubsubEvent.HostMaintenanceStateChange;
+import com.twitter.aurora.scheduler.state.MaintenanceController;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.Stats;
+
+import static com.twitter.aurora.gen.MaintenanceMode.DRAINED;
+import static com.twitter.aurora.gen.MaintenanceMode.DRAINING;
+import static com.twitter.aurora.gen.MaintenanceMode.NONE;
+import static com.twitter.aurora.gen.MaintenanceMode.SCHEDULED;
+
+/**
+ * Tracks the Offers currently known by the scheduler
+ */
+public interface OfferQueue extends EventSubscriber {
+
+  /**
+   * Notifies the scheduler of a new resource offer.
+   *
+   * @param offer Newly-available resource offer.
+   */
+  void addOffer(Offer offer);
+
+  /**
+   * Invalidates an offer.  This indicates that the scheduler should not attempt to match any
+   * tasks against the offer.
+   *
+   * @param offer Canceled offer.
+   */
+  void cancelOffer(OfferID offer);
+
+  /**
+   * Launches the first task that satisfies the {@code acceptor} by returning a {@link TaskInfo}.
+   *
+   * @param acceptor Function that determines if an offer is accepted.
+   * @return {@code true} if the task was launched, {@code false} if no offers satisfied the
+   *         {@code acceptor}.
+   * @throws LaunchException If the acceptor accepted an offer, but there was an error launching the
+   *                         task.
+   */
+  boolean launchFirst(Function<Offer, Optional<TaskInfo>> acceptor) throws LaunchException;
+
+  /**
+   * Notifies the offer queue that a host has changed state.
+   *
+   * @param change State change notification.
+   */
+  void hostChangedState(HostMaintenanceStateChange change);
+
+  /**
+   * Gets the offers that the scheduler is holding.
+   *
+   * @return A snapshot of the offers that the scheduler is currently holding.
+   */
+  Iterable<Offer> getOffers();
+
+  /**
+   * Calculates the amount of time before an offer should be 'returned' by declining it.
+   * The delay is calculated for each offer that is received, so the return delay may be
+   * fixed or variable.
+   */
+  public interface OfferReturnDelay extends Supplier<Amount<Integer, Time>> {
+  }
+
+  /**
+   * Thrown when there was an unexpected failure trying to launch a task.
+   */
+  static class LaunchException extends Exception {
+    LaunchException(String msg) {
+      super(msg);
+    }
+
+    LaunchException(String msg, Throwable cause) {
+      super(msg, cause);
+    }
+  }
+
+  class OfferQueueImpl implements OfferQueue {
+    private static final Logger LOG = Logger.getLogger(OfferQueueImpl.class.getName());
+
+    static final Comparator<HostOffer> PREFERENCE_COMPARATOR =
+        // Currently, the only preference is based on host maintenance status.
+        Ordering.explicit(NONE, SCHEDULED, DRAINING, DRAINED)
+            .onResultOf(new Function<HostOffer, MaintenanceMode>() {
+              @Override public MaintenanceMode apply(HostOffer offer) {
+                return offer.mode;
+              }
+            })
+            .compound(Ordering.arbitrary());
+
+    private final Set<HostOffer> hostOffers = new ConcurrentSkipListSet<>(PREFERENCE_COMPARATOR);
+    private final AtomicLong offerRaces = Stats.exportLong("offer_accept_races");
+
+    private final Driver driver;
+    private final OfferReturnDelay returnDelay;
+    private final ScheduledExecutorService executor;
+    private final MaintenanceController maintenance;
+
+    @Inject
+    OfferQueueImpl(Driver driver,
+        OfferReturnDelay returnDelay,
+        ScheduledExecutorService executor,
+        MaintenanceController maintenance) {
+
+      this.driver = driver;
+      this.returnDelay = returnDelay;
+      this.executor = executor;
+      this.maintenance = maintenance;
+      // Potential gotcha - since this is now a ConcurrentSkipListSet, size() is more expensive.
+      // Could track this separately if it turns out to pose problems.
+      Stats.exportSize("outstanding_offers", hostOffers);
+    }
+
+    @Override
+    public void addOffer(final Offer offer) {
+      // We run a slight risk of a race here, which is acceptable.  The worst case is that we
+      // temporarily hold two offers for the same host, which should be corrected when we return
+      // them after the return delay.
+      // There's also a chance that we return an offer for compaction ~simultaneously with the
+      // same-host offer being canceled/returned.  This is also fine.
+      List<HostOffer> sameSlave = FluentIterable.from(hostOffers)
+          .filter(new Predicate<HostOffer>() {
+            @Override public boolean apply(HostOffer hostOffer) {
+              return hostOffer.offer.getSlaveId().equals(offer.getSlaveId());
+            }
+          })
+          .toList();
+      if (sameSlave.isEmpty()) {
+        hostOffers.add(new HostOffer(offer, maintenance.getMode(offer.getHostname())));
+        executor.schedule(
+            new Runnable() {
+              @Override public void run() {
+                removeAndDecline(offer.getId());
+              }
+            },
+            returnDelay.get().as(Time.MILLISECONDS),
+            TimeUnit.MILLISECONDS);
+      } else {
+        // If there are existing offers for the slave, decline all of them so the master can
+        // compact all of those offers into a single offer and send them back.
+        LOG.info("Returning " + (sameSlave.size() + 1)
+            + " offers for " + offer.getSlaveId().getValue() + " for compaction.");
+        decline(offer.getId());
+        for (HostOffer sameSlaveOffer : sameSlave) {
+          removeAndDecline(sameSlaveOffer.offer.getId());
+        }
+      }
+    }
+
+    void removeAndDecline(OfferID id) {
+      if (removeFromHostOffers(id)) {
+        decline(id);
+      }
+    }
+
+    void decline(OfferID id) {
+      LOG.fine("Declining offer " + id);
+      driver.declineOffer(id);
+    }
+
+    @Override
+    public void cancelOffer(final OfferID offerId) {
+      removeFromHostOffers(offerId);
+    }
+
+    private boolean removeFromHostOffers(final OfferID offerId) {
+      Preconditions.checkNotNull(offerId);
+
+      // The small risk of inconsistency is acceptable here - if we have an accept/remove race
+      // on an offer, the master will mark the task as LOST and it will be retried.
+      return Iterables.removeIf(hostOffers,
+          new Predicate<HostOffer>() {
+            @Override public boolean apply(HostOffer input) {
+              return input.offer.getId().equals(offerId);
+            }
+          });
+    }
+
+    @Override
+    public Iterable<Offer> getOffers() {
+      return Iterables.unmodifiableIterable(
+          FluentIterable.from(hostOffers)
+              .transform(new Function<HostOffer, Offer>() {
+                @Override public Offer apply(HostOffer offer) {
+                  return offer.offer;
+                }
+              }));
+    }
+
+    /**
+     * Updates the preference of a host's offers.
+     *
+     * @param change Host change notification.
+     */
+    @Subscribe
+    public void hostChangedState(HostMaintenanceStateChange change) {
+      final HostStatus hostStatus = change.getStatus();
+
+      // Remove and re-add a host's offers to re-sort based on its new hostStatus
+      Set<HostOffer> changedOffers = FluentIterable.from(hostOffers)
+          .filter(new Predicate<HostOffer>() {
+            @Override public boolean apply(HostOffer hostOffer) {
+              return hostOffer.offer.getHostname().equals(hostStatus.getHost());
+            }
+          })
+          .toSet();
+      hostOffers.removeAll(changedOffers);
+      hostOffers.addAll(
+          FluentIterable.from(changedOffers)
+              .transform(new Function<HostOffer, HostOffer>() {
+                @Override public HostOffer apply(HostOffer hostOffer) {
+                  return new HostOffer(hostOffer.offer, hostStatus.getMode());
+                }
+              })
+              .toSet());
+    }
+
+    /**
+     * Notifies the queue that the driver is disconnected, and all the stored offers are now
+     * invalid.
+     * <p>
+     * The queue takes this as a signal to flush its queue.
+     *
+     * @param event Disconnected event.
+     */
+    @Subscribe
+    public void driverDisconnected(DriverDisconnected event) {
+      LOG.info("Clearing stale offers since the driver is disconnected.");
+      hostOffers.clear();
+    }
+
+    /**
+     * Encapsulate an offer from a host, and the host's maintenance mode.
+     */
+    private static class HostOffer {
+      private final Offer offer;
+      private final MaintenanceMode mode;
+
+      HostOffer(Offer offer, MaintenanceMode mode) {
+        this.offer = offer;
+        this.mode = mode;
+      }
+
+      @Override
+      public boolean equals(Object o) {
+        if (!(o instanceof HostOffer)) {
+          return false;
+        }
+        HostOffer other = (HostOffer) o;
+        return Objects.equal(offer, other.offer) && (mode == other.mode);
+      }
+
+      @Override
+      public int hashCode() {
+        return Objects.hashCode(offer, mode);
+      }
+    }
+
+    @Override
+    public boolean launchFirst(Function<Offer, Optional<TaskInfo>> acceptor)
+        throws LaunchException {
+
+      // It's important that this method is not called concurrently - doing so would open up the
+      // possibility of a race between the same offers being accepted by different threads.
+
+      for (HostOffer hostOffer : hostOffers) {
+        Optional<TaskInfo> assignment = acceptor.apply(hostOffer.offer);
+        if (assignment.isPresent()) {
+          // Guard against an offer being removed after we grabbed it from the iterator.
+          // If that happens, the offer will not exist in hostOffers, and we can immediately
+          // send it back to LOST for quick reschedule.
+          if (hostOffers.remove(hostOffer)) {
+            try {
+              driver.launchTask(hostOffer.offer.getId(), assignment.get());
+              return true;
+            } catch (IllegalStateException e) {
+              // TODO(William Farner): Catch only the checked exception produced by Driver
+              // once it changes from throwing IllegalStateException when the driver is not yet
+              // registered.
+              throw new LaunchException("Failed to launch task.", e);
+            }
+          } else {
+            offerRaces.incrementAndGet();
+            throw new LaunchException(
+                "Accepted offer no longer exists in offer queue, likely data race.");
+          }
+        }
+      }
+
+      return false;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java b/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
new file mode 100644
index 0000000..a01790c
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
@@ -0,0 +1,411 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.async;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import com.google.common.collect.Ordering;
+import com.google.common.collect.Sets;
+import com.google.inject.BindingAnnotation;
+
+import com.twitter.aurora.scheduler.ResourceSlot;
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.base.ScheduleException;
+import com.twitter.aurora.scheduler.base.Tasks;
+import com.twitter.aurora.scheduler.filter.SchedulingFilter;
+import com.twitter.aurora.scheduler.state.SchedulerCore;
+import com.twitter.aurora.scheduler.storage.Storage;
+import com.twitter.aurora.scheduler.storage.entities.IAssignedTask;
+import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.Stats;
+import com.twitter.common.util.Clock;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import org.apache.mesos.Protos.Offer;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import static com.twitter.aurora.gen.ScheduleStatus.PENDING;
+import static com.twitter.aurora.gen.ScheduleStatus.PREEMPTING;
+import static com.twitter.aurora.scheduler.base.Tasks.SCHEDULED_TO_ASSIGNED;
+
+/**
+ * Preempts active tasks in favor of higher priority tasks.
+ */
+public interface Preemptor {
+
+  /**
+   * Preempts active tasks in favor of the input task.
+   *
+   * @param taskId ID of the preempting task.
+   * @return ID of the slave where preemption occured.
+   */
+  Optional<String> findPreemptionSlotFor(String taskId);
+
+  /**
+   * A task preemptor that tries to find tasks that are waiting to be scheduled, which are of higher
+   * priority than tasks that are currently running.
+   *
+   * To avoid excessive churn, the preemptor requires that a task is PENDING for a duration
+   * (dictated by {@link #preemptionCandidacyDelay}) before it becomes eligible to preempt other
+   * tasks.
+   */
+  class PreemptorImpl implements Preemptor {
+
+    /**
+     * Binding annotation for the time interval after which a pending task becomes eligible to
+     * preempt other tasks.
+     */
+    @BindingAnnotation
+    @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+    @interface PreemptionDelay { }
+
+    @VisibleForTesting
+    static final Query.Builder CANDIDATE_QUERY = Query.statusScoped(
+        EnumSet.copyOf(Sets.difference(Tasks.ACTIVE_STATES, EnumSet.of(PENDING, PREEMPTING))));
+
+    private static final Logger LOG = Logger.getLogger(PreemptorImpl.class.getName());
+
+    private static final Function<IAssignedTask, Integer> GET_PRIORITY =
+        new Function<IAssignedTask, Integer>() {
+          @Override public Integer apply(IAssignedTask task) {
+            return task.getTask().getPriority();
+          }
+        };
+
+    private final AtomicLong tasksPreempted = Stats.exportLong("preemptor_tasks_preempted");
+    private final AtomicLong failedPreemptions = Stats.exportLong("preemptor_failed_preemptions");
+    // Incremented every time the preemptor is invoked and finds tasks pending and preemptable tasks
+    private final AtomicLong attemptedPreemptions = Stats.exportLong("preemptor_attempts");
+    // Incremented every time we fail to find tasks to preempt for a pending task.
+    private final AtomicLong noSlotsFound = Stats.exportLong("preemptor_no_slots_found");
+
+    private final Predicate<IScheduledTask> isIdleTask = new Predicate<IScheduledTask>() {
+      @Override public boolean apply(IScheduledTask task) {
+        return (clock.nowMillis() - Iterables.getLast(task.getTaskEvents()).getTimestamp())
+            >= preemptionCandidacyDelay.as(Time.MILLISECONDS);
+      }
+    };
+
+    private final Storage storage;
+    private final SchedulerCore scheduler;
+    private final OfferQueue offerQueue;
+    private final SchedulingFilter schedulingFilter;
+    private final Amount<Long, Time> preemptionCandidacyDelay;
+    private final Clock clock;
+
+    /**
+     * Creates a new preemptor.
+     *
+     * @param storage Backing store for tasks.
+     * @param scheduler Scheduler to fetch task information from, and instruct when preempting
+     *                  tasks.
+     * @param offerQueue Queue that contains available Mesos resource offers.
+     * @param schedulingFilter Filter to identify whether tasks may reside on given slaves.
+     * @param preemptionCandidacyDelay Time a task must be PENDING before it may preempt other
+     *                                 tasks.
+     * @param clock Clock to check current time.
+     */
+    @Inject
+    PreemptorImpl(
+        Storage storage,
+        SchedulerCore scheduler,
+        OfferQueue offerQueue,
+        SchedulingFilter schedulingFilter,
+        @PreemptionDelay Amount<Long, Time> preemptionCandidacyDelay,
+        Clock clock) {
+
+      this.storage = checkNotNull(storage);
+      this.scheduler = checkNotNull(scheduler);
+      this.offerQueue = checkNotNull(offerQueue);
+      this.schedulingFilter = checkNotNull(schedulingFilter);
+      this.preemptionCandidacyDelay = checkNotNull(preemptionCandidacyDelay);
+      this.clock = checkNotNull(clock);
+    }
+
+    private List<IAssignedTask> fetch(Query.Builder query, Predicate<IScheduledTask> filter) {
+      return Lists.newArrayList(Iterables.transform(Iterables.filter(
+          Storage.Util.consistentFetchTasks(storage, query), filter),
+          SCHEDULED_TO_ASSIGNED));
+    }
+
+    private List<IAssignedTask> fetch(Query.Builder query) {
+      return fetch(query, Predicates.<IScheduledTask>alwaysTrue());
+    }
+
+    private static final Function<IAssignedTask, String> TASK_TO_SLAVE_ID =
+        new Function<IAssignedTask, String>() {
+          @Override public String apply(IAssignedTask input) {
+            return input.getSlaveId();
+          }
+        };
+
+    private static final Function<IAssignedTask, String> TASK_TO_HOST =
+        new Function<IAssignedTask, String>() {
+          @Override public String apply(IAssignedTask input) {
+            return input.getSlaveHost();
+          }
+        };
+
+    private static Predicate<IAssignedTask> canPreempt(final IAssignedTask pending) {
+      return new Predicate<IAssignedTask>() {
+        @Override public boolean apply(IAssignedTask possibleVictim) {
+          return preemptionFilter(possibleVictim).apply(pending);
+        }
+      };
+    }
+
+    private static final Function<IAssignedTask, ResourceSlot> TASK_TO_RESOURCES =
+        new Function<IAssignedTask, ResourceSlot>() {
+          @Override public ResourceSlot apply(IAssignedTask input) {
+            return ResourceSlot.from(input.getTask());
+          }
+        };
+
+    private static final Function<Offer, ResourceSlot> OFFER_TO_RESOURCE_SLOT =
+        new Function<Offer, ResourceSlot>() {
+          @Override public ResourceSlot apply(Offer offer) {
+            return ResourceSlot.from(offer);
+          }
+        };
+
+    private static final Function<Offer, String> OFFER_TO_HOST =
+        new Function<Offer, String>() {
+          @Override public String apply(Offer offer) {
+            return offer.getHostname();
+          }
+        };
+
+    // TODO(zmanji) Consider using Dominant Resource Fairness for ordering instead of the vector
+    // ordering
+    private static final Ordering<IAssignedTask> RESOURCE_ORDER =
+        ResourceSlot.ORDER.onResultOf(TASK_TO_RESOURCES).reverse();
+
+    /**
+     * Optional.absent indicates that this slave does not have enough resources to satisfy the task.
+     * The empty set indicates the offers (slack) are enough.
+     * A set with elements indicates those tasks and the offers are enough.
+     */
+    private Optional<Set<IAssignedTask>> getTasksToPreempt(
+        Iterable<IAssignedTask> possibleVictims,
+        Iterable<Offer> offers,
+        IAssignedTask pendingTask) {
+
+      // This enforces the precondition that all of the resources are from the same host. We need to
+      // get the host for the schedulingFilter.
+      Set<String> hosts = ImmutableSet.<String>builder()
+          .addAll(Iterables.transform(possibleVictims, TASK_TO_HOST))
+          .addAll(Iterables.transform(offers, OFFER_TO_HOST)).build();
+
+      String host = Iterables.getOnlyElement(hosts);
+
+      ResourceSlot slackResources =
+          ResourceSlot.sum(Iterables.transform(offers, OFFER_TO_RESOURCE_SLOT));
+
+      if (!Iterables.isEmpty(offers)) {
+        Set<SchedulingFilter.Veto> vetos = schedulingFilter.filter(
+            slackResources,
+            host,
+            pendingTask.getTask(),
+            pendingTask.getTaskId());
+
+        if (vetos.isEmpty()) {
+          return Optional.<Set<IAssignedTask>>of(ImmutableSet.<IAssignedTask>of());
+        }
+      }
+
+      FluentIterable<IAssignedTask> preemptableTasks =
+          FluentIterable.from(possibleVictims).filter(canPreempt(pendingTask));
+
+      if (preemptableTasks.isEmpty()) {
+        return Optional.absent();
+      }
+
+      List<IAssignedTask> toPreemptTasks = Lists.newArrayList();
+
+      Iterable<IAssignedTask> sortedVictims = RESOURCE_ORDER.immutableSortedCopy(preemptableTasks);
+
+      for (IAssignedTask victim : sortedVictims) {
+        toPreemptTasks.add(victim);
+
+        ResourceSlot totalResource = ResourceSlot.sum(
+            ResourceSlot.sum(Iterables.transform(toPreemptTasks, TASK_TO_RESOURCES)),
+            slackResources);
+
+        Set<SchedulingFilter.Veto> vetos = schedulingFilter.filter(
+            totalResource,
+            host,
+            pendingTask.getTask(),
+            pendingTask.getTaskId());
+
+        if (vetos.isEmpty()) {
+          return Optional.<Set<IAssignedTask>>of(ImmutableSet.copyOf(toPreemptTasks));
+        }
+      }
+      return Optional.absent();
+    }
+
+    private static final Function<Offer, String> OFFER_TO_SLAVE_ID =
+        new Function<Offer, String>() {
+          @Override public String apply(Offer offer) {
+            return offer.getSlaveId().getValue();
+          }
+        };
+
+    private Multimap<String, IAssignedTask> getSlavesToActiveTasks() {
+      // Only non-pending active tasks may be preempted.
+      List<IAssignedTask> activeTasks = fetch(CANDIDATE_QUERY);
+
+      // Walk through the preemption candidates in reverse scheduling order.
+      Collections.sort(activeTasks, Tasks.SCHEDULING_ORDER.reverse());
+
+      // Group the tasks by slave id so they can be paired with offers from the same slave.
+      return Multimaps.index(activeTasks, TASK_TO_SLAVE_ID);
+    }
+
+    @Override
+    public synchronized Optional<String> findPreemptionSlotFor(String taskId) {
+      List<IAssignedTask> pendingTasks =
+          fetch(Query.statusScoped(PENDING).byId(taskId), isIdleTask);
+
+      // Task is no longer PENDING no need to preempt
+      if (pendingTasks.isEmpty()) {
+        return Optional.absent();
+      }
+
+      IAssignedTask pendingTask = Iterables.getOnlyElement(pendingTasks);
+
+      Multimap<String, IAssignedTask> slavesToActiveTasks = getSlavesToActiveTasks();
+
+      if (slavesToActiveTasks.isEmpty()) {
+        return Optional.absent();
+      }
+
+      attemptedPreemptions.incrementAndGet();
+
+      // Group the offers by slave id so they can be paired with active tasks from the same slave.
+      Multimap<String, Offer> slavesToOffers =
+          Multimaps.index(offerQueue.getOffers(), OFFER_TO_SLAVE_ID);
+
+      Set<String> allSlaves = ImmutableSet.<String>builder()
+          .addAll(slavesToOffers.keySet())
+          .addAll(slavesToActiveTasks.keySet())
+          .build();
+
+      for (String slaveID : allSlaves) {
+        Optional<Set<IAssignedTask>> toPreemptTasks = getTasksToPreempt(
+            slavesToActiveTasks.get(slaveID),
+            slavesToOffers.get(slaveID),
+            pendingTask);
+
+        if (toPreemptTasks.isPresent()) {
+          try {
+            for (IAssignedTask toPreempt : toPreemptTasks.get()) {
+              scheduler.preemptTask(toPreempt, pendingTask);
+              tasksPreempted.incrementAndGet();
+            }
+            return Optional.of(slaveID);
+          } catch (ScheduleException e) {
+            LOG.log(Level.SEVERE, "Preemption failed", e);
+            failedPreemptions.incrementAndGet();
+          }
+        }
+      }
+
+      noSlotsFound.incrementAndGet();
+      return Optional.absent();
+    }
+
+    private static final Predicate<IAssignedTask> IS_PRODUCTION =
+        Predicates.compose(Tasks.IS_PRODUCTION, Tasks.ASSIGNED_TO_INFO);
+
+    /**
+     * Creates a static filter that will identify tasks that may preempt the provided task.
+     * A task may preempt another task if the following conditions hold true:
+     * - The resources reserved for {@code preemptableTask} are sufficient to satisfy the task.
+     * - The tasks are owned by the same user and the priority of {@code preemptableTask} is lower
+     *     OR {@code preemptableTask} is non-production and the compared task is production.
+     *
+     * @param preemptableTask Task to possibly preempt.
+     * @return A filter that will compare the priorities and resources required by other tasks
+     *     with {@code preemptableTask}.
+     */
+    private static Predicate<IAssignedTask> preemptionFilter(IAssignedTask preemptableTask) {
+      Predicate<IAssignedTask> preemptableIsProduction = preemptableTask.getTask().isProduction()
+          ? Predicates.<IAssignedTask>alwaysTrue()
+          : Predicates.<IAssignedTask>alwaysFalse();
+
+      Predicate<IAssignedTask> priorityFilter =
+          greaterPriorityFilter(GET_PRIORITY.apply(preemptableTask));
+      return Predicates.or(
+          Predicates.and(Predicates.not(preemptableIsProduction), IS_PRODUCTION),
+          Predicates.and(isOwnedBy(getRole(preemptableTask)), priorityFilter)
+      );
+    }
+
+    private static Predicate<IAssignedTask> isOwnedBy(final String role) {
+      return new Predicate<IAssignedTask>() {
+        @Override public boolean apply(IAssignedTask task) {
+          return getRole(task).equals(role);
+        }
+      };
+    }
+
+    private static String getRole(IAssignedTask task) {
+      return task.getTask().getOwner().getRole();
+    }
+
+    private static Predicate<Integer> greaterThan(final int value) {
+      return new Predicate<Integer>() {
+        @Override public boolean apply(Integer input) {
+          return input > value;
+        }
+      };
+    }
+
+    private static Predicate<IAssignedTask> greaterPriorityFilter(int priority) {
+      return Predicates.compose(greaterThan(priority), GET_PRIORITY);
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java b/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java
new file mode 100644
index 0000000..eefc03a
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java
@@ -0,0 +1,188 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.async;
+
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Set;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+import com.twitter.aurora.gen.ScheduleStatus;
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.base.Tasks;
+import com.twitter.aurora.scheduler.storage.Storage;
+import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
+import com.twitter.aurora.scheduler.storage.entities.ITaskEvent;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.util.BackoffStrategy;
+import com.twitter.common.util.Clock;
+import com.twitter.common.util.Random;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import static com.twitter.aurora.gen.ScheduleStatus.KILLING;
+import static com.twitter.aurora.gen.ScheduleStatus.RESTARTING;
+
+/**
+ * Calculates scheduling delays for tasks.
+ */
+interface RescheduleCalculator {
+  /**
+   * Gets a timestamp for the task to become eligible for (re)scheduling at scheduler startup.
+   *
+   * @param task Task to calculate timestamp for.
+   * @return Timestamp in msec.
+   */
+  long getStartupReadyTimeMs(IScheduledTask task);
+
+  /**
+   * Gets a timestamp for the task to become eligible for (re)scheduling.
+   *
+   * @param task Task to calculate timestamp for.
+   * @return Timestamp in msec.
+   */
+  long getReadyTimeMs(IScheduledTask task);
+
+  class RescheduleCalculatorImpl implements RescheduleCalculator {
+
+    private static final Logger LOG = Logger.getLogger(TaskGroups.class.getName());
+
+    private final Storage storage;
+    private final RescheduleCalculatorSettings settings;
+    private final Clock clock;
+    private final Random random = new Random.SystemRandom(new java.util.Random());
+
+    private static final Predicate<ScheduleStatus> IS_ACTIVE_STATUS =
+        Predicates.in(Tasks.ACTIVE_STATES);
+
+    private static final Function<ITaskEvent, ScheduleStatus> TO_STATUS =
+        new Function<ITaskEvent, ScheduleStatus>() {
+          @Override public ScheduleStatus apply(ITaskEvent input) {
+            return input.getStatus();
+          }
+        };
+
+    private static final Set<ScheduleStatus> INTERRUPTED_TASK_STATES =
+        EnumSet.of(RESTARTING, KILLING);
+
+    private final Predicate<IScheduledTask> flapped = new Predicate<IScheduledTask>() {
+      @Override public boolean apply(IScheduledTask task) {
+        if (!task.isSetTaskEvents()) {
+          return false;
+        }
+
+        List<ITaskEvent> events = Lists.reverse(task.getTaskEvents());
+
+        // Avoid penalizing tasks that were interrupted by outside action, such as a user
+        // restarting them.
+        if (Iterables.any(Iterables.transform(events, TO_STATUS),
+            Predicates.in(INTERRUPTED_TASK_STATES))) {
+          return false;
+        }
+
+        ITaskEvent terminalEvent = Iterables.get(events, 0);
+        ScheduleStatus terminalState = terminalEvent.getStatus();
+        Preconditions.checkState(Tasks.isTerminated(terminalState));
+
+        ITaskEvent activeEvent =
+            Iterables.find(events, Predicates.compose(IS_ACTIVE_STATUS, TO_STATUS));
+
+        long thresholdMs = settings.flappingTaskThreashold.as(Time.MILLISECONDS);
+
+        return (terminalEvent.getTimestamp() - activeEvent.getTimestamp()) < thresholdMs;
+      }
+    };
+
+    static class RescheduleCalculatorSettings {
+      private final BackoffStrategy flappingTaskBackoff;
+      private final Amount<Long, Time> flappingTaskThreashold;
+      private final Amount<Integer, Time>  maxStartupRescheduleDelay;
+
+      RescheduleCalculatorSettings(
+          BackoffStrategy flappingTaskBackoff,
+          Amount<Long, Time> flappingTaskThreashold,
+          Amount<Integer, Time> maxStartupRescheduleDelay) {
+
+        this.flappingTaskBackoff = checkNotNull(flappingTaskBackoff);
+        this.flappingTaskThreashold = checkNotNull(flappingTaskThreashold);
+        this.maxStartupRescheduleDelay = checkNotNull(maxStartupRescheduleDelay);
+      }
+    }
+
+    @Inject
+    RescheduleCalculatorImpl(
+        Storage storage,
+        RescheduleCalculatorSettings settings,
+        Clock clock) {
+
+      this.storage = checkNotNull(storage);
+      this.settings = checkNotNull(settings);
+      this.clock = checkNotNull(clock);
+    }
+
+    @Override
+    public long getStartupReadyTimeMs(IScheduledTask task) {
+      return random.nextInt(settings.maxStartupRescheduleDelay.as(Time.MILLISECONDS))
+          + getTaskReadyTimestamp(task);
+    }
+
+    @Override
+    public long getReadyTimeMs(IScheduledTask task) {
+      return getTaskReadyTimestamp(task);
+    }
+
+    private Optional<IScheduledTask> getTaskAncestor(IScheduledTask task) {
+      if (!task.isSetAncestorId()) {
+        return Optional.absent();
+      }
+
+      ImmutableSet<IScheduledTask> res =
+          Storage.Util.weaklyConsistentFetchTasks(storage, Query.taskScoped(task.getAncestorId()));
+
+      return Optional.fromNullable(Iterables.getOnlyElement(res, null));
+    }
+
+    private long getTaskReadyTimestamp(IScheduledTask task) {
+      Optional<IScheduledTask> curTask = getTaskAncestor(task);
+      long penaltyMs = 0;
+      while (curTask.isPresent() && flapped.apply(curTask.get())) {
+        LOG.info(
+            String.format("Ancestor of %s flapped: %s", Tasks.id(task), Tasks.id(curTask.get())));
+        long newPenalty = settings.flappingTaskBackoff.calculateBackoffMs(penaltyMs);
+        // If the backoff strategy is truncated then there is no need for us to continue.
+        if (newPenalty == penaltyMs) {
+          break;
+        }
+        penaltyMs = newPenalty;
+        curTask = getTaskAncestor(curTask.get());
+      }
+
+      return penaltyMs + clock.nowMillis();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/async/TaskGroup.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskGroup.java b/src/main/java/org/apache/aurora/scheduler/async/TaskGroup.java
new file mode 100644
index 0000000..018022b
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskGroup.java
@@ -0,0 +1,142 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.async;
+
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Ordering;
+
+import com.twitter.aurora.scheduler.async.TaskGroups.GroupKey;
+import com.twitter.common.base.Function;
+import com.twitter.common.util.BackoffStrategy;
+
+/**
+ * A group of task IDs that are eligible for scheduling, but may be waiting for a backoff to expire.
+ */
+class TaskGroup {
+  private final GroupKey key;
+  private final BackoffStrategy backoffStrategy;
+
+  private static final Function<Task, Long> TO_TIMESTAMP = new Function<Task, Long>() {
+    @Override public Long apply(Task item) {
+      return item.readyTimestampMs;
+    }
+  };
+
+  // Order the tasks by the time they are ready to be scheduled
+  private static final Ordering<Task> TASK_ORDERING = Ordering.natural().onResultOf(TO_TIMESTAMP);
+  // 11 is the magic number used by PriorityBlockingQueue as the initial size.
+  private final Queue<Task> tasks = new PriorityBlockingQueue<>(11, TASK_ORDERING);
+  // Penalty for the task group for failing to schedule.
+  private final AtomicLong penaltyMs;
+
+  TaskGroup(GroupKey key, BackoffStrategy backoffStrategy) {
+    this.key = key;
+    this.backoffStrategy = backoffStrategy;
+    penaltyMs = new AtomicLong();
+    resetPenaltyAndGet();
+  }
+
+  GroupKey getKey() {
+    return key;
+  }
+
+  private static final Function<Task, String> TO_TASK_ID =
+      new Function<Task, String>() {
+        @Override public String apply(Task item) {
+          return item.taskId;
+        }
+      };
+
+  /**
+   * Removes the task at the head of the queue.
+   *
+   * @return String the id of the head task.
+   * @throws IllegalStateException if the queue is empty.
+   */
+  String pop() throws IllegalStateException {
+    Task head = tasks.poll();
+    Preconditions.checkState(head != null);
+    return head.taskId;
+  }
+
+  void remove(String taskId) {
+    Iterables.removeIf(tasks, Predicates.compose(Predicates.equalTo(taskId), TO_TASK_ID));
+  }
+
+  void push(final String taskId, long readyTimestamp) {
+    tasks.offer(new Task(taskId, readyTimestamp));
+  }
+
+  synchronized long resetPenaltyAndGet() {
+    penaltyMs.set(backoffStrategy.calculateBackoffMs(0));
+    return getPenaltyMs();
+  }
+
+  synchronized long penalizeAndGet() {
+    penaltyMs.set(backoffStrategy.calculateBackoffMs(getPenaltyMs()));
+    return getPenaltyMs();
+  }
+
+  GroupState isReady(long nowMs) {
+    Task task = tasks.peek();
+    if (task == null) {
+      return GroupState.EMPTY;
+    }
+
+    if (task.readyTimestampMs > nowMs) {
+      return GroupState.NOT_READY;
+    }
+    return GroupState.READY;
+  }
+  // Begin methods used for debug interfaces.
+
+  public String getName() {
+    return key.toString();
+  }
+
+  // TODO(zmanji): Return Task instances here. Can use them to display flapping penalty on web UI.
+  public Set<String> getTaskIds() {
+    return ImmutableSet.copyOf(Iterables.transform(tasks, TO_TASK_ID));
+  }
+
+  public long getPenaltyMs() {
+    return penaltyMs.get();
+  }
+
+  private static class Task {
+    private final String taskId;
+    private final long readyTimestampMs;
+
+    Task(String taskId, long readyTimestampMs) {
+      this.taskId = Preconditions.checkNotNull(taskId);
+      this.readyTimestampMs = readyTimestampMs;
+    }
+  }
+
+  enum GroupState {
+    EMPTY,      // The group is empty.
+    NOT_READY,  // Every task in the group is not ready yet.
+    READY       // The task at the head of the queue is ready.
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/async/TaskGroups.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskGroups.java b/src/main/java/org/apache/aurora/scheduler/async/TaskGroups.java
new file mode 100644
index 0000000..a59e5c8
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskGroups.java
@@ -0,0 +1,294 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.async;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.google.common.base.Objects;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.eventbus.Subscribe;
+import com.google.common.util.concurrent.RateLimiter;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import com.twitter.aurora.scheduler.base.JobKeys;
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.base.Tasks;
+import com.twitter.aurora.scheduler.events.PubsubEvent.EventSubscriber;
+import com.twitter.aurora.scheduler.events.PubsubEvent.StorageStarted;
+import com.twitter.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+import com.twitter.aurora.scheduler.events.PubsubEvent.TasksDeleted;
+import com.twitter.aurora.scheduler.storage.Storage;
+import com.twitter.aurora.scheduler.storage.entities.IAssignedTask;
+import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
+import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
+import com.twitter.common.application.ShutdownRegistry;
+import com.twitter.common.base.Command;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.Stats;
+import com.twitter.common.util.BackoffStrategy;
+import com.twitter.common.util.Clock;
+import com.twitter.common.util.concurrent.ExecutorServiceShutdown;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import static com.twitter.aurora.gen.ScheduleStatus.PENDING;
+import static com.twitter.aurora.scheduler.async.TaskGroup.GroupState;
+
+/**
+ * A collection of task groups, where a task group is a collection of tasks that are known to be
+ * equal in the way they schedule. This is expected to be tasks associated with the same job key,
+ * who also have {@code equal()} {@link ITaskConfig} values.
+ * <p>
+ * This is used to prevent redundant work in trying to schedule tasks as well as to provide
+ * nearly-equal responsiveness when scheduling across jobs.  In other words, a 1000 instance job
+ * cannot starve a 1 instance job.
+ */
+public class TaskGroups implements EventSubscriber {
+
+  private static final Logger LOG = Logger.getLogger(TaskGroups.class.getName());
+
+  private final Storage storage;
+  private final LoadingCache<GroupKey, TaskGroup> groups;
+  private final Clock clock;
+  private final RescheduleCalculator rescheduleCalculator;
+
+  static class TaskGroupsSettings {
+    private final BackoffStrategy taskGroupBackoff;
+    private final RateLimiter rateLimiter;
+
+    TaskGroupsSettings(BackoffStrategy taskGroupBackoff, RateLimiter rateLimiter) {
+      this.taskGroupBackoff = checkNotNull(taskGroupBackoff);
+      this.rateLimiter = checkNotNull(rateLimiter);
+    }
+  }
+
+  @Inject
+  TaskGroups(
+      ShutdownRegistry shutdownRegistry,
+      Storage storage,
+      TaskGroupsSettings settings,
+      TaskScheduler taskScheduler,
+      Clock clock,
+      RescheduleCalculator rescheduleCalculator) {
+
+    this(
+        createThreadPool(shutdownRegistry),
+        storage,
+        settings.taskGroupBackoff,
+        settings.rateLimiter,
+        taskScheduler,
+        clock,
+        rescheduleCalculator);
+  }
+
+  TaskGroups(
+      final ScheduledExecutorService executor,
+      final Storage storage,
+      final BackoffStrategy taskGroupBackoffStrategy,
+      final RateLimiter rateLimiter,
+      final TaskScheduler taskScheduler,
+      final Clock clock,
+      final RescheduleCalculator rescheduleCalculator) {
+
+    this.storage = checkNotNull(storage);
+    checkNotNull(executor);
+    checkNotNull(taskGroupBackoffStrategy);
+    checkNotNull(rateLimiter);
+    checkNotNull(taskScheduler);
+    this.clock = checkNotNull(clock);
+    this.rescheduleCalculator = checkNotNull(rescheduleCalculator);
+
+    final TaskScheduler ratelLimitedScheduler = new TaskScheduler() {
+      @Override public TaskSchedulerResult schedule(String taskId) {
+        rateLimiter.acquire();
+        return taskScheduler.schedule(taskId);
+      }
+    };
+
+    groups = CacheBuilder.newBuilder().build(new CacheLoader<GroupKey, TaskGroup>() {
+      @Override public TaskGroup load(GroupKey key) {
+        TaskGroup group = new TaskGroup(key, taskGroupBackoffStrategy);
+        LOG.info("Evaluating group " + key + " in " + group.getPenaltyMs() + " ms");
+        startGroup(group, executor, ratelLimitedScheduler);
+        return group;
+      }
+    });
+  }
+
+  private synchronized boolean maybeInvalidate(TaskGroup group) {
+    if (group.getTaskIds().isEmpty()) {
+      groups.invalidate(group.getKey());
+      return true;
+    }
+    return false;
+  }
+
+  private void startGroup(
+      final TaskGroup group,
+      final ScheduledExecutorService executor,
+      final TaskScheduler taskScheduler) {
+
+    Runnable monitor = new Runnable() {
+      @Override public void run() {
+        GroupState state = group.isReady(clock.nowMillis());
+
+        switch (state) {
+          case EMPTY:
+            maybeInvalidate(group);
+            break;
+
+          case READY:
+            String id = group.pop();
+            TaskScheduler.TaskSchedulerResult result = taskScheduler.schedule(id);
+            switch (result) {
+              case SUCCESS:
+                if (!maybeInvalidate(group)) {
+                  executor.schedule(this, group.resetPenaltyAndGet(), TimeUnit.MILLISECONDS);
+                }
+                break;
+
+              case TRY_AGAIN:
+                group.push(id, clock.nowMillis());
+                executor.schedule(this, group.penalizeAndGet(), TimeUnit.MILLISECONDS);
+                break;
+
+              default:
+                throw new IllegalStateException("Unknown TaskSchedulerResult " + result);
+            }
+            break;
+
+          case NOT_READY:
+            executor.schedule(this, group.getPenaltyMs(), TimeUnit.MILLISECONDS);
+            break;
+
+          default:
+            throw new IllegalStateException("Unknown GroupState " + state);
+        }
+      }
+    };
+    executor.schedule(monitor, group.getPenaltyMs(), TimeUnit.MILLISECONDS);
+  }
+
+  private static ScheduledExecutorService createThreadPool(ShutdownRegistry shutdownRegistry) {
+    // TODO(William Farner): Leverage ExceptionHandlingScheduledExecutorService:
+    // com.twitter.common.util.concurrent.ExceptionHandlingScheduledExecutorService
+    final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(
+        1,
+        new ThreadFactoryBuilder().setDaemon(true).setNameFormat("TaskScheduler-%d").build());
+    Stats.exportSize("schedule_queue_size", executor.getQueue());
+    shutdownRegistry.addAction(new Command() {
+      @Override public void execute() {
+        new ExecutorServiceShutdown(executor, Amount.of(1L, Time.SECONDS)).execute();
+      }
+    });
+    return executor;
+  }
+
+  private synchronized void add(IAssignedTask task, long readyTimestamp) {
+    groups.getUnchecked(new GroupKey(task.getTask())).push(task.getTaskId(), readyTimestamp);
+  }
+
+  /**
+   * Informs the task groups of a task state change.
+   * <p>
+   * This is used to observe {@link com.twitter.aurora.gen.ScheduleStatus#PENDING} tasks and begin
+   * attempting to schedule them.
+   *
+   * @param stateChange State change notification.
+   */
+  @Subscribe
+  public synchronized void taskChangedState(TaskStateChange stateChange) {
+    if (stateChange.getNewState() == PENDING) {
+      add(
+          stateChange.getTask().getAssignedTask(),
+          rescheduleCalculator.getReadyTimeMs(stateChange.getTask()));
+    }
+  }
+
+  /**
+   * Signals that storage has started and is consistent.
+   * <p>
+   * Upon this signal, all {@link com.twitter.aurora.gen.ScheduleStatus#PENDING} tasks in the stoage
+   * will become eligible for scheduling.
+   *
+   * @param event Storage started notification.
+   */
+  @Subscribe
+  public void storageStarted(StorageStarted event) {
+    for (IScheduledTask task
+        : Storage.Util.consistentFetchTasks(storage, Query.unscoped().byStatus(PENDING))) {
+
+      add(task.getAssignedTask(), rescheduleCalculator.getStartupReadyTimeMs(task));
+    }
+  }
+
+  /**
+   * Signals the scheduler that tasks have been deleted.
+   *
+   * @param deleted Tasks deleted event.
+   */
+  @Subscribe
+  public synchronized void tasksDeleted(TasksDeleted deleted) {
+    for (IAssignedTask task
+        : Iterables.transform(deleted.getTasks(), Tasks.SCHEDULED_TO_ASSIGNED)) {
+      TaskGroup group = groups.getIfPresent(new GroupKey(task.getTask()));
+      if (group != null) {
+        group.remove(task.getTaskId());
+      }
+    }
+  }
+
+  public Iterable<TaskGroup> getGroups() {
+    return ImmutableSet.copyOf(groups.asMap().values());
+  }
+
+  static class GroupKey {
+    private final ITaskConfig canonicalTask;
+
+    GroupKey(ITaskConfig task) {
+      this.canonicalTask = task;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(canonicalTask);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (!(o instanceof GroupKey)) {
+        return false;
+      }
+      GroupKey other = (GroupKey) o;
+      return Objects.equal(canonicalTask, other.canonicalTask);
+    }
+
+    @Override
+    public String toString() {
+      return JobKeys.toPath(Tasks.INFO_TO_JOB_KEY.apply(canonicalTask));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
new file mode 100644
index 0000000..0ad9e13
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
@@ -0,0 +1,257 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.async;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Ticker;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Iterables;
+import com.google.common.eventbus.Subscribe;
+import com.google.inject.BindingAnnotation;
+
+import org.apache.mesos.Protos.Offer;
+import org.apache.mesos.Protos.SlaveID;
+import org.apache.mesos.Protos.TaskInfo;
+
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.events.PubsubEvent.EventSubscriber;
+import com.twitter.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+import com.twitter.aurora.scheduler.state.StateManager;
+import com.twitter.aurora.scheduler.state.TaskAssigner;
+import com.twitter.aurora.scheduler.storage.Storage;
+import com.twitter.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import com.twitter.aurora.scheduler.storage.Storage.MutateWork;
+import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
+import com.twitter.common.inject.TimedInterceptor.Timed;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.StatImpl;
+import com.twitter.common.stats.Stats;
+import com.twitter.common.util.Clock;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import static com.twitter.aurora.gen.ScheduleStatus.LOST;
+import static com.twitter.aurora.gen.ScheduleStatus.PENDING;
+
+/**
+ * Enables scheduling and preemption of tasks.
+ */
+interface TaskScheduler extends EventSubscriber {
+
+  /**
+   * Attempts to schedule a task, possibly performing irreversible actions.
+   *
+   * @param taskId The task to attempt to schedule.
+   * @return SUCCESS if the task was scheduled, TRY_AGAIN otherwise. The caller should call schedule
+   * again if TRY_AGAIN is returned.
+   */
+  TaskSchedulerResult schedule(String taskId);
+
+  enum TaskSchedulerResult {
+    SUCCESS,
+    TRY_AGAIN
+  }
+
+  /**
+   * An asynchronous task scheduler.  Scheduling of tasks is performed on a delay, where each task
+   * backs off after a failed scheduling attempt.
+   * <p>
+   * Pending tasks are advertised to the scheduler via internal pubsub notifications.
+   */
+  class TaskSchedulerImpl implements TaskScheduler {
+    /**
+     * Binding annotation for the time duration of reservations
+     */
+    @BindingAnnotation
+    @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+    @interface ReservationDuration { }
+
+    private static final Logger LOG = Logger.getLogger(TaskSchedulerImpl.class.getName());
+
+    private final Storage storage;
+    private final StateManager stateManager;
+    private final TaskAssigner assigner;
+    private final OfferQueue offerQueue;
+    private final Preemptor preemptor;
+    private final Reservations reservations;
+
+    private final AtomicLong scheduleAttemptsFired = Stats.exportLong("schedule_attempts_fired");
+    private final AtomicLong scheduleAttemptsFailed = Stats.exportLong("schedule_attempts_failed");
+
+    @Inject
+    TaskSchedulerImpl(
+        Storage storage,
+        StateManager stateManager,
+        TaskAssigner assigner,
+        OfferQueue offerQueue,
+        Preemptor preemptor,
+        @ReservationDuration Amount<Long, Time> reservationDuration,
+        final Clock clock) {
+
+      this.storage = checkNotNull(storage);
+      this.stateManager = checkNotNull(stateManager);
+      this.assigner = checkNotNull(assigner);
+      this.offerQueue = checkNotNull(offerQueue);
+      this.preemptor = checkNotNull(preemptor);
+      this.reservations = new Reservations(reservationDuration, clock);
+    }
+
+    private Function<Offer, Optional<TaskInfo>> getAssignerFunction(
+        final String taskId,
+        final IScheduledTask task) {
+
+      return new Function<Offer, Optional<TaskInfo>>() {
+        @Override public Optional<TaskInfo> apply(Offer offer) {
+          Optional<String> reservedTaskId = reservations.getSlaveReservation(offer.getSlaveId());
+          if (reservedTaskId.isPresent()) {
+            if (taskId.equals(reservedTaskId.get())) {
+              // Slave is reserved to satisfy this task.
+              return assigner.maybeAssign(offer, task);
+            } else {
+              // Slave is reserved for another task.
+              return Optional.absent();
+            }
+          } else {
+            // Slave is not reserved.
+            return assigner.maybeAssign(offer, task);
+          }
+        }
+      };
+    }
+
+    @VisibleForTesting
+    static final Optional<String> LAUNCH_FAILED_MSG =
+        Optional.of("Unknown exception attempting to schedule task.");
+
+    @Timed("task_schedule_attempt")
+    @Override
+    public TaskSchedulerResult schedule(final String taskId) {
+      scheduleAttemptsFired.incrementAndGet();
+      try {
+        return storage.write(new MutateWork.Quiet<TaskSchedulerResult>() {
+          @Override public TaskSchedulerResult apply(MutableStoreProvider store) {
+            LOG.fine("Attempting to schedule task " + taskId);
+            Query.Builder pendingTaskQuery = Query.taskScoped(taskId).byStatus(PENDING);
+            final IScheduledTask task =
+                Iterables.getOnlyElement(store.getTaskStore().fetchTasks(pendingTaskQuery), null);
+            if (task == null) {
+              LOG.warning("Failed to look up task " + taskId + ", it may have been deleted.");
+            } else {
+              try {
+                if (!offerQueue.launchFirst(getAssignerFunction(taskId, task))) {
+                  // Task could not be scheduled.
+                  maybePreemptFor(taskId);
+                  return TaskSchedulerResult.TRY_AGAIN;
+                }
+              } catch (OfferQueue.LaunchException e) {
+                LOG.log(Level.WARNING, "Failed to launch task.", e);
+                scheduleAttemptsFailed.incrementAndGet();
+
+                // The attempt to schedule the task failed, so we need to backpedal on the
+                // assignment.
+                // It is in the LOST state and a new task will move to PENDING to replace it.
+                // Should the state change fail due to storage issues, that's okay.  The task will
+                // time out in the ASSIGNED state and be moved to LOST.
+                stateManager.changeState(pendingTaskQuery, LOST, LAUNCH_FAILED_MSG);
+              }
+            }
+
+            return TaskSchedulerResult.SUCCESS;
+          }
+        });
+      } catch (RuntimeException e) {
+        // We catch the generic unchecked exception here to ensure tasks are not abandoned
+        // if there is a transient issue resulting in an unchecked exception.
+        LOG.log(Level.WARNING, "Task scheduling unexpectedly failed, will be retried", e);
+        scheduleAttemptsFailed.incrementAndGet();
+        return TaskSchedulerResult.TRY_AGAIN;
+      }
+    }
+
+    private void maybePreemptFor(String taskId) {
+      if (reservations.hasReservationForTask(taskId)) {
+        return;
+      }
+      Optional<String> slaveId = preemptor.findPreemptionSlotFor(taskId);
+      if (slaveId.isPresent()) {
+        this.reservations.add(SlaveID.newBuilder().setValue(slaveId.get()).build(), taskId);
+      }
+    }
+
+    @Subscribe
+    public void taskChanged(final TaskStateChange stateChangeEvent) {
+      if (stateChangeEvent.getOldState() == PENDING) {
+        reservations.invalidateTask(stateChangeEvent.getTaskId());
+      }
+    }
+
+    private static class Reservations {
+      private final Cache<SlaveID, String> reservations;
+
+      Reservations(final Amount<Long, Time> duration, final Clock clock) {
+        checkNotNull(duration);
+        checkNotNull(clock);
+        this.reservations = CacheBuilder.newBuilder()
+            .expireAfterWrite(duration.as(Time.MINUTES), TimeUnit.MINUTES)
+            .ticker(new Ticker() {
+              @Override public long read() {
+                return clock.nowNanos();
+              }
+            })
+            .build();
+        Stats.export(new StatImpl<Long>("reservation_cache_size") {
+          @Override public Long read() {
+            return reservations.size();
+          }
+        });
+      }
+
+      private synchronized void add(SlaveID slaveId, String taskId) {
+        reservations.put(slaveId, taskId);
+      }
+
+      private synchronized boolean hasReservationForTask(String taskId) {
+        return reservations.asMap().containsValue(taskId);
+      }
+
+      private synchronized Optional<String> getSlaveReservation(SlaveID slaveID) {
+        return Optional.fromNullable(reservations.getIfPresent(slaveID));
+      }
+
+      private synchronized void invalidateTask(String taskId) {
+        reservations.asMap().values().remove(taskId);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java b/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java
new file mode 100644
index 0000000..19848c7
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java
@@ -0,0 +1,269 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.async;
+
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Objects;
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.base.Supplier;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Ordering;
+import com.google.common.eventbus.Subscribe;
+
+import com.twitter.aurora.gen.ScheduleStatus;
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.base.Tasks;
+import com.twitter.aurora.scheduler.events.PubsubEvent.EventSubscriber;
+import com.twitter.aurora.scheduler.events.PubsubEvent.StorageStarted;
+import com.twitter.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+import com.twitter.aurora.scheduler.state.StateManager;
+import com.twitter.aurora.scheduler.storage.Storage;
+import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.StatsProvider;
+import com.twitter.common.util.Clock;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Observes task transitions and identifies tasks that are 'stuck' in a transient state.  Stuck
+ * tasks will be transitioned to the LOST state.
+ */
+class TaskTimeout implements EventSubscriber {
+  private static final Logger LOG = Logger.getLogger(TaskTimeout.class.getName());
+
+  @VisibleForTesting
+  static final String TIMED_OUT_TASKS_COUNTER = "timed_out_tasks";
+
+  @VisibleForTesting
+  static final String TRANSIENT_COUNT_STAT_NAME = "transient_states";
+
+  @VisibleForTesting
+  static final Optional<String> TIMEOUT_MESSAGE = Optional.of("Task timed out");
+
+  @VisibleForTesting
+  static final Set<ScheduleStatus> TRANSIENT_STATES = EnumSet.of(
+      ScheduleStatus.ASSIGNED,
+      ScheduleStatus.PREEMPTING,
+      ScheduleStatus.RESTARTING,
+      ScheduleStatus.KILLING);
+
+  @VisibleForTesting
+  static final Query.Builder TRANSIENT_QUERY = Query.unscoped().byStatus(TRANSIENT_STATES);
+
+  private final Map<TimeoutKey, Context> futures = Maps.newConcurrentMap();
+
+  private static final class TimeoutKey {
+    private final String taskId;
+    private final ScheduleStatus status;
+
+    private TimeoutKey(String taskId, ScheduleStatus status) {
+      this.taskId = taskId;
+      this.status = status;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(taskId, status);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (!(o instanceof TimeoutKey)) {
+        return false;
+      }
+      TimeoutKey key = (TimeoutKey) o;
+      return Objects.equal(taskId, key.taskId)
+          && (status == key.status);
+    }
+
+    @Override
+    public String toString() {
+      return taskId + ":" + status;
+    }
+  }
+
+  private final Storage storage;
+  private final ScheduledExecutorService executor;
+  private final StateManager stateManager;
+  private final long timeoutMillis;
+  private final Clock clock;
+  private final AtomicLong timedOutTasks;
+
+  @Inject
+  TaskTimeout(
+      Storage storage,
+      ScheduledExecutorService executor,
+      StateManager stateManager,
+      final Clock clock,
+      Amount<Long, Time> timeout,
+      StatsProvider statsProvider) {
+
+    this.storage = checkNotNull(storage);
+    this.executor = checkNotNull(executor);
+    this.stateManager = checkNotNull(stateManager);
+    this.timeoutMillis = timeout.as(Time.MILLISECONDS);
+    this.clock = checkNotNull(clock);
+    this.timedOutTasks = statsProvider.makeCounter(TIMED_OUT_TASKS_COUNTER);
+
+    exportStats(statsProvider);
+  }
+
+  private void registerTimeout(TimeoutKey key) {
+    // This is an obvious check-then-act, but:
+    //   - there isn't much of a better option, given that we have to get the Future before
+    //     inserting into the map
+    //   - a key collision only happens in practice if something is wrong externally to this class
+    //     (double event for the same state)
+    //   - the outcome is low-risk, we would wind up with a redundant Future that will eventually
+    //     no-op
+    if (!futures.containsKey(key)) {
+      Future<?> timeoutHandler = executor.schedule(
+          new TimedOutTaskHandler(key),
+          timeoutMillis,
+          TimeUnit.MILLISECONDS);
+      futures.put(key, new Context(clock.nowMillis(), timeoutHandler));
+    }
+  }
+
+  private static boolean isTransient(ScheduleStatus status) {
+    return TRANSIENT_STATES.contains(status);
+  }
+
+  @Subscribe
+  public void recordStateChange(TaskStateChange change) {
+    String taskId = change.getTaskId();
+    ScheduleStatus newState = change.getNewState();
+    if (isTransient(change.getOldState())) {
+      TimeoutKey oldKey = new TimeoutKey(taskId, change.getOldState());
+      Context context = futures.remove(oldKey);
+      if (context != null) {
+        LOG.fine("Canceling state timeout for task " + oldKey);
+        context.future.cancel(false);
+      }
+    }
+
+    if (isTransient(newState)) {
+      registerTimeout(new TimeoutKey(taskId, change.getNewState()));
+    }
+  }
+
+  @Subscribe
+  public void storageStarted(StorageStarted event) {
+    for (IScheduledTask task : Storage.Util.consistentFetchTasks(storage, TRANSIENT_QUERY)) {
+      registerTimeout(new TimeoutKey(Tasks.id(task), task.getStatus()));
+    }
+  }
+
+  private class TimedOutTaskHandler implements Runnable {
+    private final TimeoutKey key;
+
+    TimedOutTaskHandler(TimeoutKey key) {
+      this.key = key;
+    }
+
+    @Override public void run() {
+      Context context = futures.get(key);
+      try {
+        if (context == null) {
+          LOG.warning("Timeout context not found for " + key);
+          return;
+        }
+
+        LOG.info("Timeout reached for task " + key);
+        // This query acts as a CAS by including the state that we expect the task to be in if the
+        // timeout is still valid.  Ideally, the future would have already been canceled, but in the
+        // event of a state transition race, including transientState prevents an unintended
+        // task timeout.
+        Query.Builder query = Query.taskScoped(key.taskId).byStatus(key.status);
+        // Note: This requires LOST transitions trigger Driver.killTask.
+        if (stateManager.changeState(query, ScheduleStatus.LOST, TIMEOUT_MESSAGE) > 0) {
+          timedOutTasks.incrementAndGet();
+        } else {
+          LOG.warning("Task " + key + " does not exist, or was not in the expected state.");
+        }
+      } finally {
+        futures.remove(key);
+      }
+    }
+  }
+
+  private class Context {
+    private final long timestampMillis;
+    private final Future<?> future;
+
+    Context(long timestampMillis, Future<?> future) {
+      this.timestampMillis = timestampMillis;
+      this.future = future;
+    }
+  }
+
+  private static final Function<Context, Long> CONTEXT_TIMESTAMP = new Function<Context, Long>() {
+    @Override public Long apply(Context context) {
+      return context.timestampMillis;
+    }
+  };
+
+  private static final Ordering<Context> TIMESTAMP_ORDER =
+      Ordering.natural().onResultOf(CONTEXT_TIMESTAMP);
+
+  @VisibleForTesting
+  static String waitingTimeStatName(ScheduleStatus status) {
+    return "scheduler_max_" + status + "_waiting_ms";
+  }
+
+  private void exportStats(StatsProvider statsProvider) {
+    statsProvider.makeGauge(TRANSIENT_COUNT_STAT_NAME, new Supplier<Number>() {
+      @Override public Number get() {
+          return futures.size();
+        }
+    });
+
+    for (final ScheduleStatus status : TRANSIENT_STATES) {
+      statsProvider.makeGauge(waitingTimeStatName(status), new Supplier<Number>() {
+        private final Predicate<TimeoutKey> statusMatcher = new Predicate<TimeoutKey>() {
+          @Override public boolean apply(TimeoutKey key) {
+            return key.status == status;
+          }
+        };
+
+        @Override public Number get() {
+          Iterable<Context> matches = Maps.filterKeys(futures, statusMatcher).values();
+          if (Iterables.isEmpty(matches)) {
+            return 0L;
+          } else {
+            return clock.nowMillis() - TIMESTAMP_ORDER.min(matches).timestampMillis;
+          }
+        }
+      });
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/base/CommandUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/base/CommandUtil.java b/src/main/java/org/apache/aurora/scheduler/base/CommandUtil.java
new file mode 100644
index 0000000..b11c683
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/base/CommandUtil.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.base;
+
+import org.apache.mesos.Protos.CommandInfo;
+import org.apache.mesos.Protos.CommandInfo.URI;
+
+import com.twitter.common.base.MorePreconditions;
+
+/**
+ * Utility class for constructing {@link CommandInfo} objects given an executor URI.
+ */
+public final class CommandUtil {
+
+  private CommandUtil() {
+    // Utility class.
+  }
+
+  private static String uriBasename(String uri) {
+    int lastSlash = uri.lastIndexOf("/");
+    if (lastSlash == -1) {
+      return uri;
+    } else {
+      String basename = uri.substring(lastSlash + 1);
+      MorePreconditions.checkNotBlank(basename, "URI must not end with a slash.");
+
+      return basename;
+    }
+  }
+
+  /**
+   * Creates a description of a command that will fetch and execute the given URI to an executor
+   * binary.
+   *
+   * @param executorUri URI to the executor.
+   * @return A command that will fetch and execute the executor.
+   */
+  public static CommandInfo create(String executorUri) {
+    MorePreconditions.checkNotBlank(executorUri);
+
+    return CommandInfo.newBuilder()
+        .addUris(URI.newBuilder().setValue(executorUri).setExecutable(true))
+        .setValue("./" + uriBasename(executorUri))
+        .build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/base/Conversions.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/base/Conversions.java b/src/main/java/org/apache/aurora/scheduler/base/Conversions.java
new file mode 100644
index 0000000..2f84b5c
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/base/Conversions.java
@@ -0,0 +1,135 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.base;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.logging.Logger;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicates;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.Offer;
+import org.apache.mesos.Protos.TaskState;
+
+import com.twitter.aurora.gen.Attribute;
+import com.twitter.aurora.gen.HostAttributes;
+import com.twitter.aurora.gen.ScheduleStatus;
+
+/**
+ * Collection of utility functions to convert mesos protobuf types to internal thrift types.
+ */
+public final class Conversions {
+
+  private static final Logger LOG = Logger.getLogger(Conversions.class.getName());
+
+  private Conversions() {
+    // Utility class.
+  }
+
+  // Maps from mesos state to scheduler interface state.
+  private static final Map<TaskState, ScheduleStatus> STATE_TRANSLATION =
+      new ImmutableMap.Builder<TaskState, ScheduleStatus>()
+          .put(TaskState.TASK_STARTING, ScheduleStatus.STARTING)
+          .put(TaskState.TASK_RUNNING, ScheduleStatus.RUNNING)
+          .put(TaskState.TASK_FINISHED, ScheduleStatus.FINISHED)
+          .put(TaskState.TASK_FAILED, ScheduleStatus.FAILED)
+          .put(TaskState.TASK_KILLED, ScheduleStatus.KILLED)
+          .put(TaskState.TASK_LOST, ScheduleStatus.LOST)
+          .build();
+
+  /**
+   * Converts a protobuf state to an internal schedule status.
+   *
+   * @param taskState Protobuf state.
+   * @return Equivalent thrift-generated state.
+   */
+  public static ScheduleStatus convertProtoState(TaskState taskState) {
+    ScheduleStatus status = STATE_TRANSLATION.get(taskState);
+    Preconditions.checkArgument(status != null, "Unrecognized task state " + taskState);
+    return status;
+  }
+
+  private static final Function<Protos.Attribute, String> ATTRIBUTE_NAME =
+      new Function<Protos.Attribute, String>() {
+        @Override public String apply(Protos.Attribute attr) {
+          return attr.getName();
+        }
+      };
+
+  /**
+   * Typedef to make anonymous implementation more concise.
+   */
+  private abstract static class AttributeConverter
+      implements Function<Entry<String, Collection<Protos.Attribute>>, Attribute> {
+  }
+
+  private static final Function<Protos.Attribute, String> VALUE_CONVERTER =
+      new Function<Protos.Attribute, String>() {
+        @Override public String apply(Protos.Attribute attribute) {
+          switch (attribute.getType()) {
+            case SCALAR:
+              return String.valueOf(attribute.getScalar().getValue());
+
+            case TEXT:
+              return attribute.getText().getValue();
+
+            default:
+              LOG.finest("Unrecognized attribute type:" + attribute.getType() + " , ignoring.");
+              return null;
+          }
+        }
+      };
+
+  private static final AttributeConverter ATTRIBUTE_CONVERTER = new AttributeConverter() {
+    @Override public Attribute apply(Entry<String, Collection<Protos.Attribute>> entry) {
+      // Convert values and filter any that were ignored.
+      return new Attribute(
+          entry.getKey(),
+          FluentIterable.from(entry.getValue())
+              .transform(VALUE_CONVERTER)
+              .filter(Predicates.notNull())
+              .toSet());
+    }
+  };
+
+  /**
+   * Converts protobuf attributes into thrift-generated attributes.
+   *
+   * @param offer Resource offer.
+   * @return Equivalent thrift host attributes.
+   */
+  public static HostAttributes getAttributes(Offer offer) {
+    // Group by attribute name.
+    Multimap<String, Protos.Attribute> valuesByName =
+        Multimaps.index(offer.getAttributesList(), ATTRIBUTE_NAME);
+
+    // TODO(William Farner): Include slave id.
+    return new HostAttributes(
+        offer.getHostname(),
+        FluentIterable.from(valuesByName.asMap().entrySet())
+            .transform(ATTRIBUTE_CONVERTER)
+            .toSet())
+        .setSlaveId(offer.getSlaveId().getValue());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/base/JobKeys.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/base/JobKeys.java b/src/main/java/org/apache/aurora/scheduler/base/JobKeys.java
new file mode 100644
index 0000000..008e1cb
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/base/JobKeys.java
@@ -0,0 +1,166 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.base;
+
+import javax.annotation.Nullable;
+
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
+import com.google.common.base.Optional;
+import com.google.common.base.Strings;
+
+import com.twitter.aurora.gen.JobKey;
+import com.twitter.aurora.gen.TaskQuery;
+import com.twitter.aurora.scheduler.storage.entities.IJobConfiguration;
+import com.twitter.aurora.scheduler.storage.entities.IJobKey;
+import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * Utility class providing convenience functions relating to JobKeys.
+ */
+public final class JobKeys {
+  private JobKeys() {
+    // Utility class.
+  }
+
+  public static final Function<IJobConfiguration, IJobKey> FROM_CONFIG =
+      new Function<IJobConfiguration, IJobKey>() {
+        @Override public IJobKey apply(IJobConfiguration job) {
+          return job.getKey();
+        }
+      };
+
+  public static final Function<IJobKey, String> TO_ROLE =
+      new Function<IJobKey, String>() {
+        @Override public String apply(IJobKey jobKey) {
+          return jobKey.getRole();
+        }
+      };
+
+  public static final Function<IJobKey, String> TO_ENVIRONMENT =
+      new Function<IJobKey, String>() {
+        @Override public String apply(IJobKey jobKey) {
+          return jobKey.getEnvironment();
+        }
+      };
+
+  public static final Function<IJobKey, String> TO_JOB_NAME =
+      new Function<IJobKey, String>() {
+        @Override public String apply(IJobKey jobKey) {
+          return jobKey.getName();
+        }
+      };
+
+  public static final Function<IJobConfiguration, String> CONFIG_TO_ROLE =
+      Functions.compose(TO_ROLE, FROM_CONFIG);
+
+  /**
+   * Check that a jobKey struct is valid.
+   *
+   * @param jobKey The jobKey to validate.
+   * @return {@code true} if the jobKey validates.
+   */
+  public static boolean isValid(@Nullable IJobKey jobKey) {
+    return jobKey != null
+        && !Strings.isNullOrEmpty(jobKey.getRole())
+        && !Strings.isNullOrEmpty(jobKey.getEnvironment())
+        && !Strings.isNullOrEmpty(jobKey.getName());
+  }
+
+  /**
+   * Assert that a jobKey struct is valid.
+   *
+   * @param jobKey The key struct to validate.
+   * @return The validated jobKey argument.
+   * @throws IllegalArgumentException if the key struct fails to validate.
+   */
+  public static IJobKey assertValid(IJobKey jobKey) throws IllegalArgumentException {
+    checkArgument(isValid(jobKey));
+
+    return jobKey;
+  }
+
+  /**
+   * Attempt to create a valid JobKey from the given (role, environment, name) triple.
+   *
+   * @param role The job's role.
+   * @param environment The job's environment.
+   * @param name The job's name.
+   * @return A valid JobKey if it can be created.
+   * @throws IllegalArgumentException if the key fails to validate.
+   */
+  public static IJobKey from(String role, String environment, String name)
+      throws IllegalArgumentException {
+
+    IJobKey job = IJobKey.build(new JobKey()
+        .setRole(role)
+        .setEnvironment(environment)
+        .setName(name));
+    return assertValid(job);
+  }
+
+  /**
+   * Attempts to create a valid JobKey from the given task.
+   *
+   * @param task The task to create job key from.
+   * @return A valid JobKey if it can be created.
+   * @throws IllegalArgumentException if the key fails to validate.
+   */
+  public static IJobKey from(ITaskConfig task) throws IllegalArgumentException {
+    return from(task.getOwner().getRole(), task.getEnvironment(), task.getJobName());
+  }
+
+  /**
+   * Create a "/"-delimited String representation of a job key, suitable for logging but not
+   * necessarily suitable for use as a unique identifier.
+   *
+   * @param jobKey Key to represent.
+   * @return "/"-delimited representation of the key.
+   */
+  public static String toPath(IJobKey jobKey) {
+    return jobKey.getRole() + "/" + jobKey.getEnvironment() + "/" + jobKey.getName();
+  }
+
+  /**
+   * Create a "/"-delimited String representation of job key, suitable for logging but not
+   * necessarily suitable for use as a unique identifier.
+   *
+   * @param job Job to represent.
+   * @return "/"-delimited representation of the job's key.
+   */
+  public static String toPath(IJobConfiguration job) {
+    return toPath(job.getKey());
+  }
+
+  /**
+   * Attempt to extract a job key from the given query if it is scoped to a single job.
+   *
+   * @param query Query to extract the key from.
+   * @return A present if one can be extracted, absent otherwise.
+   */
+  public static Optional<IJobKey> from(Query.Builder query) {
+    if (Query.isJobScoped(query)) {
+      TaskQuery taskQuery = query.get();
+      return Optional.of(
+          from(taskQuery.getOwner().getRole(), taskQuery.getEnvironment(), taskQuery.getJobName()));
+
+    } else {
+      return Optional.absent();
+    }
+  }
+}