You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2013/12/31 22:20:25 UTC

[32/51] [partial] Rename twitter* and com.twitter to apache and org.apache directories to preserve all file history before the refactor.

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/state/ImmediateJobManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/ImmediateJobManager.java b/src/main/java/org/apache/aurora/scheduler/state/ImmediateJobManager.java
new file mode 100644
index 0000000..a085d7d
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/state/ImmediateJobManager.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.state;
+
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.configuration.SanitizedConfiguration;
+import com.twitter.aurora.scheduler.storage.Storage;
+import com.twitter.aurora.scheduler.storage.entities.IJobKey;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Job scheduler that accepts any job and executes it immediately.
+ */
+class ImmediateJobManager extends JobManager {
+
+  private static final Logger LOG = Logger.getLogger(ImmediateJobManager.class.getName());
+
+  private final StateManager stateManager;
+  private final Storage storage;
+
+  @Inject
+  ImmediateJobManager(StateManager stateManager, Storage storage) {
+    this.stateManager = checkNotNull(stateManager);
+    this.storage = checkNotNull(storage);
+  }
+
+  @Override
+  public String getUniqueKey() {
+    return "IMMEDIATE";
+  }
+
+  @Override
+  public boolean receiveJob(SanitizedConfiguration config) {
+    LOG.info("Launching " + config.getTaskConfigs().size() + " tasks.");
+    stateManager.insertPendingTasks(config.getTaskConfigs());
+    return true;
+  }
+
+  @Override
+  public boolean hasJob(final IJobKey jobKey) {
+    return !Storage.Util.consistentFetchTasks(storage, Query.jobScoped(jobKey).active()).isEmpty();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/state/JobFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/JobFilter.java b/src/main/java/org/apache/aurora/scheduler/state/JobFilter.java
new file mode 100644
index 0000000..3b8f5d8
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/state/JobFilter.java
@@ -0,0 +1,119 @@
+package com.twitter.aurora.scheduler.state;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+
+import com.twitter.aurora.scheduler.storage.entities.IJobConfiguration;
+import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
+
+/**
+ * An action that either accepts a configuration or rejects it with a reason.
+ */
+public interface JobFilter {
+  /**
+   * Accept the JobConfiguration or reject it with a reason.
+   *
+   * @param jobConfiguration The job configuration to filter.
+   * @return A result and the reason the result was reached.
+   */
+  JobFilterResult filter(IJobConfiguration jobConfiguration);
+
+  /**
+   * Accept the TaskConfig with the specified number of instances
+   * or reject it with a reason.
+   *
+   * @param template The task configuration to filter.
+   * @param instanceCount Number of instances to apply taskConfig to.
+   * @return A result and the reason the result was reached.
+   */
+  JobFilterResult filter(ITaskConfig template, int instanceCount);
+
+  /**
+   * An indication of whether a job passed a filter or not.
+   */
+  public static final class JobFilterResult {
+    private final boolean pass;
+    private final String reason;
+
+    private JobFilterResult(boolean pass, String reason) {
+      this.pass = pass;
+      this.reason = Preconditions.checkNotNull(reason);
+    }
+
+    /**
+     * Create a new result indicating the job has passed the filter.
+     *
+     * @return A result indicating the job has passed with a default reason.
+     * @see #fail(String)
+     */
+    public static JobFilterResult pass() {
+      return new JobFilterResult(true, "Accepted by filter.");
+    }
+
+    /**
+     * Create a new result indicating the job has passed the filter.
+     *
+     * @param reason A reason that the job has passed.
+     * @return A result indicating the job has passed because of the given reason.
+     * @throws NullPointerException if reason is {@code null}.
+     * @see #fail(String)
+     */
+    public static JobFilterResult pass(String reason) {
+      return new JobFilterResult(true, reason);
+    }
+
+    /**
+     * Create a new result indicating the job has failed the filter.
+     *
+     * @param reason A reason that the job has failed.
+     * @return A result indicating the job has failed because of the given reason.
+     * @throws NullPointerException if {@code reason} is {@code null}.
+     * @see #pass()
+     * @see #pass(String)
+     */
+    public static JobFilterResult fail(String reason) {
+      return new JobFilterResult(false, reason);
+    }
+
+    /**
+     * Indicates whether the result indicates a pass.
+     *
+     * @return {@code true} if the result indicates a pass.
+     */
+    public boolean isPass() {
+      return pass;
+    }
+
+    /**
+     * Indicates the reason for the result.
+     *
+     * @return The reason for the result.
+     */
+    public String getReason() {
+      return reason;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (!(o instanceof JobFilterResult)) {
+        return false;
+      }
+      JobFilterResult that = (JobFilterResult) o;
+      return Objects.equal(pass, that.pass)
+          && Objects.equal(reason, that.reason);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(pass, reason);
+    }
+
+    @Override
+    public String toString() {
+      return Objects.toStringHelper(this)
+          .add("pass", pass)
+          .add("reason", reason)
+          .toString();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/state/JobManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/JobManager.java b/src/main/java/org/apache/aurora/scheduler/state/JobManager.java
new file mode 100644
index 0000000..4cb4ff6
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/state/JobManager.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.state;
+
+import java.util.Collections;
+
+import javax.inject.Inject;
+
+import com.twitter.aurora.scheduler.base.ScheduleException;
+import com.twitter.aurora.scheduler.configuration.SanitizedConfiguration;
+import com.twitter.aurora.scheduler.storage.entities.IJobConfiguration;
+import com.twitter.aurora.scheduler.storage.entities.IJobKey;
+
+/**
+ * Interface for a job manager.  A job manager is responsible for deciding whether and when to
+ * trigger execution of a job.
+ */
+public abstract class JobManager {
+
+  // TODO(Bill Farner): Remove this. It is only used since the CronJobManager and SchedulerCoreImpl
+  // have a circular dependency.
+  @Inject
+  protected SchedulerCore schedulerCore;
+
+  /**
+   * Gets a key that uniquely identifies this manager type, to distinguish from other schedulers.
+   * These keys end up being persisted, so they must be considered permanently immutable.
+   *
+   * @return Job manager key.
+   */
+  public abstract String getUniqueKey();
+
+  /**
+   * Submits a job to the manager.  The job may be submitted to the job runner before this method
+   * returns or at any point in the future.  This method will return false if the manager will not
+   * execute the job.
+   *
+   * @param config The job to schedule.
+   * @return {@code true} If the manager accepted the job, {@code false} otherwise.
+   * @throws ScheduleException If there is a problem with scheduling the job.
+   */
+  public abstract boolean receiveJob(SanitizedConfiguration config) throws ScheduleException;
+
+  /**
+   * Fetches the configured jobs that this manager is storing.
+   *
+   * @return Jobs stored by this job manager.
+   */
+  // TODO(ksweeney): Consider adding a Map<JobKey, JobConfiguration> to complement this.
+  public Iterable<IJobConfiguration> getJobs() {
+    return Collections.emptyList();
+  }
+
+  /**
+   * Checks whether this manager is storing a job with the given key.
+   *
+   * @param jobKey Job key.
+   * @return {@code true} if the manager has a matching job, {@code false} otherwise.
+   */
+  public abstract boolean hasJob(IJobKey jobKey);
+
+  /**
+   * Instructs the manager to delete any jobs with the given key.
+   *
+   * @param jobKey Job key.
+   * @return {@code true} if a matching job was deleted.
+   */
+  public boolean deleteJob(IJobKey jobKey) {
+    // Optionally overridden by implementing class.
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/state/LockManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/LockManager.java b/src/main/java/org/apache/aurora/scheduler/state/LockManager.java
new file mode 100644
index 0000000..58d7b0e
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/state/LockManager.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.state;
+
+import com.google.common.base.Optional;
+
+import com.twitter.aurora.scheduler.storage.entities.ILock;
+import com.twitter.aurora.scheduler.storage.entities.ILockKey;
+
+/**
+ * Defines all {@link ILock} primitives like: acquire, release, validate.
+ */
+public interface LockManager {
+  /**
+   * Creates, saves and returns a new {@link ILock} with the specified {@link ILockKey}.
+   * This method is not re-entrant, i.e. attempting to acquire a lock with the
+   * same key would throw a {@link LockException}.
+   *
+   * @param lockKey A key uniquely identify the lock to be created.
+   * @param user Name of the user requesting a lock.
+   * @return A new ILock instance.
+   * @throws LockException In case the lock with specified key already exists.
+   */
+  ILock acquireLock(ILockKey lockKey, String user) throws LockException;
+
+  /**
+   * Releases (removes) the specified {@link ILock} from the system.
+   *
+   * @param lock {@link ILock} to remove from the system.
+   */
+  void releaseLock(ILock lock);
+
+  /**
+   * Verifies if the provided lock instance is identical to the one stored in the scheduler
+   * ONLY if the operation context represented by the {@link ILockKey} is in fact locked.
+   * No validation will be performed in case there is no correspondent scheduler lock
+   * found for the provided context.
+   *
+   * @param context Operation context to validate with the provided lock.
+   * @param heldLock Lock to validate.
+   * @throws LockException If provided lock does not exist or not identical to the stored one.
+   */
+  void validateIfLocked(ILockKey context, Optional<ILock> heldLock) throws LockException;
+
+  /**
+   * Thrown when {@link ILock} related operation failed.
+   */
+  class LockException extends Exception {
+    public LockException(String msg) {
+      super(msg);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java
new file mode 100644
index 0000000..4667f9f
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java
@@ -0,0 +1,130 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.state;
+
+import java.util.Date;
+
+import javax.inject.Inject;
+
+import com.google.common.base.Optional;
+
+import com.twitter.aurora.gen.Lock;
+import com.twitter.aurora.scheduler.base.JobKeys;
+import com.twitter.aurora.scheduler.storage.LockStore;
+import com.twitter.aurora.scheduler.storage.Storage;
+import com.twitter.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import com.twitter.aurora.scheduler.storage.Storage.MutateWork;
+import com.twitter.aurora.scheduler.storage.Storage.StoreProvider;
+import com.twitter.aurora.scheduler.storage.Storage.Work;
+import com.twitter.aurora.scheduler.storage.entities.ILock;
+import com.twitter.aurora.scheduler.storage.entities.ILockKey;
+import com.twitter.common.util.Clock;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Implements lock-related primitives required to provide mutual exclusion guarantees
+ * to the critical Scheduler state-mutating operations.
+ */
+class LockManagerImpl implements LockManager {
+  private final Storage storage;
+  private final Clock clock;
+  private final UUIDGenerator tokenGenerator;
+
+  @Inject
+  LockManagerImpl(Storage storage, Clock clock, UUIDGenerator tokenGenerator) {
+    this.storage = checkNotNull(storage);
+    this.clock = checkNotNull(clock);
+    this.tokenGenerator = checkNotNull(tokenGenerator);
+  }
+
+  @Override
+  public ILock acquireLock(final ILockKey lockKey, final String user) throws LockException {
+    return storage.write(new MutateWork<ILock, LockException>() {
+      @Override public ILock apply(Storage.MutableStoreProvider storeProvider)
+          throws LockException {
+
+        LockStore.Mutable lockStore = storeProvider.getLockStore();
+        Optional<ILock> existingLock = lockStore.fetchLock(lockKey);
+
+        if (existingLock.isPresent()) {
+          throw new LockException(String.format(
+              "Operation for: %s is already in progress. Started at: %s. Current owner: %s.",
+              formatLockKey(lockKey),
+              new Date(existingLock.get().getTimestampMs()).toString(),
+              existingLock.get().getUser()));
+        }
+
+        ILock lock = ILock.build(new Lock()
+            .setKey(lockKey.newBuilder())
+            .setToken(tokenGenerator.createNew().toString())
+            .setTimestampMs(clock.nowMillis())
+            .setUser(user));
+
+        lockStore.saveLock(lock);
+        return lock;
+      }
+    });
+  }
+
+  @Override
+  public void releaseLock(final ILock lock) {
+    storage.write(new MutateWork.NoResult.Quiet() {
+      @Override public void execute(MutableStoreProvider storeProvider) {
+        storeProvider.getLockStore().removeLock(lock.getKey());
+      }
+    });
+  }
+
+  @Override
+  public synchronized void validateIfLocked(final ILockKey context, Optional<ILock> heldLock)
+      throws LockException {
+
+    Optional<ILock> stored = storage.consistentRead(new Work.Quiet<Optional<ILock>>() {
+      @Override public Optional<ILock> apply(StoreProvider storeProvider) {
+        return storeProvider.getLockStore().fetchLock(context);
+      }
+    });
+
+    // The implementation below assumes the following use cases:
+    // +-----------+-----------------+----------+
+    // |   eq      |     held        | not held |
+    // +-----------+-----------------+----------+
+    // |stored     |(stored == held)?| invalid  |
+    // +-----------+-----------------+----------+
+    // |not stored |    invalid      |  valid   |
+    // +-----------+-----------------+----------+
+    if (!stored.equals(heldLock)) {
+      if (stored.isPresent()) {
+        throw new LockException(String.format(
+            "Unable to perform operation for: %s. Use override/cancel option.",
+            formatLockKey(context)));
+      } else if (heldLock.isPresent()) {
+        throw new LockException(
+            String.format("Invalid operation context: %s", formatLockKey(context)));
+      }
+    }
+  }
+
+  private static String formatLockKey(ILockKey lockKey) {
+    switch (lockKey.getSetField()) {
+      case JOB:
+        return JobKeys.toPath(lockKey.getJob());
+      default:
+        return "Unknown lock key type: " + lockKey.getSetField();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java b/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
new file mode 100644
index 0000000..d447f52
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
@@ -0,0 +1,302 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.state;
+
+import java.util.Set;
+
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import com.google.common.eventbus.Subscribe;
+
+import com.twitter.aurora.gen.HostAttributes;
+import com.twitter.aurora.gen.HostStatus;
+import com.twitter.aurora.gen.MaintenanceMode;
+import com.twitter.aurora.gen.ScheduleStatus;
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.base.Tasks;
+import com.twitter.aurora.scheduler.events.PubsubEvent;
+import com.twitter.aurora.scheduler.events.PubsubEvent.EventSubscriber;
+import com.twitter.aurora.scheduler.events.PubsubEvent.StorageStarted;
+import com.twitter.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+import com.twitter.aurora.scheduler.storage.AttributeStore;
+import com.twitter.aurora.scheduler.storage.Storage;
+import com.twitter.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import com.twitter.aurora.scheduler.storage.Storage.MutateWork;
+import com.twitter.aurora.scheduler.storage.Storage.StoreProvider;
+import com.twitter.aurora.scheduler.storage.Storage.Work;
+import com.twitter.common.base.Closure;
+import com.twitter.common.base.Closures;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import static com.twitter.aurora.gen.MaintenanceMode.DRAINED;
+import static com.twitter.aurora.gen.MaintenanceMode.DRAINING;
+
+/**
+ * Logic that puts hosts into maintenance mode, and triggers draining of hosts upon request.
+ * All state-changing functions return their results.  Additionally, all state-changing functions
+ * will ignore requests to change state of unknown hosts and subsequently omit these hosts from
+ * return values.
+ */
+public interface MaintenanceController {
+
+  /**
+   * Places hosts in maintenance mode.
+   * Hosts in maintenance mode are less-preferred for scheduling.
+   * No change will be made for hosts that are not recognized, and unrecognized hosts will not be
+   * included in the result.
+   *
+   * @param hosts Hosts to put into maintenance mode.
+   * @return The adjusted state of the hosts.
+   */
+  Set<HostStatus> startMaintenance(Set<String> hosts);
+
+  /**
+   * Initiate a drain of all active tasks on {@code hosts}.
+   *
+   * @param hosts Hosts to drain.
+   * @return The adjusted state of the hosts.  Hosts without any active tasks will be immediately
+   *         moved to DRAINED.
+   */
+  Set<HostStatus> drain(Set<String> hosts);
+
+  /**
+   * Fetches the current maintenance mode of {$code host}.
+   *
+   * @param host Host to fetch state for.
+   * @return Maintenance mode of host, {@link MaintenanceMode#NONE} if the host is not known.
+   */
+  MaintenanceMode getMode(String host);
+
+  /**
+   * Fetches the current state of {@code hosts}.
+   *
+   * @param hosts Hosts to fetch state for.
+   * @return The state of the hosts.
+   */
+  Set<HostStatus> getStatus(Set<String> hosts);
+
+  /**
+   * Moves {@code hosts} out of maintenance mode, returning them to mode NONE.
+   *
+   * @param hosts Hosts to move out of maintenance mode.
+   * @return The adjusted state of the hosts.
+   */
+  Set<HostStatus> endMaintenance(Set<String> hosts);
+
+  class MaintenanceControllerImpl implements MaintenanceController, EventSubscriber {
+    private final Storage storage;
+    private final StateManager stateManager;
+    private final Closure<PubsubEvent> eventSink;
+
+    @Inject
+    public MaintenanceControllerImpl(
+        Storage storage,
+        StateManager stateManager,
+        Closure<PubsubEvent> eventSink) {
+
+      this.storage = checkNotNull(storage);
+      this.stateManager = checkNotNull(stateManager);
+      this.eventSink = checkNotNull(eventSink);
+    }
+
+    private Set<HostStatus> watchDrainingTasks(
+        MutableStoreProvider store,
+        Set<String> hosts,
+        Closure<Query.Builder> callback) {
+
+      Set<String> emptyHosts = Sets.newHashSet();
+      for (String host : hosts) {
+        // If there are no tasks on the host, immediately transition to DRAINED.
+        Query.Builder query = Query.slaveScoped(host).active();
+        Set<String> activeTasks = FluentIterable.from(store.getTaskStore().fetchTasks(query))
+            .transform(Tasks.SCHEDULED_TO_ID)
+            .toSet();
+        if (activeTasks.isEmpty()) {
+          emptyHosts.add(host);
+        } else {
+          callback.execute(query);
+        }
+      }
+
+      return ImmutableSet.<HostStatus>builder()
+          .addAll(setMaintenanceMode(store, emptyHosts, DRAINED))
+          .addAll(setMaintenanceMode(store, Sets.difference(hosts, emptyHosts), DRAINING))
+          .build();
+    }
+
+    private Set<HostStatus> watchDrainingTasks(MutableStoreProvider store, Set<String> hosts) {
+      return watchDrainingTasks(store, hosts, Closures.<Query.Builder>noop());
+    }
+
+    private static final Predicate<HostAttributes> IS_DRAINING = new Predicate<HostAttributes>() {
+      @Override public boolean apply(HostAttributes attributes) {
+        return DRAINING == attributes.getMode();
+      }
+    };
+
+    /**
+     * Notifies the MaintenanceController that storage has started, and maintenance statuses are
+     * ready to be loaded.
+     *
+     * @param started Event.
+     */
+    @Subscribe
+    public void storageStarted(StorageStarted started) {
+      storage.write(new MutateWork.NoResult.Quiet() {
+        @Override protected void execute(MutableStoreProvider storeProvider) {
+          Set<String> drainingHosts =
+              FluentIterable.from(storeProvider.getAttributeStore().getHostAttributes())
+                  .filter(IS_DRAINING)
+                  .transform(HOST_NAME)
+                  .toSet();
+          watchDrainingTasks(storeProvider, drainingHosts);
+        }
+      });
+    }
+
+    /**
+     * Notifies the MaintenanceController that a task has changed state
+     *
+     * @param change Event
+     */
+    @Subscribe
+    public void taskChangedState(final TaskStateChange change) {
+      if (Tasks.isTerminated(change.getNewState())) {
+        final String host = change.getTask().getAssignedTask().getSlaveHost();
+        storage.write(new MutateWork.NoResult.Quiet() {
+          @Override public void execute(MutableStoreProvider store) {
+            // If the task _was_ associated with a draining host, and it was the last task on the
+            // host.
+            Optional<HostAttributes> attributes = store.getAttributeStore().getHostAttributes(host);
+            if (attributes.isPresent() && attributes.get().getMode() == DRAINING) {
+              Query.Builder builder = Query.slaveScoped(host).active();
+              if (store.getTaskStore().fetchTasks(builder).isEmpty()) {
+                setMaintenanceMode(store, ImmutableSet.of(host), DRAINED);
+              }
+            }
+          }
+        });
+      }
+    }
+
+    @Override
+    public Set<HostStatus> startMaintenance(final Set<String> hosts) {
+      return storage.write(new MutateWork.Quiet<Set<HostStatus>>() {
+        @Override public Set<HostStatus> apply(MutableStoreProvider storeProvider) {
+          return setMaintenanceMode(storeProvider, hosts, MaintenanceMode.SCHEDULED);
+        }
+      });
+    }
+
+    @VisibleForTesting
+    static final Optional<String> DRAINING_MESSAGE =
+        Optional.of("Draining machine for maintenance.");
+
+    @Override
+    public Set<HostStatus> drain(final Set<String> hosts) {
+      return storage.write(new MutateWork.Quiet<Set<HostStatus>>() {
+        @Override public Set<HostStatus> apply(MutableStoreProvider store) {
+          return watchDrainingTasks(store, hosts, new Closure<Query.Builder>() {
+            @Override public void execute(Query.Builder query) {
+              stateManager.changeState(query, ScheduleStatus.RESTARTING, DRAINING_MESSAGE);
+            }
+          });
+        }
+      });
+    }
+
+    private static final Function<HostAttributes, String> HOST_NAME =
+        new Function<HostAttributes, String>() {
+          @Override public String apply(HostAttributes attributes) {
+            return attributes.getHost();
+          }
+        };
+
+    private static final Function<HostAttributes, HostStatus> ATTRS_TO_STATUS =
+        new Function<HostAttributes, HostStatus>() {
+          @Override public HostStatus apply(HostAttributes attributes) {
+            return new HostStatus().setHost(attributes.getHost()).setMode(attributes.getMode());
+          }
+        };
+
+    private static final Function<HostStatus, MaintenanceMode> GET_MODE =
+      new Function<HostStatus, MaintenanceMode>() {
+        @Override public MaintenanceMode apply(HostStatus status) {
+          return status.getMode();
+        }
+      };
+
+    @Override
+    public MaintenanceMode getMode(final String host) {
+      return storage.weaklyConsistentRead(new Work.Quiet<MaintenanceMode>() {
+        @Override public MaintenanceMode apply(StoreProvider storeProvider) {
+          return storeProvider.getAttributeStore().getHostAttributes(host)
+              .transform(ATTRS_TO_STATUS)
+              .transform(GET_MODE)
+              .or(MaintenanceMode.NONE);
+        }
+      });
+    }
+
+    @Override
+    public Set<HostStatus> getStatus(final Set<String> hosts) {
+      return storage.weaklyConsistentRead(new Work.Quiet<Set<HostStatus>>() {
+        @Override public Set<HostStatus> apply(StoreProvider storeProvider) {
+          // Warning - this is filtering _all_ host attributes.  If using this to frequently query
+          // for a small set of hosts, a getHostAttributes variant should be added.
+          return FluentIterable.from(storeProvider.getAttributeStore().getHostAttributes())
+              .filter(Predicates.compose(Predicates.in(hosts), HOST_NAME))
+              .transform(ATTRS_TO_STATUS).toSet();
+        }
+      });
+    }
+
+    @Override
+    public Set<HostStatus> endMaintenance(final Set<String> hosts) {
+      return storage.write(new MutateWork.Quiet<Set<HostStatus>>() {
+        @Override public Set<HostStatus> apply(MutableStoreProvider storeProvider) {
+          return setMaintenanceMode(storeProvider, hosts, MaintenanceMode.NONE);
+        }
+      });
+    }
+
+    private Set<HostStatus> setMaintenanceMode(
+        MutableStoreProvider storeProvider,
+        Set<String> hosts,
+        MaintenanceMode mode) {
+
+      AttributeStore.Mutable store = storeProvider.getAttributeStore();
+      ImmutableSet.Builder<HostStatus> statuses = ImmutableSet.builder();
+      for (String host : hosts) {
+        if (store.setMaintenanceMode(host, mode)) {
+          HostStatus status = new HostStatus().setHost(host).setMode(mode);
+          eventSink.execute(new PubsubEvent.HostMaintenanceStateChange(status.deepCopy()));
+          statuses.add(status);
+        }
+      }
+      return statuses.build();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java b/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java
new file mode 100644
index 0000000..2db01c0
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java
@@ -0,0 +1,127 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.state;
+
+import java.util.Set;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
+
+import com.twitter.aurora.gen.ScheduleStatus;
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.base.ScheduleException;
+import com.twitter.aurora.scheduler.configuration.ConfigurationManager.TaskDescriptionException;
+import com.twitter.aurora.scheduler.configuration.SanitizedConfiguration;
+import com.twitter.aurora.scheduler.storage.entities.IAssignedTask;
+import com.twitter.aurora.scheduler.storage.entities.IJobKey;
+import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
+
+/**
+ * Scheduling core, stores scheduler state and makes decisions about which tasks to schedule when
+ * a resource offer is made.
+ *
+ * When a job is submitted to the scheduler core, it will store the job configuration and offer
+ * the job to all configured scheduler modules, which are responsible for triggering execution of
+ * the job.  Until a job is triggered by a scheduler module, it is retained in the scheduler core
+ * in the PENDING state.
+ */
+public interface SchedulerCore {
+
+  /**
+   * Creates a new job, whose tasks will become candidates for scheduling.
+   *
+   * @param sanitizedConfiguration The configuration of the job to create tasks for.
+   * @throws ScheduleException If there was an error scheduling a cron job.
+   * @throws TaskDescriptionException If an invalid task description was given.
+   */
+  void createJob(SanitizedConfiguration sanitizedConfiguration)
+      throws ScheduleException, TaskDescriptionException;
+
+  /**
+   * Adds new instances specified by the instances set.
+   * <p>
+   * Provided instance IDs should be disjoint from the instance IDs active in the job.
+   *
+   * @param jobKey IJobKey identifying the parent job.
+   * @param instanceIds Set of instance IDs to be added to the job.
+   * @param config ITaskConfig to use with new instances.
+   * @throws ScheduleException If any of the existing instance IDs already exist.
+   */
+  void addInstances(IJobKey jobKey, ImmutableSet<Integer> instanceIds, ITaskConfig config)
+      throws ScheduleException;
+
+  /**
+   * Validates the new job configuration passes resource filters.
+   *
+   * @param sanitizedConfiguration Job configuration to validate.
+   * @throws ScheduleException If job resources do not pass filters.
+   */
+  void validateJobResources(SanitizedConfiguration sanitizedConfiguration) throws ScheduleException;
+
+  /**
+   * Starts a cron job immediately.
+   *
+   * @param jobKey Job key.
+   * @throws ScheduleException If the specified job does not exist, or is not a cron job.
+   * @throws TaskDescriptionException If the parsing of the job failed.
+   */
+  void startCronJob(IJobKey jobKey) throws ScheduleException, TaskDescriptionException;
+
+  /**
+   * Assigns a new state to tasks.
+   *
+   * @param query Builder for a query to identify tasks
+   * @param status The new state of the tasks.
+   * @param message Additional information about the state transition.
+   */
+  void setTaskStatus(Query.Builder query, ScheduleStatus status, Optional<String> message);
+
+  /**
+   * Kills a specific set of tasks.
+   *
+   * @param query Builder for a query to identify tasks
+   * @param user Name of the user performing the kill.
+   * @throws ScheduleException If a problem occurs with the kill request.
+   */
+  void killTasks(Query.Builder query, String user) throws ScheduleException;
+
+  /**
+   * Initiates a restart of shards within an active job.
+   *
+   * @param jobKey Key of job to be restarted.
+   * @param shards Shards to be restarted.
+   * @param requestingUser User performing the restart action.
+   * @throws ScheduleException If there are no matching active shards.
+   */
+  void restartShards(IJobKey jobKey, Set<Integer> shards, String requestingUser)
+      throws ScheduleException;
+
+  /**
+   * Preempts a task in favor of another.
+   *
+   * @param task Task being preempted.
+   * @param preemptingTask Task we are preempting in favor of.
+   * @throws ScheduleException If a problem occurs while trying to perform the preemption.
+   */
+  void preemptTask(IAssignedTask task, IAssignedTask preemptingTask) throws ScheduleException;
+
+  /**
+   * Indicates to the scheduler that tasks were deleted on the assigned host.
+   *
+   * @param taskIds IDs of tasks that were deleted.
+   */
+  void tasksDeleted(Set<String> taskIds);
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java b/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
new file mode 100644
index 0000000..cf74e49
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
@@ -0,0 +1,328 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.state;
+
+
+import java.util.Set;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Functions;
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import com.twitter.aurora.gen.ScheduleStatus;
+import com.twitter.aurora.scheduler.TaskIdGenerator;
+import com.twitter.aurora.scheduler.base.JobKeys;
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.base.ScheduleException;
+import com.twitter.aurora.scheduler.base.Tasks;
+import com.twitter.aurora.scheduler.configuration.ConfigurationManager.TaskDescriptionException;
+import com.twitter.aurora.scheduler.configuration.SanitizedConfiguration;
+import com.twitter.aurora.scheduler.storage.Storage;
+import com.twitter.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import com.twitter.aurora.scheduler.storage.Storage.MutateWork;
+import com.twitter.aurora.scheduler.storage.entities.IAssignedTask;
+import com.twitter.aurora.scheduler.storage.entities.IJobConfiguration;
+import com.twitter.aurora.scheduler.storage.entities.IJobKey;
+import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
+import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
+import com.twitter.common.args.Arg;
+import com.twitter.common.args.CmdLine;
+import com.twitter.common.args.constraints.Positive;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import static com.twitter.aurora.gen.ScheduleStatus.KILLING;
+import static com.twitter.aurora.gen.ScheduleStatus.RESTARTING;
+import static com.twitter.aurora.scheduler.base.Tasks.ACTIVE_STATES;
+
+/**
+ * Implementation of the scheduler core.
+ */
+class SchedulerCoreImpl implements SchedulerCore {
+  @Positive
+  @CmdLine(name = "max_tasks_per_job", help = "Maximum number of allowed tasks in a single job.")
+  public static final Arg<Integer> MAX_TASKS_PER_JOB = Arg.create(4000);
+
+  private static final Logger LOG = Logger.getLogger(SchedulerCoreImpl.class.getName());
+
+  private final Storage storage;
+
+  private final CronJobManager cronScheduler;
+
+  // Schedulers that are responsible for triggering execution of jobs.
+  private final ImmutableList<JobManager> jobManagers;
+
+  // TODO(Bill Farner): Avoid using StateManagerImpl.
+  // State manager handles persistence of task modifications and state transitions.
+  private final StateManagerImpl stateManager;
+
+  private final TaskIdGenerator taskIdGenerator;
+  private final JobFilter jobFilter;
+
+  /**
+   * Creates a new core scheduler.
+   *
+   * @param storage Backing store implementation.
+   * @param cronScheduler Cron scheduler.
+   * @param immediateScheduler Immediate scheduler.
+   * @param stateManager Persistent state manager.
+   * @param taskIdGenerator Task ID generator.
+   * @param jobFilter Job filter.
+   */
+  @Inject
+  public SchedulerCoreImpl(
+      Storage storage,
+      CronJobManager cronScheduler,
+      ImmediateJobManager immediateScheduler,
+      StateManagerImpl stateManager,
+      TaskIdGenerator taskIdGenerator,
+      JobFilter jobFilter) {
+
+    this.storage = checkNotNull(storage);
+
+    // The immediate scheduler will accept any job, so it's important that other schedulers are
+    // placed first.
+    this.jobManagers = ImmutableList.of(cronScheduler, immediateScheduler);
+    this.cronScheduler = cronScheduler;
+    this.stateManager = checkNotNull(stateManager);
+    this.taskIdGenerator = checkNotNull(taskIdGenerator);
+    this.jobFilter = checkNotNull(jobFilter);
+  }
+
+  private boolean hasActiveJob(IJobConfiguration job) {
+    return Iterables.any(jobManagers, managerHasJob(job));
+  }
+
+  @Override
+  public synchronized void tasksDeleted(Set<String> taskIds) {
+    setTaskStatus(Query.taskScoped(taskIds), ScheduleStatus.UNKNOWN, Optional.<String>absent());
+  }
+
+  @Override
+  public synchronized void createJob(SanitizedConfiguration sanitizedConfiguration)
+      throws ScheduleException {
+
+    IJobConfiguration job = sanitizedConfiguration.getJobConfig();
+    if (hasActiveJob(job)) {
+      throw new ScheduleException("Job already exists: " + JobKeys.toPath(job));
+    }
+
+    runJobFilters(job.getKey(), job.getTaskConfig(), job.getInstanceCount(), false);
+
+    boolean accepted = false;
+    for (final JobManager manager : jobManagers) {
+      if (manager.receiveJob(sanitizedConfiguration)) {
+        LOG.info("Job accepted by manager: " + manager.getUniqueKey());
+        accepted = true;
+        break;
+      }
+    }
+
+    if (!accepted) {
+      LOG.severe("Job was not accepted by any of the configured schedulers, discarding.");
+      LOG.severe("Discarded job: " + job);
+      throw new ScheduleException("Job not accepted, discarding.");
+    }
+  }
+
+  // This number is derived from the maximum file name length limit on most UNIX systems, less
+  // the number of characters we've observed being added by mesos for the executor ID, prefix, and
+  // delimiters.
+  @VisibleForTesting
+  static final int MAX_TASK_ID_LENGTH = 255 - 90;
+
+  // TODO(maximk): Consider a better approach to quota checking. MESOS-4476.
+  private void runJobFilters(IJobKey jobKey, ITaskConfig task, int count, boolean incremental)
+      throws ScheduleException {
+
+    int instanceCount = count;
+    if (incremental) {
+      instanceCount +=
+          Storage.Util.weaklyConsistentFetchTasks(storage, Query.jobScoped(jobKey).active()).size();
+    }
+
+    // TODO(maximk): This is a short-term hack to stop the bleeding from
+    //               https://issues.apache.org/jira/browse/MESOS-691
+    if (taskIdGenerator.generate(task, instanceCount).length() > MAX_TASK_ID_LENGTH) {
+      throw new ScheduleException(
+          "Task ID is too long, please shorten your role or job name.");
+    }
+
+    JobFilter.JobFilterResult filterResult = jobFilter.filter(task, instanceCount);
+    // TODO(maximk): Consider deprecating JobFilterResult in favor of custom exception.
+    if (!filterResult.isPass()) {
+      throw new ScheduleException(filterResult.getReason());
+    }
+
+    if (instanceCount > MAX_TASKS_PER_JOB.get()) {
+      throw new ScheduleException("Job exceeds task limit of " + MAX_TASKS_PER_JOB.get());
+    }
+  }
+
+  @Override
+  public void validateJobResources(SanitizedConfiguration sanitizedConfiguration)
+      throws ScheduleException {
+
+    IJobConfiguration job = sanitizedConfiguration.getJobConfig();
+    runJobFilters(job.getKey(), job.getTaskConfig(), job.getInstanceCount(), false);
+  }
+
+  @Override
+  public void addInstances(
+      final IJobKey jobKey,
+      final ImmutableSet<Integer> instanceIds,
+      final ITaskConfig config) throws ScheduleException {
+
+    runJobFilters(jobKey, config, instanceIds.size(), true);
+    storage.write(new MutateWork.NoResult<ScheduleException>() {
+      @Override
+      protected void execute(MutableStoreProvider storeProvider)
+          throws ScheduleException {
+
+        ImmutableSet<IScheduledTask> tasks =
+            storeProvider.getTaskStore().fetchTasks(Query.jobScoped(jobKey).active());
+
+        Set<Integer> existingInstanceIds =
+            FluentIterable.from(tasks).transform(Tasks.SCHEDULED_TO_INSTANCE_ID).toSet();
+        if (!Sets.intersection(existingInstanceIds, instanceIds).isEmpty()) {
+          throw new ScheduleException("Instance ID collision detected.");
+        }
+
+        stateManager.insertPendingTasks(Maps.asMap(instanceIds, Functions.constant(config)));
+      }
+    });
+  }
+
+  @Override
+  public synchronized void startCronJob(IJobKey jobKey)
+      throws ScheduleException, TaskDescriptionException {
+
+    checkNotNull(jobKey);
+
+    if (!cronScheduler.hasJob(jobKey)) {
+      throw new ScheduleException("Cron job does not exist for " + JobKeys.toPath(jobKey));
+    }
+
+    cronScheduler.startJobNow(jobKey);
+  }
+
+  /**
+   * Creates a predicate that will determine whether a job manager has a job matching a job key.
+   *
+   * @param job Job to match.
+   * @return A new predicate matching the job owner and name given.
+   */
+  private static Predicate<JobManager> managerHasJob(final IJobConfiguration job) {
+    return new Predicate<JobManager>() {
+      @Override public boolean apply(JobManager manager) {
+        return manager.hasJob(job.getKey());
+      }
+    };
+  }
+
+  @Override
+  public synchronized void setTaskStatus(
+      Query.Builder query,
+      final ScheduleStatus status,
+      Optional<String> message) {
+
+    checkNotNull(query);
+    checkNotNull(status);
+
+    stateManager.changeState(query, status, message);
+  }
+
+  @Override
+  public synchronized void killTasks(Query.Builder query, String user) throws ScheduleException {
+    checkNotNull(query);
+    LOG.info("Killing tasks matching " + query);
+
+    boolean jobDeleted = false;
+
+    if (Query.isOnlyJobScoped(query)) {
+      // If this looks like a query for all tasks in a job, instruct the scheduler modules to
+      // delete the job.
+      IJobKey jobKey = JobKeys.from(query).get();
+      for (JobManager manager : jobManagers) {
+        if (manager.deleteJob(jobKey)) {
+          jobDeleted = true;
+        }
+      }
+    }
+
+    // Unless statuses were specifically supplied, only attempt to kill active tasks.
+    Query.Builder taskQuery = query.get().isSetStatuses() ? query.byStatus(ACTIVE_STATES) : query;
+
+    int tasksAffected =
+        stateManager.changeState(taskQuery, KILLING, Optional.of("Killed by " + user));
+    if (!jobDeleted && (tasksAffected == 0)) {
+      throw new ScheduleException("No jobs to kill");
+    }
+  }
+
+  @Override
+  public void restartShards(
+      IJobKey jobKey,
+      final Set<Integer> shards,
+      final String requestingUser) throws ScheduleException {
+
+    if (!JobKeys.isValid(jobKey)) {
+      throw new ScheduleException("Invalid job key: " + jobKey);
+    }
+
+    if (shards.isEmpty()) {
+      throw new ScheduleException("At least one shard must be specified.");
+    }
+
+    final Query.Builder query = Query.instanceScoped(jobKey, shards).active();
+    storage.write(new MutateWork.NoResult<ScheduleException>() {
+      @Override protected void execute(MutableStoreProvider storeProvider)
+          throws ScheduleException {
+
+        Set<IScheduledTask> matchingTasks = storeProvider.getTaskStore().fetchTasks(query);
+        if (matchingTasks.size() != shards.size()) {
+          throw new ScheduleException("Not all requested shards are active.");
+        }
+        LOG.info("Restarting shards matching " + query);
+        stateManager.changeState(
+            Query.taskScoped(Tasks.ids(matchingTasks)),
+            RESTARTING,
+            Optional.of("Restarted by " + requestingUser));
+      }
+    });
+  }
+
+
+  @Override
+  public synchronized void preemptTask(IAssignedTask task, IAssignedTask preemptingTask) {
+    checkNotNull(task);
+    checkNotNull(preemptingTask);
+    // TODO(William Farner): Throw SchedulingException if either task doesn't exist, etc.
+
+    stateManager.changeState(Query.taskScoped(task.getTaskId()), ScheduleStatus.PREEMPTING,
+        Optional.of("Preempting in favor of " + preemptingTask.getTaskId()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/state/SideEffectStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/SideEffectStorage.java b/src/main/java/org/apache/aurora/scheduler/state/SideEffectStorage.java
new file mode 100644
index 0000000..23ffff9
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/state/SideEffectStorage.java
@@ -0,0 +1,169 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.state;
+
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import com.twitter.aurora.scheduler.events.PubsubEvent;
+import com.twitter.aurora.scheduler.storage.Storage;
+import com.twitter.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import com.twitter.aurora.scheduler.storage.Storage.MutateWork;
+import com.twitter.aurora.scheduler.storage.Storage.StorageException;
+import com.twitter.aurora.scheduler.storage.Storage.Work;
+import com.twitter.common.base.Closure;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Wrapper around the persistent storage and mutable state.
+ */
+class SideEffectStorage {
+
+  private final Queue<PubsubEvent> events = Lists.newLinkedList();
+  @VisibleForTesting
+  Queue<PubsubEvent> getEvents() {
+    return events;
+  }
+
+  private AtomicBoolean inOperation = new AtomicBoolean(false);
+
+  private final Storage storage;
+  private final OperationFinalizer operationFinalizer;
+  private final Closure<PubsubEvent> taskEventSink;
+
+  interface OperationFinalizer {
+    /**
+     * Performs any work necessary to complete the operation.
+     * This is executed in the context of a write operation, immediately after the work
+     * executes normally.
+     * NOTE: At present, this is executed for every nesting level of operations, rather than
+     * at the completion of the top-level operation.
+     * See comment in {@link #SideEffectStorage#executeSideEffectsAfter(SideEffectWork)}
+     * for more detail.
+     *
+     * @param work Work to finalize.
+     * @param storeProvider Mutable store reference.
+     */
+    void finalize(SideEffectWork<?, ?> work, MutableStoreProvider storeProvider);
+  }
+
+  SideEffectStorage(
+      Storage storage,
+      OperationFinalizer operationFinalizer,
+      Closure<PubsubEvent> taskEventSink) {
+
+    this.storage = checkNotNull(storage);
+    this.operationFinalizer = checkNotNull(operationFinalizer);
+    this.taskEventSink = checkNotNull(taskEventSink);
+  }
+
+  /**
+   * Perform a unit of work in a mutating operation.  This supports nesting/reentrancy.
+   *
+   * @param work Work to perform.
+   * @param <T> Work return type
+   * @param <E> Work exception type.
+   * @return The work return value.
+   * @throws E The work exception.
+   */
+  <T, E extends Exception> T write(SideEffectWork<T, E> work) throws E {
+    return storage.write(executeSideEffectsAfter(work));
+  }
+
+  <T, E extends Exception> T consistentRead(Work<T, E> work) throws StorageException, E {
+    return storage.consistentRead(work);
+  }
+
+  /**
+   * Work that has side effects external to the storage system.
+   * Work may add side effect and pubsub events, which will be executed/sent upon normal
+   * completion of the operation.
+   *
+   * @param <T> Work return type.
+   * @param <E> Work exception type.
+   */
+  abstract class SideEffectWork<T, E extends Exception> implements MutateWork<T, E> {
+    protected final void addTaskEvent(PubsubEvent notice) {
+      Preconditions.checkState(inOperation.get());
+      events.add(Preconditions.checkNotNull(notice));
+    }
+  }
+
+  /**
+   * Work with side effects which does not throw checked exceptions.
+   *
+   * @param <T>   Work return type.
+   */
+  abstract class QuietSideEffectWork<T> extends SideEffectWork<T, RuntimeException> {
+  }
+
+  /**
+   * Work with side effects that does not have a return value.
+   *
+   * @param <E> Work exception type.
+   */
+  abstract class NoResultSideEffectWork<E extends Exception> extends SideEffectWork<Void, E> {
+
+    @Override public final Void apply(MutableStoreProvider storeProvider) throws E {
+      execute(storeProvider);
+      return null;
+    }
+
+    abstract void execute(MutableStoreProvider storeProvider) throws E;
+  }
+
+  /**
+   * Work with side effects which does not throw checked exceptions or have a return
+   * value.
+   */
+  abstract class NoResultQuietSideEffectWork extends NoResultSideEffectWork<RuntimeException> {
+  }
+
+  private <T, E extends Exception> MutateWork<T, E> executeSideEffectsAfter(
+      final SideEffectWork<T, E> work) {
+
+    return new MutateWork<T, E>() {
+      @Override public T apply(MutableStoreProvider storeProvider) throws E {
+        boolean topLevelOperation = inOperation.compareAndSet(false, true);
+
+        try {
+          T result = work.apply(storeProvider);
+
+          // TODO(William Farner): Maintaining this since it matches prior behavior, but this
+          // seems wrong.  Double-check whether this is necessary, or if only the top-level
+          // operation should be executing the finalizer.  Update doc on OperationFinalizer
+          // once this is assessed.
+          operationFinalizer.finalize(work, storeProvider);
+          if (topLevelOperation) {
+            while (!events.isEmpty()) {
+              taskEventSink.execute(events.remove());
+            }
+          }
+          return result;
+        } finally {
+          if (topLevelOperation) {
+            inOperation.set(false);
+          }
+        }
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/state/StateManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateManager.java b/src/main/java/org/apache/aurora/scheduler/state/StateManager.java
new file mode 100644
index 0000000..099ec70
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateManager.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.state;
+
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.base.Optional;
+
+import org.apache.mesos.Protos.SlaveID;
+
+import com.twitter.aurora.gen.ScheduleStatus;
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.storage.entities.IAssignedTask;
+import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
+
+/**
+ * Thin interface for the state manager.
+ */
+public interface StateManager {
+
+  /**
+   * Performs a simple state change, transitioning all tasks matching a query to the given
+   * state and applying the given audit message.
+   * TODO(William Farner): Consider removing the return value.
+   *
+   * @param query Builder of the query to perform, the results of which will be modified.
+   * @param newState State to move the resulting tasks into.
+   * @param auditMessage Audit message to apply along with the state change.
+   * @return the number of successful state changes.
+   */
+  int changeState(Query.Builder query, ScheduleStatus newState, Optional<String> auditMessage);
+
+  /**
+   * Assigns a task to a specific slave.
+   * This will modify the task record to reflect the host assignment and return the updated record.
+   *
+   * @param taskId ID of the task to mutate.
+   * @param slaveHost Host name that the task is being assigned to.
+   * @param slaveId ID of the slave that the task is being assigned to.
+   * @param assignedPorts Ports on the host that are being assigned to the task.
+   * @return The updated task record, or {@code null} if the task was not found.
+   */
+  IAssignedTask assignTask(
+      String taskId,
+      String slaveHost,
+      SlaveID slaveId,
+      Set<Integer> assignedPorts);
+
+  /**
+   * Inserts new tasks into the store. Tasks will immediately move into PENDING and will be eligible
+   * for scheduling.
+   *
+   * @param tasks Tasks to insert, mapped by their instance IDs.
+   */
+  void insertPendingTasks(Map<Integer, ITaskConfig> tasks);
+
+  /**
+   * Deletes records of tasks from the task store.
+   * This will not perform any state checking or state transitions, but will immediately remove
+   * the tasks from the store.  It will also silently ignore attempts to delete task IDs that do
+   * not exist.
+   *
+   * @param taskIds IDs of tasks to delete.
+   */
+  void deleteTasks(final Set<String> taskIds);
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
new file mode 100644
index 0000000..37d13f4
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
@@ -0,0 +1,458 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.state;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.logging.Logger;
+
+import javax.annotation.Nullable;
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Atomics;
+
+import org.apache.mesos.Protos.SlaveID;
+
+import com.twitter.aurora.gen.AssignedTask;
+import com.twitter.aurora.gen.ScheduleStatus;
+import com.twitter.aurora.gen.ScheduledTask;
+import com.twitter.aurora.scheduler.Driver;
+import com.twitter.aurora.scheduler.TaskIdGenerator;
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.base.Tasks;
+import com.twitter.aurora.scheduler.events.PubsubEvent;
+import com.twitter.aurora.scheduler.state.SideEffectStorage.SideEffectWork;
+import com.twitter.aurora.scheduler.storage.Storage;
+import com.twitter.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import com.twitter.aurora.scheduler.storage.Storage.StoreProvider;
+import com.twitter.aurora.scheduler.storage.Storage.Work;
+import com.twitter.aurora.scheduler.storage.TaskStore;
+import com.twitter.aurora.scheduler.storage.TaskStore.Mutable.TaskMutation;
+import com.twitter.aurora.scheduler.storage.entities.IAssignedTask;
+import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
+import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
+import com.twitter.common.base.Closure;
+import com.twitter.common.stats.Stats;
+import com.twitter.common.util.Clock;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.collect.Iterables.transform;
+
+import static com.twitter.aurora.gen.ScheduleStatus.INIT;
+import static com.twitter.aurora.gen.ScheduleStatus.PENDING;
+import static com.twitter.aurora.gen.ScheduleStatus.UNKNOWN;
+import static com.twitter.aurora.scheduler.state.SideEffectStorage.OperationFinalizer;
+import static com.twitter.common.base.MorePreconditions.checkNotBlank;
+
+/**
+ * Manager of all persistence-related operations for the scheduler.  Acts as a controller for
+ * persisted state machine transitions, and their side-effects.
+ *
+ * TODO(William Farner): Re-evaluate thread safety here, specifically risk of races that
+ * modify managerState.
+ */
+public class StateManagerImpl implements StateManager {
+  private static final Logger LOG = Logger.getLogger(StateManagerImpl.class.getName());
+
+  private final SideEffectStorage storage;
+  @VisibleForTesting
+  SideEffectStorage getStorage() {
+    return storage;
+  }
+
+  private final TaskIdGenerator taskIdGenerator;
+
+  // Work queue to receive state machine side effect work.
+  // Items are sorted to place DELETE entries last.  This is to ensure that within an operation,
+  // a delete is always processed after a state transition.
+  private final Queue<WorkEntry> workQueue = new PriorityQueue<>(10,
+      new Comparator<WorkEntry>() {
+        @Override public int compare(WorkEntry a, WorkEntry b) {
+          if ((a.command == WorkCommand.DELETE) != (b.command == WorkCommand.DELETE)) {
+            return (a.command == WorkCommand.DELETE) ? 1 : -1;
+          } else {
+            return 0;
+          }
+        }
+      });
+
+  // Adapt the work queue into a sink.
+  private final TaskStateMachine.WorkSink workSink = new TaskStateMachine.WorkSink() {
+      @Override public void addWork(
+          WorkCommand work,
+          TaskStateMachine stateMachine,
+          Function<IScheduledTask, IScheduledTask> mutation) {
+
+        workQueue.add(new WorkEntry(work, stateMachine, mutation));
+      }
+    };
+
+  private final Function<Map.Entry<Integer, ITaskConfig>, IScheduledTask> taskCreator =
+      new Function<Map.Entry<Integer, ITaskConfig>, IScheduledTask>() {
+        @Override public IScheduledTask apply(Map.Entry<Integer, ITaskConfig> entry) {
+          ITaskConfig task = entry.getValue();
+          AssignedTask assigned = new AssignedTask()
+              .setTaskId(taskIdGenerator.generate(task, entry.getKey()))
+              .setInstanceId(entry.getKey())
+              .setTask(task.newBuilder());
+          return IScheduledTask.build(new ScheduledTask()
+              .setStatus(INIT)
+              .setAssignedTask(assigned));
+        }
+      };
+
+  private final Driver driver;
+  private final Clock clock;
+
+  /**
+   * An item of work on the work queue.
+   */
+  private static class WorkEntry {
+    private final WorkCommand command;
+    private final TaskStateMachine stateMachine;
+    private final Function<IScheduledTask, IScheduledTask> mutation;
+
+    WorkEntry(
+        WorkCommand command,
+        TaskStateMachine stateMachine,
+        Function<IScheduledTask, IScheduledTask> mutation) {
+
+      this.command = command;
+      this.stateMachine = stateMachine;
+      this.mutation = mutation;
+    }
+  }
+
+  @Inject
+  StateManagerImpl(
+      final Storage storage,
+      final Clock clock,
+      Driver driver,
+      TaskIdGenerator taskIdGenerator,
+      Closure<PubsubEvent> taskEventSink) {
+
+    checkNotNull(storage);
+    this.clock = checkNotNull(clock);
+
+    OperationFinalizer finalizer = new OperationFinalizer() {
+      @Override public void finalize(SideEffectWork<?, ?> work, MutableStoreProvider store) {
+        processWorkQueueInWriteOperation(work, store);
+      }
+    };
+
+    this.storage = new SideEffectStorage(storage, finalizer, taskEventSink);
+
+    this.driver = checkNotNull(driver);
+    this.taskIdGenerator = checkNotNull(taskIdGenerator);
+
+    Stats.exportSize("work_queue_depth", workQueue);
+  }
+
+  @Override
+  public void insertPendingTasks(final Map<Integer, ITaskConfig> tasks) {
+    checkNotNull(tasks);
+
+    // Done outside the write transaction to minimize the work done inside a transaction.
+    final Set<IScheduledTask> scheduledTasks =
+        ImmutableSet.copyOf(transform(tasks.entrySet(), taskCreator));
+
+    storage.write(storage.new NoResultQuietSideEffectWork() {
+      @Override protected void execute(MutableStoreProvider storeProvider) {
+        storeProvider.getUnsafeTaskStore().saveTasks(scheduledTasks);
+
+        for (IScheduledTask task : scheduledTasks) {
+          createStateMachine(task).updateState(PENDING);
+        }
+      }
+    });
+  }
+
+  @Override
+  public int changeState(
+      Query.Builder query,
+      final ScheduleStatus newState,
+      final Optional<String> auditMessage) {
+
+    return changeState(query, new Function<TaskStateMachine, Boolean>() {
+      @Override public Boolean apply(TaskStateMachine stateMachine) {
+        return stateMachine.updateState(newState, auditMessage);
+      }
+    });
+  }
+
+  @Override
+  public IAssignedTask assignTask(
+      String taskId,
+      String slaveHost,
+      SlaveID slaveId,
+      Set<Integer> assignedPorts) {
+
+    checkNotBlank(taskId);
+    checkNotBlank(slaveHost);
+    checkNotNull(assignedPorts);
+
+    TaskAssignMutation mutation = assignHost(slaveHost, slaveId, assignedPorts);
+    changeState(Query.taskScoped(taskId), mutation);
+
+    return mutation.getAssignedTask();
+  }
+
+  private int changeStateInWriteOperation(
+      Set<String> taskIds,
+      Function<TaskStateMachine, Boolean> stateChange) {
+
+    int count = 0;
+    for (TaskStateMachine stateMachine : getStateMachines(taskIds).values()) {
+      if (stateChange.apply(stateMachine)) {
+        ++count;
+      }
+    }
+    return count;
+  }
+
+  private int changeState(
+      final Query.Builder query,
+      final Function<TaskStateMachine, Boolean> stateChange) {
+
+    return storage.write(storage.new QuietSideEffectWork<Integer>() {
+      @Override public Integer apply(MutableStoreProvider storeProvider) {
+        Set<String> ids = FluentIterable.from(storeProvider.getTaskStore().fetchTasks(query))
+            .transform(Tasks.SCHEDULED_TO_ID)
+            .toSet();
+        return changeStateInWriteOperation(ids, stateChange);
+      }
+    });
+  }
+
+  private interface TaskAssignMutation extends Function<TaskStateMachine, Boolean> {
+    IAssignedTask getAssignedTask();
+  }
+
+  private static Map<String, Integer> getNameMappedPorts(
+      Set<String> portNames,
+      Set<Integer> allocatedPorts) {
+
+    Preconditions.checkNotNull(portNames);
+
+    // Expand ports.
+    Map<String, Integer> ports = Maps.newHashMap();
+    Set<Integer> portsRemaining = Sets.newHashSet(allocatedPorts);
+    Iterator<Integer> portConsumer = Iterables.consumingIterable(portsRemaining).iterator();
+
+    for (String portName : portNames) {
+      Preconditions.checkArgument(portConsumer.hasNext(),
+          "Allocated ports %s were not sufficient to expand task.", allocatedPorts);
+      int portNumber = portConsumer.next();
+      ports.put(portName, portNumber);
+    }
+
+    if (!portsRemaining.isEmpty()) {
+      LOG.warning("Not all allocated ports were used to map ports!");
+    }
+
+    return ports;
+  }
+
+  private TaskAssignMutation assignHost(
+      final String slaveHost,
+      final SlaveID slaveId,
+      final Set<Integer> assignedPorts) {
+
+    final TaskMutation mutation = new TaskMutation() {
+      @Override public IScheduledTask apply(IScheduledTask task) {
+        ScheduledTask builder = task.newBuilder();
+        AssignedTask assigned = builder.getAssignedTask();
+        assigned.setAssignedPorts(
+            getNameMappedPorts(assigned.getTask().getRequestedPorts(), assignedPorts));
+        assigned.setSlaveHost(slaveHost)
+            .setSlaveId(slaveId.getValue());
+        return IScheduledTask.build(builder);
+      }
+    };
+
+    return new TaskAssignMutation() {
+      private AtomicReference<IAssignedTask> assignedTask = Atomics.newReference();
+      @Override public IAssignedTask getAssignedTask() {
+        return assignedTask.get();
+      }
+
+      @Override public Boolean apply(final TaskStateMachine stateMachine) {
+        TaskMutation wrapper = new TaskMutation() {
+          @Override public IScheduledTask apply(IScheduledTask task) {
+            IScheduledTask mutated = mutation.apply(task);
+            Preconditions.checkState(
+                assignedTask.compareAndSet(null, mutated.getAssignedTask()),
+                "More than one result was found for an identity query.");
+            return mutated;
+          }
+        };
+        return stateMachine.updateState(ScheduleStatus.ASSIGNED, wrapper);
+      }
+    };
+  }
+
+  private void processWorkQueueInWriteOperation(
+      SideEffectWork<?, ?> sideEffectWork,
+      MutableStoreProvider storeProvider) {
+
+    for (final WorkEntry work : Iterables.consumingIterable(workQueue)) {
+      final TaskStateMachine stateMachine = work.stateMachine;
+
+      if (work.command == WorkCommand.KILL) {
+        driver.killTask(stateMachine.getTaskId());
+      } else {
+        TaskStore.Mutable taskStore = storeProvider.getUnsafeTaskStore();
+        String taskId = stateMachine.getTaskId();
+        Query.Builder idQuery = Query.taskScoped(taskId);
+
+        switch (work.command) {
+          case RESCHEDULE:
+            ScheduledTask builder =
+                Iterables.getOnlyElement(taskStore.fetchTasks(idQuery)).newBuilder();
+            builder.getAssignedTask().unsetSlaveId();
+            builder.getAssignedTask().unsetSlaveHost();
+            builder.getAssignedTask().unsetAssignedPorts();
+            builder.unsetTaskEvents();
+            builder.setAncestorId(taskId);
+            String newTaskId = taskIdGenerator.generate(
+                ITaskConfig.build(builder.getAssignedTask().getTask()),
+                builder.getAssignedTask().getInstanceId());
+            builder.getAssignedTask().setTaskId(newTaskId);
+
+            LOG.info("Task being rescheduled: " + taskId);
+
+            IScheduledTask task = IScheduledTask.build(builder);
+            taskStore.saveTasks(ImmutableSet.of(task));
+
+            createStateMachine(task).updateState(PENDING, Optional.of("Rescheduled"));
+            ITaskConfig taskInfo = task.getAssignedTask().getTask();
+            sideEffectWork.addTaskEvent(
+                new PubsubEvent.TaskRescheduled(
+                    taskInfo.getOwner().getRole(),
+                    taskInfo.getJobName(),
+                    task.getAssignedTask().getInstanceId()));
+            break;
+
+          case UPDATE_STATE:
+            taskStore.mutateTasks(idQuery, new TaskMutation() {
+              @Override public IScheduledTask apply(IScheduledTask task) {
+                return work.mutation.apply(
+                    IScheduledTask.build(task.newBuilder().setStatus(stateMachine.getState())));
+              }
+            });
+            sideEffectWork.addTaskEvent(
+                new PubsubEvent.TaskStateChange(
+                    Iterables.getOnlyElement(taskStore.fetchTasks(idQuery)),
+                    stateMachine.getPreviousState()));
+            break;
+
+          case DELETE:
+            deleteTasks(ImmutableSet.of(taskId));
+            break;
+
+          case INCREMENT_FAILURES:
+            taskStore.mutateTasks(idQuery, new TaskMutation() {
+              @Override public IScheduledTask apply(IScheduledTask task) {
+                return IScheduledTask.build(
+                    task.newBuilder().setFailureCount(task.getFailureCount() + 1));
+              }
+            });
+            break;
+
+          default:
+            LOG.severe("Unrecognized work command type " + work.command);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void deleteTasks(final Set<String> taskIds) {
+    storage.write(storage.new NoResultQuietSideEffectWork() {
+      @Override protected void execute(final MutableStoreProvider storeProvider) {
+        TaskStore.Mutable taskStore = storeProvider.getUnsafeTaskStore();
+        Iterable<IScheduledTask> tasks = taskStore.fetchTasks(Query.taskScoped(taskIds));
+        addTaskEvent(new PubsubEvent.TasksDeleted(ImmutableSet.copyOf(tasks)));
+        taskStore.deleteTasks(taskIds);
+      }
+    });
+  }
+
+  private Map<String, TaskStateMachine> getStateMachines(final Set<String> taskIds) {
+    return storage.consistentRead(new Work.Quiet<Map<String, TaskStateMachine>>() {
+      @Override public Map<String, TaskStateMachine> apply(StoreProvider storeProvider) {
+        Map<String, IScheduledTask> existingTasks = Maps.uniqueIndex(
+            storeProvider.getTaskStore().fetchTasks(Query.taskScoped(taskIds)),
+            new Function<IScheduledTask, String>() {
+              @Override public String apply(IScheduledTask input) {
+                return input.getAssignedTask().getTaskId();
+              }
+            });
+
+        ImmutableMap.Builder<String, TaskStateMachine> builder = ImmutableMap.builder();
+        for (String taskId : taskIds) {
+          // Pass null get() values through.
+          builder.put(taskId, getStateMachine(taskId, existingTasks.get(taskId)));
+        }
+        return builder.build();
+      }
+    });
+  }
+
+  private TaskStateMachine getStateMachine(String taskId, @Nullable IScheduledTask task) {
+    if (task != null) {
+      return createStateMachine(task, task.getStatus());
+    }
+
+    // The task is unknown, not present in storage.
+    TaskStateMachine stateMachine = new TaskStateMachine(
+        taskId,
+        null,
+        workSink,
+        clock,
+        INIT);
+    stateMachine.updateState(UNKNOWN);
+    return stateMachine;
+  }
+
+  private TaskStateMachine createStateMachine(IScheduledTask task) {
+    return createStateMachine(task, INIT);
+  }
+
+  private TaskStateMachine createStateMachine(IScheduledTask task, ScheduleStatus initialState) {
+    return new TaskStateMachine(
+        Tasks.id(task),
+        task,
+        workSink,
+        clock,
+        initialState);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateModule.java b/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
new file mode 100644
index 0000000..870d085
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.state;
+
+import javax.inject.Singleton;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.inject.AbstractModule;
+import com.google.inject.Binder;
+
+import com.twitter.aurora.scheduler.MesosTaskFactory;
+import com.twitter.aurora.scheduler.MesosTaskFactory.MesosTaskFactoryImpl;
+import com.twitter.aurora.scheduler.events.PubsubEventModule;
+import com.twitter.aurora.scheduler.state.MaintenanceController.MaintenanceControllerImpl;
+import com.twitter.aurora.scheduler.state.TaskAssigner.TaskAssignerImpl;
+import com.twitter.aurora.scheduler.state.UUIDGenerator.UUIDGeneratorImpl;
+
+/**
+ * Binding module for scheduling logic and higher-level state management.
+ */
+public class StateModule extends AbstractModule {
+
+  @Override
+  protected void configure() {
+    bind(TaskAssigner.class).to(TaskAssignerImpl.class);
+    bind(TaskAssignerImpl.class).in(Singleton.class);
+    bind(MesosTaskFactory.class).to(MesosTaskFactoryImpl.class);
+
+    bind(SchedulerCore.class).to(SchedulerCoreImpl.class).in(Singleton.class);
+
+    bind(StateManager.class).to(StateManagerImpl.class);
+    bind(StateManagerImpl.class).in(Singleton.class);
+
+    bind(UUIDGenerator.class).to(UUIDGeneratorImpl.class);
+    bind(UUIDGeneratorImpl.class).in(Singleton.class);
+    bind(LockManager.class).to(LockManagerImpl.class);
+    bind(LockManagerImpl.class).in(Singleton.class);
+
+    bindCronJobManager(binder());
+    bind(ImmediateJobManager.class).in(Singleton.class);
+
+    bindMaintenanceController(binder());
+  }
+
+  @VisibleForTesting
+  static void bindCronJobManager(Binder binder) {
+    binder.bind(CronJobManager.class).in(Singleton.class);
+    PubsubEventModule.bindSubscriber(binder, CronJobManager.class);
+  }
+
+  @VisibleForTesting
+  static void bindMaintenanceController(Binder binder) {
+    binder.bind(MaintenanceController.class).to(MaintenanceControllerImpl.class);
+    binder.bind(MaintenanceControllerImpl.class).in(Singleton.class);
+    PubsubEventModule.bindSubscriber(binder, MaintenanceControllerImpl.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
new file mode 100644
index 0000000..c37feae
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.state;
+
+import java.util.Set;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.google.common.base.Optional;
+
+import org.apache.mesos.Protos.Offer;
+import org.apache.mesos.Protos.TaskInfo;
+
+import com.twitter.aurora.scheduler.MesosTaskFactory;
+import com.twitter.aurora.scheduler.ResourceSlot;
+import com.twitter.aurora.scheduler.base.Tasks;
+import com.twitter.aurora.scheduler.configuration.Resources;
+import com.twitter.aurora.scheduler.filter.SchedulingFilter;
+import com.twitter.aurora.scheduler.filter.SchedulingFilter.Veto;
+import com.twitter.aurora.scheduler.storage.entities.IAssignedTask;
+import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Responsible for matching a task against an offer.
+ */
+public interface TaskAssigner {
+
+  /**
+   * Tries to match a task against an offer.  If a match is found, the assigner should
+   * make the appropriate changes to the task and provide a non-empty result.
+   *
+   * @param offer The resource offer.
+   * @param task The task to match against and optionally assign.
+   * @return Instructions for launching the task if matching and assignment were successful.
+   */
+  Optional<TaskInfo> maybeAssign(Offer offer, IScheduledTask task);
+
+  class TaskAssignerImpl implements TaskAssigner {
+    private static final Logger LOG = Logger.getLogger(TaskAssignerImpl.class.getName());
+
+    private final StateManager stateManager;
+    private final SchedulingFilter filter;
+    private final MesosTaskFactory taskFactory;
+
+    @Inject
+    public TaskAssignerImpl(
+        StateManager stateManager,
+        SchedulingFilter filter,
+        MesosTaskFactory taskFactory) {
+
+      this.stateManager = checkNotNull(stateManager);
+      this.filter = checkNotNull(filter);
+      this.taskFactory = checkNotNull(taskFactory);
+    }
+
+    private TaskInfo assign(Offer offer, IScheduledTask task) {
+      String host = offer.getHostname();
+      Set<Integer> selectedPorts =
+          Resources.getPorts(offer, task.getAssignedTask().getTask().getRequestedPorts().size());
+      IAssignedTask assigned = stateManager.assignTask(
+          Tasks.id(task),
+          host,
+          offer.getSlaveId(),
+          selectedPorts);
+      LOG.info(String.format("Offer on slave %s (id %s) is being assigned task for %s.",
+          host, offer.getSlaveId(), Tasks.id(task)));
+      return taskFactory.createFrom(assigned, offer.getSlaveId());
+    }
+
+    @Override
+    public Optional<TaskInfo> maybeAssign(Offer offer, IScheduledTask task) {
+      Set<Veto> vetoes = filter.filter(
+          ResourceSlot.from(offer),
+          offer.getHostname(),
+          task.getAssignedTask().getTask(),
+          Tasks.id(task));
+      if (vetoes.isEmpty()) {
+        return Optional.of(assign(offer, task));
+      } else {
+        LOG.fine("Slave " + offer.getHostname() + " vetoed task " + Tasks.id(task)
+            + ": " + vetoes);
+        return Optional.absent();
+      }
+    }
+  }
+}