You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2013/12/31 22:20:21 UTC

[28/51] [partial] Rename twitter* and com.twitter to apache and org.apache directories to preserve all file history before the refactor.

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java b/src/main/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java
new file mode 100644
index 0000000..d735828
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java
@@ -0,0 +1,135 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.storage.testing;
+
+import com.google.common.collect.ImmutableSet;
+
+import org.easymock.Capture;
+import org.easymock.IAnswer;
+import org.easymock.IExpectationSetters;
+
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.storage.AttributeStore;
+import com.twitter.aurora.scheduler.storage.JobStore;
+import com.twitter.aurora.scheduler.storage.LockStore;
+import com.twitter.aurora.scheduler.storage.QuotaStore;
+import com.twitter.aurora.scheduler.storage.SchedulerStore;
+import com.twitter.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import com.twitter.aurora.scheduler.storage.Storage.MutateWork;
+import com.twitter.aurora.scheduler.storage.Storage.NonVolatileStorage;
+import com.twitter.aurora.scheduler.storage.Storage.StoreProvider;
+import com.twitter.aurora.scheduler.storage.Storage.Work;
+import com.twitter.aurora.scheduler.storage.TaskStore;
+import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
+import com.twitter.common.testing.easymock.EasyMockTest;
+
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.expect;
+
+/**
+ * Auxiliary class to simplify testing against a mocked storage.  This allows callers to directly
+ * set up call expectations on individual stores rather than writing plumbing code to deal with
+ * operations and {@link StoreProvider}.
+ */
+public class StorageTestUtil {
+
+  public final StoreProvider storeProvider;
+  public final MutableStoreProvider mutableStoreProvider;
+  public final TaskStore.Mutable taskStore;
+  public final QuotaStore.Mutable quotaStore;
+  public final AttributeStore.Mutable attributeStore;
+  public final JobStore.Mutable jobStore;
+  public final LockStore.Mutable lockStore;
+  public final SchedulerStore.Mutable schedulerStore;
+  public final NonVolatileStorage storage;
+
+  /**
+   * Creates a new storage test utility.
+   *
+   * @param easyMock Mocking framework to use for setting up mocks and expectations.
+   */
+  public StorageTestUtil(EasyMockTest easyMock) {
+    this.storeProvider = easyMock.createMock(StoreProvider.class);
+    this.mutableStoreProvider = easyMock.createMock(MutableStoreProvider.class);
+    this.taskStore = easyMock.createMock(TaskStore.Mutable.class);
+    this.quotaStore = easyMock.createMock(QuotaStore.Mutable.class);
+    this.attributeStore = easyMock.createMock(AttributeStore.Mutable.class);
+    this.jobStore = easyMock.createMock(JobStore.Mutable.class);
+    this.lockStore = easyMock.createMock(LockStore.Mutable.class);
+    this.schedulerStore = easyMock.createMock(SchedulerStore.Mutable.class);
+    this.storage = easyMock.createMock(NonVolatileStorage.class);
+  }
+
+  private <T> IExpectationSetters<T> expectConsistentRead() {
+    final Capture<Work<T, RuntimeException>> work = EasyMockTest.createCapture();
+    return expect(storage.consistentRead(capture(work))).andAnswer(new IAnswer<T>() {
+      @Override public T answer() {
+        return work.getValue().apply(storeProvider);
+      }
+    });
+  }
+
+  private <T> IExpectationSetters<T> expectWeaklyConsistentRead() {
+    final Capture<Work<T, RuntimeException>> work = EasyMockTest.createCapture();
+    return expect(storage.weaklyConsistentRead(capture(work))).andAnswer(new IAnswer<T>() {
+      @Override public T answer() {
+        return work.getValue().apply(storeProvider);
+      }
+    });
+  }
+
+  private <T> IExpectationSetters<T> expectWriteOperation() {
+    final Capture<MutateWork<T, RuntimeException>> work = EasyMockTest.createCapture();
+    return expect(storage.write(capture(work))).andAnswer(new IAnswer<T>() {
+      @Override public T answer() {
+        return work.getValue().apply(mutableStoreProvider);
+      }
+    });
+  }
+
+  /**
+   * Expects any number of read or write operations.
+   */
+  public void expectOperations() {
+    expect(storeProvider.getTaskStore()).andReturn(taskStore).anyTimes();
+    expect(storeProvider.getQuotaStore()).andReturn(quotaStore).anyTimes();
+    expect(storeProvider.getAttributeStore()).andReturn(attributeStore).anyTimes();
+    expect(storeProvider.getJobStore()).andReturn(jobStore).anyTimes();
+    expect(storeProvider.getLockStore()).andReturn(lockStore).anyTimes();
+    expect(storeProvider.getSchedulerStore()).andReturn(schedulerStore).anyTimes();
+    expect(mutableStoreProvider.getTaskStore()).andReturn(taskStore).anyTimes();
+    expect(mutableStoreProvider.getUnsafeTaskStore()).andReturn(taskStore).anyTimes();
+    expect(mutableStoreProvider.getQuotaStore()).andReturn(quotaStore).anyTimes();
+    expect(mutableStoreProvider.getAttributeStore()).andReturn(attributeStore).anyTimes();
+    expect(mutableStoreProvider.getJobStore()).andReturn(jobStore).anyTimes();
+    expect(mutableStoreProvider.getLockStore()).andReturn(lockStore).anyTimes();
+    expect(mutableStoreProvider.getSchedulerStore()).andReturn(schedulerStore).anyTimes();
+    expectConsistentRead().anyTimes();
+    expectWeaklyConsistentRead().anyTimes();
+    expectWriteOperation().anyTimes();
+  }
+
+  public IExpectationSetters<?> expectTaskFetch(
+      Query.Builder query,
+      ImmutableSet<IScheduledTask> result) {
+
+    return expect(taskStore.fetchTasks(query)).andReturn(result);
+  }
+
+  public IExpectationSetters<?> expectTaskFetch(Query.Builder query, IScheduledTask... result) {
+    return expectTaskFetch(query, ImmutableSet.<IScheduledTask>builder().add(result).build());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerAPIServlet.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerAPIServlet.java b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerAPIServlet.java
new file mode 100644
index 0000000..2acf5c8
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerAPIServlet.java
@@ -0,0 +1,19 @@
+package com.twitter.aurora.scheduler.thrift;
+
+import javax.inject.Inject;
+
+import org.apache.thrift.protocol.TJSONProtocol;
+import org.apache.thrift.server.TServlet;
+
+import com.twitter.aurora.gen.AuroraAdmin;
+
+/**
+ * A servlet that exposes the scheduler Thrift API over HTTP/JSON.
+ */
+class SchedulerAPIServlet extends TServlet {
+
+  @Inject
+  SchedulerAPIServlet(AuroraAdmin.Iface schedulerThriftInterface) {
+    super(new AuroraAdmin.Processor<>(schedulerThriftInterface), new TJSONProtocol.Factory());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
new file mode 100644
index 0000000..3ff0f1c
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
@@ -0,0 +1,1000 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.thrift;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.annotation.Nullable;
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.base.Throwables;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import com.google.common.collect.Sets;
+
+import org.apache.commons.lang.StringUtils;
+
+import com.twitter.aurora.auth.CapabilityValidator;
+import com.twitter.aurora.auth.CapabilityValidator.AuditCheck;
+import com.twitter.aurora.auth.CapabilityValidator.Capability;
+import com.twitter.aurora.auth.SessionValidator.AuthFailedException;
+import com.twitter.aurora.gen.AcquireLockResult;
+import com.twitter.aurora.gen.AddInstancesConfig;
+import com.twitter.aurora.gen.AuroraAdmin;
+import com.twitter.aurora.gen.ConfigRewrite;
+import com.twitter.aurora.gen.DrainHostsResult;
+import com.twitter.aurora.gen.EndMaintenanceResult;
+import com.twitter.aurora.gen.GetJobsResult;
+import com.twitter.aurora.gen.GetQuotaResult;
+import com.twitter.aurora.gen.Hosts;
+import com.twitter.aurora.gen.InstanceConfigRewrite;
+import com.twitter.aurora.gen.InstanceKey;
+import com.twitter.aurora.gen.JobConfigRewrite;
+import com.twitter.aurora.gen.JobConfigValidation;
+import com.twitter.aurora.gen.JobConfiguration;
+import com.twitter.aurora.gen.JobKey;
+import com.twitter.aurora.gen.JobSummary;
+import com.twitter.aurora.gen.JobSummaryResult;
+import com.twitter.aurora.gen.ListBackupsResult;
+import com.twitter.aurora.gen.Lock;
+import com.twitter.aurora.gen.LockKey;
+import com.twitter.aurora.gen.LockValidation;
+import com.twitter.aurora.gen.MaintenanceStatusResult;
+import com.twitter.aurora.gen.PopulateJobResult;
+import com.twitter.aurora.gen.QueryRecoveryResult;
+import com.twitter.aurora.gen.Quota;
+import com.twitter.aurora.gen.Response;
+import com.twitter.aurora.gen.ResponseCode;
+import com.twitter.aurora.gen.Result;
+import com.twitter.aurora.gen.RewriteConfigsRequest;
+import com.twitter.aurora.gen.ScheduleStatus;
+import com.twitter.aurora.gen.ScheduleStatusResult;
+import com.twitter.aurora.gen.SessionKey;
+import com.twitter.aurora.gen.StartMaintenanceResult;
+import com.twitter.aurora.gen.TaskConfig;
+import com.twitter.aurora.gen.TaskQuery;
+import com.twitter.aurora.scheduler.base.JobKeys;
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.base.ScheduleException;
+import com.twitter.aurora.scheduler.base.Tasks;
+import com.twitter.aurora.scheduler.configuration.ConfigurationManager;
+import com.twitter.aurora.scheduler.configuration.ConfigurationManager.TaskDescriptionException;
+import com.twitter.aurora.scheduler.configuration.SanitizedConfiguration;
+import com.twitter.aurora.scheduler.quota.Quotas;
+import com.twitter.aurora.scheduler.state.CronJobManager;
+import com.twitter.aurora.scheduler.state.LockManager;
+import com.twitter.aurora.scheduler.state.LockManager.LockException;
+import com.twitter.aurora.scheduler.state.MaintenanceController;
+import com.twitter.aurora.scheduler.state.SchedulerCore;
+import com.twitter.aurora.scheduler.storage.JobStore;
+import com.twitter.aurora.scheduler.storage.Storage;
+import com.twitter.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import com.twitter.aurora.scheduler.storage.Storage.MutateWork;
+import com.twitter.aurora.scheduler.storage.Storage.StoreProvider;
+import com.twitter.aurora.scheduler.storage.Storage.Work;
+import com.twitter.aurora.scheduler.storage.backup.Recovery;
+import com.twitter.aurora.scheduler.storage.backup.Recovery.RecoveryException;
+import com.twitter.aurora.scheduler.storage.backup.StorageBackup;
+import com.twitter.aurora.scheduler.storage.entities.IAssignedTask;
+import com.twitter.aurora.scheduler.storage.entities.IJobConfiguration;
+import com.twitter.aurora.scheduler.storage.entities.IJobKey;
+import com.twitter.aurora.scheduler.storage.entities.ILock;
+import com.twitter.aurora.scheduler.storage.entities.ILockKey;
+import com.twitter.aurora.scheduler.storage.entities.IQuota;
+import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
+import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
+import com.twitter.aurora.scheduler.thrift.auth.DecoratedThrift;
+import com.twitter.aurora.scheduler.thrift.auth.Requires;
+import com.twitter.common.args.Arg;
+import com.twitter.common.args.CmdLine;
+import com.twitter.common.base.MorePreconditions;
+import com.twitter.common.base.Supplier;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.util.BackoffHelper;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import static com.twitter.aurora.auth.SessionValidator.SessionContext;
+import static com.twitter.aurora.gen.ResponseCode.AUTH_FAILED;
+import static com.twitter.aurora.gen.ResponseCode.ERROR;
+import static com.twitter.aurora.gen.ResponseCode.INVALID_REQUEST;
+import static com.twitter.aurora.gen.ResponseCode.LOCK_ERROR;
+import static com.twitter.aurora.gen.ResponseCode.OK;
+import static com.twitter.aurora.gen.apiConstants.CURRENT_API_VERSION;
+import static com.twitter.common.base.MorePreconditions.checkNotBlank;
+
+/**
+ * Aurora scheduler thrift server implementation.
+ * <p>
+ * Interfaces between users and the scheduler to access/modify jobs and perform cluster
+ * administration tasks.
+ */
+@DecoratedThrift
+class SchedulerThriftInterface implements AuroraAdmin.Iface {
+  private static final Logger LOG = Logger.getLogger(SchedulerThriftInterface.class.getName());
+
+  @CmdLine(name = "kill_task_initial_backoff",
+      help = "Initial backoff delay while waiting for the tasks to transition to KILLED.")
+  private static final Arg<Amount<Long, Time>> KILL_TASK_INITIAL_BACKOFF =
+      Arg.create(Amount.of(1L, Time.SECONDS));
+
+  @CmdLine(name = "kill_task_max_backoff",
+      help = "Max backoff delay while waiting for the tasks to transition to KILLED.")
+  private static final Arg<Amount<Long, Time>> KILL_TASK_MAX_BACKOFF =
+      Arg.create(Amount.of(30L, Time.SECONDS));
+
+  private static final Function<IScheduledTask, String> GET_ROLE = Functions.compose(
+      new Function<ITaskConfig, String>() {
+        @Override public String apply(ITaskConfig task) {
+          return task.getOwner().getRole();
+        }
+      },
+      Tasks.SCHEDULED_TO_INFO);
+
+  private final Storage storage;
+  private final SchedulerCore schedulerCore;
+  private final LockManager lockManager;
+  private final CapabilityValidator sessionValidator;
+  private final StorageBackup backup;
+  private final Recovery recovery;
+  private final MaintenanceController maintenance;
+  private final CronJobManager cronJobManager;
+  private final Amount<Long, Time> killTaskInitialBackoff;
+  private final Amount<Long, Time> killTaskMaxBackoff;
+
+  @Inject
+  SchedulerThriftInterface(
+      Storage storage,
+      SchedulerCore schedulerCore,
+      LockManager lockManager,
+      CapabilityValidator sessionValidator,
+      StorageBackup backup,
+      Recovery recovery,
+      CronJobManager cronJobManager,
+      MaintenanceController maintenance) {
+
+    this(storage,
+        schedulerCore,
+        lockManager,
+        sessionValidator,
+        backup,
+        recovery,
+        maintenance,
+        cronJobManager,
+        KILL_TASK_INITIAL_BACKOFF.get(),
+        KILL_TASK_MAX_BACKOFF.get());
+  }
+
+  @VisibleForTesting
+  SchedulerThriftInterface(
+      Storage storage,
+      SchedulerCore schedulerCore,
+      LockManager lockManager,
+      CapabilityValidator sessionValidator,
+      StorageBackup backup,
+      Recovery recovery,
+      MaintenanceController maintenance,
+      CronJobManager cronJobManager,
+      Amount<Long, Time> initialBackoff,
+      Amount<Long, Time> maxBackoff) {
+
+    this.storage = checkNotNull(storage);
+    this.schedulerCore = checkNotNull(schedulerCore);
+    this.lockManager = checkNotNull(lockManager);
+    this.sessionValidator = checkNotNull(sessionValidator);
+    this.backup = checkNotNull(backup);
+    this.recovery = checkNotNull(recovery);
+    this.maintenance = checkNotNull(maintenance);
+    this.cronJobManager = checkNotNull(cronJobManager);
+    this.killTaskInitialBackoff = checkNotNull(initialBackoff);
+    this.killTaskMaxBackoff = checkNotNull(maxBackoff);
+  }
+
+  @Override
+  public Response createJob(
+      JobConfiguration mutableJob,
+      @Nullable Lock mutableLock,
+      SessionKey session) {
+
+    IJobConfiguration job = IJobConfiguration.build(mutableJob);
+    IJobKey jobKey = JobKeys.assertValid(job.getKey());
+    checkNotNull(session);
+
+    Response response = new Response();
+
+    try {
+      sessionValidator.checkAuthenticated(session, ImmutableSet.of(job.getOwner().getRole()));
+    } catch (AuthFailedException e) {
+      return response.setResponseCode(AUTH_FAILED).setMessage(e.getMessage());
+    }
+
+    try {
+      SanitizedConfiguration sanitized = SanitizedConfiguration.fromUnsanitized(job);
+
+      lockManager.validateIfLocked(
+          ILockKey.build(LockKey.job(jobKey.newBuilder())),
+          Optional.fromNullable(mutableLock).transform(ILock.FROM_BUILDER));
+
+      schedulerCore.createJob(sanitized);
+      response.setResponseCode(OK)
+          .setMessage(String.format("%d new tasks pending for job %s",
+              sanitized.getJobConfig().getInstanceCount(), JobKeys.toPath(job)));
+    } catch (LockException e) {
+      response.setResponseCode(LOCK_ERROR).setMessage(e.getMessage());
+    } catch (TaskDescriptionException | ScheduleException e) {
+      response.setResponseCode(INVALID_REQUEST).setMessage(e.getMessage());
+    }
+
+    return response;
+  }
+
+  @Override
+  public Response replaceCronTemplate(
+      JobConfiguration mutableConfig,
+      @Nullable Lock mutableLock,
+      SessionKey session) {
+
+    checkNotNull(mutableConfig);
+    IJobConfiguration job = IJobConfiguration.build(mutableConfig);
+    IJobKey jobKey = JobKeys.assertValid(job.getKey());
+    checkNotNull(session);
+
+    Response response = new Response();
+    try {
+      sessionValidator.checkAuthenticated(session, ImmutableSet.of(job.getOwner().getRole()));
+    } catch (AuthFailedException e) {
+      return response.setResponseCode(AUTH_FAILED).setMessage(e.getMessage());
+    }
+
+    try {
+      lockManager.validateIfLocked(
+          ILockKey.build(LockKey.job(jobKey.newBuilder())),
+          Optional.fromNullable(mutableLock).transform(ILock.FROM_BUILDER));
+
+      SanitizedConfiguration sanitized = SanitizedConfiguration.fromUnsanitized(job);
+
+      if (!cronJobManager.hasJob(jobKey)) {
+        return response.setResponseCode(INVALID_REQUEST).setMessage(
+            "No cron template found for the given key: " + jobKey);
+      }
+      cronJobManager.updateJob(sanitized);
+      return response.setResponseCode(OK).setMessage("Replaced template for: " + jobKey);
+
+    } catch (LockException e) {
+      return response.setResponseCode(LOCK_ERROR).setMessage(e.getMessage());
+    } catch (TaskDescriptionException | ScheduleException e) {
+      return response.setResponseCode(INVALID_REQUEST).setMessage(e.getMessage());
+    }
+  }
+
+  @Override
+  public Response populateJobConfig(JobConfiguration description, JobConfigValidation validation) {
+
+    checkNotNull(description);
+
+    Response response = new Response();
+    try {
+      SanitizedConfiguration sanitized =
+          SanitizedConfiguration.fromUnsanitized(IJobConfiguration.build(description));
+
+      // TODO(maximk): Consider moving job validation logic into a dedicated RPC. MESOS-4476.
+      if (validation != null && validation == JobConfigValidation.RUN_FILTERS) {
+        schedulerCore.validateJobResources(sanitized);
+      }
+
+      PopulateJobResult result = new PopulateJobResult()
+          .setPopulated(ITaskConfig.toBuildersSet(sanitized.getTaskConfigs().values()));
+      response.setResult(Result.populateJobResult(result))
+          .setResponseCode(OK)
+          .setMessage("Tasks populated");
+    } catch (TaskDescriptionException | ScheduleException e) {
+      response.setResponseCode(INVALID_REQUEST)
+          .setMessage("Invalid configuration: " + e.getMessage());
+    }
+    return response;
+  }
+
+  @Override
+  public Response startCronJob(JobKey mutableJobKey, SessionKey session) {
+    checkNotNull(session);
+    IJobKey jobKey = JobKeys.assertValid(IJobKey.build(mutableJobKey));
+
+    Response response = new Response();
+    try {
+      sessionValidator.checkAuthenticated(session, ImmutableSet.of(jobKey.getRole()));
+    } catch (AuthFailedException e) {
+      response.setResponseCode(AUTH_FAILED).setMessage(e.getMessage());
+      return response;
+    }
+
+    try {
+      schedulerCore.startCronJob(jobKey);
+      response.setResponseCode(OK).setMessage("Cron run started.");
+    } catch (ScheduleException e) {
+      response.setResponseCode(INVALID_REQUEST)
+          .setMessage("Failed to start cron job - " + e.getMessage());
+    } catch (TaskDescriptionException e) {
+      response.setResponseCode(ERROR).setMessage("Invalid task description: " + e.getMessage());
+    }
+
+    return response;
+  }
+
+  // TODO(William Farner): Provide status information about cron jobs here.
+  @Override
+  public Response getTasksStatus(TaskQuery query) {
+    checkNotNull(query);
+
+    Set<IScheduledTask> tasks =
+        Storage.Util.weaklyConsistentFetchTasks(storage, Query.arbitrary(query));
+
+    Response response = new Response();
+
+    if (tasks.isEmpty()) {
+      response.setResponseCode(INVALID_REQUEST)
+          .setMessage("No tasks found for query: " + query);
+    } else {
+      response.setResponseCode(OK)
+          .setResult(Result.scheduleStatusResult(
+              new ScheduleStatusResult().setTasks(IScheduledTask.toBuildersList(tasks))));
+    }
+
+    return response;
+  }
+
+  @Override
+  public Response getJobSummary() {
+    Set<IScheduledTask> tasks = Storage.Util.weaklyConsistentFetchTasks(storage, Query.unscoped());
+    Multimap<String, IJobKey> jobsByRole = Multimaps.index(
+        FluentIterable.from(tasks).transform(Tasks.SCHEDULED_TO_JOB_KEY),
+        JobKeys.TO_ROLE);
+
+    Multimap<String, IJobKey> cronJobsByRole = Multimaps.index(
+        FluentIterable.from(cronJobManager.getJobs()).transform(JobKeys.FROM_CONFIG),
+        JobKeys.TO_ROLE);
+
+    List<JobSummary> jobSummaries = Lists.newLinkedList();
+    for (String role : Sets.union(jobsByRole.keySet(), cronJobsByRole.keySet())) {
+      JobSummary summary = new JobSummary();
+      summary.setRole(role);
+      summary.setJobCount(jobsByRole.get(role).size());
+      summary.setCronJobCount(cronJobsByRole.get(role).size());
+      jobSummaries.add(summary);
+    }
+
+    return new Response()
+        .setResponseCode(OK)
+        .setResult(Result.jobSummaryResult(new JobSummaryResult(jobSummaries)));
+  }
+
+  @Override
+  public Response getJobs(@Nullable String maybeNullRole) {
+    Optional<String> ownerRole = Optional.fromNullable(maybeNullRole);
+
+
+    // Ensure we only return one JobConfiguration for each JobKey.
+    Map<IJobKey, IJobConfiguration> jobs = Maps.newHashMap();
+
+    // Query the task store, find immediate jobs, and synthesize a JobConfiguration for them.
+    // This is necessary because the ImmediateJobManager doesn't store jobs directly and
+    // ImmediateJobManager#getJobs always returns an empty Collection.
+    Query.Builder scope = ownerRole.isPresent()
+        ? Query.roleScoped(ownerRole.get())
+        : Query.unscoped();
+    Multimap<IJobKey, IScheduledTask> tasks =
+        Tasks.byJobKey(Storage.Util.weaklyConsistentFetchTasks(storage, scope.active()));
+
+    jobs.putAll(Maps.transformEntries(tasks.asMap(),
+        new Maps.EntryTransformer<IJobKey, Collection<IScheduledTask>, IJobConfiguration>() {
+          @Override
+          public IJobConfiguration transformEntry(
+              IJobKey jobKey,
+              Collection<IScheduledTask> tasks) {
+
+            // Pick an arbitrary task for each immediate job. The chosen task might not be the most
+            // recent if the job is in the middle of an update or some shards have been selectively
+            // created.
+            TaskConfig firstTask = tasks.iterator().next().getAssignedTask().getTask().newBuilder();
+            return IJobConfiguration.build(new JobConfiguration()
+                .setKey(jobKey.newBuilder())
+                .setOwner(firstTask.getOwner())
+                .setTaskConfig(firstTask)
+                .setInstanceCount(tasks.size()));
+          }
+        }));
+
+    // Get cron jobs directly from the manager. Do this after querying the task store so the real
+    // template JobConfiguration for a cron job will overwrite the synthesized one that could have
+    // been created above.
+    Predicate<IJobConfiguration> configFilter = ownerRole.isPresent()
+        ? Predicates.compose(Predicates.equalTo(ownerRole.get()), JobKeys.CONFIG_TO_ROLE)
+        : Predicates.<IJobConfiguration>alwaysTrue();
+    jobs.putAll(Maps.uniqueIndex(
+        FluentIterable.from(cronJobManager.getJobs()).filter(configFilter),
+        JobKeys.FROM_CONFIG));
+
+    return new Response()
+        .setResponseCode(OK)
+        .setResult(Result.getJobsResult(new GetJobsResult()
+            .setConfigs(IJobConfiguration.toBuildersSet(jobs.values()))));
+  }
+
+  private void validateLockForTasks(Optional<ILock> lock, Iterable<IScheduledTask> tasks)
+      throws LockException {
+
+    ImmutableSet<IJobKey> uniqueKeys = FluentIterable.from(tasks)
+        .transform(Tasks.SCHEDULED_TO_JOB_KEY)
+        .toSet();
+
+    // Validate lock against every unique job key derived from the tasks.
+    for (IJobKey key : uniqueKeys) {
+      lockManager.validateIfLocked(ILockKey.build(LockKey.job(key.newBuilder())), lock);
+    }
+  }
+
+  private SessionContext validateSessionKeyForTasks(
+      SessionKey session,
+      TaskQuery taskQuery,
+      Iterable<IScheduledTask> tasks) throws AuthFailedException {
+
+    // Authenticate the session against any affected roles, always including the role for a
+    // role-scoped query.  This papers over the implementation detail that dormant cron jobs are
+    // authenticated this way.
+    ImmutableSet.Builder<String> targetRoles = ImmutableSet.<String>builder()
+        .addAll(FluentIterable.from(tasks).transform(GET_ROLE));
+    if (taskQuery.isSetOwner()) {
+      targetRoles.add(taskQuery.getOwner().getRole());
+    }
+    return sessionValidator.checkAuthenticated(session, targetRoles.build());
+  }
+
+  private Optional<SessionContext> isAdmin(SessionKey session) {
+    try {
+      return Optional.of(
+          sessionValidator.checkAuthorized(session, Capability.ROOT, AuditCheck.REQUIRED));
+    } catch (AuthFailedException e) {
+      return Optional.absent();
+    }
+  }
+
+  @Override
+  public Response killTasks(final TaskQuery query, Lock mutablelock, SessionKey session) {
+    checkNotNull(query);
+    checkNotNull(session);
+
+    Response response = new Response();
+
+    if (query.getJobName() != null && StringUtils.isBlank(query.getJobName())) {
+      response.setResponseCode(INVALID_REQUEST).setMessage(
+          String.format("Invalid job name: '%s'", query.getJobName()));
+      return response;
+    }
+
+    Set<IScheduledTask> tasks = Storage.Util.consistentFetchTasks(storage, Query.arbitrary(query));
+
+    Optional<SessionContext> context = isAdmin(session);
+    if (context.isPresent()) {
+      LOG.info("Granting kill query to admin user: " + query);
+    } else {
+      try {
+        context = Optional.of(validateSessionKeyForTasks(session, query, tasks));
+      } catch (AuthFailedException e) {
+        response.setResponseCode(AUTH_FAILED).setMessage(e.getMessage());
+        return response;
+      }
+    }
+
+    try {
+      validateLockForTasks(Optional.fromNullable(mutablelock).transform(ILock.FROM_BUILDER), tasks);
+      schedulerCore.killTasks(Query.arbitrary(query), context.get().getIdentity());
+    } catch (LockException e) {
+      return response.setResponseCode(LOCK_ERROR).setMessage(e.getMessage());
+    } catch (ScheduleException e) {
+      return response.setResponseCode(INVALID_REQUEST).setMessage(e.getMessage());
+    }
+
+    // TODO(William Farner): Move this into the client.
+    BackoffHelper backoff = new BackoffHelper(killTaskInitialBackoff, killTaskMaxBackoff, true);
+    final Query.Builder activeQuery = Query.arbitrary(query.setStatuses(Tasks.ACTIVE_STATES));
+    try {
+      backoff.doUntilSuccess(new Supplier<Boolean>() {
+        @Override public Boolean get() {
+          Set<IScheduledTask> tasks = Storage.Util.consistentFetchTasks(storage, activeQuery);
+          if (tasks.isEmpty()) {
+            LOG.info("Tasks all killed, done waiting.");
+            return true;
+          } else {
+            LOG.info("Jobs not yet killed, waiting...");
+            return false;
+          }
+        }
+      });
+      response.setResponseCode(OK).setMessage("Tasks killed.");
+    } catch (InterruptedException e) {
+      LOG.warning("Interrupted while trying to kill tasks: " + e);
+      Thread.currentThread().interrupt();
+      response.setResponseCode(ERROR).setMessage("killTasks thread was interrupted.");
+    } catch (BackoffHelper.BackoffStoppedException e) {
+      response.setResponseCode(ERROR).setMessage("Tasks were not killed in time.");
+    }
+    return response;
+  }
+
+  @Override
+  public Response restartShards(
+      JobKey mutableJobKey,
+      Set<Integer> shardIds,
+      Lock mutableLock,
+      SessionKey session) {
+
+    IJobKey jobKey = JobKeys.assertValid(IJobKey.build(mutableJobKey));
+    MorePreconditions.checkNotBlank(shardIds);
+    checkNotNull(session);
+
+    Response response = new Response();
+    SessionContext context;
+    try {
+      context = sessionValidator.checkAuthenticated(session, ImmutableSet.of(jobKey.getRole()));
+    } catch (AuthFailedException e) {
+      response.setResponseCode(AUTH_FAILED).setMessage(e.getMessage());
+      return response;
+    }
+
+    try {
+      lockManager.validateIfLocked(
+          ILockKey.build(LockKey.job(jobKey.newBuilder())),
+          Optional.fromNullable(mutableLock).transform(ILock.FROM_BUILDER));
+      schedulerCore.restartShards(jobKey, shardIds, context.getIdentity());
+      response.setResponseCode(OK).setMessage("Shards are restarting.");
+    } catch (LockException e) {
+      response.setResponseCode(ResponseCode.LOCK_ERROR).setMessage(e.getMessage());
+    } catch (ScheduleException e) {
+      response.setResponseCode(ResponseCode.INVALID_REQUEST).setMessage(e.getMessage());
+    }
+
+    return response;
+  }
+
+  @Override
+  public Response getQuota(final String ownerRole) {
+    checkNotBlank(ownerRole);
+
+    IQuota quota = storage.consistentRead(new Work.Quiet<IQuota>() {
+      @Override public IQuota apply(StoreProvider storeProvider) {
+        return storeProvider.getQuotaStore().fetchQuota(ownerRole).or(Quotas.noQuota());
+      }
+    });
+
+    return new Response()
+        .setResponseCode(OK)
+        .setResult(Result.getQuotaResult(new GetQuotaResult()
+            .setQuota(quota.newBuilder())));
+  }
+
+  @Override
+  public Response startMaintenance(Hosts hosts, SessionKey session) {
+      return new Response()
+          .setResponseCode(OK)
+          .setResult(Result.startMaintenanceResult(new StartMaintenanceResult()
+              .setStatuses(maintenance.startMaintenance(hosts.getHostNames()))));
+  }
+
+  @Override
+  public Response drainHosts(Hosts hosts, SessionKey session) {
+    return new Response()
+        .setResponseCode(OK)
+        .setResult(Result.drainHostsResult(new DrainHostsResult()
+            .setStatuses(maintenance.drain(hosts.getHostNames()))));
+  }
+
+  @Override
+  public Response maintenanceStatus(Hosts hosts, SessionKey session) {
+    return new Response()
+        .setResponseCode(OK)
+        .setResult(Result.maintenanceStatusResult(new MaintenanceStatusResult()
+            .setStatuses(maintenance.getStatus(hosts.getHostNames()))));
+  }
+
+  @Override
+  public Response endMaintenance(Hosts hosts, SessionKey session) {
+      return new Response()
+          .setResponseCode(OK)
+          .setResult(Result.endMaintenanceResult(new EndMaintenanceResult()
+              .setStatuses(maintenance.endMaintenance(hosts.getHostNames()))));
+  }
+
+  @Requires(whitelist = Capability.PROVISIONER)
+  @Override
+  public Response setQuota(final String ownerRole, final Quota quota, SessionKey session) {
+    checkNotBlank(ownerRole);
+    checkNotNull(quota);
+    checkNotNull(session);
+
+    // TODO(Kevin Sweeney): Input validation for Quota.
+
+    storage.write(new MutateWork.NoResult.Quiet() {
+      @Override protected void execute(MutableStoreProvider storeProvider) {
+        storeProvider.getQuotaStore().saveQuota(ownerRole, IQuota.build(quota));
+      }
+    });
+
+    return new Response().setResponseCode(OK).setMessage("Quota applied.");
+  }
+
+  @Override
+  public Response forceTaskState(
+      String taskId,
+      ScheduleStatus status,
+      SessionKey session) {
+
+    checkNotBlank(taskId);
+    checkNotNull(status);
+    checkNotNull(session);
+
+    Response response = new Response();
+    SessionContext context;
+    try {
+      // TODO(Sathya): Remove this after AOP-style session validation passes in a SessionContext.
+      context = sessionValidator.checkAuthorized(session, Capability.ROOT, AuditCheck.REQUIRED);
+    } catch (AuthFailedException e) {
+      response.setResponseCode(AUTH_FAILED).setMessage(e.getMessage());
+      return response;
+    }
+
+    schedulerCore.setTaskStatus(
+        Query.taskScoped(taskId), status, transitionMessage(context.getIdentity()));
+    return new Response().setResponseCode(OK).setMessage("Transition attempted.");
+  }
+
+  @Override
+  public Response performBackup(SessionKey session) {
+    backup.backupNow();
+    return new Response().setResponseCode(OK);
+  }
+
+  @Override
+  public Response listBackups(SessionKey session) {
+    return new Response()
+        .setResponseCode(OK)
+        .setResult(Result.listBackupsResult(new ListBackupsResult()
+            .setBackups(recovery.listBackups())));
+  }
+
+  @Override
+  public Response stageRecovery(String backupId, SessionKey session) {
+    Response response = new Response().setResponseCode(OK);
+    try {
+      recovery.stage(backupId);
+    } catch (RecoveryException e) {
+      response.setResponseCode(ERROR).setMessage(e.getMessage());
+      LOG.log(Level.WARNING, "Failed to stage recovery: " + e, e);
+    }
+
+    return response;
+  }
+
+  @Override
+  public Response queryRecovery(TaskQuery query, SessionKey session) {
+    Response response = new Response();
+    try {
+      response.setResponseCode(OK)
+          .setResult(Result.queryRecoveryResult(new QueryRecoveryResult()
+              .setTasks(IScheduledTask.toBuildersSet(recovery.query(Query.arbitrary(query))))));
+    } catch (RecoveryException e) {
+      response.setResponseCode(ERROR).setMessage(e.getMessage());
+      LOG.log(Level.WARNING, "Failed to query recovery: " + e, e);
+    }
+
+    return response;
+  }
+
+  @Override
+  public Response deleteRecoveryTasks(TaskQuery query, SessionKey session) {
+    Response response = new Response().setResponseCode(OK);
+    try {
+      recovery.deleteTasks(Query.arbitrary(query));
+    } catch (RecoveryException e) {
+      response.setResponseCode(ERROR).setMessage(e.getMessage());
+      LOG.log(Level.WARNING, "Failed to delete recovery tasks: " + e, e);
+    }
+
+    return response;
+  }
+
+  @Override
+  public Response commitRecovery(SessionKey session) {
+    Response response = new Response().setResponseCode(OK);
+    try {
+      recovery.commit();
+    } catch (RecoveryException e) {
+      response.setResponseCode(ERROR).setMessage(e.getMessage());
+    }
+
+    return response;
+  }
+
+  @Override
+  public Response unloadRecovery(SessionKey session) {
+    recovery.unload();
+    return new Response().setResponseCode(OK);
+  }
+
+  @Override
+  public Response snapshot(SessionKey session) {
+    Response response = new Response();
+    try {
+      storage.snapshot();
+      return response.setResponseCode(OK).setMessage("Compaction successful.");
+    } catch (Storage.StorageException e) {
+      LOG.log(Level.WARNING, "Requested snapshot failed.", e);
+      return response.setResponseCode(ERROR).setMessage(e.getMessage());
+    }
+  }
+
+  private static Multimap<String, IJobConfiguration> jobsByKey(JobStore jobStore, IJobKey jobKey) {
+    ImmutableMultimap.Builder<String, IJobConfiguration> matches = ImmutableMultimap.builder();
+    for (String managerId : jobStore.fetchManagerIds()) {
+      for (IJobConfiguration job : jobStore.fetchJobs(managerId)) {
+        if (job.getKey().equals(jobKey)) {
+          matches.put(managerId, job);
+        }
+      }
+    }
+    return matches.build();
+  }
+
+  @Override
+  public Response rewriteConfigs(
+      final RewriteConfigsRequest request,
+      SessionKey session) {
+
+    if (request.getRewriteCommandsSize() == 0) {
+      return new Response()
+        .setResponseCode(ResponseCode.ERROR)
+        .setMessage("No rewrite commands provided.");
+    }
+
+    return storage.write(new MutateWork.Quiet<Response>() {
+      @Override public Response apply(MutableStoreProvider storeProvider) {
+        List<String> errors = Lists.newArrayList();
+
+        for (ConfigRewrite command : request.getRewriteCommands()) {
+          Optional<String> error = rewriteConfig(command, storeProvider);
+          if (error.isPresent()) {
+            errors.add(error.get());
+          }
+        }
+
+        Response resp = new Response();
+        if (!errors.isEmpty()) {
+          resp.setResponseCode(ResponseCode.WARNING).setMessage(Joiner.on(", ").join(errors));
+        } else {
+          resp.setResponseCode(OK).setMessage("All rewrites completed successfully.");
+        }
+        return resp;
+      }
+    });
+  }
+
+  private Optional<String> rewriteConfig(
+      ConfigRewrite command,
+      MutableStoreProvider storeProvider) {
+
+    Optional<String> error = Optional.absent();
+    switch (command.getSetField()) {
+      case JOB_REWRITE:
+        JobConfigRewrite jobRewrite = command.getJobRewrite();
+        IJobConfiguration existingJob = IJobConfiguration.build(jobRewrite.getOldJob());
+        IJobConfiguration rewrittenJob;
+        try {
+          rewrittenJob = ConfigurationManager.validateAndPopulate(
+              IJobConfiguration.build(jobRewrite.getRewrittenJob()));
+        } catch (TaskDescriptionException e) {
+          // We could add an error here, but this is probably a hint of something wrong in
+          // the client that's causing a bad configuration to be applied.
+          throw Throwables.propagate(e);
+        }
+        if (!existingJob.getKey().equals(rewrittenJob.getKey())) {
+          error = Optional.of("Disallowing rewrite attempting to change job key.");
+        } else if (!existingJob.getOwner().equals(rewrittenJob.getOwner())) {
+          error = Optional.of("Disallowing rewrite attempting to change job owner.");
+        } else {
+          JobStore.Mutable jobStore = storeProvider.getJobStore();
+          Multimap<String, IJobConfiguration> matches =
+              jobsByKey(jobStore, existingJob.getKey());
+          switch (matches.size()) {
+            case 0:
+              error = Optional.of("No jobs found for key " + JobKeys.toPath(existingJob));
+              break;
+
+            case 1:
+              Map.Entry<String, IJobConfiguration> match =
+                  Iterables.getOnlyElement(matches.entries());
+              IJobConfiguration storedJob = match.getValue();
+              if (!storedJob.equals(existingJob)) {
+                error = Optional.of("CAS compare failed for " + JobKeys.toPath(storedJob));
+              } else {
+                jobStore.saveAcceptedJob(match.getKey(), rewrittenJob);
+              }
+              break;
+
+            default:
+              error = Optional.of("Multiple jobs found for key " + JobKeys.toPath(existingJob));
+          }
+        }
+        break;
+
+      case INSTANCE_REWRITE:
+        InstanceConfigRewrite instanceRewrite = command.getInstanceRewrite();
+        InstanceKey instanceKey = instanceRewrite.getInstanceKey();
+        Iterable<IScheduledTask> tasks = storeProvider.getTaskStore().fetchTasks(
+            Query.instanceScoped(IJobKey.build(instanceKey.getJobKey()),
+                instanceKey.getInstanceId())
+                .active());
+        Optional<IAssignedTask> task =
+            Optional.fromNullable(Iterables.getOnlyElement(tasks, null))
+                .transform(Tasks.SCHEDULED_TO_ASSIGNED);
+        if (!task.isPresent()) {
+          error = Optional.of("No active task found for " + instanceKey);
+        } else if (!task.get().getTask().newBuilder().equals(instanceRewrite.getOldTask())) {
+          error = Optional.of("CAS compare failed for " + instanceKey);
+        } else {
+          ITaskConfig newConfiguration = ITaskConfig.build(
+              ConfigurationManager.applyDefaultsIfUnset(instanceRewrite.getRewrittenTask()));
+          boolean changed = storeProvider.getUnsafeTaskStore().unsafeModifyInPlace(
+              task.get().getTaskId(), newConfiguration);
+          if (!changed) {
+            error = Optional.of("Did not change " + task.get().getTaskId());
+          }
+        }
+        break;
+
+      default:
+        throw new IllegalArgumentException("Unhandled command type " + command.getSetField());
+    }
+
+    return error;
+  }
+
+  @Override
+  public Response getVersion() {
+    return new Response()
+        .setResponseCode(OK)
+        .setResult(Result.getVersionResult(CURRENT_API_VERSION));
+  }
+
+  @Override
+  public Response addInstances(
+      AddInstancesConfig config,
+      @Nullable Lock mutableLock,
+      SessionKey session) {
+
+    checkNotNull(config);
+    checkNotNull(session);
+    checkNotBlank(config.getInstanceIds());
+    IJobKey jobKey = JobKeys.assertValid(IJobKey.build(config.getKey()));
+
+    Response resp = new Response();
+    try {
+      sessionValidator.checkAuthenticated(session, ImmutableSet.of(jobKey.getRole()));
+      ITaskConfig task = ConfigurationManager.validateAndPopulate(
+          ITaskConfig.build(config.getTaskConfig()));
+
+      if (cronJobManager.hasJob(jobKey)) {
+        return resp.setResponseCode(INVALID_REQUEST)
+            .setMessage("Cron jobs are not supported here.");
+      }
+
+      lockManager.validateIfLocked(
+          ILockKey.build(LockKey.job(jobKey.newBuilder())),
+          Optional.fromNullable(mutableLock).transform(ILock.FROM_BUILDER));
+
+      schedulerCore.addInstances(jobKey, ImmutableSet.copyOf(config.getInstanceIds()), task);
+      return resp.setResponseCode(OK).setMessage("Successfully added instances.");
+    } catch (AuthFailedException e) {
+      return resp.setResponseCode(AUTH_FAILED).setMessage(e.getMessage());
+    } catch (LockException e) {
+      return resp.setResponseCode(LOCK_ERROR).setMessage(e.getMessage());
+    } catch (TaskDescriptionException | ScheduleException e) {
+      return resp.setResponseCode(INVALID_REQUEST).setMessage(e.getMessage());
+    }
+  }
+
+  private String getRoleFromLockKey(ILockKey lockKey) {
+    switch (lockKey.getSetField()) {
+      case JOB:
+        JobKeys.assertValid(lockKey.getJob());
+        return lockKey.getJob().getRole();
+      default:
+        throw new IllegalArgumentException("Unhandled LockKey: " + lockKey.getSetField());
+    }
+  }
+
+  @Override
+  public Response acquireLock(LockKey mutableLockKey, SessionKey session) {
+    checkNotNull(mutableLockKey);
+    checkNotNull(session);
+
+    ILockKey lockKey = ILockKey.build(mutableLockKey);
+    Response response = new Response();
+
+    try {
+      SessionContext context = sessionValidator.checkAuthenticated(
+          session,
+          ImmutableSet.of(getRoleFromLockKey(lockKey)));
+
+      ILock lock = lockManager.acquireLock(lockKey, context.getIdentity());
+      response.setResult(Result.acquireLockResult(
+          new AcquireLockResult().setLock(lock.newBuilder())));
+
+      return response.setResponseCode(OK).setMessage("Lock has been acquired.");
+    } catch (AuthFailedException e) {
+      return response.setResponseCode(AUTH_FAILED).setMessage(e.getMessage());
+    } catch (LockException e) {
+      return response.setResponseCode(ResponseCode.LOCK_ERROR).setMessage(e.getMessage());
+    }
+  }
+
+  @Override
+  public Response releaseLock(Lock mutableLock, LockValidation validation, SessionKey session) {
+    checkNotNull(mutableLock);
+    checkNotNull(validation);
+    checkNotNull(session);
+
+    Response response = new Response();
+    ILock lock = ILock.build(mutableLock);
+
+    try {
+      sessionValidator.checkAuthenticated(
+          session,
+          ImmutableSet.of(getRoleFromLockKey(lock.getKey())));
+
+      if (validation == LockValidation.CHECKED) {
+        lockManager.validateIfLocked(lock.getKey(), Optional.of(lock));
+      }
+      lockManager.releaseLock(lock);
+      return response.setResponseCode(OK).setMessage("Lock has been released.");
+    } catch (AuthFailedException e) {
+      return response.setResponseCode(AUTH_FAILED).setMessage(e.getMessage());
+    } catch (LockException e) {
+      return response.setResponseCode(ResponseCode.LOCK_ERROR).setMessage(e.getMessage());
+    }
+  }
+
+  @VisibleForTesting
+  static Optional<String> transitionMessage(String user) {
+    return Optional.of("Transition forced by " + user);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/thrift/ThriftConfiguration.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/ThriftConfiguration.java b/src/main/java/org/apache/aurora/scheduler/thrift/ThriftConfiguration.java
new file mode 100644
index 0000000..c6e9b18
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/ThriftConfiguration.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.thrift;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import com.google.common.base.Optional;
+
+/**
+ * Container for thrift server configuration options.
+ */
+public interface ThriftConfiguration {
+  /**
+   * Gets a stream for the thrift socket SSL key if this server is configured to use SSL.
+   *
+   * @return A stream that contains the SSL key data if SSL is enabled, absent otherwise.
+   * @throws IOException If the stream could not be opened.
+   */
+  Optional<? extends InputStream> getSslKeyStream() throws IOException;
+
+  /**
+   * Gets the port that the thrift server should listen on.
+   *
+   * @return Thrift server port.
+   */
+  int getServingPort();
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/thrift/ThriftModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/ThriftModule.java b/src/main/java/org/apache/aurora/scheduler/thrift/ThriftModule.java
new file mode 100644
index 0000000..cca9053
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/ThriftModule.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.thrift;
+
+import javax.inject.Singleton;
+
+import com.google.inject.AbstractModule;
+
+import com.twitter.aurora.gen.AuroraAdmin;
+import com.twitter.aurora.scheduler.thrift.aop.AopModule;
+import com.twitter.common.application.http.Registration;
+import com.twitter.common.application.modules.LifecycleModule;
+
+/**
+ * Binding module to configure a thrift server.
+ */
+public class ThriftModule extends AbstractModule {
+
+  @Override
+  protected void configure() {
+    bind(AuroraAdmin.Iface.class).to(SchedulerThriftInterface.class);
+    bind(ThriftServer.class).in(Singleton.class);
+    LifecycleModule.bindServiceRunner(binder(), ThriftServerLauncher.class);
+
+    Registration.registerServlet(binder(), "/api", SchedulerAPIServlet.class, true);
+
+    install(new AopModule());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/thrift/ThriftServer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/ThriftServer.java b/src/main/java/org/apache/aurora/scheduler/thrift/ThriftServer.java
new file mode 100644
index 0000000..7b9abd1
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/ThriftServer.java
@@ -0,0 +1,107 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.thrift;
+
+import java.net.ServerSocket;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TServerSocket;
+
+import com.twitter.thrift.Status;
+
+class ThriftServer {
+  private static final Logger LOG = Logger.getLogger(ThriftServer.class.getName());
+
+  private TServer server = null;
+
+  // Current health status of the server.
+  private Status status = Status.STARTING;
+
+  /**
+   * Starts the server.
+   * This may be called at any point except when the server is already alive.  That is, it's
+   * allowable to start, stop, and re-start the server.
+   *
+   * @param socket The socket to use.
+   * @param processor The processor to handle requests.
+   */
+  public synchronized void start(ServerSocket socket, TProcessor processor) {
+    Preconditions.checkNotNull(socket);
+    Preconditions.checkNotNull(processor);
+    Preconditions.checkState(status != Status.ALIVE, "Server must only be started once.");
+    setStatus(Status.ALIVE);
+    TThreadPoolServer.Args args = new TThreadPoolServer.Args(new TServerSocket(socket))
+        .processor(processor)
+        .protocolFactory(new TBinaryProtocol.Factory(false, true));
+
+    final TServer starting = new TThreadPoolServer(args);
+    server = starting;
+    LOG.info("Starting thrift server on port " + socket.getLocalPort());
+
+    Thread listeningThread = new ThreadFactoryBuilder().setDaemon(false).build()
+        .newThread(new Runnable() {
+          @Override public void run() {
+            try {
+              starting.serve();
+            } catch (Throwable t) {
+              LOG.log(Level.WARNING,
+                  "Uncaught exception while attempting to handle service requests: " + t, t);
+              setStatus(Status.DEAD);
+            }
+          }
+    });
+
+    listeningThread.start();
+  }
+
+  private synchronized void setStatus(Status status) {
+    LOG.info("Moving from status " + this.status + " to " + status);
+    this.status = status;
+  }
+
+  /**
+   * Attempts to shut down the server.
+   * The server may be shut down at any time, though the request will be ignored if the server is
+   * already stopped.
+   */
+  public synchronized void shutdown() {
+    if (status == Status.STOPPED) {
+      LOG.info("Server already stopped, shutdown request ignored.");
+      return;
+    }
+
+    LOG.info("Received shutdown request, stopping server.");
+    setStatus(Status.STOPPING);
+
+    // TODO(William Farner): Figure out what happens to queued / in-process requests when the server
+    // is stopped.  Might want to allow a sleep period for the active requests to be completed.
+
+    if (server != null) {
+      server.stop();
+    }
+
+    server = null;
+    setStatus(Status.STOPPED);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/thrift/ThriftServerLauncher.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/ThriftServerLauncher.java b/src/main/java/org/apache/aurora/scheduler/thrift/ThriftServerLauncher.java
new file mode 100644
index 0000000..6743060
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/ThriftServerLauncher.java
@@ -0,0 +1,115 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.thrift;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.ServerSocket;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLServerSocket;
+import javax.net.ssl.SSLServerSocketFactory;
+
+import com.google.common.base.Optional;
+
+import com.twitter.aurora.gen.AuroraAdmin;
+import com.twitter.aurora.gen.AuroraAdmin.Iface;
+import com.twitter.common.application.modules.LifecycleModule.ServiceRunner;
+import com.twitter.common.application.modules.LocalServiceRegistry.LocalService;
+import com.twitter.common.base.Command;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Service launcher that starts up and registers the scheduler thrift server as a primary service
+ * for the application.
+ */
+class ThriftServerLauncher implements ServiceRunner {
+
+  private static final Logger LOG = Logger.getLogger(ThriftServerLauncher.class.getName());
+
+  private final ThriftConfiguration configuration;
+
+  // Security is enforced via file permissions, not via this password, for what it's worth.
+  private static final String SSL_KEYFILE_PASSWORD = "MesosKeyStorePassword";
+
+  private final Iface schedulerThriftInterface;
+  private final ThriftServer schedulerThriftServer;
+
+  @Inject
+  ThriftServerLauncher(
+      Iface schedulerThriftInterface,
+      ThriftServer schedulerThriftServer,
+      ThriftConfiguration configuration) {
+
+    this.schedulerThriftInterface = checkNotNull(schedulerThriftInterface);
+    this.schedulerThriftServer = checkNotNull(schedulerThriftServer);
+    this.configuration = checkNotNull(configuration);
+  }
+
+  @Override
+  public LocalService launch() {
+    ServerSocket socket = getServerSocket();
+    schedulerThriftServer.start(
+        socket,
+        new AuroraAdmin.Processor<>(schedulerThriftInterface));
+
+    Command shutdown = new Command() {
+      @Override public void execute() {
+        LOG.info("Stopping thrift server.");
+        schedulerThriftServer.shutdown();
+      }
+    };
+
+    return LocalService.primaryService(socket.getLocalPort(), shutdown);
+  }
+
+  private ServerSocket getServerSocket() {
+    try {
+      Optional<? extends InputStream> sslKeyStream = configuration.getSslKeyStream();
+      if (!sslKeyStream.isPresent()) {
+        LOG.warning("Running Thrift Server without SSL.");
+        return new ServerSocket(configuration.getServingPort());
+      } else {
+        // TODO(Kevin Sweeney): Add helper to perform this keyfile import.
+        KeyStore ks = KeyStore.getInstance("JKS");
+        ks.load(sslKeyStream.get(), SSL_KEYFILE_PASSWORD.toCharArray());
+
+        KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
+        kmf.init(ks, SSL_KEYFILE_PASSWORD.toCharArray());
+
+        SSLContext ctx = SSLContext.getInstance("TLS");
+        ctx.init(kmf.getKeyManagers(), null, null);
+
+        SSLServerSocketFactory ssf = ctx.getServerSocketFactory();
+        SSLServerSocket serverSocket = (SSLServerSocket) ssf.createServerSocket(
+            configuration.getServingPort());
+        serverSocket.setEnabledCipherSuites(serverSocket.getSupportedCipherSuites());
+        serverSocket.setNeedClientAuth(false);
+        return serverSocket;
+      }
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to read key file.", e);
+    } catch (GeneralSecurityException e) {
+      throw new RuntimeException("SSL setup failed.", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/thrift/aop/APIVersionInterceptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/aop/APIVersionInterceptor.java b/src/main/java/org/apache/aurora/scheduler/thrift/aop/APIVersionInterceptor.java
new file mode 100644
index 0000000..d66a2b2
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/aop/APIVersionInterceptor.java
@@ -0,0 +1,20 @@
+package com.twitter.aurora.scheduler.thrift.aop;
+
+import org.aopalliance.intercept.MethodInterceptor;
+import org.aopalliance.intercept.MethodInvocation;
+
+import com.twitter.aurora.gen.Response;
+
+import static com.twitter.aurora.gen.apiConstants.CURRENT_API_VERSION;
+
+class APIVersionInterceptor implements MethodInterceptor {
+
+  @Override
+  public Object invoke(MethodInvocation invocation) throws Throwable {
+    Response resp = (Response) invocation.proceed();
+    if (resp.version == null) {
+      resp.setVersion(CURRENT_API_VERSION);
+    }
+    return resp;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/thrift/aop/AopModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/aop/AopModule.java b/src/main/java/org/apache/aurora/scheduler/thrift/aop/AopModule.java
new file mode 100644
index 0000000..4afc263
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/aop/AopModule.java
@@ -0,0 +1,169 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.thrift.aop;
+
+import java.lang.reflect.Method;
+import java.util.List;
+import java.util.Map;
+
+import javax.inject.Inject;
+import javax.inject.Singleton;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.inject.AbstractModule;
+import com.google.inject.Binder;
+import com.google.inject.Key;
+import com.google.inject.PrivateModule;
+import com.google.inject.TypeLiteral;
+import com.google.inject.matcher.Matcher;
+import com.google.inject.matcher.Matchers;
+
+import org.aopalliance.intercept.MethodInterceptor;
+
+import com.twitter.aurora.GuiceUtils;
+import com.twitter.aurora.auth.CapabilityValidator;
+import com.twitter.aurora.gen.AuroraAdmin;
+import com.twitter.aurora.gen.AuroraSchedulerManager;
+import com.twitter.aurora.scheduler.thrift.auth.DecoratedThrift;
+import com.twitter.common.args.Arg;
+import com.twitter.common.args.CmdLine;
+
+/**
+ * Binding module for AOP-style decorations of the thrift API.
+ */
+public class AopModule extends AbstractModule {
+
+  @CmdLine(name = "enable_job_updates", help = "Whether new job updates should be accepted.")
+  private static final Arg<Boolean> ENABLE_UPDATES = Arg.create(true);
+
+  @CmdLine(name = "enable_job_creation",
+      help = "Allow new jobs to be created, if false all job creation requests will be denied.")
+  private static final Arg<Boolean> ENABLE_JOB_CREATION = Arg.create(true);
+
+  private static final Matcher<? super Class<?>> THRIFT_IFACE_MATCHER =
+      Matchers.subclassesOf(AuroraAdmin.Iface.class)
+          .and(Matchers.annotatedWith(DecoratedThrift.class));
+
+  private final Map<String, Boolean> toggledMethods;
+
+  public AopModule() {
+    this(ImmutableMap.of(
+        "createJob", ENABLE_JOB_CREATION.get(),
+        "acquireLock", ENABLE_UPDATES.get()));
+  }
+
+  @VisibleForTesting
+  AopModule(Map<String, Boolean> toggledMethods) {
+    this.toggledMethods = ImmutableMap.copyOf(toggledMethods);
+  }
+
+  private static final Function<Method, String> GET_NAME = new Function<Method, String>() {
+    @Override public String apply(Method method) {
+      return method.getName();
+    }
+  };
+
+  @Override
+  protected void configure() {
+    requireBinding(CapabilityValidator.class);
+
+    // Layer ordering:
+    // Log -> CapabilityValidator -> FeatureToggle -> StatsExporter -> APIVersion ->
+    // SchedulerThriftInterface
+
+    // TODO(Sathya): Consider using provider pattern for constructing interceptors to facilitate
+    // unit testing without the creation of Guice injectors.
+    bindThriftDecorator(new LoggingInterceptor());
+
+    // Note: it's important that the capability interceptor is only applied to AuroraAdmin.Iface
+    // methods, and does not pick up methods on AuroraSchedulerManager.Iface.
+    MethodInterceptor authInterceptor = new UserCapabilityInterceptor();
+    requestInjection(authInterceptor);
+    bindInterceptor(
+        THRIFT_IFACE_MATCHER,
+        GuiceUtils.interfaceMatcher(AuroraAdmin.Iface.class, true),
+        authInterceptor);
+
+    install(new PrivateModule() {
+      @Override protected void configure() {
+        // Ensure that the provided methods exist on the decorated interface.
+        List<Method> methods =
+            ImmutableList.copyOf(AuroraSchedulerManager.Iface.class.getMethods());
+        for (String toggledMethod : toggledMethods.keySet()) {
+          Preconditions.checkArgument(
+              Iterables.any(methods,
+                  Predicates.compose(Predicates.equalTo(toggledMethod), GET_NAME)),
+              String.format("Method %s was not found in class %s",
+                  toggledMethod,
+                  AuroraSchedulerManager.Iface.class));
+        }
+
+        bind(new TypeLiteral<Map<String, Boolean>>() { }).toInstance(toggledMethods);
+        bind(IsFeatureEnabled.class).in(Singleton.class);
+        Key<Predicate<Method>> predicateKey = Key.get(new TypeLiteral<Predicate<Method>>() { });
+        bind(predicateKey).to(IsFeatureEnabled.class);
+        expose(predicateKey);
+      }
+    });
+    bindThriftDecorator(new FeatureToggleInterceptor());
+    bindThriftDecorator(new ThriftStatsExporterInterceptor());
+    bindThriftDecorator(new APIVersionInterceptor());
+  }
+
+  private void bindThriftDecorator(MethodInterceptor interceptor) {
+    bindThriftDecorator(binder(), THRIFT_IFACE_MATCHER, interceptor);
+  }
+
+  @VisibleForTesting
+  static void bindThriftDecorator(
+      Binder binder,
+      Matcher<? super Class<?>> classMatcher,
+      MethodInterceptor interceptor) {
+
+    binder.bindInterceptor(classMatcher, Matchers.any(), interceptor);
+    binder.requestInjection(interceptor);
+  }
+
+  private static class IsFeatureEnabled implements Predicate<Method> {
+    private final Predicate<String> methodEnabled;
+
+    @Inject
+    IsFeatureEnabled(Map<String, Boolean> toggleMethods) {
+      Predicate<String> builder = Predicates.alwaysTrue();
+      for (Map.Entry<String, Boolean> toggleMethod : toggleMethods.entrySet()) {
+        Predicate<String> enableMethod = Predicates.or(
+            toggleMethod.getValue()
+                ? Predicates.<String>alwaysTrue()
+                : Predicates.<String>alwaysFalse(),
+            Predicates.not(Predicates.equalTo(toggleMethod.getKey())));
+        builder = Predicates.and(builder, enableMethod);
+      }
+      methodEnabled = builder;
+    }
+
+    @Override
+    public boolean apply(Method method) {
+      return methodEnabled.apply(method.getName());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/thrift/aop/FeatureToggleInterceptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/aop/FeatureToggleInterceptor.java b/src/main/java/org/apache/aurora/scheduler/thrift/aop/FeatureToggleInterceptor.java
new file mode 100644
index 0000000..03c3d99
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/aop/FeatureToggleInterceptor.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.thrift.aop;
+
+import java.lang.reflect.Method;
+
+import javax.inject.Inject;
+
+import com.google.common.base.Predicate;
+
+import org.aopalliance.intercept.MethodInterceptor;
+import org.aopalliance.intercept.MethodInvocation;
+
+import com.twitter.aurora.gen.ResponseCode;
+
+/**
+ * A method interceptor that blocks access to features based on a supplied predicate.
+ */
+public class FeatureToggleInterceptor implements MethodInterceptor {
+
+  @Inject private Predicate<Method> allowMethod;
+
+  @Override
+  public Object invoke(MethodInvocation invocation) throws Throwable {
+    Method method = invocation.getMethod();
+    if (!allowMethod.apply(method)) {
+      return Interceptors.properlyTypedResponse(
+          method,
+          ResponseCode.ERROR,
+          "The " + method.getName() + " feature is currently disabled on this scheduler.");
+    } else {
+      return invocation.proceed();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/thrift/aop/Interceptors.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/aop/Interceptors.java b/src/main/java/org/apache/aurora/scheduler/thrift/aop/Interceptors.java
new file mode 100644
index 0000000..d0cb9c1
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/aop/Interceptors.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.thrift.aop;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.logging.Logger;
+
+import com.google.common.base.Throwables;
+
+import com.twitter.aurora.gen.ResponseCode;
+
+/**
+ * Utility class for functions useful when implementing an interceptor on the thrift interface.
+ */
+final class Interceptors {
+
+  private Interceptors() {
+    // Utility class.
+  }
+
+  private static final Logger LOG = Logger.getLogger(Interceptors.class.getName());
+
+  static Object properlyTypedResponse(Method method, ResponseCode responseCode, String message)
+      throws IllegalAccessException, InstantiationException {
+
+    Class<?> returnType = method.getReturnType();
+    Object response = returnType.newInstance();
+    invoke(returnType, response, "setResponseCode", ResponseCode.class, responseCode);
+    invoke(returnType, response, "setMessage", String.class, message);
+    return response;
+  }
+
+  private static <T> void invoke(
+      Class<?> type,
+      Object obj,
+      String name,
+      Class<T> parameterType,
+      T argument) {
+
+    Method method;
+    try {
+      method = type.getMethod(name, parameterType);
+    } catch (NoSuchMethodException e) {
+      LOG.severe(type + " does not support " + name);
+      throw Throwables.propagate(e);
+    }
+    try {
+      method.invoke(obj, argument);
+    } catch (IllegalAccessException e) {
+      LOG.severe("Method " + name + " is not accessible in " + type);
+      throw Throwables.propagate(e);
+    } catch (InvocationTargetException e) {
+      LOG.severe("Failed to invoke " + name + " on " + type);
+      throw Throwables.propagate(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/thrift/aop/LoggingInterceptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/aop/LoggingInterceptor.java b/src/main/java/org/apache/aurora/scheduler/thrift/aop/LoggingInterceptor.java
new file mode 100644
index 0000000..5f773cc
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/aop/LoggingInterceptor.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.thrift.aop;
+
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
+import org.aopalliance.intercept.MethodInterceptor;
+import org.aopalliance.intercept.MethodInvocation;
+
+import com.twitter.aurora.auth.CapabilityValidator;
+import com.twitter.aurora.gen.ExecutorConfig;
+import com.twitter.aurora.gen.JobConfiguration;
+import com.twitter.aurora.gen.ResponseCode;
+import com.twitter.aurora.gen.SessionKey;
+
+import static com.twitter.aurora.scheduler.thrift.aop.Interceptors.properlyTypedResponse;
+
+/**
+ * A method interceptor that logs all invocations as well as any unchecked exceptions thrown from
+ * the underlying call.
+ */
+class LoggingInterceptor implements MethodInterceptor {
+
+  private static final Logger LOG = Logger.getLogger(LoggingInterceptor.class.getName());
+
+  @Inject private CapabilityValidator validator;
+
+  // TODO(wfarner): Scrub updateToken when it is identifiable by type.
+  private final Map<Class<?>, Function<Object, String>> printFunctions =
+      ImmutableMap.<Class<?>, Function<Object, String>>of(
+          JobConfiguration.class,
+          new Function<Object, String>() {
+            @Override public String apply(Object input) {
+              JobConfiguration configuration = ((JobConfiguration) input).deepCopy();
+              if (configuration.isSetTaskConfig()) {
+                configuration.getTaskConfig().setExecutorConfig(
+                    new ExecutorConfig("BLANKED", "BLANKED"));
+              }
+              return configuration.toString();
+            }
+          },
+          SessionKey.class,
+          new Function<Object, String>() {
+            @Override public String apply(Object input) {
+              SessionKey key = (SessionKey) input;
+              return validator.toString(key);
+            }
+          }
+      );
+
+  @Override
+  public Object invoke(MethodInvocation invocation) throws Throwable {
+    List<String> argStrings = Lists.newArrayList();
+    for (Object arg : invocation.getArguments()) {
+      if (arg == null) {
+        argStrings.add("null");
+      } else {
+        Function<Object, String> printFunction = printFunctions.get(arg.getClass());
+        argStrings.add((printFunction == null) ? arg.toString() : printFunction.apply(arg));
+      }
+    }
+    String methodName = invocation.getMethod().getName();
+    String message = String.format("%s(%s)", methodName, Joiner.on(", ").join(argStrings));
+    LOG.info(message);
+    try {
+      return invocation.proceed();
+    } catch (RuntimeException e) {
+      LOG.log(Level.WARNING, "Uncaught exception while handling " + message, e);
+      return properlyTypedResponse(invocation.getMethod(), ResponseCode.ERROR, e.getMessage());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/thrift/aop/ThriftStatsExporterInterceptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/aop/ThriftStatsExporterInterceptor.java b/src/main/java/org/apache/aurora/scheduler/thrift/aop/ThriftStatsExporterInterceptor.java
new file mode 100644
index 0000000..d700ab5
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/aop/ThriftStatsExporterInterceptor.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.thrift.aop;
+
+import java.lang.reflect.Method;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
+import org.aopalliance.intercept.MethodInterceptor;
+import org.aopalliance.intercept.MethodInvocation;
+
+import com.twitter.common.stats.SlidingStats;
+import com.twitter.common.stats.Stats;
+
+/**
+ * A method interceptor that exports counterStats about thrift calls.
+ */
+class ThriftStatsExporterInterceptor implements MethodInterceptor {
+
+  private final LoadingCache<Method, SlidingStats> stats =
+      CacheBuilder.newBuilder().build(new CacheLoader<Method, SlidingStats>() {
+        @Override public SlidingStats load(Method method) {
+          return new SlidingStats(
+              Stats.normalizeName(String.format("scheduler_thrift_%s", method.getName())),
+              "nanos");
+        }
+      });
+
+  @Override
+  public Object invoke(MethodInvocation invocation) throws Throwable {
+    SlidingStats stat = stats.get(invocation.getMethod());
+    long start = System.nanoTime();
+    try {
+      return invocation.proceed();
+    } finally {
+      stat.accumulate(System.nanoTime() - start);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/thrift/aop/UserCapabilityInterceptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/aop/UserCapabilityInterceptor.java b/src/main/java/org/apache/aurora/scheduler/thrift/aop/UserCapabilityInterceptor.java
new file mode 100644
index 0000000..d9240bc
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/aop/UserCapabilityInterceptor.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.thrift.aop;
+
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.List;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicates;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+
+import org.aopalliance.intercept.MethodInterceptor;
+import org.aopalliance.intercept.MethodInvocation;
+
+import com.twitter.aurora.auth.CapabilityValidator;
+import com.twitter.aurora.auth.CapabilityValidator.AuditCheck;
+import com.twitter.aurora.auth.CapabilityValidator.Capability;
+import com.twitter.aurora.auth.SessionValidator.AuthFailedException;
+import com.twitter.aurora.gen.ResponseCode;
+import com.twitter.aurora.gen.SessionKey;
+import com.twitter.aurora.scheduler.thrift.auth.Requires;
+
+/**
+ * A method interceptor that will authenticate users identified by a {@link SessionKey} argument
+ * to invoked methods.
+ * <p>
+ * Intercepted methods will require {@link Capability#ROOT}, but additional capabilities
+ * may be specified by annotating methods with {@link Requires} and supplying a whitelist.
+ */
+class UserCapabilityInterceptor implements MethodInterceptor {
+  private static final Logger LOG = Logger.getLogger(UserCapabilityInterceptor.class.getName());
+
+  @Inject private CapabilityValidator capabilityValidator;
+
+  private static final Function<Object, SessionKey> CAST = new Function<Object, SessionKey>() {
+    @Override public SessionKey apply(Object o) {
+      return (SessionKey) o;
+    }
+  };
+
+  @Override
+  public Object invoke(MethodInvocation invocation) throws Throwable {
+    Preconditions.checkNotNull(capabilityValidator, "Session validator has not yet been set.");
+
+    // Ensure ROOT is always permitted.
+    ImmutableList.Builder<Capability> whitelistBuilder =
+        ImmutableList.<Capability>builder().add(Capability.ROOT);
+
+    Method method = invocation.getMethod();
+    Requires requires = method.getAnnotation(Requires.class);
+    if (requires != null) {
+      whitelistBuilder.add(requires.whitelist());
+    }
+
+    List<Capability> whitelist = whitelistBuilder.build();
+    LOG.fine("Operation " + method.getName() + " may be performed by: " + whitelist);
+    Optional<SessionKey> sessionKey = FluentIterable.from(Arrays.asList(invocation.getArguments()))
+        .firstMatch(Predicates.instanceOf(SessionKey.class)).transform(CAST);
+    if (!sessionKey.isPresent()) {
+      LOG.severe("Interceptor should only be applied to methods accepting a SessionKey, but "
+          + method + " does not.");
+      return invocation.proceed();
+    }
+
+    String key = capabilityValidator.toString(sessionKey.get());
+    for (Capability user : whitelist) {
+      LOG.fine("Attempting to validate " + key + " against " + user);
+      try {
+        capabilityValidator.checkAuthorized(sessionKey.get(), user, AuditCheck.NONE);
+
+        LOG.info("Permitting " + key + " to act as "
+            + user + " and perform action " + method.getName());
+        return invocation.proceed();
+      } catch (AuthFailedException e) {
+        LOG.fine("Auth failed: " + e);
+      }
+    }
+
+    // User is not permitted to perform this operation.
+    return Interceptors.properlyTypedResponse(
+        method,
+        ResponseCode.AUTH_FAILED,
+        "Session identified by '" + key
+            + "' does not have the required capability to perform this action: " + whitelist);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/thrift/auth/DecoratedThrift.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/auth/DecoratedThrift.java b/src/main/java/org/apache/aurora/scheduler/thrift/auth/DecoratedThrift.java
new file mode 100644
index 0000000..4a667c5
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/auth/DecoratedThrift.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.thrift.auth;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.ElementType.TYPE;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+/**
+ * Type annotation to apply to a thrift interface implementation that should be decorated with
+ * additional functionality.
+ */
+@Target({PARAMETER, TYPE}) @Retention(RUNTIME)
+public @interface DecoratedThrift {
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/thrift/auth/Requires.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/auth/Requires.java b/src/main/java/org/apache/aurora/scheduler/thrift/auth/Requires.java
new file mode 100644
index 0000000..0fff3f6
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/auth/Requires.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.thrift.auth;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import com.twitter.aurora.auth.CapabilityValidator.Capability;
+
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+/**
+ * Annotation applied to a method that may allow users with non-ROOT capabilities to perform
+ * an action.
+ */
+@Target(METHOD) @Retention(RUNTIME)
+public @interface Requires {
+  /**
+   * The list of capabilities required to perform an action.
+   */
+  Capability[] whitelist() default { Capability.ROOT };
+}