You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2013/12/31 22:20:33 UTC

[40/51] [partial] Rename twitter* and com.twitter to apache and org.apache directories to preserve all file history before the refactor.

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemStorage.java b/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemStorage.java
deleted file mode 100644
index 438c023..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemStorage.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.storage.mem;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.inject.Inject;
-
-import com.google.common.annotations.VisibleForTesting;
-
-import com.twitter.aurora.scheduler.storage.AttributeStore;
-import com.twitter.aurora.scheduler.storage.JobStore;
-import com.twitter.aurora.scheduler.storage.LockStore;
-import com.twitter.aurora.scheduler.storage.QuotaStore;
-import com.twitter.aurora.scheduler.storage.ReadWriteLockManager;
-import com.twitter.aurora.scheduler.storage.SchedulerStore;
-import com.twitter.aurora.scheduler.storage.Storage;
-import com.twitter.aurora.scheduler.storage.TaskStore;
-import com.twitter.common.inject.TimedInterceptor.Timed;
-import com.twitter.common.stats.Stats;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * A storage implementation comprised of individual in-memory store implementations.
- * <p>
- * This storage has a global read-write lock, which is used when invoking
- * {@link #consistentRead(Work)} and {@link #write(MutateWork)}.  However, no locks are used at this
- * level for {@link #weaklyConsistentRead(Work)}. It is the responsibility of the
- * individual stores to ensure that read operations are thread-safe (optimally supporting
- * concurrency).  Store implementations may assume that all methods invoked on {@code Mutable}
- * store interfaces are protected by the global write lock, and thus invoked serially.
- */
-public class MemStorage implements Storage {
-  private final AtomicLong readLockWaitNanos = Stats.exportLong("read_lock_wait_nanos");
-  private final AtomicLong writeLockWaitNanos = Stats.exportLong("write_lock_wait_nanos");
-
-  private final MutableStoreProvider storeProvider;
-  private final ReadWriteLockManager lockManager = new ReadWriteLockManager();
-
-  @Inject
-  MemStorage(
-      final SchedulerStore.Mutable schedulerStore,
-      final JobStore.Mutable jobStore,
-      final TaskStore.Mutable taskStore,
-      final LockStore.Mutable lockStore,
-      final QuotaStore.Mutable quotaStore,
-      final AttributeStore.Mutable attributeStore) {
-
-    storeProvider = new MutableStoreProvider() {
-      @Override public SchedulerStore.Mutable getSchedulerStore() {
-        return schedulerStore;
-      }
-
-      @Override public JobStore.Mutable getJobStore() {
-        return jobStore;
-      }
-
-      @Override public TaskStore getTaskStore() {
-        return taskStore;
-      }
-
-      @Override public TaskStore.Mutable getUnsafeTaskStore() {
-        return taskStore;
-      }
-
-      @Override public LockStore.Mutable getLockStore() {
-        return lockStore;
-      }
-
-      @Override public QuotaStore.Mutable getQuotaStore() {
-        return quotaStore;
-      }
-
-      @Override public AttributeStore.Mutable getAttributeStore() {
-        return attributeStore;
-      }
-    };
-  }
-
-  /**
-   * Creates a new empty in-memory storage for use in testing.
-   */
-  @VisibleForTesting
-  public static MemStorage newEmptyStorage() {
-    return new MemStorage(
-        new MemSchedulerStore(),
-        new MemJobStore(),
-        new MemTaskStore(),
-        new MemLockStore(),
-        new MemQuotaStore(),
-        new MemAttributeStore());
-  }
-
-  @Timed("mem_storage_consistent_read_operation")
-  @Override
-  public <T, E extends Exception> T consistentRead(Work<T, E> work) throws StorageException, E {
-    checkNotNull(work);
-
-    long lockStartNanos = System.nanoTime();
-    boolean topLevelOperation = lockManager.readLock();
-    if (topLevelOperation) {
-      readLockWaitNanos.addAndGet(System.nanoTime() - lockStartNanos);
-    }
-    try {
-      return work.apply(storeProvider);
-    } finally {
-      lockManager.readUnlock();
-    }
-  }
-
-  @Timed("mem_storage_weakly_consistent_read_operation")
-  @Override
-  public <T, E extends Exception> T weaklyConsistentRead(Work<T, E> work)
-      throws StorageException, E {
-
-    return work.apply(storeProvider);
-  }
-
-  @Timed("mem_storage_write_operation")
-  @Override
-  public <T, E extends Exception> T write(MutateWork<T, E> work)
-      throws StorageException, E {
-
-    checkNotNull(work);
-
-    long lockStartNanos = System.nanoTime();
-    boolean topLevelOperation = lockManager.writeLock();
-    if (topLevelOperation) {
-      writeLockWaitNanos.addAndGet(System.nanoTime() - lockStartNanos);
-    }
-    try {
-      return work.apply(storeProvider);
-    } finally {
-      lockManager.writeUnlock();
-    }
-  }
-
-  @Override
-  public void snapshot() {
-    // No-op.
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemStorageModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemStorageModule.java b/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemStorageModule.java
deleted file mode 100644
index a99aa12..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemStorageModule.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.storage.mem;
-
-import javax.inject.Singleton;
-
-import com.google.inject.Key;
-import com.google.inject.PrivateModule;
-
-import com.twitter.aurora.scheduler.storage.AttributeStore;
-import com.twitter.aurora.scheduler.storage.JobStore;
-import com.twitter.aurora.scheduler.storage.LockStore;
-import com.twitter.aurora.scheduler.storage.QuotaStore;
-import com.twitter.aurora.scheduler.storage.SchedulerStore;
-import com.twitter.aurora.scheduler.storage.Storage;
-import com.twitter.aurora.scheduler.storage.Storage.Volatile;
-import com.twitter.aurora.scheduler.storage.TaskStore;
-import com.twitter.common.inject.Bindings.KeyFactory;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Binding module for an in-memory storage system.
- * <p>
- * Exposes bindings for storage components:
- * <ul>
- *   <li>{@link com.twitter.aurora.scheduler.storage.Storage}</li>
- *   <li>Keyed with keys provided by the provided{@code keyFactory}:</li>
- *     <ul>
- *       <li>{@link com.twitter.aurora.scheduler.storage.SchedulerStore}</li>
- *       <li>{@link com.twitter.aurora.scheduler.storage.JobStore}</li>
- *       <li>{@link com.twitter.aurora.scheduler.storage.TaskStore}</li>
- *       <li>{@link com.twitter.aurora.scheduler.storage.LockStore}</li>
- *       <li>{@link com.twitter.aurora.scheduler.storage.QuotaStore}</li>
- *       <li>{@link com.twitter.aurora.scheduler.storage.AttributeStore}</li>
- *     </ul>
- * </ul>
- */
-public final class MemStorageModule extends PrivateModule {
-
-  private final KeyFactory keyFactory;
-
-  public MemStorageModule(KeyFactory keyFactory) {
-    this.keyFactory = checkNotNull(keyFactory);
-  }
-
-  private <T> void bindStore(Class<T> binding, Class<? extends T> impl) {
-    bind(binding).to(impl);
-    bind(impl).in(Singleton.class);
-    Key<T> key = keyFactory.create(binding);
-    bind(key).to(impl);
-    expose(key);
-  }
-
-  @Override
-  protected void configure() {
-    Key<Storage> storageKey = keyFactory.create(Storage.class);
-    bind(storageKey).to(MemStorage.class);
-    expose(storageKey);
-    Key<Storage> exposedMemStorageKey = Key.get(Storage.class, Volatile.class);
-    bind(exposedMemStorageKey).to(MemStorage.class);
-    expose(exposedMemStorageKey);
-    bind(MemStorage.class).in(Singleton.class);
-
-    bindStore(SchedulerStore.Mutable.class, MemSchedulerStore.class);
-    bindStore(JobStore.Mutable.class, MemJobStore.class);
-    bindStore(TaskStore.Mutable.class, MemTaskStore.class);
-    bindStore(LockStore.Mutable.class, MemLockStore.class);
-    bindStore(QuotaStore.Mutable.class, MemQuotaStore.class);
-    bindStore(AttributeStore.Mutable.class, MemAttributeStore.class);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemTaskStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemTaskStore.java b/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemTaskStore.java
deleted file mode 100644
index d02511f..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemTaskStore.java
+++ /dev/null
@@ -1,304 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.storage.mem;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import com.google.common.base.Function;
-import com.google.common.base.Functions;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
-
-import org.apache.commons.lang.StringUtils;
-
-import com.twitter.aurora.gen.ScheduledTask;
-import com.twitter.aurora.gen.TaskConfig;
-import com.twitter.aurora.gen.TaskQuery;
-import com.twitter.aurora.scheduler.base.JobKeys;
-import com.twitter.aurora.scheduler.base.Query;
-import com.twitter.aurora.scheduler.base.Tasks;
-import com.twitter.aurora.scheduler.storage.TaskStore;
-import com.twitter.aurora.scheduler.storage.entities.IJobKey;
-import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
-import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
-import com.twitter.common.args.Arg;
-import com.twitter.common.args.CmdLine;
-import com.twitter.common.base.MorePreconditions;
-import com.twitter.common.inject.TimedInterceptor.Timed;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.stats.Stats;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * An in-memory task store.
- */
-class MemTaskStore implements TaskStore.Mutable {
-
-  private static final Logger LOG = Logger.getLogger(MemTaskStore.class.getName());
-
-  @CmdLine(name = "slow_query_log_threshold",
-      help = "Log all queries that take at least this long to execute.")
-  private static final Arg<Amount<Long, Time>> SLOW_QUERY_LOG_THRESHOLD =
-      Arg.create(Amount.of(25L, Time.MILLISECONDS));
-
-  private final long slowQueryThresholdNanos = SLOW_QUERY_LOG_THRESHOLD.get().as(Time.NANOSECONDS);
-
-  private final Map<String, Task> tasks = Maps.newConcurrentMap();
-  private final Multimap<IJobKey, String> tasksByJobKey =
-      Multimaps.synchronizedSetMultimap(HashMultimap.<IJobKey, String>create());
-
-  // An interner is used here to collapse equivalent TaskConfig instances into canonical instances.
-  // Ideally this would fall out of the object hierarchy (TaskConfig being associated with the job
-  // rather than the task), but we intuit this detail here for performance reasons.
-  private final Interner<TaskConfig, String> configInterner = new Interner<TaskConfig, String>();
-
-  private final AtomicLong taskQueriesById = Stats.exportLong("task_queries_by_id");
-  private final AtomicLong taskQueriesByJob = Stats.exportLong("task_queries_by_job");
-  private final AtomicLong taskQueriesAll = Stats.exportLong("task_queries_all");
-
-  @Timed("mem_storage_fetch_tasks")
-  @Override
-  public ImmutableSet<IScheduledTask> fetchTasks(Query.Builder query) {
-    checkNotNull(query);
-
-    long start = System.nanoTime();
-    ImmutableSet<IScheduledTask> result = matches(query.get()).toSet();
-    long durationNanos = System.nanoTime() - start;
-    Level level = (durationNanos >= slowQueryThresholdNanos) ? Level.INFO : Level.FINE;
-    if (LOG.isLoggable(level)) {
-      Long time = Amount.of(durationNanos, Time.NANOSECONDS).as(Time.MILLISECONDS);
-      LOG.log(level, "Query took " + time + " ms: " + query.get());
-    }
-
-    return result;
-  }
-
-  private final Function<IScheduledTask, Task> toTask =
-      new Function<IScheduledTask, Task>() {
-        @Override public Task apply(IScheduledTask task) {
-          return new Task(task, configInterner);
-        }
-      };
-
-  @Timed("mem_storage_save_tasks")
-  @Override
-  public void saveTasks(Set<IScheduledTask> newTasks) {
-    checkNotNull(newTasks);
-    Preconditions.checkState(Tasks.ids(newTasks).size() == newTasks.size(),
-        "Proposed new tasks would create task ID collision.");
-
-    Iterable<Task> canonicalized = Iterables.transform(newTasks, toTask);
-    tasks.putAll(Maps.uniqueIndex(canonicalized, TO_ID));
-    tasksByJobKey.putAll(taskIdsByJobKey(canonicalized));
-  }
-
-  private Multimap<IJobKey, String> taskIdsByJobKey(Iterable<Task> toIndex) {
-    return Multimaps.transformValues(
-        Multimaps.index(toIndex, Functions.compose(Tasks.SCHEDULED_TO_JOB_KEY, TO_SCHEDULED)),
-        TO_ID);
-  }
-
-  @Timed("mem_storage_delete_all_tasks")
-  @Override
-  public void deleteAllTasks() {
-    tasks.clear();
-    tasksByJobKey.clear();
-    configInterner.clear();
-  }
-
-  @Timed("mem_storage_delete_tasks")
-  @Override
-  public void deleteTasks(Set<String> taskIds) {
-    checkNotNull(taskIds);
-
-    for (String id : taskIds) {
-      Task removed = tasks.remove(id);
-      if (removed != null) {
-        tasksByJobKey.remove(Tasks.SCHEDULED_TO_JOB_KEY.apply(removed.task), id);
-        configInterner.removeAssociation(removed.task.getAssignedTask().getTask().newBuilder(), id);
-      }
-    }
-  }
-
-  @Timed("mem_storage_mutate_tasks")
-  @Override
-  public ImmutableSet<IScheduledTask> mutateTasks(
-      Query.Builder query,
-      Function<IScheduledTask, IScheduledTask> mutator) {
-
-    checkNotNull(query);
-    checkNotNull(mutator);
-
-    ImmutableSet.Builder<IScheduledTask> mutated = ImmutableSet.builder();
-    for (IScheduledTask original : matches(query.get())) {
-      IScheduledTask maybeMutated = mutator.apply(original);
-      if (!original.equals(maybeMutated)) {
-        Preconditions.checkState(
-            Tasks.id(original).equals(Tasks.id(maybeMutated)),
-            "A task's ID may not be mutated.");
-        tasks.put(Tasks.id(maybeMutated), toTask.apply(maybeMutated));
-        mutated.add(maybeMutated);
-      }
-    }
-
-    return mutated.build();
-  }
-
-  @Timed("mem_storage_unsafe_modify_in_place")
-  @Override
-  public boolean unsafeModifyInPlace(String taskId, ITaskConfig taskConfiguration) {
-    MorePreconditions.checkNotBlank(taskId);
-    checkNotNull(taskConfiguration);
-
-    Task stored = tasks.get(taskId);
-    if (stored == null) {
-      return false;
-    } else {
-      ScheduledTask updated = stored.task.newBuilder();
-      updated.getAssignedTask().setTask(taskConfiguration.newBuilder());
-      tasks.put(taskId, toTask.apply(IScheduledTask.build(updated)));
-      return true;
-    }
-  }
-
-  private static Predicate<IScheduledTask> queryFilter(final TaskQuery query) {
-    return new Predicate<IScheduledTask>() {
-      @Override public boolean apply(IScheduledTask task) {
-        ITaskConfig config = task.getAssignedTask().getTask();
-        if (query.getOwner() != null) {
-          if (!StringUtils.isBlank(query.getOwner().getRole())) {
-            if (!query.getOwner().getRole().equals(config.getOwner().getRole())) {
-              return false;
-            }
-          }
-          if (!StringUtils.isBlank(query.getOwner().getUser())) {
-            if (!query.getOwner().getUser().equals(config.getOwner().getUser())) {
-              return false;
-            }
-          }
-        }
-        if (query.getEnvironment() != null) {
-          if (!query.getEnvironment().equals(config.getEnvironment())) {
-            return false;
-          }
-        }
-        if (query.getJobName() != null) {
-          if (!query.getJobName().equals(config.getJobName())) {
-            return false;
-          }
-        }
-
-        if (query.getTaskIds() != null) {
-          if (!query.getTaskIds().contains(Tasks.id(task))) {
-            return false;
-          }
-        }
-
-        if (query.getStatusesSize() > 0) {
-          if (!query.getStatuses().contains(task.getStatus())) {
-            return false;
-          }
-        }
-        if (!StringUtils.isEmpty(query.getSlaveHost())) {
-          if (!query.getSlaveHost().equals(task.getAssignedTask().getSlaveHost())) {
-            return false;
-          }
-        }
-        if (query.getInstanceIdsSize() > 0) {
-          if (!query.getInstanceIds().contains(task.getAssignedTask().getInstanceId())) {
-            return false;
-          }
-        }
-
-        return true;
-      }
-    };
-  }
-
-  private Iterable<Task> fromIdIndex(Iterable<String> taskIds) {
-    ImmutableList.Builder<Task> matches = ImmutableList.builder();
-    for (String id : taskIds) {
-      Task match = tasks.get(id);
-      if (match != null) {
-        matches.add(match);
-      }
-    }
-    return matches.build();
-  }
-
-  private FluentIterable<IScheduledTask> matches(TaskQuery query) {
-    // Apply the query against the working set.
-    Iterable<Task> from;
-    Optional<IJobKey> jobKey = JobKeys.from(Query.arbitrary(query));
-    if (query.isSetTaskIds()) {
-      taskQueriesById.incrementAndGet();
-      from = fromIdIndex(query.getTaskIds());
-    } else if (jobKey.isPresent()) {
-      taskQueriesByJob.incrementAndGet();
-      Collection<String> taskIds = tasksByJobKey.get(jobKey.get());
-      if (taskIds == null) {
-        from = ImmutableList.of();
-      } else {
-        from = fromIdIndex(taskIds);
-      }
-    } else {
-      taskQueriesAll.incrementAndGet();
-      from = tasks.values();
-    }
-
-    return FluentIterable.from(from).transform(TO_SCHEDULED).filter(queryFilter(query));
-  }
-
-  private static final Function<Task, IScheduledTask> TO_SCHEDULED =
-      new Function<Task, IScheduledTask>() {
-        @Override public IScheduledTask apply(Task task) {
-          return task.task;
-        }
-      };
-
-  private static final Function<Task, String> TO_ID =
-      Functions.compose(Tasks.SCHEDULED_TO_ID, TO_SCHEDULED);
-
-  private static class Task {
-    private final IScheduledTask task;
-
-    Task(IScheduledTask task, Interner<TaskConfig, String> interner) {
-      interner.removeAssociation(task.getAssignedTask().getTask().newBuilder(), Tasks.id(task));
-      TaskConfig canonical = interner.addAssociation(
-          task.getAssignedTask().getTask().newBuilder(),
-          Tasks.id(task));
-      ScheduledTask builder = task.newBuilder();
-      builder.getAssignedTask().setTask(canonical);
-      this.task = IScheduledTask.build(builder);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/storage/mem/Util.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/storage/mem/Util.java b/src/main/java/com/twitter/aurora/scheduler/storage/mem/Util.java
deleted file mode 100644
index cf1b057..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/storage/mem/Util.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.storage.mem;
-
-import javax.annotation.Nullable;
-
-import com.google.common.base.Function;
-
-import org.apache.thrift.TBase;
-
-/**
- * Utility class for common operations amongst in-memory store implementations.
- */
-final class Util {
-
-  private Util() {
-    // Utility class.
-  }
-
-  /**
-   * Creates a function that performs a 'deep copy' on a thrift struct of a specific type.  The
-   * resulting copied objects will be exactly identical to the original.  Mutations to the original
-   * object will not be reflected in the copy, and vice versa.
-   *
-   * @return A copier for the provided type of thrift structs.
-   */
-  static <T extends TBase<T, ?>> Function<T, T> deepCopier() {
-    return new Function<T, T>() {
-      @Override public T apply(@Nullable T input) {
-        if (input == null) {
-          return null;
-        }
-
-        @SuppressWarnings("unchecked")
-        T t = (T) input.deepCopy();
-        return t;
-      }
-    };
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/storage/testing/StorageTestUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/storage/testing/StorageTestUtil.java b/src/main/java/com/twitter/aurora/scheduler/storage/testing/StorageTestUtil.java
deleted file mode 100644
index d735828..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/storage/testing/StorageTestUtil.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.storage.testing;
-
-import com.google.common.collect.ImmutableSet;
-
-import org.easymock.Capture;
-import org.easymock.IAnswer;
-import org.easymock.IExpectationSetters;
-
-import com.twitter.aurora.scheduler.base.Query;
-import com.twitter.aurora.scheduler.storage.AttributeStore;
-import com.twitter.aurora.scheduler.storage.JobStore;
-import com.twitter.aurora.scheduler.storage.LockStore;
-import com.twitter.aurora.scheduler.storage.QuotaStore;
-import com.twitter.aurora.scheduler.storage.SchedulerStore;
-import com.twitter.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import com.twitter.aurora.scheduler.storage.Storage.MutateWork;
-import com.twitter.aurora.scheduler.storage.Storage.NonVolatileStorage;
-import com.twitter.aurora.scheduler.storage.Storage.StoreProvider;
-import com.twitter.aurora.scheduler.storage.Storage.Work;
-import com.twitter.aurora.scheduler.storage.TaskStore;
-import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
-import com.twitter.common.testing.easymock.EasyMockTest;
-
-import static org.easymock.EasyMock.capture;
-import static org.easymock.EasyMock.expect;
-
-/**
- * Auxiliary class to simplify testing against a mocked storage.  This allows callers to directly
- * set up call expectations on individual stores rather than writing plumbing code to deal with
- * operations and {@link StoreProvider}.
- */
-public class StorageTestUtil {
-
-  public final StoreProvider storeProvider;
-  public final MutableStoreProvider mutableStoreProvider;
-  public final TaskStore.Mutable taskStore;
-  public final QuotaStore.Mutable quotaStore;
-  public final AttributeStore.Mutable attributeStore;
-  public final JobStore.Mutable jobStore;
-  public final LockStore.Mutable lockStore;
-  public final SchedulerStore.Mutable schedulerStore;
-  public final NonVolatileStorage storage;
-
-  /**
-   * Creates a new storage test utility.
-   *
-   * @param easyMock Mocking framework to use for setting up mocks and expectations.
-   */
-  public StorageTestUtil(EasyMockTest easyMock) {
-    this.storeProvider = easyMock.createMock(StoreProvider.class);
-    this.mutableStoreProvider = easyMock.createMock(MutableStoreProvider.class);
-    this.taskStore = easyMock.createMock(TaskStore.Mutable.class);
-    this.quotaStore = easyMock.createMock(QuotaStore.Mutable.class);
-    this.attributeStore = easyMock.createMock(AttributeStore.Mutable.class);
-    this.jobStore = easyMock.createMock(JobStore.Mutable.class);
-    this.lockStore = easyMock.createMock(LockStore.Mutable.class);
-    this.schedulerStore = easyMock.createMock(SchedulerStore.Mutable.class);
-    this.storage = easyMock.createMock(NonVolatileStorage.class);
-  }
-
-  private <T> IExpectationSetters<T> expectConsistentRead() {
-    final Capture<Work<T, RuntimeException>> work = EasyMockTest.createCapture();
-    return expect(storage.consistentRead(capture(work))).andAnswer(new IAnswer<T>() {
-      @Override public T answer() {
-        return work.getValue().apply(storeProvider);
-      }
-    });
-  }
-
-  private <T> IExpectationSetters<T> expectWeaklyConsistentRead() {
-    final Capture<Work<T, RuntimeException>> work = EasyMockTest.createCapture();
-    return expect(storage.weaklyConsistentRead(capture(work))).andAnswer(new IAnswer<T>() {
-      @Override public T answer() {
-        return work.getValue().apply(storeProvider);
-      }
-    });
-  }
-
-  private <T> IExpectationSetters<T> expectWriteOperation() {
-    final Capture<MutateWork<T, RuntimeException>> work = EasyMockTest.createCapture();
-    return expect(storage.write(capture(work))).andAnswer(new IAnswer<T>() {
-      @Override public T answer() {
-        return work.getValue().apply(mutableStoreProvider);
-      }
-    });
-  }
-
-  /**
-   * Expects any number of read or write operations.
-   */
-  public void expectOperations() {
-    expect(storeProvider.getTaskStore()).andReturn(taskStore).anyTimes();
-    expect(storeProvider.getQuotaStore()).andReturn(quotaStore).anyTimes();
-    expect(storeProvider.getAttributeStore()).andReturn(attributeStore).anyTimes();
-    expect(storeProvider.getJobStore()).andReturn(jobStore).anyTimes();
-    expect(storeProvider.getLockStore()).andReturn(lockStore).anyTimes();
-    expect(storeProvider.getSchedulerStore()).andReturn(schedulerStore).anyTimes();
-    expect(mutableStoreProvider.getTaskStore()).andReturn(taskStore).anyTimes();
-    expect(mutableStoreProvider.getUnsafeTaskStore()).andReturn(taskStore).anyTimes();
-    expect(mutableStoreProvider.getQuotaStore()).andReturn(quotaStore).anyTimes();
-    expect(mutableStoreProvider.getAttributeStore()).andReturn(attributeStore).anyTimes();
-    expect(mutableStoreProvider.getJobStore()).andReturn(jobStore).anyTimes();
-    expect(mutableStoreProvider.getLockStore()).andReturn(lockStore).anyTimes();
-    expect(mutableStoreProvider.getSchedulerStore()).andReturn(schedulerStore).anyTimes();
-    expectConsistentRead().anyTimes();
-    expectWeaklyConsistentRead().anyTimes();
-    expectWriteOperation().anyTimes();
-  }
-
-  public IExpectationSetters<?> expectTaskFetch(
-      Query.Builder query,
-      ImmutableSet<IScheduledTask> result) {
-
-    return expect(taskStore.fetchTasks(query)).andReturn(result);
-  }
-
-  public IExpectationSetters<?> expectTaskFetch(Query.Builder query, IScheduledTask... result) {
-    return expectTaskFetch(query, ImmutableSet.<IScheduledTask>builder().add(result).build());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/thrift/SchedulerAPIServlet.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/thrift/SchedulerAPIServlet.java b/src/main/java/com/twitter/aurora/scheduler/thrift/SchedulerAPIServlet.java
deleted file mode 100644
index 2acf5c8..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/thrift/SchedulerAPIServlet.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package com.twitter.aurora.scheduler.thrift;
-
-import javax.inject.Inject;
-
-import org.apache.thrift.protocol.TJSONProtocol;
-import org.apache.thrift.server.TServlet;
-
-import com.twitter.aurora.gen.AuroraAdmin;
-
-/**
- * A servlet that exposes the scheduler Thrift API over HTTP/JSON.
- */
-class SchedulerAPIServlet extends TServlet {
-
-  @Inject
-  SchedulerAPIServlet(AuroraAdmin.Iface schedulerThriftInterface) {
-    super(new AuroraAdmin.Processor<>(schedulerThriftInterface), new TJSONProtocol.Factory());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/thrift/SchedulerThriftInterface.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/thrift/SchedulerThriftInterface.java b/src/main/java/com/twitter/aurora/scheduler/thrift/SchedulerThriftInterface.java
deleted file mode 100644
index 3ff0f1c..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/thrift/SchedulerThriftInterface.java
+++ /dev/null
@@ -1,1000 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.thrift;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.annotation.Nullable;
-import javax.inject.Inject;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Functions;
-import com.google.common.base.Joiner;
-import com.google.common.base.Optional;
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
-import com.google.common.base.Throwables;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableMultimap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
-import com.google.common.collect.Sets;
-
-import org.apache.commons.lang.StringUtils;
-
-import com.twitter.aurora.auth.CapabilityValidator;
-import com.twitter.aurora.auth.CapabilityValidator.AuditCheck;
-import com.twitter.aurora.auth.CapabilityValidator.Capability;
-import com.twitter.aurora.auth.SessionValidator.AuthFailedException;
-import com.twitter.aurora.gen.AcquireLockResult;
-import com.twitter.aurora.gen.AddInstancesConfig;
-import com.twitter.aurora.gen.AuroraAdmin;
-import com.twitter.aurora.gen.ConfigRewrite;
-import com.twitter.aurora.gen.DrainHostsResult;
-import com.twitter.aurora.gen.EndMaintenanceResult;
-import com.twitter.aurora.gen.GetJobsResult;
-import com.twitter.aurora.gen.GetQuotaResult;
-import com.twitter.aurora.gen.Hosts;
-import com.twitter.aurora.gen.InstanceConfigRewrite;
-import com.twitter.aurora.gen.InstanceKey;
-import com.twitter.aurora.gen.JobConfigRewrite;
-import com.twitter.aurora.gen.JobConfigValidation;
-import com.twitter.aurora.gen.JobConfiguration;
-import com.twitter.aurora.gen.JobKey;
-import com.twitter.aurora.gen.JobSummary;
-import com.twitter.aurora.gen.JobSummaryResult;
-import com.twitter.aurora.gen.ListBackupsResult;
-import com.twitter.aurora.gen.Lock;
-import com.twitter.aurora.gen.LockKey;
-import com.twitter.aurora.gen.LockValidation;
-import com.twitter.aurora.gen.MaintenanceStatusResult;
-import com.twitter.aurora.gen.PopulateJobResult;
-import com.twitter.aurora.gen.QueryRecoveryResult;
-import com.twitter.aurora.gen.Quota;
-import com.twitter.aurora.gen.Response;
-import com.twitter.aurora.gen.ResponseCode;
-import com.twitter.aurora.gen.Result;
-import com.twitter.aurora.gen.RewriteConfigsRequest;
-import com.twitter.aurora.gen.ScheduleStatus;
-import com.twitter.aurora.gen.ScheduleStatusResult;
-import com.twitter.aurora.gen.SessionKey;
-import com.twitter.aurora.gen.StartMaintenanceResult;
-import com.twitter.aurora.gen.TaskConfig;
-import com.twitter.aurora.gen.TaskQuery;
-import com.twitter.aurora.scheduler.base.JobKeys;
-import com.twitter.aurora.scheduler.base.Query;
-import com.twitter.aurora.scheduler.base.ScheduleException;
-import com.twitter.aurora.scheduler.base.Tasks;
-import com.twitter.aurora.scheduler.configuration.ConfigurationManager;
-import com.twitter.aurora.scheduler.configuration.ConfigurationManager.TaskDescriptionException;
-import com.twitter.aurora.scheduler.configuration.SanitizedConfiguration;
-import com.twitter.aurora.scheduler.quota.Quotas;
-import com.twitter.aurora.scheduler.state.CronJobManager;
-import com.twitter.aurora.scheduler.state.LockManager;
-import com.twitter.aurora.scheduler.state.LockManager.LockException;
-import com.twitter.aurora.scheduler.state.MaintenanceController;
-import com.twitter.aurora.scheduler.state.SchedulerCore;
-import com.twitter.aurora.scheduler.storage.JobStore;
-import com.twitter.aurora.scheduler.storage.Storage;
-import com.twitter.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import com.twitter.aurora.scheduler.storage.Storage.MutateWork;
-import com.twitter.aurora.scheduler.storage.Storage.StoreProvider;
-import com.twitter.aurora.scheduler.storage.Storage.Work;
-import com.twitter.aurora.scheduler.storage.backup.Recovery;
-import com.twitter.aurora.scheduler.storage.backup.Recovery.RecoveryException;
-import com.twitter.aurora.scheduler.storage.backup.StorageBackup;
-import com.twitter.aurora.scheduler.storage.entities.IAssignedTask;
-import com.twitter.aurora.scheduler.storage.entities.IJobConfiguration;
-import com.twitter.aurora.scheduler.storage.entities.IJobKey;
-import com.twitter.aurora.scheduler.storage.entities.ILock;
-import com.twitter.aurora.scheduler.storage.entities.ILockKey;
-import com.twitter.aurora.scheduler.storage.entities.IQuota;
-import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
-import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
-import com.twitter.aurora.scheduler.thrift.auth.DecoratedThrift;
-import com.twitter.aurora.scheduler.thrift.auth.Requires;
-import com.twitter.common.args.Arg;
-import com.twitter.common.args.CmdLine;
-import com.twitter.common.base.MorePreconditions;
-import com.twitter.common.base.Supplier;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.util.BackoffHelper;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import static com.twitter.aurora.auth.SessionValidator.SessionContext;
-import static com.twitter.aurora.gen.ResponseCode.AUTH_FAILED;
-import static com.twitter.aurora.gen.ResponseCode.ERROR;
-import static com.twitter.aurora.gen.ResponseCode.INVALID_REQUEST;
-import static com.twitter.aurora.gen.ResponseCode.LOCK_ERROR;
-import static com.twitter.aurora.gen.ResponseCode.OK;
-import static com.twitter.aurora.gen.apiConstants.CURRENT_API_VERSION;
-import static com.twitter.common.base.MorePreconditions.checkNotBlank;
-
-/**
- * Aurora scheduler thrift server implementation.
- * <p>
- * Interfaces between users and the scheduler to access/modify jobs and perform cluster
- * administration tasks.
- */
-@DecoratedThrift
-class SchedulerThriftInterface implements AuroraAdmin.Iface {
-  private static final Logger LOG = Logger.getLogger(SchedulerThriftInterface.class.getName());
-
-  @CmdLine(name = "kill_task_initial_backoff",
-      help = "Initial backoff delay while waiting for the tasks to transition to KILLED.")
-  private static final Arg<Amount<Long, Time>> KILL_TASK_INITIAL_BACKOFF =
-      Arg.create(Amount.of(1L, Time.SECONDS));
-
-  @CmdLine(name = "kill_task_max_backoff",
-      help = "Max backoff delay while waiting for the tasks to transition to KILLED.")
-  private static final Arg<Amount<Long, Time>> KILL_TASK_MAX_BACKOFF =
-      Arg.create(Amount.of(30L, Time.SECONDS));
-
-  private static final Function<IScheduledTask, String> GET_ROLE = Functions.compose(
-      new Function<ITaskConfig, String>() {
-        @Override public String apply(ITaskConfig task) {
-          return task.getOwner().getRole();
-        }
-      },
-      Tasks.SCHEDULED_TO_INFO);
-
-  private final Storage storage;
-  private final SchedulerCore schedulerCore;
-  private final LockManager lockManager;
-  private final CapabilityValidator sessionValidator;
-  private final StorageBackup backup;
-  private final Recovery recovery;
-  private final MaintenanceController maintenance;
-  private final CronJobManager cronJobManager;
-  private final Amount<Long, Time> killTaskInitialBackoff;
-  private final Amount<Long, Time> killTaskMaxBackoff;
-
-  @Inject
-  SchedulerThriftInterface(
-      Storage storage,
-      SchedulerCore schedulerCore,
-      LockManager lockManager,
-      CapabilityValidator sessionValidator,
-      StorageBackup backup,
-      Recovery recovery,
-      CronJobManager cronJobManager,
-      MaintenanceController maintenance) {
-
-    this(storage,
-        schedulerCore,
-        lockManager,
-        sessionValidator,
-        backup,
-        recovery,
-        maintenance,
-        cronJobManager,
-        KILL_TASK_INITIAL_BACKOFF.get(),
-        KILL_TASK_MAX_BACKOFF.get());
-  }
-
-  @VisibleForTesting
-  SchedulerThriftInterface(
-      Storage storage,
-      SchedulerCore schedulerCore,
-      LockManager lockManager,
-      CapabilityValidator sessionValidator,
-      StorageBackup backup,
-      Recovery recovery,
-      MaintenanceController maintenance,
-      CronJobManager cronJobManager,
-      Amount<Long, Time> initialBackoff,
-      Amount<Long, Time> maxBackoff) {
-
-    this.storage = checkNotNull(storage);
-    this.schedulerCore = checkNotNull(schedulerCore);
-    this.lockManager = checkNotNull(lockManager);
-    this.sessionValidator = checkNotNull(sessionValidator);
-    this.backup = checkNotNull(backup);
-    this.recovery = checkNotNull(recovery);
-    this.maintenance = checkNotNull(maintenance);
-    this.cronJobManager = checkNotNull(cronJobManager);
-    this.killTaskInitialBackoff = checkNotNull(initialBackoff);
-    this.killTaskMaxBackoff = checkNotNull(maxBackoff);
-  }
-
-  @Override
-  public Response createJob(
-      JobConfiguration mutableJob,
-      @Nullable Lock mutableLock,
-      SessionKey session) {
-
-    IJobConfiguration job = IJobConfiguration.build(mutableJob);
-    IJobKey jobKey = JobKeys.assertValid(job.getKey());
-    checkNotNull(session);
-
-    Response response = new Response();
-
-    try {
-      sessionValidator.checkAuthenticated(session, ImmutableSet.of(job.getOwner().getRole()));
-    } catch (AuthFailedException e) {
-      return response.setResponseCode(AUTH_FAILED).setMessage(e.getMessage());
-    }
-
-    try {
-      SanitizedConfiguration sanitized = SanitizedConfiguration.fromUnsanitized(job);
-
-      lockManager.validateIfLocked(
-          ILockKey.build(LockKey.job(jobKey.newBuilder())),
-          Optional.fromNullable(mutableLock).transform(ILock.FROM_BUILDER));
-
-      schedulerCore.createJob(sanitized);
-      response.setResponseCode(OK)
-          .setMessage(String.format("%d new tasks pending for job %s",
-              sanitized.getJobConfig().getInstanceCount(), JobKeys.toPath(job)));
-    } catch (LockException e) {
-      response.setResponseCode(LOCK_ERROR).setMessage(e.getMessage());
-    } catch (TaskDescriptionException | ScheduleException e) {
-      response.setResponseCode(INVALID_REQUEST).setMessage(e.getMessage());
-    }
-
-    return response;
-  }
-
-  @Override
-  public Response replaceCronTemplate(
-      JobConfiguration mutableConfig,
-      @Nullable Lock mutableLock,
-      SessionKey session) {
-
-    checkNotNull(mutableConfig);
-    IJobConfiguration job = IJobConfiguration.build(mutableConfig);
-    IJobKey jobKey = JobKeys.assertValid(job.getKey());
-    checkNotNull(session);
-
-    Response response = new Response();
-    try {
-      sessionValidator.checkAuthenticated(session, ImmutableSet.of(job.getOwner().getRole()));
-    } catch (AuthFailedException e) {
-      return response.setResponseCode(AUTH_FAILED).setMessage(e.getMessage());
-    }
-
-    try {
-      lockManager.validateIfLocked(
-          ILockKey.build(LockKey.job(jobKey.newBuilder())),
-          Optional.fromNullable(mutableLock).transform(ILock.FROM_BUILDER));
-
-      SanitizedConfiguration sanitized = SanitizedConfiguration.fromUnsanitized(job);
-
-      if (!cronJobManager.hasJob(jobKey)) {
-        return response.setResponseCode(INVALID_REQUEST).setMessage(
-            "No cron template found for the given key: " + jobKey);
-      }
-      cronJobManager.updateJob(sanitized);
-      return response.setResponseCode(OK).setMessage("Replaced template for: " + jobKey);
-
-    } catch (LockException e) {
-      return response.setResponseCode(LOCK_ERROR).setMessage(e.getMessage());
-    } catch (TaskDescriptionException | ScheduleException e) {
-      return response.setResponseCode(INVALID_REQUEST).setMessage(e.getMessage());
-    }
-  }
-
-  @Override
-  public Response populateJobConfig(JobConfiguration description, JobConfigValidation validation) {
-
-    checkNotNull(description);
-
-    Response response = new Response();
-    try {
-      SanitizedConfiguration sanitized =
-          SanitizedConfiguration.fromUnsanitized(IJobConfiguration.build(description));
-
-      // TODO(maximk): Consider moving job validation logic into a dedicated RPC. MESOS-4476.
-      if (validation != null && validation == JobConfigValidation.RUN_FILTERS) {
-        schedulerCore.validateJobResources(sanitized);
-      }
-
-      PopulateJobResult result = new PopulateJobResult()
-          .setPopulated(ITaskConfig.toBuildersSet(sanitized.getTaskConfigs().values()));
-      response.setResult(Result.populateJobResult(result))
-          .setResponseCode(OK)
-          .setMessage("Tasks populated");
-    } catch (TaskDescriptionException | ScheduleException e) {
-      response.setResponseCode(INVALID_REQUEST)
-          .setMessage("Invalid configuration: " + e.getMessage());
-    }
-    return response;
-  }
-
-  @Override
-  public Response startCronJob(JobKey mutableJobKey, SessionKey session) {
-    checkNotNull(session);
-    IJobKey jobKey = JobKeys.assertValid(IJobKey.build(mutableJobKey));
-
-    Response response = new Response();
-    try {
-      sessionValidator.checkAuthenticated(session, ImmutableSet.of(jobKey.getRole()));
-    } catch (AuthFailedException e) {
-      response.setResponseCode(AUTH_FAILED).setMessage(e.getMessage());
-      return response;
-    }
-
-    try {
-      schedulerCore.startCronJob(jobKey);
-      response.setResponseCode(OK).setMessage("Cron run started.");
-    } catch (ScheduleException e) {
-      response.setResponseCode(INVALID_REQUEST)
-          .setMessage("Failed to start cron job - " + e.getMessage());
-    } catch (TaskDescriptionException e) {
-      response.setResponseCode(ERROR).setMessage("Invalid task description: " + e.getMessage());
-    }
-
-    return response;
-  }
-
-  // TODO(William Farner): Provide status information about cron jobs here.
-  @Override
-  public Response getTasksStatus(TaskQuery query) {
-    checkNotNull(query);
-
-    Set<IScheduledTask> tasks =
-        Storage.Util.weaklyConsistentFetchTasks(storage, Query.arbitrary(query));
-
-    Response response = new Response();
-
-    if (tasks.isEmpty()) {
-      response.setResponseCode(INVALID_REQUEST)
-          .setMessage("No tasks found for query: " + query);
-    } else {
-      response.setResponseCode(OK)
-          .setResult(Result.scheduleStatusResult(
-              new ScheduleStatusResult().setTasks(IScheduledTask.toBuildersList(tasks))));
-    }
-
-    return response;
-  }
-
-  @Override
-  public Response getJobSummary() {
-    Set<IScheduledTask> tasks = Storage.Util.weaklyConsistentFetchTasks(storage, Query.unscoped());
-    Multimap<String, IJobKey> jobsByRole = Multimaps.index(
-        FluentIterable.from(tasks).transform(Tasks.SCHEDULED_TO_JOB_KEY),
-        JobKeys.TO_ROLE);
-
-    Multimap<String, IJobKey> cronJobsByRole = Multimaps.index(
-        FluentIterable.from(cronJobManager.getJobs()).transform(JobKeys.FROM_CONFIG),
-        JobKeys.TO_ROLE);
-
-    List<JobSummary> jobSummaries = Lists.newLinkedList();
-    for (String role : Sets.union(jobsByRole.keySet(), cronJobsByRole.keySet())) {
-      JobSummary summary = new JobSummary();
-      summary.setRole(role);
-      summary.setJobCount(jobsByRole.get(role).size());
-      summary.setCronJobCount(cronJobsByRole.get(role).size());
-      jobSummaries.add(summary);
-    }
-
-    return new Response()
-        .setResponseCode(OK)
-        .setResult(Result.jobSummaryResult(new JobSummaryResult(jobSummaries)));
-  }
-
-  @Override
-  public Response getJobs(@Nullable String maybeNullRole) {
-    Optional<String> ownerRole = Optional.fromNullable(maybeNullRole);
-
-
-    // Ensure we only return one JobConfiguration for each JobKey.
-    Map<IJobKey, IJobConfiguration> jobs = Maps.newHashMap();
-
-    // Query the task store, find immediate jobs, and synthesize a JobConfiguration for them.
-    // This is necessary because the ImmediateJobManager doesn't store jobs directly and
-    // ImmediateJobManager#getJobs always returns an empty Collection.
-    Query.Builder scope = ownerRole.isPresent()
-        ? Query.roleScoped(ownerRole.get())
-        : Query.unscoped();
-    Multimap<IJobKey, IScheduledTask> tasks =
-        Tasks.byJobKey(Storage.Util.weaklyConsistentFetchTasks(storage, scope.active()));
-
-    jobs.putAll(Maps.transformEntries(tasks.asMap(),
-        new Maps.EntryTransformer<IJobKey, Collection<IScheduledTask>, IJobConfiguration>() {
-          @Override
-          public IJobConfiguration transformEntry(
-              IJobKey jobKey,
-              Collection<IScheduledTask> tasks) {
-
-            // Pick an arbitrary task for each immediate job. The chosen task might not be the most
-            // recent if the job is in the middle of an update or some shards have been selectively
-            // created.
-            TaskConfig firstTask = tasks.iterator().next().getAssignedTask().getTask().newBuilder();
-            return IJobConfiguration.build(new JobConfiguration()
-                .setKey(jobKey.newBuilder())
-                .setOwner(firstTask.getOwner())
-                .setTaskConfig(firstTask)
-                .setInstanceCount(tasks.size()));
-          }
-        }));
-
-    // Get cron jobs directly from the manager. Do this after querying the task store so the real
-    // template JobConfiguration for a cron job will overwrite the synthesized one that could have
-    // been created above.
-    Predicate<IJobConfiguration> configFilter = ownerRole.isPresent()
-        ? Predicates.compose(Predicates.equalTo(ownerRole.get()), JobKeys.CONFIG_TO_ROLE)
-        : Predicates.<IJobConfiguration>alwaysTrue();
-    jobs.putAll(Maps.uniqueIndex(
-        FluentIterable.from(cronJobManager.getJobs()).filter(configFilter),
-        JobKeys.FROM_CONFIG));
-
-    return new Response()
-        .setResponseCode(OK)
-        .setResult(Result.getJobsResult(new GetJobsResult()
-            .setConfigs(IJobConfiguration.toBuildersSet(jobs.values()))));
-  }
-
-  private void validateLockForTasks(Optional<ILock> lock, Iterable<IScheduledTask> tasks)
-      throws LockException {
-
-    ImmutableSet<IJobKey> uniqueKeys = FluentIterable.from(tasks)
-        .transform(Tasks.SCHEDULED_TO_JOB_KEY)
-        .toSet();
-
-    // Validate lock against every unique job key derived from the tasks.
-    for (IJobKey key : uniqueKeys) {
-      lockManager.validateIfLocked(ILockKey.build(LockKey.job(key.newBuilder())), lock);
-    }
-  }
-
-  private SessionContext validateSessionKeyForTasks(
-      SessionKey session,
-      TaskQuery taskQuery,
-      Iterable<IScheduledTask> tasks) throws AuthFailedException {
-
-    // Authenticate the session against any affected roles, always including the role for a
-    // role-scoped query.  This papers over the implementation detail that dormant cron jobs are
-    // authenticated this way.
-    ImmutableSet.Builder<String> targetRoles = ImmutableSet.<String>builder()
-        .addAll(FluentIterable.from(tasks).transform(GET_ROLE));
-    if (taskQuery.isSetOwner()) {
-      targetRoles.add(taskQuery.getOwner().getRole());
-    }
-    return sessionValidator.checkAuthenticated(session, targetRoles.build());
-  }
-
-  private Optional<SessionContext> isAdmin(SessionKey session) {
-    try {
-      return Optional.of(
-          sessionValidator.checkAuthorized(session, Capability.ROOT, AuditCheck.REQUIRED));
-    } catch (AuthFailedException e) {
-      return Optional.absent();
-    }
-  }
-
-  @Override
-  public Response killTasks(final TaskQuery query, Lock mutablelock, SessionKey session) {
-    checkNotNull(query);
-    checkNotNull(session);
-
-    Response response = new Response();
-
-    if (query.getJobName() != null && StringUtils.isBlank(query.getJobName())) {
-      response.setResponseCode(INVALID_REQUEST).setMessage(
-          String.format("Invalid job name: '%s'", query.getJobName()));
-      return response;
-    }
-
-    Set<IScheduledTask> tasks = Storage.Util.consistentFetchTasks(storage, Query.arbitrary(query));
-
-    Optional<SessionContext> context = isAdmin(session);
-    if (context.isPresent()) {
-      LOG.info("Granting kill query to admin user: " + query);
-    } else {
-      try {
-        context = Optional.of(validateSessionKeyForTasks(session, query, tasks));
-      } catch (AuthFailedException e) {
-        response.setResponseCode(AUTH_FAILED).setMessage(e.getMessage());
-        return response;
-      }
-    }
-
-    try {
-      validateLockForTasks(Optional.fromNullable(mutablelock).transform(ILock.FROM_BUILDER), tasks);
-      schedulerCore.killTasks(Query.arbitrary(query), context.get().getIdentity());
-    } catch (LockException e) {
-      return response.setResponseCode(LOCK_ERROR).setMessage(e.getMessage());
-    } catch (ScheduleException e) {
-      return response.setResponseCode(INVALID_REQUEST).setMessage(e.getMessage());
-    }
-
-    // TODO(William Farner): Move this into the client.
-    BackoffHelper backoff = new BackoffHelper(killTaskInitialBackoff, killTaskMaxBackoff, true);
-    final Query.Builder activeQuery = Query.arbitrary(query.setStatuses(Tasks.ACTIVE_STATES));
-    try {
-      backoff.doUntilSuccess(new Supplier<Boolean>() {
-        @Override public Boolean get() {
-          Set<IScheduledTask> tasks = Storage.Util.consistentFetchTasks(storage, activeQuery);
-          if (tasks.isEmpty()) {
-            LOG.info("Tasks all killed, done waiting.");
-            return true;
-          } else {
-            LOG.info("Jobs not yet killed, waiting...");
-            return false;
-          }
-        }
-      });
-      response.setResponseCode(OK).setMessage("Tasks killed.");
-    } catch (InterruptedException e) {
-      LOG.warning("Interrupted while trying to kill tasks: " + e);
-      Thread.currentThread().interrupt();
-      response.setResponseCode(ERROR).setMessage("killTasks thread was interrupted.");
-    } catch (BackoffHelper.BackoffStoppedException e) {
-      response.setResponseCode(ERROR).setMessage("Tasks were not killed in time.");
-    }
-    return response;
-  }
-
-  @Override
-  public Response restartShards(
-      JobKey mutableJobKey,
-      Set<Integer> shardIds,
-      Lock mutableLock,
-      SessionKey session) {
-
-    IJobKey jobKey = JobKeys.assertValid(IJobKey.build(mutableJobKey));
-    MorePreconditions.checkNotBlank(shardIds);
-    checkNotNull(session);
-
-    Response response = new Response();
-    SessionContext context;
-    try {
-      context = sessionValidator.checkAuthenticated(session, ImmutableSet.of(jobKey.getRole()));
-    } catch (AuthFailedException e) {
-      response.setResponseCode(AUTH_FAILED).setMessage(e.getMessage());
-      return response;
-    }
-
-    try {
-      lockManager.validateIfLocked(
-          ILockKey.build(LockKey.job(jobKey.newBuilder())),
-          Optional.fromNullable(mutableLock).transform(ILock.FROM_BUILDER));
-      schedulerCore.restartShards(jobKey, shardIds, context.getIdentity());
-      response.setResponseCode(OK).setMessage("Shards are restarting.");
-    } catch (LockException e) {
-      response.setResponseCode(ResponseCode.LOCK_ERROR).setMessage(e.getMessage());
-    } catch (ScheduleException e) {
-      response.setResponseCode(ResponseCode.INVALID_REQUEST).setMessage(e.getMessage());
-    }
-
-    return response;
-  }
-
-  @Override
-  public Response getQuota(final String ownerRole) {
-    checkNotBlank(ownerRole);
-
-    IQuota quota = storage.consistentRead(new Work.Quiet<IQuota>() {
-      @Override public IQuota apply(StoreProvider storeProvider) {
-        return storeProvider.getQuotaStore().fetchQuota(ownerRole).or(Quotas.noQuota());
-      }
-    });
-
-    return new Response()
-        .setResponseCode(OK)
-        .setResult(Result.getQuotaResult(new GetQuotaResult()
-            .setQuota(quota.newBuilder())));
-  }
-
-  @Override
-  public Response startMaintenance(Hosts hosts, SessionKey session) {
-      return new Response()
-          .setResponseCode(OK)
-          .setResult(Result.startMaintenanceResult(new StartMaintenanceResult()
-              .setStatuses(maintenance.startMaintenance(hosts.getHostNames()))));
-  }
-
-  @Override
-  public Response drainHosts(Hosts hosts, SessionKey session) {
-    return new Response()
-        .setResponseCode(OK)
-        .setResult(Result.drainHostsResult(new DrainHostsResult()
-            .setStatuses(maintenance.drain(hosts.getHostNames()))));
-  }
-
-  @Override
-  public Response maintenanceStatus(Hosts hosts, SessionKey session) {
-    return new Response()
-        .setResponseCode(OK)
-        .setResult(Result.maintenanceStatusResult(new MaintenanceStatusResult()
-            .setStatuses(maintenance.getStatus(hosts.getHostNames()))));
-  }
-
-  @Override
-  public Response endMaintenance(Hosts hosts, SessionKey session) {
-      return new Response()
-          .setResponseCode(OK)
-          .setResult(Result.endMaintenanceResult(new EndMaintenanceResult()
-              .setStatuses(maintenance.endMaintenance(hosts.getHostNames()))));
-  }
-
-  @Requires(whitelist = Capability.PROVISIONER)
-  @Override
-  public Response setQuota(final String ownerRole, final Quota quota, SessionKey session) {
-    checkNotBlank(ownerRole);
-    checkNotNull(quota);
-    checkNotNull(session);
-
-    // TODO(Kevin Sweeney): Input validation for Quota.
-
-    storage.write(new MutateWork.NoResult.Quiet() {
-      @Override protected void execute(MutableStoreProvider storeProvider) {
-        storeProvider.getQuotaStore().saveQuota(ownerRole, IQuota.build(quota));
-      }
-    });
-
-    return new Response().setResponseCode(OK).setMessage("Quota applied.");
-  }
-
-  @Override
-  public Response forceTaskState(
-      String taskId,
-      ScheduleStatus status,
-      SessionKey session) {
-
-    checkNotBlank(taskId);
-    checkNotNull(status);
-    checkNotNull(session);
-
-    Response response = new Response();
-    SessionContext context;
-    try {
-      // TODO(Sathya): Remove this after AOP-style session validation passes in a SessionContext.
-      context = sessionValidator.checkAuthorized(session, Capability.ROOT, AuditCheck.REQUIRED);
-    } catch (AuthFailedException e) {
-      response.setResponseCode(AUTH_FAILED).setMessage(e.getMessage());
-      return response;
-    }
-
-    schedulerCore.setTaskStatus(
-        Query.taskScoped(taskId), status, transitionMessage(context.getIdentity()));
-    return new Response().setResponseCode(OK).setMessage("Transition attempted.");
-  }
-
-  @Override
-  public Response performBackup(SessionKey session) {
-    backup.backupNow();
-    return new Response().setResponseCode(OK);
-  }
-
-  @Override
-  public Response listBackups(SessionKey session) {
-    return new Response()
-        .setResponseCode(OK)
-        .setResult(Result.listBackupsResult(new ListBackupsResult()
-            .setBackups(recovery.listBackups())));
-  }
-
-  @Override
-  public Response stageRecovery(String backupId, SessionKey session) {
-    Response response = new Response().setResponseCode(OK);
-    try {
-      recovery.stage(backupId);
-    } catch (RecoveryException e) {
-      response.setResponseCode(ERROR).setMessage(e.getMessage());
-      LOG.log(Level.WARNING, "Failed to stage recovery: " + e, e);
-    }
-
-    return response;
-  }
-
-  @Override
-  public Response queryRecovery(TaskQuery query, SessionKey session) {
-    Response response = new Response();
-    try {
-      response.setResponseCode(OK)
-          .setResult(Result.queryRecoveryResult(new QueryRecoveryResult()
-              .setTasks(IScheduledTask.toBuildersSet(recovery.query(Query.arbitrary(query))))));
-    } catch (RecoveryException e) {
-      response.setResponseCode(ERROR).setMessage(e.getMessage());
-      LOG.log(Level.WARNING, "Failed to query recovery: " + e, e);
-    }
-
-    return response;
-  }
-
-  @Override
-  public Response deleteRecoveryTasks(TaskQuery query, SessionKey session) {
-    Response response = new Response().setResponseCode(OK);
-    try {
-      recovery.deleteTasks(Query.arbitrary(query));
-    } catch (RecoveryException e) {
-      response.setResponseCode(ERROR).setMessage(e.getMessage());
-      LOG.log(Level.WARNING, "Failed to delete recovery tasks: " + e, e);
-    }
-
-    return response;
-  }
-
-  @Override
-  public Response commitRecovery(SessionKey session) {
-    Response response = new Response().setResponseCode(OK);
-    try {
-      recovery.commit();
-    } catch (RecoveryException e) {
-      response.setResponseCode(ERROR).setMessage(e.getMessage());
-    }
-
-    return response;
-  }
-
-  @Override
-  public Response unloadRecovery(SessionKey session) {
-    recovery.unload();
-    return new Response().setResponseCode(OK);
-  }
-
-  @Override
-  public Response snapshot(SessionKey session) {
-    Response response = new Response();
-    try {
-      storage.snapshot();
-      return response.setResponseCode(OK).setMessage("Compaction successful.");
-    } catch (Storage.StorageException e) {
-      LOG.log(Level.WARNING, "Requested snapshot failed.", e);
-      return response.setResponseCode(ERROR).setMessage(e.getMessage());
-    }
-  }
-
-  private static Multimap<String, IJobConfiguration> jobsByKey(JobStore jobStore, IJobKey jobKey) {
-    ImmutableMultimap.Builder<String, IJobConfiguration> matches = ImmutableMultimap.builder();
-    for (String managerId : jobStore.fetchManagerIds()) {
-      for (IJobConfiguration job : jobStore.fetchJobs(managerId)) {
-        if (job.getKey().equals(jobKey)) {
-          matches.put(managerId, job);
-        }
-      }
-    }
-    return matches.build();
-  }
-
-  @Override
-  public Response rewriteConfigs(
-      final RewriteConfigsRequest request,
-      SessionKey session) {
-
-    if (request.getRewriteCommandsSize() == 0) {
-      return new Response()
-        .setResponseCode(ResponseCode.ERROR)
-        .setMessage("No rewrite commands provided.");
-    }
-
-    return storage.write(new MutateWork.Quiet<Response>() {
-      @Override public Response apply(MutableStoreProvider storeProvider) {
-        List<String> errors = Lists.newArrayList();
-
-        for (ConfigRewrite command : request.getRewriteCommands()) {
-          Optional<String> error = rewriteConfig(command, storeProvider);
-          if (error.isPresent()) {
-            errors.add(error.get());
-          }
-        }
-
-        Response resp = new Response();
-        if (!errors.isEmpty()) {
-          resp.setResponseCode(ResponseCode.WARNING).setMessage(Joiner.on(", ").join(errors));
-        } else {
-          resp.setResponseCode(OK).setMessage("All rewrites completed successfully.");
-        }
-        return resp;
-      }
-    });
-  }
-
-  private Optional<String> rewriteConfig(
-      ConfigRewrite command,
-      MutableStoreProvider storeProvider) {
-
-    Optional<String> error = Optional.absent();
-    switch (command.getSetField()) {
-      case JOB_REWRITE:
-        JobConfigRewrite jobRewrite = command.getJobRewrite();
-        IJobConfiguration existingJob = IJobConfiguration.build(jobRewrite.getOldJob());
-        IJobConfiguration rewrittenJob;
-        try {
-          rewrittenJob = ConfigurationManager.validateAndPopulate(
-              IJobConfiguration.build(jobRewrite.getRewrittenJob()));
-        } catch (TaskDescriptionException e) {
-          // We could add an error here, but this is probably a hint of something wrong in
-          // the client that's causing a bad configuration to be applied.
-          throw Throwables.propagate(e);
-        }
-        if (!existingJob.getKey().equals(rewrittenJob.getKey())) {
-          error = Optional.of("Disallowing rewrite attempting to change job key.");
-        } else if (!existingJob.getOwner().equals(rewrittenJob.getOwner())) {
-          error = Optional.of("Disallowing rewrite attempting to change job owner.");
-        } else {
-          JobStore.Mutable jobStore = storeProvider.getJobStore();
-          Multimap<String, IJobConfiguration> matches =
-              jobsByKey(jobStore, existingJob.getKey());
-          switch (matches.size()) {
-            case 0:
-              error = Optional.of("No jobs found for key " + JobKeys.toPath(existingJob));
-              break;
-
-            case 1:
-              Map.Entry<String, IJobConfiguration> match =
-                  Iterables.getOnlyElement(matches.entries());
-              IJobConfiguration storedJob = match.getValue();
-              if (!storedJob.equals(existingJob)) {
-                error = Optional.of("CAS compare failed for " + JobKeys.toPath(storedJob));
-              } else {
-                jobStore.saveAcceptedJob(match.getKey(), rewrittenJob);
-              }
-              break;
-
-            default:
-              error = Optional.of("Multiple jobs found for key " + JobKeys.toPath(existingJob));
-          }
-        }
-        break;
-
-      case INSTANCE_REWRITE:
-        InstanceConfigRewrite instanceRewrite = command.getInstanceRewrite();
-        InstanceKey instanceKey = instanceRewrite.getInstanceKey();
-        Iterable<IScheduledTask> tasks = storeProvider.getTaskStore().fetchTasks(
-            Query.instanceScoped(IJobKey.build(instanceKey.getJobKey()),
-                instanceKey.getInstanceId())
-                .active());
-        Optional<IAssignedTask> task =
-            Optional.fromNullable(Iterables.getOnlyElement(tasks, null))
-                .transform(Tasks.SCHEDULED_TO_ASSIGNED);
-        if (!task.isPresent()) {
-          error = Optional.of("No active task found for " + instanceKey);
-        } else if (!task.get().getTask().newBuilder().equals(instanceRewrite.getOldTask())) {
-          error = Optional.of("CAS compare failed for " + instanceKey);
-        } else {
-          ITaskConfig newConfiguration = ITaskConfig.build(
-              ConfigurationManager.applyDefaultsIfUnset(instanceRewrite.getRewrittenTask()));
-          boolean changed = storeProvider.getUnsafeTaskStore().unsafeModifyInPlace(
-              task.get().getTaskId(), newConfiguration);
-          if (!changed) {
-            error = Optional.of("Did not change " + task.get().getTaskId());
-          }
-        }
-        break;
-
-      default:
-        throw new IllegalArgumentException("Unhandled command type " + command.getSetField());
-    }
-
-    return error;
-  }
-
-  @Override
-  public Response getVersion() {
-    return new Response()
-        .setResponseCode(OK)
-        .setResult(Result.getVersionResult(CURRENT_API_VERSION));
-  }
-
-  @Override
-  public Response addInstances(
-      AddInstancesConfig config,
-      @Nullable Lock mutableLock,
-      SessionKey session) {
-
-    checkNotNull(config);
-    checkNotNull(session);
-    checkNotBlank(config.getInstanceIds());
-    IJobKey jobKey = JobKeys.assertValid(IJobKey.build(config.getKey()));
-
-    Response resp = new Response();
-    try {
-      sessionValidator.checkAuthenticated(session, ImmutableSet.of(jobKey.getRole()));
-      ITaskConfig task = ConfigurationManager.validateAndPopulate(
-          ITaskConfig.build(config.getTaskConfig()));
-
-      if (cronJobManager.hasJob(jobKey)) {
-        return resp.setResponseCode(INVALID_REQUEST)
-            .setMessage("Cron jobs are not supported here.");
-      }
-
-      lockManager.validateIfLocked(
-          ILockKey.build(LockKey.job(jobKey.newBuilder())),
-          Optional.fromNullable(mutableLock).transform(ILock.FROM_BUILDER));
-
-      schedulerCore.addInstances(jobKey, ImmutableSet.copyOf(config.getInstanceIds()), task);
-      return resp.setResponseCode(OK).setMessage("Successfully added instances.");
-    } catch (AuthFailedException e) {
-      return resp.setResponseCode(AUTH_FAILED).setMessage(e.getMessage());
-    } catch (LockException e) {
-      return resp.setResponseCode(LOCK_ERROR).setMessage(e.getMessage());
-    } catch (TaskDescriptionException | ScheduleException e) {
-      return resp.setResponseCode(INVALID_REQUEST).setMessage(e.getMessage());
-    }
-  }
-
-  private String getRoleFromLockKey(ILockKey lockKey) {
-    switch (lockKey.getSetField()) {
-      case JOB:
-        JobKeys.assertValid(lockKey.getJob());
-        return lockKey.getJob().getRole();
-      default:
-        throw new IllegalArgumentException("Unhandled LockKey: " + lockKey.getSetField());
-    }
-  }
-
-  @Override
-  public Response acquireLock(LockKey mutableLockKey, SessionKey session) {
-    checkNotNull(mutableLockKey);
-    checkNotNull(session);
-
-    ILockKey lockKey = ILockKey.build(mutableLockKey);
-    Response response = new Response();
-
-    try {
-      SessionContext context = sessionValidator.checkAuthenticated(
-          session,
-          ImmutableSet.of(getRoleFromLockKey(lockKey)));
-
-      ILock lock = lockManager.acquireLock(lockKey, context.getIdentity());
-      response.setResult(Result.acquireLockResult(
-          new AcquireLockResult().setLock(lock.newBuilder())));
-
-      return response.setResponseCode(OK).setMessage("Lock has been acquired.");
-    } catch (AuthFailedException e) {
-      return response.setResponseCode(AUTH_FAILED).setMessage(e.getMessage());
-    } catch (LockException e) {
-      return response.setResponseCode(ResponseCode.LOCK_ERROR).setMessage(e.getMessage());
-    }
-  }
-
-  @Override
-  public Response releaseLock(Lock mutableLock, LockValidation validation, SessionKey session) {
-    checkNotNull(mutableLock);
-    checkNotNull(validation);
-    checkNotNull(session);
-
-    Response response = new Response();
-    ILock lock = ILock.build(mutableLock);
-
-    try {
-      sessionValidator.checkAuthenticated(
-          session,
-          ImmutableSet.of(getRoleFromLockKey(lock.getKey())));
-
-      if (validation == LockValidation.CHECKED) {
-        lockManager.validateIfLocked(lock.getKey(), Optional.of(lock));
-      }
-      lockManager.releaseLock(lock);
-      return response.setResponseCode(OK).setMessage("Lock has been released.");
-    } catch (AuthFailedException e) {
-      return response.setResponseCode(AUTH_FAILED).setMessage(e.getMessage());
-    } catch (LockException e) {
-      return response.setResponseCode(ResponseCode.LOCK_ERROR).setMessage(e.getMessage());
-    }
-  }
-
-  @VisibleForTesting
-  static Optional<String> transitionMessage(String user) {
-    return Optional.of("Transition forced by " + user);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/thrift/ThriftConfiguration.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/thrift/ThriftConfiguration.java b/src/main/java/com/twitter/aurora/scheduler/thrift/ThriftConfiguration.java
deleted file mode 100644
index c6e9b18..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/thrift/ThriftConfiguration.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.thrift;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-import com.google.common.base.Optional;
-
-/**
- * Container for thrift server configuration options.
- */
-public interface ThriftConfiguration {
-  /**
-   * Gets a stream for the thrift socket SSL key if this server is configured to use SSL.
-   *
-   * @return A stream that contains the SSL key data if SSL is enabled, absent otherwise.
-   * @throws IOException If the stream could not be opened.
-   */
-  Optional<? extends InputStream> getSslKeyStream() throws IOException;
-
-  /**
-   * Gets the port that the thrift server should listen on.
-   *
-   * @return Thrift server port.
-   */
-  int getServingPort();
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/thrift/ThriftModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/thrift/ThriftModule.java b/src/main/java/com/twitter/aurora/scheduler/thrift/ThriftModule.java
deleted file mode 100644
index cca9053..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/thrift/ThriftModule.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.thrift;
-
-import javax.inject.Singleton;
-
-import com.google.inject.AbstractModule;
-
-import com.twitter.aurora.gen.AuroraAdmin;
-import com.twitter.aurora.scheduler.thrift.aop.AopModule;
-import com.twitter.common.application.http.Registration;
-import com.twitter.common.application.modules.LifecycleModule;
-
-/**
- * Binding module to configure a thrift server.
- */
-public class ThriftModule extends AbstractModule {
-
-  @Override
-  protected void configure() {
-    bind(AuroraAdmin.Iface.class).to(SchedulerThriftInterface.class);
-    bind(ThriftServer.class).in(Singleton.class);
-    LifecycleModule.bindServiceRunner(binder(), ThriftServerLauncher.class);
-
-    Registration.registerServlet(binder(), "/api", SchedulerAPIServlet.class, true);
-
-    install(new AopModule());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/thrift/ThriftServer.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/thrift/ThriftServer.java b/src/main/java/com/twitter/aurora/scheduler/thrift/ThriftServer.java
deleted file mode 100644
index 7b9abd1..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/thrift/ThriftServer.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.thrift;
-
-import java.net.ServerSocket;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-import org.apache.thrift.TProcessor;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.server.TServer;
-import org.apache.thrift.server.TThreadPoolServer;
-import org.apache.thrift.transport.TServerSocket;
-
-import com.twitter.thrift.Status;
-
-class ThriftServer {
-  private static final Logger LOG = Logger.getLogger(ThriftServer.class.getName());
-
-  private TServer server = null;
-
-  // Current health status of the server.
-  private Status status = Status.STARTING;
-
-  /**
-   * Starts the server.
-   * This may be called at any point except when the server is already alive.  That is, it's
-   * allowable to start, stop, and re-start the server.
-   *
-   * @param socket The socket to use.
-   * @param processor The processor to handle requests.
-   */
-  public synchronized void start(ServerSocket socket, TProcessor processor) {
-    Preconditions.checkNotNull(socket);
-    Preconditions.checkNotNull(processor);
-    Preconditions.checkState(status != Status.ALIVE, "Server must only be started once.");
-    setStatus(Status.ALIVE);
-    TThreadPoolServer.Args args = new TThreadPoolServer.Args(new TServerSocket(socket))
-        .processor(processor)
-        .protocolFactory(new TBinaryProtocol.Factory(false, true));
-
-    final TServer starting = new TThreadPoolServer(args);
-    server = starting;
-    LOG.info("Starting thrift server on port " + socket.getLocalPort());
-
-    Thread listeningThread = new ThreadFactoryBuilder().setDaemon(false).build()
-        .newThread(new Runnable() {
-          @Override public void run() {
-            try {
-              starting.serve();
-            } catch (Throwable t) {
-              LOG.log(Level.WARNING,
-                  "Uncaught exception while attempting to handle service requests: " + t, t);
-              setStatus(Status.DEAD);
-            }
-          }
-    });
-
-    listeningThread.start();
-  }
-
-  private synchronized void setStatus(Status status) {
-    LOG.info("Moving from status " + this.status + " to " + status);
-    this.status = status;
-  }
-
-  /**
-   * Attempts to shut down the server.
-   * The server may be shut down at any time, though the request will be ignored if the server is
-   * already stopped.
-   */
-  public synchronized void shutdown() {
-    if (status == Status.STOPPED) {
-      LOG.info("Server already stopped, shutdown request ignored.");
-      return;
-    }
-
-    LOG.info("Received shutdown request, stopping server.");
-    setStatus(Status.STOPPING);
-
-    // TODO(William Farner): Figure out what happens to queued / in-process requests when the server
-    // is stopped.  Might want to allow a sleep period for the active requests to be completed.
-
-    if (server != null) {
-      server.stop();
-    }
-
-    server = null;
-    setStatus(Status.STOPPED);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/thrift/ThriftServerLauncher.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/thrift/ThriftServerLauncher.java b/src/main/java/com/twitter/aurora/scheduler/thrift/ThriftServerLauncher.java
deleted file mode 100644
index 6743060..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/thrift/ThriftServerLauncher.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.thrift;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.ServerSocket;
-import java.security.GeneralSecurityException;
-import java.security.KeyStore;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLServerSocket;
-import javax.net.ssl.SSLServerSocketFactory;
-
-import com.google.common.base.Optional;
-
-import com.twitter.aurora.gen.AuroraAdmin;
-import com.twitter.aurora.gen.AuroraAdmin.Iface;
-import com.twitter.common.application.modules.LifecycleModule.ServiceRunner;
-import com.twitter.common.application.modules.LocalServiceRegistry.LocalService;
-import com.twitter.common.base.Command;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Service launcher that starts up and registers the scheduler thrift server as a primary service
- * for the application.
- */
-class ThriftServerLauncher implements ServiceRunner {
-
-  private static final Logger LOG = Logger.getLogger(ThriftServerLauncher.class.getName());
-
-  private final ThriftConfiguration configuration;
-
-  // Security is enforced via file permissions, not via this password, for what it's worth.
-  private static final String SSL_KEYFILE_PASSWORD = "MesosKeyStorePassword";
-
-  private final Iface schedulerThriftInterface;
-  private final ThriftServer schedulerThriftServer;
-
-  @Inject
-  ThriftServerLauncher(
-      Iface schedulerThriftInterface,
-      ThriftServer schedulerThriftServer,
-      ThriftConfiguration configuration) {
-
-    this.schedulerThriftInterface = checkNotNull(schedulerThriftInterface);
-    this.schedulerThriftServer = checkNotNull(schedulerThriftServer);
-    this.configuration = checkNotNull(configuration);
-  }
-
-  @Override
-  public LocalService launch() {
-    ServerSocket socket = getServerSocket();
-    schedulerThriftServer.start(
-        socket,
-        new AuroraAdmin.Processor<>(schedulerThriftInterface));
-
-    Command shutdown = new Command() {
-      @Override public void execute() {
-        LOG.info("Stopping thrift server.");
-        schedulerThriftServer.shutdown();
-      }
-    };
-
-    return LocalService.primaryService(socket.getLocalPort(), shutdown);
-  }
-
-  private ServerSocket getServerSocket() {
-    try {
-      Optional<? extends InputStream> sslKeyStream = configuration.getSslKeyStream();
-      if (!sslKeyStream.isPresent()) {
-        LOG.warning("Running Thrift Server without SSL.");
-        return new ServerSocket(configuration.getServingPort());
-      } else {
-        // TODO(Kevin Sweeney): Add helper to perform this keyfile import.
-        KeyStore ks = KeyStore.getInstance("JKS");
-        ks.load(sslKeyStream.get(), SSL_KEYFILE_PASSWORD.toCharArray());
-
-        KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
-        kmf.init(ks, SSL_KEYFILE_PASSWORD.toCharArray());
-
-        SSLContext ctx = SSLContext.getInstance("TLS");
-        ctx.init(kmf.getKeyManagers(), null, null);
-
-        SSLServerSocketFactory ssf = ctx.getServerSocketFactory();
-        SSLServerSocket serverSocket = (SSLServerSocket) ssf.createServerSocket(
-            configuration.getServingPort());
-        serverSocket.setEnabledCipherSuites(serverSocket.getSupportedCipherSuites());
-        serverSocket.setNeedClientAuth(false);
-        return serverSocket;
-      }
-    } catch (IOException e) {
-      throw new RuntimeException("Failed to read key file.", e);
-    } catch (GeneralSecurityException e) {
-      throw new RuntimeException("SSL setup failed.", e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/thrift/aop/APIVersionInterceptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/thrift/aop/APIVersionInterceptor.java b/src/main/java/com/twitter/aurora/scheduler/thrift/aop/APIVersionInterceptor.java
deleted file mode 100644
index d66a2b2..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/thrift/aop/APIVersionInterceptor.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package com.twitter.aurora.scheduler.thrift.aop;
-
-import org.aopalliance.intercept.MethodInterceptor;
-import org.aopalliance.intercept.MethodInvocation;
-
-import com.twitter.aurora.gen.Response;
-
-import static com.twitter.aurora.gen.apiConstants.CURRENT_API_VERSION;
-
-class APIVersionInterceptor implements MethodInterceptor {
-
-  @Override
-  public Object invoke(MethodInvocation invocation) throws Throwable {
-    Response resp = (Response) invocation.proceed();
-    if (resp.version == null) {
-      resp.setVersion(CURRENT_API_VERSION);
-    }
-    return resp;
-  }
-}