You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2013/12/31 22:20:36 UTC

[43/51] [partial] Rename twitter* and com.twitter to apache and org.apache directories to preserve all file history before the refactor.

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/state/SchedulerCoreImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/state/SchedulerCoreImpl.java b/src/main/java/com/twitter/aurora/scheduler/state/SchedulerCoreImpl.java
deleted file mode 100644
index cf74e49..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/state/SchedulerCoreImpl.java
+++ /dev/null
@@ -1,328 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.state;
-
-
-import java.util.Set;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Functions;
-import com.google.common.base.Optional;
-import com.google.common.base.Predicate;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-import com.twitter.aurora.gen.ScheduleStatus;
-import com.twitter.aurora.scheduler.TaskIdGenerator;
-import com.twitter.aurora.scheduler.base.JobKeys;
-import com.twitter.aurora.scheduler.base.Query;
-import com.twitter.aurora.scheduler.base.ScheduleException;
-import com.twitter.aurora.scheduler.base.Tasks;
-import com.twitter.aurora.scheduler.configuration.ConfigurationManager.TaskDescriptionException;
-import com.twitter.aurora.scheduler.configuration.SanitizedConfiguration;
-import com.twitter.aurora.scheduler.storage.Storage;
-import com.twitter.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import com.twitter.aurora.scheduler.storage.Storage.MutateWork;
-import com.twitter.aurora.scheduler.storage.entities.IAssignedTask;
-import com.twitter.aurora.scheduler.storage.entities.IJobConfiguration;
-import com.twitter.aurora.scheduler.storage.entities.IJobKey;
-import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
-import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
-import com.twitter.common.args.Arg;
-import com.twitter.common.args.CmdLine;
-import com.twitter.common.args.constraints.Positive;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import static com.twitter.aurora.gen.ScheduleStatus.KILLING;
-import static com.twitter.aurora.gen.ScheduleStatus.RESTARTING;
-import static com.twitter.aurora.scheduler.base.Tasks.ACTIVE_STATES;
-
-/**
- * Implementation of the scheduler core.
- */
-class SchedulerCoreImpl implements SchedulerCore {
-  @Positive
-  @CmdLine(name = "max_tasks_per_job", help = "Maximum number of allowed tasks in a single job.")
-  public static final Arg<Integer> MAX_TASKS_PER_JOB = Arg.create(4000);
-
-  private static final Logger LOG = Logger.getLogger(SchedulerCoreImpl.class.getName());
-
-  private final Storage storage;
-
-  private final CronJobManager cronScheduler;
-
-  // Schedulers that are responsible for triggering execution of jobs.
-  private final ImmutableList<JobManager> jobManagers;
-
-  // TODO(Bill Farner): Avoid using StateManagerImpl.
-  // State manager handles persistence of task modifications and state transitions.
-  private final StateManagerImpl stateManager;
-
-  private final TaskIdGenerator taskIdGenerator;
-  private final JobFilter jobFilter;
-
-  /**
-   * Creates a new core scheduler.
-   *
-   * @param storage Backing store implementation.
-   * @param cronScheduler Cron scheduler.
-   * @param immediateScheduler Immediate scheduler.
-   * @param stateManager Persistent state manager.
-   * @param taskIdGenerator Task ID generator.
-   * @param jobFilter Job filter.
-   */
-  @Inject
-  public SchedulerCoreImpl(
-      Storage storage,
-      CronJobManager cronScheduler,
-      ImmediateJobManager immediateScheduler,
-      StateManagerImpl stateManager,
-      TaskIdGenerator taskIdGenerator,
-      JobFilter jobFilter) {
-
-    this.storage = checkNotNull(storage);
-
-    // The immediate scheduler will accept any job, so it's important that other schedulers are
-    // placed first.
-    this.jobManagers = ImmutableList.of(cronScheduler, immediateScheduler);
-    this.cronScheduler = cronScheduler;
-    this.stateManager = checkNotNull(stateManager);
-    this.taskIdGenerator = checkNotNull(taskIdGenerator);
-    this.jobFilter = checkNotNull(jobFilter);
-  }
-
-  private boolean hasActiveJob(IJobConfiguration job) {
-    return Iterables.any(jobManagers, managerHasJob(job));
-  }
-
-  @Override
-  public synchronized void tasksDeleted(Set<String> taskIds) {
-    setTaskStatus(Query.taskScoped(taskIds), ScheduleStatus.UNKNOWN, Optional.<String>absent());
-  }
-
-  @Override
-  public synchronized void createJob(SanitizedConfiguration sanitizedConfiguration)
-      throws ScheduleException {
-
-    IJobConfiguration job = sanitizedConfiguration.getJobConfig();
-    if (hasActiveJob(job)) {
-      throw new ScheduleException("Job already exists: " + JobKeys.toPath(job));
-    }
-
-    runJobFilters(job.getKey(), job.getTaskConfig(), job.getInstanceCount(), false);
-
-    boolean accepted = false;
-    for (final JobManager manager : jobManagers) {
-      if (manager.receiveJob(sanitizedConfiguration)) {
-        LOG.info("Job accepted by manager: " + manager.getUniqueKey());
-        accepted = true;
-        break;
-      }
-    }
-
-    if (!accepted) {
-      LOG.severe("Job was not accepted by any of the configured schedulers, discarding.");
-      LOG.severe("Discarded job: " + job);
-      throw new ScheduleException("Job not accepted, discarding.");
-    }
-  }
-
-  // This number is derived from the maximum file name length limit on most UNIX systems, less
-  // the number of characters we've observed being added by mesos for the executor ID, prefix, and
-  // delimiters.
-  @VisibleForTesting
-  static final int MAX_TASK_ID_LENGTH = 255 - 90;
-
-  // TODO(maximk): Consider a better approach to quota checking. MESOS-4476.
-  private void runJobFilters(IJobKey jobKey, ITaskConfig task, int count, boolean incremental)
-      throws ScheduleException {
-
-    int instanceCount = count;
-    if (incremental) {
-      instanceCount +=
-          Storage.Util.weaklyConsistentFetchTasks(storage, Query.jobScoped(jobKey).active()).size();
-    }
-
-    // TODO(maximk): This is a short-term hack to stop the bleeding from
-    //               https://issues.apache.org/jira/browse/MESOS-691
-    if (taskIdGenerator.generate(task, instanceCount).length() > MAX_TASK_ID_LENGTH) {
-      throw new ScheduleException(
-          "Task ID is too long, please shorten your role or job name.");
-    }
-
-    JobFilter.JobFilterResult filterResult = jobFilter.filter(task, instanceCount);
-    // TODO(maximk): Consider deprecating JobFilterResult in favor of custom exception.
-    if (!filterResult.isPass()) {
-      throw new ScheduleException(filterResult.getReason());
-    }
-
-    if (instanceCount > MAX_TASKS_PER_JOB.get()) {
-      throw new ScheduleException("Job exceeds task limit of " + MAX_TASKS_PER_JOB.get());
-    }
-  }
-
-  @Override
-  public void validateJobResources(SanitizedConfiguration sanitizedConfiguration)
-      throws ScheduleException {
-
-    IJobConfiguration job = sanitizedConfiguration.getJobConfig();
-    runJobFilters(job.getKey(), job.getTaskConfig(), job.getInstanceCount(), false);
-  }
-
-  @Override
-  public void addInstances(
-      final IJobKey jobKey,
-      final ImmutableSet<Integer> instanceIds,
-      final ITaskConfig config) throws ScheduleException {
-
-    runJobFilters(jobKey, config, instanceIds.size(), true);
-    storage.write(new MutateWork.NoResult<ScheduleException>() {
-      @Override
-      protected void execute(MutableStoreProvider storeProvider)
-          throws ScheduleException {
-
-        ImmutableSet<IScheduledTask> tasks =
-            storeProvider.getTaskStore().fetchTasks(Query.jobScoped(jobKey).active());
-
-        Set<Integer> existingInstanceIds =
-            FluentIterable.from(tasks).transform(Tasks.SCHEDULED_TO_INSTANCE_ID).toSet();
-        if (!Sets.intersection(existingInstanceIds, instanceIds).isEmpty()) {
-          throw new ScheduleException("Instance ID collision detected.");
-        }
-
-        stateManager.insertPendingTasks(Maps.asMap(instanceIds, Functions.constant(config)));
-      }
-    });
-  }
-
-  @Override
-  public synchronized void startCronJob(IJobKey jobKey)
-      throws ScheduleException, TaskDescriptionException {
-
-    checkNotNull(jobKey);
-
-    if (!cronScheduler.hasJob(jobKey)) {
-      throw new ScheduleException("Cron job does not exist for " + JobKeys.toPath(jobKey));
-    }
-
-    cronScheduler.startJobNow(jobKey);
-  }
-
-  /**
-   * Creates a predicate that will determine whether a job manager has a job matching a job key.
-   *
-   * @param job Job to match.
-   * @return A new predicate matching the job owner and name given.
-   */
-  private static Predicate<JobManager> managerHasJob(final IJobConfiguration job) {
-    return new Predicate<JobManager>() {
-      @Override public boolean apply(JobManager manager) {
-        return manager.hasJob(job.getKey());
-      }
-    };
-  }
-
-  @Override
-  public synchronized void setTaskStatus(
-      Query.Builder query,
-      final ScheduleStatus status,
-      Optional<String> message) {
-
-    checkNotNull(query);
-    checkNotNull(status);
-
-    stateManager.changeState(query, status, message);
-  }
-
-  @Override
-  public synchronized void killTasks(Query.Builder query, String user) throws ScheduleException {
-    checkNotNull(query);
-    LOG.info("Killing tasks matching " + query);
-
-    boolean jobDeleted = false;
-
-    if (Query.isOnlyJobScoped(query)) {
-      // If this looks like a query for all tasks in a job, instruct the scheduler modules to
-      // delete the job.
-      IJobKey jobKey = JobKeys.from(query).get();
-      for (JobManager manager : jobManagers) {
-        if (manager.deleteJob(jobKey)) {
-          jobDeleted = true;
-        }
-      }
-    }
-
-    // Unless statuses were specifically supplied, only attempt to kill active tasks.
-    Query.Builder taskQuery = query.get().isSetStatuses() ? query.byStatus(ACTIVE_STATES) : query;
-
-    int tasksAffected =
-        stateManager.changeState(taskQuery, KILLING, Optional.of("Killed by " + user));
-    if (!jobDeleted && (tasksAffected == 0)) {
-      throw new ScheduleException("No jobs to kill");
-    }
-  }
-
-  @Override
-  public void restartShards(
-      IJobKey jobKey,
-      final Set<Integer> shards,
-      final String requestingUser) throws ScheduleException {
-
-    if (!JobKeys.isValid(jobKey)) {
-      throw new ScheduleException("Invalid job key: " + jobKey);
-    }
-
-    if (shards.isEmpty()) {
-      throw new ScheduleException("At least one shard must be specified.");
-    }
-
-    final Query.Builder query = Query.instanceScoped(jobKey, shards).active();
-    storage.write(new MutateWork.NoResult<ScheduleException>() {
-      @Override protected void execute(MutableStoreProvider storeProvider)
-          throws ScheduleException {
-
-        Set<IScheduledTask> matchingTasks = storeProvider.getTaskStore().fetchTasks(query);
-        if (matchingTasks.size() != shards.size()) {
-          throw new ScheduleException("Not all requested shards are active.");
-        }
-        LOG.info("Restarting shards matching " + query);
-        stateManager.changeState(
-            Query.taskScoped(Tasks.ids(matchingTasks)),
-            RESTARTING,
-            Optional.of("Restarted by " + requestingUser));
-      }
-    });
-  }
-
-
-  @Override
-  public synchronized void preemptTask(IAssignedTask task, IAssignedTask preemptingTask) {
-    checkNotNull(task);
-    checkNotNull(preemptingTask);
-    // TODO(William Farner): Throw SchedulingException if either task doesn't exist, etc.
-
-    stateManager.changeState(Query.taskScoped(task.getTaskId()), ScheduleStatus.PREEMPTING,
-        Optional.of("Preempting in favor of " + preemptingTask.getTaskId()));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/state/SideEffectStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/state/SideEffectStorage.java b/src/main/java/com/twitter/aurora/scheduler/state/SideEffectStorage.java
deleted file mode 100644
index 23ffff9..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/state/SideEffectStorage.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.state;
-
-import java.util.Queue;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-import com.twitter.aurora.scheduler.events.PubsubEvent;
-import com.twitter.aurora.scheduler.storage.Storage;
-import com.twitter.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import com.twitter.aurora.scheduler.storage.Storage.MutateWork;
-import com.twitter.aurora.scheduler.storage.Storage.StorageException;
-import com.twitter.aurora.scheduler.storage.Storage.Work;
-import com.twitter.common.base.Closure;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Wrapper around the persistent storage and mutable state.
- */
-class SideEffectStorage {
-
-  private final Queue<PubsubEvent> events = Lists.newLinkedList();
-  @VisibleForTesting
-  Queue<PubsubEvent> getEvents() {
-    return events;
-  }
-
-  private AtomicBoolean inOperation = new AtomicBoolean(false);
-
-  private final Storage storage;
-  private final OperationFinalizer operationFinalizer;
-  private final Closure<PubsubEvent> taskEventSink;
-
-  interface OperationFinalizer {
-    /**
-     * Performs any work necessary to complete the operation.
-     * This is executed in the context of a write operation, immediately after the work
-     * executes normally.
-     * NOTE: At present, this is executed for every nesting level of operations, rather than
-     * at the completion of the top-level operation.
-     * See comment in {@link #SideEffectStorage#executeSideEffectsAfter(SideEffectWork)}
-     * for more detail.
-     *
-     * @param work Work to finalize.
-     * @param storeProvider Mutable store reference.
-     */
-    void finalize(SideEffectWork<?, ?> work, MutableStoreProvider storeProvider);
-  }
-
-  SideEffectStorage(
-      Storage storage,
-      OperationFinalizer operationFinalizer,
-      Closure<PubsubEvent> taskEventSink) {
-
-    this.storage = checkNotNull(storage);
-    this.operationFinalizer = checkNotNull(operationFinalizer);
-    this.taskEventSink = checkNotNull(taskEventSink);
-  }
-
-  /**
-   * Perform a unit of work in a mutating operation.  This supports nesting/reentrancy.
-   *
-   * @param work Work to perform.
-   * @param <T> Work return type
-   * @param <E> Work exception type.
-   * @return The work return value.
-   * @throws E The work exception.
-   */
-  <T, E extends Exception> T write(SideEffectWork<T, E> work) throws E {
-    return storage.write(executeSideEffectsAfter(work));
-  }
-
-  <T, E extends Exception> T consistentRead(Work<T, E> work) throws StorageException, E {
-    return storage.consistentRead(work);
-  }
-
-  /**
-   * Work that has side effects external to the storage system.
-   * Work may add side effect and pubsub events, which will be executed/sent upon normal
-   * completion of the operation.
-   *
-   * @param <T> Work return type.
-   * @param <E> Work exception type.
-   */
-  abstract class SideEffectWork<T, E extends Exception> implements MutateWork<T, E> {
-    protected final void addTaskEvent(PubsubEvent notice) {
-      Preconditions.checkState(inOperation.get());
-      events.add(Preconditions.checkNotNull(notice));
-    }
-  }
-
-  /**
-   * Work with side effects which does not throw checked exceptions.
-   *
-   * @param <T>   Work return type.
-   */
-  abstract class QuietSideEffectWork<T> extends SideEffectWork<T, RuntimeException> {
-  }
-
-  /**
-   * Work with side effects that does not have a return value.
-   *
-   * @param <E> Work exception type.
-   */
-  abstract class NoResultSideEffectWork<E extends Exception> extends SideEffectWork<Void, E> {
-
-    @Override public final Void apply(MutableStoreProvider storeProvider) throws E {
-      execute(storeProvider);
-      return null;
-    }
-
-    abstract void execute(MutableStoreProvider storeProvider) throws E;
-  }
-
-  /**
-   * Work with side effects which does not throw checked exceptions or have a return
-   * value.
-   */
-  abstract class NoResultQuietSideEffectWork extends NoResultSideEffectWork<RuntimeException> {
-  }
-
-  private <T, E extends Exception> MutateWork<T, E> executeSideEffectsAfter(
-      final SideEffectWork<T, E> work) {
-
-    return new MutateWork<T, E>() {
-      @Override public T apply(MutableStoreProvider storeProvider) throws E {
-        boolean topLevelOperation = inOperation.compareAndSet(false, true);
-
-        try {
-          T result = work.apply(storeProvider);
-
-          // TODO(William Farner): Maintaining this since it matches prior behavior, but this
-          // seems wrong.  Double-check whether this is necessary, or if only the top-level
-          // operation should be executing the finalizer.  Update doc on OperationFinalizer
-          // once this is assessed.
-          operationFinalizer.finalize(work, storeProvider);
-          if (topLevelOperation) {
-            while (!events.isEmpty()) {
-              taskEventSink.execute(events.remove());
-            }
-          }
-          return result;
-        } finally {
-          if (topLevelOperation) {
-            inOperation.set(false);
-          }
-        }
-      }
-    };
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/state/StateManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/state/StateManager.java b/src/main/java/com/twitter/aurora/scheduler/state/StateManager.java
deleted file mode 100644
index 099ec70..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/state/StateManager.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.state;
-
-import java.util.Map;
-import java.util.Set;
-
-import com.google.common.base.Optional;
-
-import org.apache.mesos.Protos.SlaveID;
-
-import com.twitter.aurora.gen.ScheduleStatus;
-import com.twitter.aurora.scheduler.base.Query;
-import com.twitter.aurora.scheduler.storage.entities.IAssignedTask;
-import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
-
-/**
- * Thin interface for the state manager.
- */
-public interface StateManager {
-
-  /**
-   * Performs a simple state change, transitioning all tasks matching a query to the given
-   * state and applying the given audit message.
-   * TODO(William Farner): Consider removing the return value.
-   *
-   * @param query Builder of the query to perform, the results of which will be modified.
-   * @param newState State to move the resulting tasks into.
-   * @param auditMessage Audit message to apply along with the state change.
-   * @return the number of successful state changes.
-   */
-  int changeState(Query.Builder query, ScheduleStatus newState, Optional<String> auditMessage);
-
-  /**
-   * Assigns a task to a specific slave.
-   * This will modify the task record to reflect the host assignment and return the updated record.
-   *
-   * @param taskId ID of the task to mutate.
-   * @param slaveHost Host name that the task is being assigned to.
-   * @param slaveId ID of the slave that the task is being assigned to.
-   * @param assignedPorts Ports on the host that are being assigned to the task.
-   * @return The updated task record, or {@code null} if the task was not found.
-   */
-  IAssignedTask assignTask(
-      String taskId,
-      String slaveHost,
-      SlaveID slaveId,
-      Set<Integer> assignedPorts);
-
-  /**
-   * Inserts new tasks into the store. Tasks will immediately move into PENDING and will be eligible
-   * for scheduling.
-   *
-   * @param tasks Tasks to insert, mapped by their instance IDs.
-   */
-  void insertPendingTasks(Map<Integer, ITaskConfig> tasks);
-
-  /**
-   * Deletes records of tasks from the task store.
-   * This will not perform any state checking or state transitions, but will immediately remove
-   * the tasks from the store.  It will also silently ignore attempts to delete task IDs that do
-   * not exist.
-   *
-   * @param taskIds IDs of tasks to delete.
-   */
-  void deleteTasks(final Set<String> taskIds);
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/state/StateManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/state/StateManagerImpl.java b/src/main/java/com/twitter/aurora/scheduler/state/StateManagerImpl.java
deleted file mode 100644
index 37d13f4..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/state/StateManagerImpl.java
+++ /dev/null
@@ -1,458 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.state;
-
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.logging.Logger;
-
-import javax.annotation.Nullable;
-import javax.inject.Inject;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.Atomics;
-
-import org.apache.mesos.Protos.SlaveID;
-
-import com.twitter.aurora.gen.AssignedTask;
-import com.twitter.aurora.gen.ScheduleStatus;
-import com.twitter.aurora.gen.ScheduledTask;
-import com.twitter.aurora.scheduler.Driver;
-import com.twitter.aurora.scheduler.TaskIdGenerator;
-import com.twitter.aurora.scheduler.base.Query;
-import com.twitter.aurora.scheduler.base.Tasks;
-import com.twitter.aurora.scheduler.events.PubsubEvent;
-import com.twitter.aurora.scheduler.state.SideEffectStorage.SideEffectWork;
-import com.twitter.aurora.scheduler.storage.Storage;
-import com.twitter.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import com.twitter.aurora.scheduler.storage.Storage.StoreProvider;
-import com.twitter.aurora.scheduler.storage.Storage.Work;
-import com.twitter.aurora.scheduler.storage.TaskStore;
-import com.twitter.aurora.scheduler.storage.TaskStore.Mutable.TaskMutation;
-import com.twitter.aurora.scheduler.storage.entities.IAssignedTask;
-import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
-import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
-import com.twitter.common.base.Closure;
-import com.twitter.common.stats.Stats;
-import com.twitter.common.util.Clock;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.collect.Iterables.transform;
-
-import static com.twitter.aurora.gen.ScheduleStatus.INIT;
-import static com.twitter.aurora.gen.ScheduleStatus.PENDING;
-import static com.twitter.aurora.gen.ScheduleStatus.UNKNOWN;
-import static com.twitter.aurora.scheduler.state.SideEffectStorage.OperationFinalizer;
-import static com.twitter.common.base.MorePreconditions.checkNotBlank;
-
-/**
- * Manager of all persistence-related operations for the scheduler.  Acts as a controller for
- * persisted state machine transitions, and their side-effects.
- *
- * TODO(William Farner): Re-evaluate thread safety here, specifically risk of races that
- * modify managerState.
- */
-public class StateManagerImpl implements StateManager {
-  private static final Logger LOG = Logger.getLogger(StateManagerImpl.class.getName());
-
-  private final SideEffectStorage storage;
-  @VisibleForTesting
-  SideEffectStorage getStorage() {
-    return storage;
-  }
-
-  private final TaskIdGenerator taskIdGenerator;
-
-  // Work queue to receive state machine side effect work.
-  // Items are sorted to place DELETE entries last.  This is to ensure that within an operation,
-  // a delete is always processed after a state transition.
-  private final Queue<WorkEntry> workQueue = new PriorityQueue<>(10,
-      new Comparator<WorkEntry>() {
-        @Override public int compare(WorkEntry a, WorkEntry b) {
-          if ((a.command == WorkCommand.DELETE) != (b.command == WorkCommand.DELETE)) {
-            return (a.command == WorkCommand.DELETE) ? 1 : -1;
-          } else {
-            return 0;
-          }
-        }
-      });
-
-  // Adapt the work queue into a sink.
-  private final TaskStateMachine.WorkSink workSink = new TaskStateMachine.WorkSink() {
-      @Override public void addWork(
-          WorkCommand work,
-          TaskStateMachine stateMachine,
-          Function<IScheduledTask, IScheduledTask> mutation) {
-
-        workQueue.add(new WorkEntry(work, stateMachine, mutation));
-      }
-    };
-
-  private final Function<Map.Entry<Integer, ITaskConfig>, IScheduledTask> taskCreator =
-      new Function<Map.Entry<Integer, ITaskConfig>, IScheduledTask>() {
-        @Override public IScheduledTask apply(Map.Entry<Integer, ITaskConfig> entry) {
-          ITaskConfig task = entry.getValue();
-          AssignedTask assigned = new AssignedTask()
-              .setTaskId(taskIdGenerator.generate(task, entry.getKey()))
-              .setInstanceId(entry.getKey())
-              .setTask(task.newBuilder());
-          return IScheduledTask.build(new ScheduledTask()
-              .setStatus(INIT)
-              .setAssignedTask(assigned));
-        }
-      };
-
-  private final Driver driver;
-  private final Clock clock;
-
-  /**
-   * An item of work on the work queue.
-   */
-  private static class WorkEntry {
-    private final WorkCommand command;
-    private final TaskStateMachine stateMachine;
-    private final Function<IScheduledTask, IScheduledTask> mutation;
-
-    WorkEntry(
-        WorkCommand command,
-        TaskStateMachine stateMachine,
-        Function<IScheduledTask, IScheduledTask> mutation) {
-
-      this.command = command;
-      this.stateMachine = stateMachine;
-      this.mutation = mutation;
-    }
-  }
-
-  @Inject
-  StateManagerImpl(
-      final Storage storage,
-      final Clock clock,
-      Driver driver,
-      TaskIdGenerator taskIdGenerator,
-      Closure<PubsubEvent> taskEventSink) {
-
-    checkNotNull(storage);
-    this.clock = checkNotNull(clock);
-
-    OperationFinalizer finalizer = new OperationFinalizer() {
-      @Override public void finalize(SideEffectWork<?, ?> work, MutableStoreProvider store) {
-        processWorkQueueInWriteOperation(work, store);
-      }
-    };
-
-    this.storage = new SideEffectStorage(storage, finalizer, taskEventSink);
-
-    this.driver = checkNotNull(driver);
-    this.taskIdGenerator = checkNotNull(taskIdGenerator);
-
-    Stats.exportSize("work_queue_depth", workQueue);
-  }
-
-  @Override
-  public void insertPendingTasks(final Map<Integer, ITaskConfig> tasks) {
-    checkNotNull(tasks);
-
-    // Done outside the write transaction to minimize the work done inside a transaction.
-    final Set<IScheduledTask> scheduledTasks =
-        ImmutableSet.copyOf(transform(tasks.entrySet(), taskCreator));
-
-    storage.write(storage.new NoResultQuietSideEffectWork() {
-      @Override protected void execute(MutableStoreProvider storeProvider) {
-        storeProvider.getUnsafeTaskStore().saveTasks(scheduledTasks);
-
-        for (IScheduledTask task : scheduledTasks) {
-          createStateMachine(task).updateState(PENDING);
-        }
-      }
-    });
-  }
-
-  @Override
-  public int changeState(
-      Query.Builder query,
-      final ScheduleStatus newState,
-      final Optional<String> auditMessage) {
-
-    return changeState(query, new Function<TaskStateMachine, Boolean>() {
-      @Override public Boolean apply(TaskStateMachine stateMachine) {
-        return stateMachine.updateState(newState, auditMessage);
-      }
-    });
-  }
-
-  @Override
-  public IAssignedTask assignTask(
-      String taskId,
-      String slaveHost,
-      SlaveID slaveId,
-      Set<Integer> assignedPorts) {
-
-    checkNotBlank(taskId);
-    checkNotBlank(slaveHost);
-    checkNotNull(assignedPorts);
-
-    TaskAssignMutation mutation = assignHost(slaveHost, slaveId, assignedPorts);
-    changeState(Query.taskScoped(taskId), mutation);
-
-    return mutation.getAssignedTask();
-  }
-
-  private int changeStateInWriteOperation(
-      Set<String> taskIds,
-      Function<TaskStateMachine, Boolean> stateChange) {
-
-    int count = 0;
-    for (TaskStateMachine stateMachine : getStateMachines(taskIds).values()) {
-      if (stateChange.apply(stateMachine)) {
-        ++count;
-      }
-    }
-    return count;
-  }
-
-  private int changeState(
-      final Query.Builder query,
-      final Function<TaskStateMachine, Boolean> stateChange) {
-
-    return storage.write(storage.new QuietSideEffectWork<Integer>() {
-      @Override public Integer apply(MutableStoreProvider storeProvider) {
-        Set<String> ids = FluentIterable.from(storeProvider.getTaskStore().fetchTasks(query))
-            .transform(Tasks.SCHEDULED_TO_ID)
-            .toSet();
-        return changeStateInWriteOperation(ids, stateChange);
-      }
-    });
-  }
-
-  private interface TaskAssignMutation extends Function<TaskStateMachine, Boolean> {
-    IAssignedTask getAssignedTask();
-  }
-
-  private static Map<String, Integer> getNameMappedPorts(
-      Set<String> portNames,
-      Set<Integer> allocatedPorts) {
-
-    Preconditions.checkNotNull(portNames);
-
-    // Expand ports.
-    Map<String, Integer> ports = Maps.newHashMap();
-    Set<Integer> portsRemaining = Sets.newHashSet(allocatedPorts);
-    Iterator<Integer> portConsumer = Iterables.consumingIterable(portsRemaining).iterator();
-
-    for (String portName : portNames) {
-      Preconditions.checkArgument(portConsumer.hasNext(),
-          "Allocated ports %s were not sufficient to expand task.", allocatedPorts);
-      int portNumber = portConsumer.next();
-      ports.put(portName, portNumber);
-    }
-
-    if (!portsRemaining.isEmpty()) {
-      LOG.warning("Not all allocated ports were used to map ports!");
-    }
-
-    return ports;
-  }
-
-  private TaskAssignMutation assignHost(
-      final String slaveHost,
-      final SlaveID slaveId,
-      final Set<Integer> assignedPorts) {
-
-    final TaskMutation mutation = new TaskMutation() {
-      @Override public IScheduledTask apply(IScheduledTask task) {
-        ScheduledTask builder = task.newBuilder();
-        AssignedTask assigned = builder.getAssignedTask();
-        assigned.setAssignedPorts(
-            getNameMappedPorts(assigned.getTask().getRequestedPorts(), assignedPorts));
-        assigned.setSlaveHost(slaveHost)
-            .setSlaveId(slaveId.getValue());
-        return IScheduledTask.build(builder);
-      }
-    };
-
-    return new TaskAssignMutation() {
-      private AtomicReference<IAssignedTask> assignedTask = Atomics.newReference();
-      @Override public IAssignedTask getAssignedTask() {
-        return assignedTask.get();
-      }
-
-      @Override public Boolean apply(final TaskStateMachine stateMachine) {
-        TaskMutation wrapper = new TaskMutation() {
-          @Override public IScheduledTask apply(IScheduledTask task) {
-            IScheduledTask mutated = mutation.apply(task);
-            Preconditions.checkState(
-                assignedTask.compareAndSet(null, mutated.getAssignedTask()),
-                "More than one result was found for an identity query.");
-            return mutated;
-          }
-        };
-        return stateMachine.updateState(ScheduleStatus.ASSIGNED, wrapper);
-      }
-    };
-  }
-
-  private void processWorkQueueInWriteOperation(
-      SideEffectWork<?, ?> sideEffectWork,
-      MutableStoreProvider storeProvider) {
-
-    for (final WorkEntry work : Iterables.consumingIterable(workQueue)) {
-      final TaskStateMachine stateMachine = work.stateMachine;
-
-      if (work.command == WorkCommand.KILL) {
-        driver.killTask(stateMachine.getTaskId());
-      } else {
-        TaskStore.Mutable taskStore = storeProvider.getUnsafeTaskStore();
-        String taskId = stateMachine.getTaskId();
-        Query.Builder idQuery = Query.taskScoped(taskId);
-
-        switch (work.command) {
-          case RESCHEDULE:
-            ScheduledTask builder =
-                Iterables.getOnlyElement(taskStore.fetchTasks(idQuery)).newBuilder();
-            builder.getAssignedTask().unsetSlaveId();
-            builder.getAssignedTask().unsetSlaveHost();
-            builder.getAssignedTask().unsetAssignedPorts();
-            builder.unsetTaskEvents();
-            builder.setAncestorId(taskId);
-            String newTaskId = taskIdGenerator.generate(
-                ITaskConfig.build(builder.getAssignedTask().getTask()),
-                builder.getAssignedTask().getInstanceId());
-            builder.getAssignedTask().setTaskId(newTaskId);
-
-            LOG.info("Task being rescheduled: " + taskId);
-
-            IScheduledTask task = IScheduledTask.build(builder);
-            taskStore.saveTasks(ImmutableSet.of(task));
-
-            createStateMachine(task).updateState(PENDING, Optional.of("Rescheduled"));
-            ITaskConfig taskInfo = task.getAssignedTask().getTask();
-            sideEffectWork.addTaskEvent(
-                new PubsubEvent.TaskRescheduled(
-                    taskInfo.getOwner().getRole(),
-                    taskInfo.getJobName(),
-                    task.getAssignedTask().getInstanceId()));
-            break;
-
-          case UPDATE_STATE:
-            taskStore.mutateTasks(idQuery, new TaskMutation() {
-              @Override public IScheduledTask apply(IScheduledTask task) {
-                return work.mutation.apply(
-                    IScheduledTask.build(task.newBuilder().setStatus(stateMachine.getState())));
-              }
-            });
-            sideEffectWork.addTaskEvent(
-                new PubsubEvent.TaskStateChange(
-                    Iterables.getOnlyElement(taskStore.fetchTasks(idQuery)),
-                    stateMachine.getPreviousState()));
-            break;
-
-          case DELETE:
-            deleteTasks(ImmutableSet.of(taskId));
-            break;
-
-          case INCREMENT_FAILURES:
-            taskStore.mutateTasks(idQuery, new TaskMutation() {
-              @Override public IScheduledTask apply(IScheduledTask task) {
-                return IScheduledTask.build(
-                    task.newBuilder().setFailureCount(task.getFailureCount() + 1));
-              }
-            });
-            break;
-
-          default:
-            LOG.severe("Unrecognized work command type " + work.command);
-        }
-      }
-    }
-  }
-
-  @Override
-  public void deleteTasks(final Set<String> taskIds) {
-    storage.write(storage.new NoResultQuietSideEffectWork() {
-      @Override protected void execute(final MutableStoreProvider storeProvider) {
-        TaskStore.Mutable taskStore = storeProvider.getUnsafeTaskStore();
-        Iterable<IScheduledTask> tasks = taskStore.fetchTasks(Query.taskScoped(taskIds));
-        addTaskEvent(new PubsubEvent.TasksDeleted(ImmutableSet.copyOf(tasks)));
-        taskStore.deleteTasks(taskIds);
-      }
-    });
-  }
-
-  private Map<String, TaskStateMachine> getStateMachines(final Set<String> taskIds) {
-    return storage.consistentRead(new Work.Quiet<Map<String, TaskStateMachine>>() {
-      @Override public Map<String, TaskStateMachine> apply(StoreProvider storeProvider) {
-        Map<String, IScheduledTask> existingTasks = Maps.uniqueIndex(
-            storeProvider.getTaskStore().fetchTasks(Query.taskScoped(taskIds)),
-            new Function<IScheduledTask, String>() {
-              @Override public String apply(IScheduledTask input) {
-                return input.getAssignedTask().getTaskId();
-              }
-            });
-
-        ImmutableMap.Builder<String, TaskStateMachine> builder = ImmutableMap.builder();
-        for (String taskId : taskIds) {
-          // Pass null get() values through.
-          builder.put(taskId, getStateMachine(taskId, existingTasks.get(taskId)));
-        }
-        return builder.build();
-      }
-    });
-  }
-
-  private TaskStateMachine getStateMachine(String taskId, @Nullable IScheduledTask task) {
-    if (task != null) {
-      return createStateMachine(task, task.getStatus());
-    }
-
-    // The task is unknown, not present in storage.
-    TaskStateMachine stateMachine = new TaskStateMachine(
-        taskId,
-        null,
-        workSink,
-        clock,
-        INIT);
-    stateMachine.updateState(UNKNOWN);
-    return stateMachine;
-  }
-
-  private TaskStateMachine createStateMachine(IScheduledTask task) {
-    return createStateMachine(task, INIT);
-  }
-
-  private TaskStateMachine createStateMachine(IScheduledTask task, ScheduleStatus initialState) {
-    return new TaskStateMachine(
-        Tasks.id(task),
-        task,
-        workSink,
-        clock,
-        initialState);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/state/StateModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/state/StateModule.java b/src/main/java/com/twitter/aurora/scheduler/state/StateModule.java
deleted file mode 100644
index 870d085..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/state/StateModule.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.state;
-
-import javax.inject.Singleton;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.inject.AbstractModule;
-import com.google.inject.Binder;
-
-import com.twitter.aurora.scheduler.MesosTaskFactory;
-import com.twitter.aurora.scheduler.MesosTaskFactory.MesosTaskFactoryImpl;
-import com.twitter.aurora.scheduler.events.PubsubEventModule;
-import com.twitter.aurora.scheduler.state.MaintenanceController.MaintenanceControllerImpl;
-import com.twitter.aurora.scheduler.state.TaskAssigner.TaskAssignerImpl;
-import com.twitter.aurora.scheduler.state.UUIDGenerator.UUIDGeneratorImpl;
-
-/**
- * Binding module for scheduling logic and higher-level state management.
- */
-public class StateModule extends AbstractModule {
-
-  @Override
-  protected void configure() {
-    bind(TaskAssigner.class).to(TaskAssignerImpl.class);
-    bind(TaskAssignerImpl.class).in(Singleton.class);
-    bind(MesosTaskFactory.class).to(MesosTaskFactoryImpl.class);
-
-    bind(SchedulerCore.class).to(SchedulerCoreImpl.class).in(Singleton.class);
-
-    bind(StateManager.class).to(StateManagerImpl.class);
-    bind(StateManagerImpl.class).in(Singleton.class);
-
-    bind(UUIDGenerator.class).to(UUIDGeneratorImpl.class);
-    bind(UUIDGeneratorImpl.class).in(Singleton.class);
-    bind(LockManager.class).to(LockManagerImpl.class);
-    bind(LockManagerImpl.class).in(Singleton.class);
-
-    bindCronJobManager(binder());
-    bind(ImmediateJobManager.class).in(Singleton.class);
-
-    bindMaintenanceController(binder());
-  }
-
-  @VisibleForTesting
-  static void bindCronJobManager(Binder binder) {
-    binder.bind(CronJobManager.class).in(Singleton.class);
-    PubsubEventModule.bindSubscriber(binder, CronJobManager.class);
-  }
-
-  @VisibleForTesting
-  static void bindMaintenanceController(Binder binder) {
-    binder.bind(MaintenanceController.class).to(MaintenanceControllerImpl.class);
-    binder.bind(MaintenanceControllerImpl.class).in(Singleton.class);
-    PubsubEventModule.bindSubscriber(binder, MaintenanceControllerImpl.class);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/state/TaskAssigner.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/state/TaskAssigner.java b/src/main/java/com/twitter/aurora/scheduler/state/TaskAssigner.java
deleted file mode 100644
index c37feae..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/state/TaskAssigner.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.state;
-
-import java.util.Set;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-
-import com.google.common.base.Optional;
-
-import org.apache.mesos.Protos.Offer;
-import org.apache.mesos.Protos.TaskInfo;
-
-import com.twitter.aurora.scheduler.MesosTaskFactory;
-import com.twitter.aurora.scheduler.ResourceSlot;
-import com.twitter.aurora.scheduler.base.Tasks;
-import com.twitter.aurora.scheduler.configuration.Resources;
-import com.twitter.aurora.scheduler.filter.SchedulingFilter;
-import com.twitter.aurora.scheduler.filter.SchedulingFilter.Veto;
-import com.twitter.aurora.scheduler.storage.entities.IAssignedTask;
-import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Responsible for matching a task against an offer.
- */
-public interface TaskAssigner {
-
-  /**
-   * Tries to match a task against an offer.  If a match is found, the assigner should
-   * make the appropriate changes to the task and provide a non-empty result.
-   *
-   * @param offer The resource offer.
-   * @param task The task to match against and optionally assign.
-   * @return Instructions for launching the task if matching and assignment were successful.
-   */
-  Optional<TaskInfo> maybeAssign(Offer offer, IScheduledTask task);
-
-  class TaskAssignerImpl implements TaskAssigner {
-    private static final Logger LOG = Logger.getLogger(TaskAssignerImpl.class.getName());
-
-    private final StateManager stateManager;
-    private final SchedulingFilter filter;
-    private final MesosTaskFactory taskFactory;
-
-    @Inject
-    public TaskAssignerImpl(
-        StateManager stateManager,
-        SchedulingFilter filter,
-        MesosTaskFactory taskFactory) {
-
-      this.stateManager = checkNotNull(stateManager);
-      this.filter = checkNotNull(filter);
-      this.taskFactory = checkNotNull(taskFactory);
-    }
-
-    private TaskInfo assign(Offer offer, IScheduledTask task) {
-      String host = offer.getHostname();
-      Set<Integer> selectedPorts =
-          Resources.getPorts(offer, task.getAssignedTask().getTask().getRequestedPorts().size());
-      IAssignedTask assigned = stateManager.assignTask(
-          Tasks.id(task),
-          host,
-          offer.getSlaveId(),
-          selectedPorts);
-      LOG.info(String.format("Offer on slave %s (id %s) is being assigned task for %s.",
-          host, offer.getSlaveId(), Tasks.id(task)));
-      return taskFactory.createFrom(assigned, offer.getSlaveId());
-    }
-
-    @Override
-    public Optional<TaskInfo> maybeAssign(Offer offer, IScheduledTask task) {
-      Set<Veto> vetoes = filter.filter(
-          ResourceSlot.from(offer),
-          offer.getHostname(),
-          task.getAssignedTask().getTask(),
-          Tasks.id(task));
-      if (vetoes.isEmpty()) {
-        return Optional.of(assign(offer, task));
-      } else {
-        LOG.fine("Slave " + offer.getHostname() + " vetoed task " + Tasks.id(task)
-            + ": " + vetoes);
-        return Optional.absent();
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/state/TaskStateMachine.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/state/TaskStateMachine.java b/src/main/java/com/twitter/aurora/scheduler/state/TaskStateMachine.java
deleted file mode 100644
index f32fd14..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/state/TaskStateMachine.java
+++ /dev/null
@@ -1,622 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.state;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.annotation.Nullable;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Functions;
-import com.google.common.base.Optional;
-import com.google.common.base.Supplier;
-import com.google.common.base.Suppliers;
-import com.google.common.base.Throwables;
-
-import org.apache.commons.lang.builder.HashCodeBuilder;
-
-import com.twitter.aurora.gen.ScheduleStatus;
-import com.twitter.aurora.gen.ScheduledTask;
-import com.twitter.aurora.gen.TaskEvent;
-import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
-import com.twitter.common.base.Closure;
-import com.twitter.common.base.Closures;
-import com.twitter.common.base.Command;
-import com.twitter.common.base.MorePreconditions;
-import com.twitter.common.stats.Stats;
-import com.twitter.common.util.Clock;
-import com.twitter.common.util.StateMachine;
-import com.twitter.common.util.StateMachine.Rule;
-import com.twitter.common.util.StateMachine.Transition;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * State machine for a task.
- * <p>
- * This enforces the lifecycle of a task, and triggers the actions that should be taken in response
- * to different state transitions.  These responses are externally communicated by populating a
- * provided work queue.
- * <p>
- * TODO(William Farner): Introduce an interface to allow state machines to be dealt with
- *     abstractly from the consumption side.
- */
-class TaskStateMachine {
-  private static final Logger LOG = Logger.getLogger(TaskStateMachine.class.getName());
-
-  private static final AtomicLong ILLEGAL_TRANSITIONS =
-      Stats.exportLong("scheduler_illegal_task_state_transitions");
-
-  // Re-declarations of statuses as wrapped state objects.
-  private static final State ASSIGNED = State.create(ScheduleStatus.ASSIGNED);
-  private static final State FAILED = State.create(ScheduleStatus.FAILED);
-  private static final State FINISHED = State.create(ScheduleStatus.FINISHED);
-  private static final State INIT = State.create(ScheduleStatus.INIT);
-  private static final State KILLED = State.create(ScheduleStatus.KILLED);
-  private static final State KILLING = State.create(ScheduleStatus.KILLING);
-  private static final State LOST = State.create(ScheduleStatus.LOST);
-  private static final State PENDING = State.create(ScheduleStatus.PENDING);
-  private static final State PREEMPTING = State.create(ScheduleStatus.PREEMPTING);
-  private static final State RESTARTING = State.create(ScheduleStatus.RESTARTING);
-  private static final State RUNNING = State.create(ScheduleStatus.RUNNING);
-  private static final State STARTING = State.create(ScheduleStatus.STARTING);
-  private static final State UNKNOWN = State.create(ScheduleStatus.UNKNOWN);
-
-  @VisibleForTesting
-  static final Supplier<String> LOCAL_HOST_SUPPLIER = Suppliers.memoize(
-      new Supplier<String>() {
-        @Override public String get() {
-          try {
-            return InetAddress.getLocalHost().getHostName();
-          } catch (UnknownHostException e) {
-            LOG.log(Level.SEVERE, "Failed to get self hostname.");
-            throw Throwables.propagate(e);
-          }
-        }
-      });
-
-  private final String taskId;
-  private final WorkSink workSink;
-  private final StateMachine<State> stateMachine;
-  private ScheduleStatus previousState = null;
-  private final Clock clock;
-
-  /**
-   * Composes a schedule status and a state change argument.  Only the ScheduleStatuses in two
-   * States must be equal for them to be considered equal.
-   */
-  private static class State {
-    private final ScheduleStatus state;
-    private final Function<IScheduledTask, IScheduledTask> mutation;
-
-    State(ScheduleStatus state, Function<IScheduledTask, IScheduledTask> mutation) {
-      this.state = state;
-      this.mutation = mutation;
-    }
-
-    static State create(ScheduleStatus status) {
-      return create(status, Functions.<IScheduledTask>identity());
-    }
-
-    static State create(
-        ScheduleStatus status,
-        Function<IScheduledTask, IScheduledTask> mutation) {
-
-      return new State(status, mutation);
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (!(o instanceof State)) {
-        return false;
-      }
-
-      if (o == this) {
-        return true;
-      }
-
-      State other = (State) o;
-      return state == other.state;
-    }
-
-    @Override
-    public int hashCode() {
-      return new HashCodeBuilder()
-          .append(state)
-          .toHashCode();
-    }
-
-    @Override
-    public String toString() {
-      return state.toString();
-    }
-
-    private ScheduleStatus getState() {
-      return state;
-    }
-
-    private Function<IScheduledTask, IScheduledTask> getMutation() {
-      return mutation;
-    }
-  }
-
-  /**
-   * A write-only work acceptor.
-   */
-  public interface WorkSink {
-    /**
-     * Appends external work that must be performed for a state machine transition to be fully
-     * complete.
-     *
-     * @param work Description of the work to be performed.
-     * @param stateMachine The state machine that the work is associated with.
-     * @param mutation Mutate operation to perform along with the state transition.
-     */
-    void addWork(
-        WorkCommand work,
-        TaskStateMachine stateMachine,
-        Function<IScheduledTask, IScheduledTask> mutation);
-  }
-
-  /**
-   * Creates a new task state machine.
-   *
-   * @param taskId ID of the task managed by this state machine.
-   * @param task Read-only task that this state machine manages.
-   * @param workSink Work sink to receive transition response actions
-   * @param clock Clock to use for reading the current time.
-   * @param initialState The state to begin the state machine at.  All legal transitions will be
-   *     added, but this allows the state machine to 'skip' states, for instance when a task is
-   *     loaded from a persistent store.
-   */
-  public TaskStateMachine(
-      final String taskId,
-      final IScheduledTask task,
-      final WorkSink workSink,
-      final Clock clock,
-      final ScheduleStatus initialState) {
-
-    this.taskId = MorePreconditions.checkNotBlank(taskId);
-    this.workSink = checkNotNull(workSink);
-    this.clock = checkNotNull(clock);
-    checkNotNull(initialState);
-
-    @SuppressWarnings("unchecked")
-    Closure<Transition<State>> manageTerminatedTasks = Closures.combine(
-        /* Kill a task that we believe to be terminated when an attempt is made to revive. */
-        Closures.filter(Transition.to(ASSIGNED, STARTING, RUNNING),
-            addWorkClosure(WorkCommand.KILL)),
-        /* Remove a terminated task that is remotely removed. */
-        Closures.filter(Transition.to(UNKNOWN), addWorkClosure(WorkCommand.DELETE)));
-
-    final Closure<Transition<State>> manageRestartingTask = new Closure<Transition<State>>() {
-      @SuppressWarnings("fallthrough")
-      @Override public void execute(Transition<State> transition) {
-        switch (transition.getTo().getState()) {
-          case ASSIGNED:
-          case STARTING:
-          case RUNNING:
-            addWork(WorkCommand.KILL);
-            break;
-
-          case LOST:
-            addWork(WorkCommand.KILL);
-            // fall through
-
-          case FINISHED:
-          case FAILED:
-          case KILLED:
-            addWork(WorkCommand.RESCHEDULE, transition.getTo().getMutation());
-            break;
-
-          case UNKNOWN:
-            updateState(ScheduleStatus.LOST);
-            break;
-
-          default:
-            // No-op.
-        }
-      }
-    };
-
-    // To be called on a task transitioning into the FINISHED state.
-    final Command rescheduleIfService = new Command() {
-      @Override public void execute() {
-        if (task.getAssignedTask().getTask().isIsService()) {
-          addWork(WorkCommand.RESCHEDULE);
-        }
-      }
-    };
-
-    // To be called on a task transitioning into the FAILED state.
-    final Command incrementFailuresMaybeReschedule = new Command() {
-      @Override public void execute() {
-        addWork(WorkCommand.INCREMENT_FAILURES);
-
-        // Max failures is ignored for service task.
-        boolean isService = task.getAssignedTask().getTask().isIsService();
-
-        // Max failures is ignored when set to -1.
-        int maxFailures = task.getAssignedTask().getTask().getMaxTaskFailures();
-        if (isService || (maxFailures == -1) || (task.getFailureCount() < (maxFailures - 1))) {
-          addWork(WorkCommand.RESCHEDULE);
-        } else {
-          LOG.info("Task " + getTaskId() + " reached failure limit, not rescheduling");
-        }
-      }
-    };
-
-    stateMachine = StateMachine.<State>builder(taskId)
-        .logTransitions()
-        .initialState(State.create(initialState))
-        .addState(
-            Rule.from(INIT)
-                .to(PENDING, UNKNOWN))
-        .addState(
-            Rule.from(PENDING)
-                .to(ASSIGNED, KILLING)
-                .withCallback(
-                    new Closure<Transition<State>>() {
-                      @Override public void execute(Transition<State> transition) {
-                        switch (transition.getTo().getState()) {
-                          case KILLING:
-                            addWork(WorkCommand.DELETE);
-                            break;
-
-                          default:
-                            // No-op.
-                        }
-                      }
-                    }
-                ))
-        .addState(
-            Rule.from(ASSIGNED)
-                .to(STARTING, RUNNING, FINISHED, FAILED, RESTARTING, KILLED,
-                    KILLING, LOST, PREEMPTING)
-                .withCallback(
-                    new Closure<Transition<State>>() {
-                      @SuppressWarnings("fallthrough")
-                      @Override public void execute(Transition<State> transition) {
-                        switch (transition.getTo().getState()) {
-                          case FINISHED:
-                            rescheduleIfService.execute();
-                            break;
-
-                          case PREEMPTING:
-                            addWork(WorkCommand.KILL);
-                            break;
-
-                          case FAILED:
-                            incrementFailuresMaybeReschedule.execute();
-                            break;
-
-                          case RESTARTING:
-                            addWork(WorkCommand.KILL);
-                            break;
-
-                          case KILLED:
-                            addWork(WorkCommand.RESCHEDULE);
-                            break;
-
-                          case LOST:
-                            addWork(WorkCommand.RESCHEDULE);
-                            // fall through
-                          case KILLING:
-                            addWork(WorkCommand.KILL);
-                            break;
-
-                          case UNKNOWN:
-                            break;
-
-                           default:
-                             // No-op.
-                        }
-                      }
-                    }
-                ))
-        .addState(
-            Rule.from(STARTING)
-                .to(RUNNING, FINISHED, FAILED, RESTARTING, KILLING, KILLED, LOST, PREEMPTING)
-                .withCallback(
-                    new Closure<Transition<State>>() {
-                      @SuppressWarnings("fallthrough")
-                      @Override public void execute(Transition<State> transition) {
-                        switch (transition.getTo().getState()) {
-                          case FINISHED:
-                            rescheduleIfService.execute();
-                            break;
-
-                          case RESTARTING:
-                            addWork(WorkCommand.KILL);
-                            break;
-
-                          case PREEMPTING:
-                            addWork(WorkCommand.KILL);
-                            break;
-
-                          case FAILED:
-                            incrementFailuresMaybeReschedule.execute();
-                            break;
-
-                          case KILLED:
-                            addWork(WorkCommand.RESCHEDULE);
-                            break;
-
-                          case KILLING:
-                            addWork(WorkCommand.KILL);
-                            break;
-
-                          case LOST:
-                            addWork(WorkCommand.RESCHEDULE);
-                            break;
-
-                          case UNKNOWN:
-                            // The slave previously acknowledged that it had the task, and now
-                            // stopped reporting it.
-                            updateState(ScheduleStatus.LOST);
-                            break;
-
-                           default:
-                             // No-op.
-                        }
-                      }
-                    }
-                ))
-        .addState(
-            Rule.from(RUNNING)
-                .to(FINISHED, RESTARTING, FAILED, KILLING, KILLED, LOST, PREEMPTING)
-                .withCallback(
-                    new Closure<Transition<State>>() {
-                      @SuppressWarnings("fallthrough")
-                      @Override public void execute(Transition<State> transition) {
-                        switch (transition.getTo().getState()) {
-                          case FINISHED:
-                            rescheduleIfService.execute();
-                            break;
-
-                          case PREEMPTING:
-                            addWork(WorkCommand.KILL);
-                            break;
-
-                          case RESTARTING:
-                            addWork(WorkCommand.KILL);
-                            break;
-
-                          case FAILED:
-                            incrementFailuresMaybeReschedule.execute();
-                            break;
-
-                          case KILLED:
-                            addWork(WorkCommand.RESCHEDULE);
-                            break;
-
-                          case KILLING:
-                            addWork(WorkCommand.KILL);
-                            break;
-
-                          case LOST:
-                            addWork(WorkCommand.RESCHEDULE);
-                            break;
-
-                          case UNKNOWN:
-                            updateState(ScheduleStatus.LOST);
-                            break;
-
-                           default:
-                             // No-op.
-                        }
-                      }
-                    }
-                ))
-        .addState(
-            Rule.from(FINISHED)
-                .to(UNKNOWN)
-                .withCallback(manageTerminatedTasks))
-        .addState(
-            Rule.from(PREEMPTING)
-                .to(FINISHED, FAILED, KILLING, KILLED, LOST)
-                .withCallback(manageRestartingTask))
-        .addState(
-            Rule.from(RESTARTING)
-                .to(FINISHED, FAILED, KILLING, KILLED, LOST)
-                .withCallback(manageRestartingTask))
-        .addState(
-            Rule.from(FAILED)
-                .to(UNKNOWN)
-                .withCallback(manageTerminatedTasks))
-        .addState(
-            Rule.from(KILLED)
-                .to(UNKNOWN)
-                .withCallback(manageTerminatedTasks))
-        .addState(
-            Rule.from(KILLING)
-                .to(FINISHED, FAILED, KILLED, LOST, UNKNOWN)
-                .withCallback(manageTerminatedTasks))
-        .addState(
-            Rule.from(LOST)
-                .to(UNKNOWN)
-                .withCallback(manageTerminatedTasks))
-        .addState(
-            Rule.from(UNKNOWN)
-                .noTransitions()
-                .withCallback(manageTerminatedTasks))
-        // Since we want this action to be performed last in the transition sequence, the callback
-        // must be the last chained transition callback.
-        .onAnyTransition(
-            new Closure<Transition<State>>() {
-              @Override public void execute(final Transition<State> transition) {
-                ScheduleStatus from = transition.getFrom().getState();
-                ScheduleStatus to = transition.getTo().getState();
-
-                if (transition.isValidStateChange() && (to != ScheduleStatus.UNKNOWN)
-                    // Prevent an update when killing a pending task, since the task is deleted
-                    // prior to the update.
-                    && !((from == ScheduleStatus.PENDING) && (to == ScheduleStatus.KILLING))) {
-                  addWork(WorkCommand.UPDATE_STATE, transition.getTo().getMutation());
-                } else if (!transition.isAllowed()) {
-                  LOG.log(Level.SEVERE, "Illegal state transition attempted: " + transition);
-                  ILLEGAL_TRANSITIONS.incrementAndGet();
-                }
-
-                if (transition.isValidStateChange()) {
-                  previousState = from;
-                }
-              }
-            }
-        )
-        // TODO(wfarner): Consider alternatives to allow exceptions to surface.  This would allow
-        // the state machine to surface illegal state transitions and propagate better information
-        // to the caller.  As it stands, the caller must implement logic that really belongs in
-        // the state machine.  For example, preventing RESTARTING->UPDATING transitions
-        // (or for that matter, almost any user-initiated state transition) is awkward.
-        .throwOnBadTransition(false)
-        .build();
-  }
-
-  private Closure<Transition<State>> addWorkClosure(final WorkCommand work) {
-    return new Closure<Transition<State>>() {
-      @Override public void execute(Transition<State> item) {
-        addWork(work);
-      }
-    };
-  }
-
-  private void addWork(WorkCommand work) {
-    addWork(work, Functions.<IScheduledTask>identity());
-  }
-
-  private void addWork(WorkCommand work, Function<IScheduledTask, IScheduledTask> mutation) {
-    LOG.info("Adding work command " + work + " for " + this);
-    workSink.addWork(work, TaskStateMachine.this, mutation);
-  }
-
-  /**
-   * Same as {@link #updateState(ScheduleStatus, Function)}, but uses a noop mutation.
-   *
-   * @param status Status to apply to the task.
-   * @return {@code true} if the state change was allowed, {@code false} otherwise.
-   */
-  public synchronized boolean updateState(ScheduleStatus status) {
-    return updateState(status, Functions.<IScheduledTask>identity());
-  }
-
-  /**
-   * Same as {@link #updateState(ScheduleStatus, Function, Optional)}, but uses a noop mutation.
-   *
-   * @param status Status to apply to the task.
-   * @param auditMessage The (optional) audit message to associate with the transition.
-   * @return {@code true} if the state change was allowed, {@code false} otherwise.
-   */
-  public synchronized boolean updateState(ScheduleStatus status, Optional<String> auditMessage) {
-    return updateState(status, Functions.<IScheduledTask>identity(), auditMessage);
-  }
-
-  /**
-   * Same as {@link #updateState(ScheduleStatus, Function, Optional)}, but omits the audit message.
-   *
-   * @param status Status to apply to the task.
-   * @param mutation Mutate operation to perform while updating the task.
-   * @return {@code true} if the state change was allowed, {@code false} otherwise.
-   */
-  public synchronized boolean updateState(
-      ScheduleStatus status,
-      Function<IScheduledTask, IScheduledTask> mutation) {
-
-    return updateState(status, mutation, Optional.<String>absent());
-  }
-
-  /**
-   * Attempt to transition the state machine to the provided state.
-   * At the time this method returns, any work commands required to satisfy the state transition
-   * will be appended to the work queue.
-   *
-   * @param status Status to apply to the task.
-   * @param auditMessage The audit message to associate with the transition.
-   * @param mutation Mutate operation to perform while updating the task.
-   * @return {@code true} if the state change was allowed, {@code false} otherwise.
-   */
-  public synchronized boolean updateState(
-      final ScheduleStatus status,
-      Function<IScheduledTask, IScheduledTask> mutation,
-      final Optional<String> auditMessage) {
-
-    checkNotNull(status);
-    checkNotNull(mutation);
-    checkNotNull(auditMessage);
-
-    /**
-     * Don't bother applying noop state changes.  If we end up modifying task state without a
-     * state transition (e.g. storing resource consumption of a running task), we need to find
-     * a different way to suppress noop transitions.
-     */
-    if (stateMachine.getState().getState() != status) {
-      Function<IScheduledTask, IScheduledTask> operation = Functions.compose(mutation,
-          new Function<IScheduledTask, IScheduledTask>() {
-            @Override public IScheduledTask apply(IScheduledTask task) {
-              ScheduledTask builder = task.newBuilder();
-              builder.addToTaskEvents(new TaskEvent()
-                  .setTimestamp(clock.nowMillis())
-                  .setStatus(status)
-                  .setMessage(auditMessage.orNull())
-                  .setScheduler(LOCAL_HOST_SUPPLIER.get()));
-              return IScheduledTask.build(builder);
-            }
-          });
-      return stateMachine.transition(State.create(status, operation));
-    }
-
-    return false;
-  }
-
-  /**
-   * Fetch the current state from the state machine.
-   *
-   * @return The current state.
-   */
-  public synchronized ScheduleStatus getState() {
-    return stateMachine.getState().getState();
-  }
-
-  /**
-   * Gets the ID for the task that this state machine manages.
-   *
-   * @return The state machine's task ID.
-   */
-  public String getTaskId() {
-    return taskId;
-  }
-
-  /**
-   * Gets the previous state of this state machine.
-   *
-   * @return The state machine's previous state, or {@code null} if the state machine has not
-   *     transitioned since being created.
-   */
-  @Nullable
-  ScheduleStatus getPreviousState() {
-    return previousState;
-  }
-
-  @Override
-  public String toString() {
-    return getTaskId();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/state/UUIDGenerator.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/state/UUIDGenerator.java b/src/main/java/com/twitter/aurora/scheduler/state/UUIDGenerator.java
deleted file mode 100644
index d8de19c..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/state/UUIDGenerator.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package com.twitter.aurora.scheduler.state;
-
-import java.util.UUID;
-
-/**
- * Wraps {@link java.util.UUID#randomUUID()} to facilitate unit testing.
- */
-interface UUIDGenerator {
-  UUID createNew();
-
-  class UUIDGeneratorImpl implements UUIDGenerator {
-    @Override
-    public UUID createNew() {
-      return UUID.randomUUID();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/state/WorkCommand.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/state/WorkCommand.java b/src/main/java/com/twitter/aurora/scheduler/state/WorkCommand.java
deleted file mode 100644
index 6c5637d..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/state/WorkCommand.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.state;
-
-/**
- * Descriptions of the different types of external work commands that task state machines may
- * trigger.
- */
-enum WorkCommand {
-  // Send an instruction for the runner of this task to kill the task.
-  KILL,
-  // Create a new state machine with a copy of this task.
-  RESCHEDULE,
-  // Update the task's state (schedule status) in the persistent store to match the state machine.
-  UPDATE_STATE,
-  // Delete this task from the persistent store.
-  DELETE,
-  // Increment the failure count for this task.
-  INCREMENT_FAILURES
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/stats/AsyncStatsModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/stats/AsyncStatsModule.java b/src/main/java/com/twitter/aurora/scheduler/stats/AsyncStatsModule.java
deleted file mode 100644
index a52c5c5..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/stats/AsyncStatsModule.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.stats;
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import javax.inject.Inject;
-import javax.inject.Singleton;
-
-import com.google.common.base.Function;
-import com.google.common.collect.FluentIterable;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.inject.AbstractModule;
-import com.google.inject.BindingAnnotation;
-
-import org.apache.mesos.Protos.Offer;
-
-import com.twitter.aurora.gen.Quota;
-import com.twitter.aurora.scheduler.async.OfferQueue;
-import com.twitter.aurora.scheduler.configuration.Resources;
-import com.twitter.aurora.scheduler.stats.SlotSizeCounter.ResourceSlotProvider;
-import com.twitter.aurora.scheduler.storage.entities.IQuota;
-import com.twitter.common.application.modules.LifecycleModule;
-import com.twitter.common.args.Arg;
-import com.twitter.common.args.CmdLine;
-import com.twitter.common.base.Command;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Data;
-import com.twitter.common.quantity.Time;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Module to configure export of cluster-wide resource allocation and consumption statistics.
- */
-public class AsyncStatsModule extends AbstractModule {
-
-  @CmdLine(name = "async_task_stat_update_interval",
-      help = "Interval on which to try to update resource consumption stats.")
-  private static final Arg<Amount<Long, Time>> TASK_STAT_INTERVAL =
-      Arg.create(Amount.of(1L, Time.HOURS));
-
-  @CmdLine(name = "async_slot_stat_update_interval",
-      help = "Interval on which to try to update open slot stats.")
-  private static final Arg<Amount<Long, Time>> SLOT_STAT_INTERVAL =
-      Arg.create(Amount.of(1L, Time.MINUTES));
-
-  @BindingAnnotation
-  @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
-  private @interface StatExecutor { }
-
-  @Override
-  protected void configure() {
-    final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(
-        new ThreadFactoryBuilder().setNameFormat("AsyncStat-%d").setDaemon(true).build());
-
-    bind(TaskStatCalculator.class).in(Singleton.class);
-    bind(CachedCounters.class).in(Singleton.class);
-    bind(ResourceSlotProvider.class).to(OfferAdapter.class);
-    bind(SlotSizeCounter.class).in(Singleton.class);
-
-    bind(ScheduledExecutorService.class).annotatedWith(StatExecutor.class).toInstance(executor);
-    LifecycleModule.bindStartupAction(binder(), StatUpdater.class);
-  }
-
-  static class StatUpdater implements Command {
-    private final ScheduledExecutorService executor;
-    private final TaskStatCalculator taskStats;
-    private final SlotSizeCounter slotCounter;
-
-    @Inject
-    StatUpdater(
-        @StatExecutor ScheduledExecutorService executor,
-        TaskStatCalculator taskStats,
-        SlotSizeCounter slotCounter) {
-
-      this.executor = checkNotNull(executor);
-      this.taskStats = checkNotNull(taskStats);
-      this.slotCounter = checkNotNull(slotCounter);
-    }
-
-    @Override
-    public void execute() {
-      long taskInterval = TASK_STAT_INTERVAL.get().as(Time.SECONDS);
-      executor.scheduleAtFixedRate(taskStats, taskInterval, taskInterval, TimeUnit.SECONDS);
-      long slotInterval = SLOT_STAT_INTERVAL.get().as(Time.SECONDS);
-      executor.scheduleAtFixedRate(slotCounter, slotInterval, slotInterval, TimeUnit.SECONDS);
-    }
-  }
-
-  static class OfferAdapter implements ResourceSlotProvider {
-    private static final Function<Offer, IQuota> TO_QUOTA = new Function<Offer, IQuota>() {
-      @Override public IQuota apply(Offer offer) {
-        Resources resources = Resources.from(offer);
-        return IQuota.build(new Quota()
-            .setNumCpus(resources.getNumCpus())
-            .setRamMb(resources.getRam().as(Data.MB))
-            .setDiskMb(resources.getDisk().as(Data.MB)));
-      }
-    };
-
-    private final OfferQueue offerQueue;
-
-    @Inject
-    OfferAdapter(OfferQueue offerQueue) {
-      this.offerQueue = checkNotNull(offerQueue);
-    }
-
-    @Override
-    public Iterable<IQuota> get() {
-      Iterable<Offer> offers = offerQueue.getOffers();
-      return FluentIterable.from(offers).transform(TO_QUOTA);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/stats/CachedCounters.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/stats/CachedCounters.java b/src/main/java/com/twitter/aurora/scheduler/stats/CachedCounters.java
deleted file mode 100644
index 81d6811..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/stats/CachedCounters.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.stats;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.inject.Inject;
-
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-
-import com.twitter.common.stats.StatsProvider;
-
-/**
- * A cache of stats, allowing counters to be fetched and reused based on their names.
- */
-class CachedCounters {
-  private final LoadingCache<String, AtomicLong> cache;
-
-  @Inject
-  CachedCounters(final StatsProvider stats) {
-    cache = CacheBuilder.newBuilder().build(
-        new CacheLoader<String, AtomicLong>() {
-          @Override public AtomicLong load(String key) {
-            return stats.makeCounter(key);
-          }
-        }
-    );
-  }
-
-  AtomicLong get(String name) {
-    return cache.getUnchecked(name);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/stats/ResourceCounter.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/stats/ResourceCounter.java b/src/main/java/com/twitter/aurora/scheduler/stats/ResourceCounter.java
deleted file mode 100644
index 7b96e86..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/stats/ResourceCounter.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.stats;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-import javax.inject.Inject;
-
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.collect.Iterables;
-
-import com.twitter.aurora.scheduler.base.Query;
-import com.twitter.aurora.scheduler.base.Tasks;
-import com.twitter.aurora.scheduler.configuration.ConfigurationManager;
-import com.twitter.aurora.scheduler.storage.Storage;
-import com.twitter.aurora.scheduler.storage.Storage.StorageException;
-import com.twitter.aurora.scheduler.storage.Storage.StoreProvider;
-import com.twitter.aurora.scheduler.storage.Storage.Work;
-import com.twitter.aurora.scheduler.storage.entities.IQuota;
-import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
-
-/**
- * Computes aggregate metrics about resource allocation and consumption in the scheduler.
- */
-public class ResourceCounter {
-  private final Storage storage;
-
-  @Inject
-  ResourceCounter(Storage storage) {
-    this.storage = Preconditions.checkNotNull(storage);
-  }
-
-  private Iterable<ITaskConfig> getTasks(Query.Builder query) throws StorageException {
-    return Iterables.transform(
-        Storage.Util.consistentFetchTasks(storage, query),
-        Tasks.SCHEDULED_TO_INFO);
-  }
-
-  /**
-   * Computes totals for each of the {@link MetricType}s.
-   *
-   * @return aggregates for each global metric type.
-   * @throws StorageException if there was a problem fetching tasks from storage.
-   */
-  public List<GlobalMetric> computeConsumptionTotals() throws StorageException {
-    List<GlobalMetric> counts = Arrays.asList(
-        new GlobalMetric(MetricType.TOTAL_CONSUMED),
-        new GlobalMetric(MetricType.DEDICATED_CONSUMED),
-        new GlobalMetric(MetricType.QUOTA_CONSUMED),
-        new GlobalMetric(MetricType.FREE_POOL_CONSUMED));
-
-    for (ITaskConfig task : getTasks(Query.unscoped().active())) {
-      for (GlobalMetric count : counts) {
-        count.accumulate(task);
-      }
-    }
-    return counts;
-  }
-
-  /**
-   * Computes total quota allocations.
-   *
-   * @return Total allocated quota.
-   * @throws StorageException if there was a problem fetching quotas from storage.
-   */
-  public Metric computeQuotaAllocationTotals() throws StorageException {
-    return storage.weaklyConsistentRead(new Work.Quiet<Metric>() {
-      @Override public Metric apply(StoreProvider storeProvider) {
-        Metric allocation = new Metric();
-        for (IQuota quota : storeProvider.getQuotaStore().fetchQuotas().values()) {
-          allocation.accumulate(quota);
-        }
-        return allocation;
-      }
-    });
-  }
-
-  /**
-   * Computes arbitrary resource aggregates based on a query, a filter, and a grouping function.
-   *
-   * @param query Query to select tasks for aggregation.
-   * @param filter Filter to apply on query result tasks.
-   * @param keyFunction Function to define aggregation groupings.
-   * @param <K> Key type.
-   * @return A map from the keys to their aggregates based on the tasks fetched.
-   * @throws StorageException if there was a problem fetching tasks from storage.
-   */
-  public <K> Map<K, Metric> computeAggregates(
-      Query.Builder query,
-      Predicate<ITaskConfig> filter,
-      Function<ITaskConfig, K> keyFunction) throws StorageException {
-
-    LoadingCache<K, Metric> metrics = CacheBuilder.newBuilder()
-        .build(new CacheLoader<K, Metric>() {
-          @Override public Metric load(K key) {
-            return new Metric();
-          }
-        });
-    for (ITaskConfig task : Iterables.filter(getTasks(query), filter)) {
-      metrics.getUnchecked(keyFunction.apply(task)).accumulate(task);
-    }
-    return metrics.asMap();
-  }
-
-  public enum MetricType {
-    TOTAL_CONSUMED(Predicates.<ITaskConfig>alwaysTrue()),
-    DEDICATED_CONSUMED(new Predicate<ITaskConfig>() {
-      @Override public boolean apply(ITaskConfig task) {
-        return ConfigurationManager.isDedicated(task);
-      }
-    }),
-    QUOTA_CONSUMED(new Predicate<ITaskConfig>() {
-      @Override public boolean apply(ITaskConfig task) {
-        return task.isProduction();
-      }
-    }),
-    FREE_POOL_CONSUMED(new Predicate<ITaskConfig>() {
-      @Override public boolean apply(ITaskConfig task) {
-        return !ConfigurationManager.isDedicated(task) && !task.isProduction();
-      }
-    });
-
-    public final Predicate<ITaskConfig> filter;
-
-    MetricType(Predicate<ITaskConfig> filter) {
-      this.filter = filter;
-    }
-  }
-
-  public static class GlobalMetric extends Metric {
-    public final MetricType type;
-
-    public GlobalMetric(MetricType type) {
-      this.type = type;
-    }
-
-    @Override
-    protected void accumulate(ITaskConfig task) {
-      if (type.filter.apply(task)) {
-        super.accumulate(task);
-      }
-    }
-  }
-
-  public static class Metric {
-    private long cpu = 0;
-    private long ramMb = 0;
-    private long diskMb = 0;
-
-    public Metric() {
-      this.cpu = 0;
-      this.ramMb = 0;
-      this.diskMb = 0;
-    }
-
-    public Metric(Metric copy) {
-      this.cpu = copy.cpu;
-      this.ramMb = copy.ramMb;
-      this.diskMb = copy.diskMb;
-    }
-
-    protected void accumulate(ITaskConfig task) {
-      cpu += task.getNumCpus();
-      ramMb += task.getRamMb();
-      diskMb += task.getDiskMb();
-    }
-
-    protected void accumulate(IQuota quota) {
-      cpu += quota.getNumCpus();
-      ramMb += quota.getRamMb();
-      diskMb += quota.getDiskMb();
-    }
-
-    public long getCpu() {
-      return cpu;
-    }
-
-    public long getRamGb() {
-      return ramMb / 1024;
-    }
-
-    public long getDiskGb() {
-      return diskMb / 1024;
-    }
-  }
-}