You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2013/12/31 22:20:21 UTC
[28/51] [partial] Rename twitter* and com.twitter to apache and
org.apache directories to preserve all file history before the refactor.
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java b/src/main/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java
new file mode 100644
index 0000000..d735828
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java
@@ -0,0 +1,135 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.storage.testing;
+
+import com.google.common.collect.ImmutableSet;
+
+import org.easymock.Capture;
+import org.easymock.IAnswer;
+import org.easymock.IExpectationSetters;
+
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.storage.AttributeStore;
+import com.twitter.aurora.scheduler.storage.JobStore;
+import com.twitter.aurora.scheduler.storage.LockStore;
+import com.twitter.aurora.scheduler.storage.QuotaStore;
+import com.twitter.aurora.scheduler.storage.SchedulerStore;
+import com.twitter.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import com.twitter.aurora.scheduler.storage.Storage.MutateWork;
+import com.twitter.aurora.scheduler.storage.Storage.NonVolatileStorage;
+import com.twitter.aurora.scheduler.storage.Storage.StoreProvider;
+import com.twitter.aurora.scheduler.storage.Storage.Work;
+import com.twitter.aurora.scheduler.storage.TaskStore;
+import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
+import com.twitter.common.testing.easymock.EasyMockTest;
+
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.expect;
+
+/**
+ * Auxiliary class to simplify testing against a mocked storage. This allows callers to directly
+ * set up call expectations on individual stores rather than writing plumbing code to deal with
+ * operations and {@link StoreProvider}.
+ */
+public class StorageTestUtil {
+
+ public final StoreProvider storeProvider;
+ public final MutableStoreProvider mutableStoreProvider;
+ public final TaskStore.Mutable taskStore;
+ public final QuotaStore.Mutable quotaStore;
+ public final AttributeStore.Mutable attributeStore;
+ public final JobStore.Mutable jobStore;
+ public final LockStore.Mutable lockStore;
+ public final SchedulerStore.Mutable schedulerStore;
+ public final NonVolatileStorage storage;
+
+ /**
+ * Creates a new storage test utility.
+ *
+ * @param easyMock Mocking framework to use for setting up mocks and expectations.
+ */
+ public StorageTestUtil(EasyMockTest easyMock) {
+ this.storeProvider = easyMock.createMock(StoreProvider.class);
+ this.mutableStoreProvider = easyMock.createMock(MutableStoreProvider.class);
+ this.taskStore = easyMock.createMock(TaskStore.Mutable.class);
+ this.quotaStore = easyMock.createMock(QuotaStore.Mutable.class);
+ this.attributeStore = easyMock.createMock(AttributeStore.Mutable.class);
+ this.jobStore = easyMock.createMock(JobStore.Mutable.class);
+ this.lockStore = easyMock.createMock(LockStore.Mutable.class);
+ this.schedulerStore = easyMock.createMock(SchedulerStore.Mutable.class);
+ this.storage = easyMock.createMock(NonVolatileStorage.class);
+ }
+
+ private <T> IExpectationSetters<T> expectConsistentRead() {
+ final Capture<Work<T, RuntimeException>> work = EasyMockTest.createCapture();
+ return expect(storage.consistentRead(capture(work))).andAnswer(new IAnswer<T>() {
+ @Override public T answer() {
+ return work.getValue().apply(storeProvider);
+ }
+ });
+ }
+
+ private <T> IExpectationSetters<T> expectWeaklyConsistentRead() {
+ final Capture<Work<T, RuntimeException>> work = EasyMockTest.createCapture();
+ return expect(storage.weaklyConsistentRead(capture(work))).andAnswer(new IAnswer<T>() {
+ @Override public T answer() {
+ return work.getValue().apply(storeProvider);
+ }
+ });
+ }
+
+ private <T> IExpectationSetters<T> expectWriteOperation() {
+ final Capture<MutateWork<T, RuntimeException>> work = EasyMockTest.createCapture();
+ return expect(storage.write(capture(work))).andAnswer(new IAnswer<T>() {
+ @Override public T answer() {
+ return work.getValue().apply(mutableStoreProvider);
+ }
+ });
+ }
+
+ /**
+ * Expects any number of read or write operations.
+ */
+ public void expectOperations() {
+ expect(storeProvider.getTaskStore()).andReturn(taskStore).anyTimes();
+ expect(storeProvider.getQuotaStore()).andReturn(quotaStore).anyTimes();
+ expect(storeProvider.getAttributeStore()).andReturn(attributeStore).anyTimes();
+ expect(storeProvider.getJobStore()).andReturn(jobStore).anyTimes();
+ expect(storeProvider.getLockStore()).andReturn(lockStore).anyTimes();
+ expect(storeProvider.getSchedulerStore()).andReturn(schedulerStore).anyTimes();
+ expect(mutableStoreProvider.getTaskStore()).andReturn(taskStore).anyTimes();
+ expect(mutableStoreProvider.getUnsafeTaskStore()).andReturn(taskStore).anyTimes();
+ expect(mutableStoreProvider.getQuotaStore()).andReturn(quotaStore).anyTimes();
+ expect(mutableStoreProvider.getAttributeStore()).andReturn(attributeStore).anyTimes();
+ expect(mutableStoreProvider.getJobStore()).andReturn(jobStore).anyTimes();
+ expect(mutableStoreProvider.getLockStore()).andReturn(lockStore).anyTimes();
+ expect(mutableStoreProvider.getSchedulerStore()).andReturn(schedulerStore).anyTimes();
+ expectConsistentRead().anyTimes();
+ expectWeaklyConsistentRead().anyTimes();
+ expectWriteOperation().anyTimes();
+ }
+
+ public IExpectationSetters<?> expectTaskFetch(
+ Query.Builder query,
+ ImmutableSet<IScheduledTask> result) {
+
+ return expect(taskStore.fetchTasks(query)).andReturn(result);
+ }
+
+ public IExpectationSetters<?> expectTaskFetch(Query.Builder query, IScheduledTask... result) {
+ return expectTaskFetch(query, ImmutableSet.<IScheduledTask>builder().add(result).build());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerAPIServlet.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerAPIServlet.java b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerAPIServlet.java
new file mode 100644
index 0000000..2acf5c8
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerAPIServlet.java
@@ -0,0 +1,19 @@
+package com.twitter.aurora.scheduler.thrift;
+
+import javax.inject.Inject;
+
+import org.apache.thrift.protocol.TJSONProtocol;
+import org.apache.thrift.server.TServlet;
+
+import com.twitter.aurora.gen.AuroraAdmin;
+
+/**
+ * A servlet that exposes the scheduler Thrift API over HTTP/JSON.
+ */
+class SchedulerAPIServlet extends TServlet {
+
+ @Inject
+ SchedulerAPIServlet(AuroraAdmin.Iface schedulerThriftInterface) {
+ super(new AuroraAdmin.Processor<>(schedulerThriftInterface), new TJSONProtocol.Factory());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
new file mode 100644
index 0000000..3ff0f1c
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
@@ -0,0 +1,1000 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.thrift;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.annotation.Nullable;
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.base.Throwables;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import com.google.common.collect.Sets;
+
+import org.apache.commons.lang.StringUtils;
+
+import com.twitter.aurora.auth.CapabilityValidator;
+import com.twitter.aurora.auth.CapabilityValidator.AuditCheck;
+import com.twitter.aurora.auth.CapabilityValidator.Capability;
+import com.twitter.aurora.auth.SessionValidator.AuthFailedException;
+import com.twitter.aurora.gen.AcquireLockResult;
+import com.twitter.aurora.gen.AddInstancesConfig;
+import com.twitter.aurora.gen.AuroraAdmin;
+import com.twitter.aurora.gen.ConfigRewrite;
+import com.twitter.aurora.gen.DrainHostsResult;
+import com.twitter.aurora.gen.EndMaintenanceResult;
+import com.twitter.aurora.gen.GetJobsResult;
+import com.twitter.aurora.gen.GetQuotaResult;
+import com.twitter.aurora.gen.Hosts;
+import com.twitter.aurora.gen.InstanceConfigRewrite;
+import com.twitter.aurora.gen.InstanceKey;
+import com.twitter.aurora.gen.JobConfigRewrite;
+import com.twitter.aurora.gen.JobConfigValidation;
+import com.twitter.aurora.gen.JobConfiguration;
+import com.twitter.aurora.gen.JobKey;
+import com.twitter.aurora.gen.JobSummary;
+import com.twitter.aurora.gen.JobSummaryResult;
+import com.twitter.aurora.gen.ListBackupsResult;
+import com.twitter.aurora.gen.Lock;
+import com.twitter.aurora.gen.LockKey;
+import com.twitter.aurora.gen.LockValidation;
+import com.twitter.aurora.gen.MaintenanceStatusResult;
+import com.twitter.aurora.gen.PopulateJobResult;
+import com.twitter.aurora.gen.QueryRecoveryResult;
+import com.twitter.aurora.gen.Quota;
+import com.twitter.aurora.gen.Response;
+import com.twitter.aurora.gen.ResponseCode;
+import com.twitter.aurora.gen.Result;
+import com.twitter.aurora.gen.RewriteConfigsRequest;
+import com.twitter.aurora.gen.ScheduleStatus;
+import com.twitter.aurora.gen.ScheduleStatusResult;
+import com.twitter.aurora.gen.SessionKey;
+import com.twitter.aurora.gen.StartMaintenanceResult;
+import com.twitter.aurora.gen.TaskConfig;
+import com.twitter.aurora.gen.TaskQuery;
+import com.twitter.aurora.scheduler.base.JobKeys;
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.base.ScheduleException;
+import com.twitter.aurora.scheduler.base.Tasks;
+import com.twitter.aurora.scheduler.configuration.ConfigurationManager;
+import com.twitter.aurora.scheduler.configuration.ConfigurationManager.TaskDescriptionException;
+import com.twitter.aurora.scheduler.configuration.SanitizedConfiguration;
+import com.twitter.aurora.scheduler.quota.Quotas;
+import com.twitter.aurora.scheduler.state.CronJobManager;
+import com.twitter.aurora.scheduler.state.LockManager;
+import com.twitter.aurora.scheduler.state.LockManager.LockException;
+import com.twitter.aurora.scheduler.state.MaintenanceController;
+import com.twitter.aurora.scheduler.state.SchedulerCore;
+import com.twitter.aurora.scheduler.storage.JobStore;
+import com.twitter.aurora.scheduler.storage.Storage;
+import com.twitter.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import com.twitter.aurora.scheduler.storage.Storage.MutateWork;
+import com.twitter.aurora.scheduler.storage.Storage.StoreProvider;
+import com.twitter.aurora.scheduler.storage.Storage.Work;
+import com.twitter.aurora.scheduler.storage.backup.Recovery;
+import com.twitter.aurora.scheduler.storage.backup.Recovery.RecoveryException;
+import com.twitter.aurora.scheduler.storage.backup.StorageBackup;
+import com.twitter.aurora.scheduler.storage.entities.IAssignedTask;
+import com.twitter.aurora.scheduler.storage.entities.IJobConfiguration;
+import com.twitter.aurora.scheduler.storage.entities.IJobKey;
+import com.twitter.aurora.scheduler.storage.entities.ILock;
+import com.twitter.aurora.scheduler.storage.entities.ILockKey;
+import com.twitter.aurora.scheduler.storage.entities.IQuota;
+import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
+import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
+import com.twitter.aurora.scheduler.thrift.auth.DecoratedThrift;
+import com.twitter.aurora.scheduler.thrift.auth.Requires;
+import com.twitter.common.args.Arg;
+import com.twitter.common.args.CmdLine;
+import com.twitter.common.base.MorePreconditions;
+import com.twitter.common.base.Supplier;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.util.BackoffHelper;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import static com.twitter.aurora.auth.SessionValidator.SessionContext;
+import static com.twitter.aurora.gen.ResponseCode.AUTH_FAILED;
+import static com.twitter.aurora.gen.ResponseCode.ERROR;
+import static com.twitter.aurora.gen.ResponseCode.INVALID_REQUEST;
+import static com.twitter.aurora.gen.ResponseCode.LOCK_ERROR;
+import static com.twitter.aurora.gen.ResponseCode.OK;
+import static com.twitter.aurora.gen.apiConstants.CURRENT_API_VERSION;
+import static com.twitter.common.base.MorePreconditions.checkNotBlank;
+
+/**
+ * Aurora scheduler thrift server implementation.
+ * <p>
+ * Interfaces between users and the scheduler to access/modify jobs and perform cluster
+ * administration tasks.
+ */
+@DecoratedThrift
+class SchedulerThriftInterface implements AuroraAdmin.Iface {
+ private static final Logger LOG = Logger.getLogger(SchedulerThriftInterface.class.getName());
+
+ @CmdLine(name = "kill_task_initial_backoff",
+ help = "Initial backoff delay while waiting for the tasks to transition to KILLED.")
+ private static final Arg<Amount<Long, Time>> KILL_TASK_INITIAL_BACKOFF =
+ Arg.create(Amount.of(1L, Time.SECONDS));
+
+ @CmdLine(name = "kill_task_max_backoff",
+ help = "Max backoff delay while waiting for the tasks to transition to KILLED.")
+ private static final Arg<Amount<Long, Time>> KILL_TASK_MAX_BACKOFF =
+ Arg.create(Amount.of(30L, Time.SECONDS));
+
+ private static final Function<IScheduledTask, String> GET_ROLE = Functions.compose(
+ new Function<ITaskConfig, String>() {
+ @Override public String apply(ITaskConfig task) {
+ return task.getOwner().getRole();
+ }
+ },
+ Tasks.SCHEDULED_TO_INFO);
+
+ private final Storage storage;
+ private final SchedulerCore schedulerCore;
+ private final LockManager lockManager;
+ private final CapabilityValidator sessionValidator;
+ private final StorageBackup backup;
+ private final Recovery recovery;
+ private final MaintenanceController maintenance;
+ private final CronJobManager cronJobManager;
+ private final Amount<Long, Time> killTaskInitialBackoff;
+ private final Amount<Long, Time> killTaskMaxBackoff;
+
+ @Inject
+ SchedulerThriftInterface(
+ Storage storage,
+ SchedulerCore schedulerCore,
+ LockManager lockManager,
+ CapabilityValidator sessionValidator,
+ StorageBackup backup,
+ Recovery recovery,
+ CronJobManager cronJobManager,
+ MaintenanceController maintenance) {
+
+ this(storage,
+ schedulerCore,
+ lockManager,
+ sessionValidator,
+ backup,
+ recovery,
+ maintenance,
+ cronJobManager,
+ KILL_TASK_INITIAL_BACKOFF.get(),
+ KILL_TASK_MAX_BACKOFF.get());
+ }
+
+ @VisibleForTesting
+ SchedulerThriftInterface(
+ Storage storage,
+ SchedulerCore schedulerCore,
+ LockManager lockManager,
+ CapabilityValidator sessionValidator,
+ StorageBackup backup,
+ Recovery recovery,
+ MaintenanceController maintenance,
+ CronJobManager cronJobManager,
+ Amount<Long, Time> initialBackoff,
+ Amount<Long, Time> maxBackoff) {
+
+ this.storage = checkNotNull(storage);
+ this.schedulerCore = checkNotNull(schedulerCore);
+ this.lockManager = checkNotNull(lockManager);
+ this.sessionValidator = checkNotNull(sessionValidator);
+ this.backup = checkNotNull(backup);
+ this.recovery = checkNotNull(recovery);
+ this.maintenance = checkNotNull(maintenance);
+ this.cronJobManager = checkNotNull(cronJobManager);
+ this.killTaskInitialBackoff = checkNotNull(initialBackoff);
+ this.killTaskMaxBackoff = checkNotNull(maxBackoff);
+ }
+
+ @Override
+ public Response createJob(
+ JobConfiguration mutableJob,
+ @Nullable Lock mutableLock,
+ SessionKey session) {
+
+ IJobConfiguration job = IJobConfiguration.build(mutableJob);
+ IJobKey jobKey = JobKeys.assertValid(job.getKey());
+ checkNotNull(session);
+
+ Response response = new Response();
+
+ try {
+ sessionValidator.checkAuthenticated(session, ImmutableSet.of(job.getOwner().getRole()));
+ } catch (AuthFailedException e) {
+ return response.setResponseCode(AUTH_FAILED).setMessage(e.getMessage());
+ }
+
+ try {
+ SanitizedConfiguration sanitized = SanitizedConfiguration.fromUnsanitized(job);
+
+ lockManager.validateIfLocked(
+ ILockKey.build(LockKey.job(jobKey.newBuilder())),
+ Optional.fromNullable(mutableLock).transform(ILock.FROM_BUILDER));
+
+ schedulerCore.createJob(sanitized);
+ response.setResponseCode(OK)
+ .setMessage(String.format("%d new tasks pending for job %s",
+ sanitized.getJobConfig().getInstanceCount(), JobKeys.toPath(job)));
+ } catch (LockException e) {
+ response.setResponseCode(LOCK_ERROR).setMessage(e.getMessage());
+ } catch (TaskDescriptionException | ScheduleException e) {
+ response.setResponseCode(INVALID_REQUEST).setMessage(e.getMessage());
+ }
+
+ return response;
+ }
+
+ @Override
+ public Response replaceCronTemplate(
+ JobConfiguration mutableConfig,
+ @Nullable Lock mutableLock,
+ SessionKey session) {
+
+ checkNotNull(mutableConfig);
+ IJobConfiguration job = IJobConfiguration.build(mutableConfig);
+ IJobKey jobKey = JobKeys.assertValid(job.getKey());
+ checkNotNull(session);
+
+ Response response = new Response();
+ try {
+ sessionValidator.checkAuthenticated(session, ImmutableSet.of(job.getOwner().getRole()));
+ } catch (AuthFailedException e) {
+ return response.setResponseCode(AUTH_FAILED).setMessage(e.getMessage());
+ }
+
+ try {
+ lockManager.validateIfLocked(
+ ILockKey.build(LockKey.job(jobKey.newBuilder())),
+ Optional.fromNullable(mutableLock).transform(ILock.FROM_BUILDER));
+
+ SanitizedConfiguration sanitized = SanitizedConfiguration.fromUnsanitized(job);
+
+ if (!cronJobManager.hasJob(jobKey)) {
+ return response.setResponseCode(INVALID_REQUEST).setMessage(
+ "No cron template found for the given key: " + jobKey);
+ }
+ cronJobManager.updateJob(sanitized);
+ return response.setResponseCode(OK).setMessage("Replaced template for: " + jobKey);
+
+ } catch (LockException e) {
+ return response.setResponseCode(LOCK_ERROR).setMessage(e.getMessage());
+ } catch (TaskDescriptionException | ScheduleException e) {
+ return response.setResponseCode(INVALID_REQUEST).setMessage(e.getMessage());
+ }
+ }
+
+ @Override
+ public Response populateJobConfig(JobConfiguration description, JobConfigValidation validation) {
+
+ checkNotNull(description);
+
+ Response response = new Response();
+ try {
+ SanitizedConfiguration sanitized =
+ SanitizedConfiguration.fromUnsanitized(IJobConfiguration.build(description));
+
+ // TODO(maximk): Consider moving job validation logic into a dedicated RPC. MESOS-4476.
+ if (validation != null && validation == JobConfigValidation.RUN_FILTERS) {
+ schedulerCore.validateJobResources(sanitized);
+ }
+
+ PopulateJobResult result = new PopulateJobResult()
+ .setPopulated(ITaskConfig.toBuildersSet(sanitized.getTaskConfigs().values()));
+ response.setResult(Result.populateJobResult(result))
+ .setResponseCode(OK)
+ .setMessage("Tasks populated");
+ } catch (TaskDescriptionException | ScheduleException e) {
+ response.setResponseCode(INVALID_REQUEST)
+ .setMessage("Invalid configuration: " + e.getMessage());
+ }
+ return response;
+ }
+
+ @Override
+ public Response startCronJob(JobKey mutableJobKey, SessionKey session) {
+ checkNotNull(session);
+ IJobKey jobKey = JobKeys.assertValid(IJobKey.build(mutableJobKey));
+
+ Response response = new Response();
+ try {
+ sessionValidator.checkAuthenticated(session, ImmutableSet.of(jobKey.getRole()));
+ } catch (AuthFailedException e) {
+ response.setResponseCode(AUTH_FAILED).setMessage(e.getMessage());
+ return response;
+ }
+
+ try {
+ schedulerCore.startCronJob(jobKey);
+ response.setResponseCode(OK).setMessage("Cron run started.");
+ } catch (ScheduleException e) {
+ response.setResponseCode(INVALID_REQUEST)
+ .setMessage("Failed to start cron job - " + e.getMessage());
+ } catch (TaskDescriptionException e) {
+ response.setResponseCode(ERROR).setMessage("Invalid task description: " + e.getMessage());
+ }
+
+ return response;
+ }
+
+ // TODO(William Farner): Provide status information about cron jobs here.
+ @Override
+ public Response getTasksStatus(TaskQuery query) {
+ checkNotNull(query);
+
+ Set<IScheduledTask> tasks =
+ Storage.Util.weaklyConsistentFetchTasks(storage, Query.arbitrary(query));
+
+ Response response = new Response();
+
+ if (tasks.isEmpty()) {
+ response.setResponseCode(INVALID_REQUEST)
+ .setMessage("No tasks found for query: " + query);
+ } else {
+ response.setResponseCode(OK)
+ .setResult(Result.scheduleStatusResult(
+ new ScheduleStatusResult().setTasks(IScheduledTask.toBuildersList(tasks))));
+ }
+
+ return response;
+ }
+
+ @Override
+ public Response getJobSummary() {
+ Set<IScheduledTask> tasks = Storage.Util.weaklyConsistentFetchTasks(storage, Query.unscoped());
+ Multimap<String, IJobKey> jobsByRole = Multimaps.index(
+ FluentIterable.from(tasks).transform(Tasks.SCHEDULED_TO_JOB_KEY),
+ JobKeys.TO_ROLE);
+
+ Multimap<String, IJobKey> cronJobsByRole = Multimaps.index(
+ FluentIterable.from(cronJobManager.getJobs()).transform(JobKeys.FROM_CONFIG),
+ JobKeys.TO_ROLE);
+
+ List<JobSummary> jobSummaries = Lists.newLinkedList();
+ for (String role : Sets.union(jobsByRole.keySet(), cronJobsByRole.keySet())) {
+ JobSummary summary = new JobSummary();
+ summary.setRole(role);
+ summary.setJobCount(jobsByRole.get(role).size());
+ summary.setCronJobCount(cronJobsByRole.get(role).size());
+ jobSummaries.add(summary);
+ }
+
+ return new Response()
+ .setResponseCode(OK)
+ .setResult(Result.jobSummaryResult(new JobSummaryResult(jobSummaries)));
+ }
+
+ @Override
+ public Response getJobs(@Nullable String maybeNullRole) {
+ Optional<String> ownerRole = Optional.fromNullable(maybeNullRole);
+
+
+ // Ensure we only return one JobConfiguration for each JobKey.
+ Map<IJobKey, IJobConfiguration> jobs = Maps.newHashMap();
+
+ // Query the task store, find immediate jobs, and synthesize a JobConfiguration for them.
+ // This is necessary because the ImmediateJobManager doesn't store jobs directly and
+ // ImmediateJobManager#getJobs always returns an empty Collection.
+ Query.Builder scope = ownerRole.isPresent()
+ ? Query.roleScoped(ownerRole.get())
+ : Query.unscoped();
+ Multimap<IJobKey, IScheduledTask> tasks =
+ Tasks.byJobKey(Storage.Util.weaklyConsistentFetchTasks(storage, scope.active()));
+
+ jobs.putAll(Maps.transformEntries(tasks.asMap(),
+ new Maps.EntryTransformer<IJobKey, Collection<IScheduledTask>, IJobConfiguration>() {
+ @Override
+ public IJobConfiguration transformEntry(
+ IJobKey jobKey,
+ Collection<IScheduledTask> tasks) {
+
+ // Pick an arbitrary task for each immediate job. The chosen task might not be the most
+ // recent if the job is in the middle of an update or some shards have been selectively
+ // created.
+ TaskConfig firstTask = tasks.iterator().next().getAssignedTask().getTask().newBuilder();
+ return IJobConfiguration.build(new JobConfiguration()
+ .setKey(jobKey.newBuilder())
+ .setOwner(firstTask.getOwner())
+ .setTaskConfig(firstTask)
+ .setInstanceCount(tasks.size()));
+ }
+ }));
+
+ // Get cron jobs directly from the manager. Do this after querying the task store so the real
+ // template JobConfiguration for a cron job will overwrite the synthesized one that could have
+ // been created above.
+ Predicate<IJobConfiguration> configFilter = ownerRole.isPresent()
+ ? Predicates.compose(Predicates.equalTo(ownerRole.get()), JobKeys.CONFIG_TO_ROLE)
+ : Predicates.<IJobConfiguration>alwaysTrue();
+ jobs.putAll(Maps.uniqueIndex(
+ FluentIterable.from(cronJobManager.getJobs()).filter(configFilter),
+ JobKeys.FROM_CONFIG));
+
+ return new Response()
+ .setResponseCode(OK)
+ .setResult(Result.getJobsResult(new GetJobsResult()
+ .setConfigs(IJobConfiguration.toBuildersSet(jobs.values()))));
+ }
+
+ private void validateLockForTasks(Optional<ILock> lock, Iterable<IScheduledTask> tasks)
+ throws LockException {
+
+ ImmutableSet<IJobKey> uniqueKeys = FluentIterable.from(tasks)
+ .transform(Tasks.SCHEDULED_TO_JOB_KEY)
+ .toSet();
+
+ // Validate lock against every unique job key derived from the tasks.
+ for (IJobKey key : uniqueKeys) {
+ lockManager.validateIfLocked(ILockKey.build(LockKey.job(key.newBuilder())), lock);
+ }
+ }
+
+ private SessionContext validateSessionKeyForTasks(
+ SessionKey session,
+ TaskQuery taskQuery,
+ Iterable<IScheduledTask> tasks) throws AuthFailedException {
+
+ // Authenticate the session against any affected roles, always including the role for a
+ // role-scoped query. This papers over the implementation detail that dormant cron jobs are
+ // authenticated this way.
+ ImmutableSet.Builder<String> targetRoles = ImmutableSet.<String>builder()
+ .addAll(FluentIterable.from(tasks).transform(GET_ROLE));
+ if (taskQuery.isSetOwner()) {
+ targetRoles.add(taskQuery.getOwner().getRole());
+ }
+ return sessionValidator.checkAuthenticated(session, targetRoles.build());
+ }
+
+ private Optional<SessionContext> isAdmin(SessionKey session) {
+ try {
+ return Optional.of(
+ sessionValidator.checkAuthorized(session, Capability.ROOT, AuditCheck.REQUIRED));
+ } catch (AuthFailedException e) {
+ return Optional.absent();
+ }
+ }
+
+ @Override
+ public Response killTasks(final TaskQuery query, Lock mutablelock, SessionKey session) {
+ checkNotNull(query);
+ checkNotNull(session);
+
+ Response response = new Response();
+
+ if (query.getJobName() != null && StringUtils.isBlank(query.getJobName())) {
+ response.setResponseCode(INVALID_REQUEST).setMessage(
+ String.format("Invalid job name: '%s'", query.getJobName()));
+ return response;
+ }
+
+ Set<IScheduledTask> tasks = Storage.Util.consistentFetchTasks(storage, Query.arbitrary(query));
+
+ Optional<SessionContext> context = isAdmin(session);
+ if (context.isPresent()) {
+ LOG.info("Granting kill query to admin user: " + query);
+ } else {
+ try {
+ context = Optional.of(validateSessionKeyForTasks(session, query, tasks));
+ } catch (AuthFailedException e) {
+ response.setResponseCode(AUTH_FAILED).setMessage(e.getMessage());
+ return response;
+ }
+ }
+
+ try {
+ validateLockForTasks(Optional.fromNullable(mutablelock).transform(ILock.FROM_BUILDER), tasks);
+ schedulerCore.killTasks(Query.arbitrary(query), context.get().getIdentity());
+ } catch (LockException e) {
+ return response.setResponseCode(LOCK_ERROR).setMessage(e.getMessage());
+ } catch (ScheduleException e) {
+ return response.setResponseCode(INVALID_REQUEST).setMessage(e.getMessage());
+ }
+
+ // TODO(William Farner): Move this into the client.
+ BackoffHelper backoff = new BackoffHelper(killTaskInitialBackoff, killTaskMaxBackoff, true);
+ final Query.Builder activeQuery = Query.arbitrary(query.setStatuses(Tasks.ACTIVE_STATES));
+ try {
+ backoff.doUntilSuccess(new Supplier<Boolean>() {
+ @Override public Boolean get() {
+ Set<IScheduledTask> tasks = Storage.Util.consistentFetchTasks(storage, activeQuery);
+ if (tasks.isEmpty()) {
+ LOG.info("Tasks all killed, done waiting.");
+ return true;
+ } else {
+ LOG.info("Jobs not yet killed, waiting...");
+ return false;
+ }
+ }
+ });
+ response.setResponseCode(OK).setMessage("Tasks killed.");
+ } catch (InterruptedException e) {
+ LOG.warning("Interrupted while trying to kill tasks: " + e);
+ Thread.currentThread().interrupt();
+ response.setResponseCode(ERROR).setMessage("killTasks thread was interrupted.");
+ } catch (BackoffHelper.BackoffStoppedException e) {
+ response.setResponseCode(ERROR).setMessage("Tasks were not killed in time.");
+ }
+ return response;
+ }
+
+ @Override
+ public Response restartShards(
+ JobKey mutableJobKey,
+ Set<Integer> shardIds,
+ Lock mutableLock,
+ SessionKey session) {
+
+ IJobKey jobKey = JobKeys.assertValid(IJobKey.build(mutableJobKey));
+ MorePreconditions.checkNotBlank(shardIds);
+ checkNotNull(session);
+
+ Response response = new Response();
+ SessionContext context;
+ try {
+ context = sessionValidator.checkAuthenticated(session, ImmutableSet.of(jobKey.getRole()));
+ } catch (AuthFailedException e) {
+ response.setResponseCode(AUTH_FAILED).setMessage(e.getMessage());
+ return response;
+ }
+
+ try {
+ lockManager.validateIfLocked(
+ ILockKey.build(LockKey.job(jobKey.newBuilder())),
+ Optional.fromNullable(mutableLock).transform(ILock.FROM_BUILDER));
+ schedulerCore.restartShards(jobKey, shardIds, context.getIdentity());
+ response.setResponseCode(OK).setMessage("Shards are restarting.");
+ } catch (LockException e) {
+ response.setResponseCode(ResponseCode.LOCK_ERROR).setMessage(e.getMessage());
+ } catch (ScheduleException e) {
+ response.setResponseCode(ResponseCode.INVALID_REQUEST).setMessage(e.getMessage());
+ }
+
+ return response;
+ }
+
+ @Override
+ public Response getQuota(final String ownerRole) {
+ checkNotBlank(ownerRole);
+
+ IQuota quota = storage.consistentRead(new Work.Quiet<IQuota>() {
+ @Override public IQuota apply(StoreProvider storeProvider) {
+ return storeProvider.getQuotaStore().fetchQuota(ownerRole).or(Quotas.noQuota());
+ }
+ });
+
+ return new Response()
+ .setResponseCode(OK)
+ .setResult(Result.getQuotaResult(new GetQuotaResult()
+ .setQuota(quota.newBuilder())));
+ }
+
+ @Override
+ public Response startMaintenance(Hosts hosts, SessionKey session) {
+ return new Response()
+ .setResponseCode(OK)
+ .setResult(Result.startMaintenanceResult(new StartMaintenanceResult()
+ .setStatuses(maintenance.startMaintenance(hosts.getHostNames()))));
+ }
+
+ @Override
+ public Response drainHosts(Hosts hosts, SessionKey session) {
+ return new Response()
+ .setResponseCode(OK)
+ .setResult(Result.drainHostsResult(new DrainHostsResult()
+ .setStatuses(maintenance.drain(hosts.getHostNames()))));
+ }
+
+ @Override
+ public Response maintenanceStatus(Hosts hosts, SessionKey session) {
+ return new Response()
+ .setResponseCode(OK)
+ .setResult(Result.maintenanceStatusResult(new MaintenanceStatusResult()
+ .setStatuses(maintenance.getStatus(hosts.getHostNames()))));
+ }
+
+ @Override
+ public Response endMaintenance(Hosts hosts, SessionKey session) {
+ return new Response()
+ .setResponseCode(OK)
+ .setResult(Result.endMaintenanceResult(new EndMaintenanceResult()
+ .setStatuses(maintenance.endMaintenance(hosts.getHostNames()))));
+ }
+
+ @Requires(whitelist = Capability.PROVISIONER)
+ @Override
+ public Response setQuota(final String ownerRole, final Quota quota, SessionKey session) {
+ checkNotBlank(ownerRole);
+ checkNotNull(quota);
+ checkNotNull(session);
+
+ // TODO(Kevin Sweeney): Input validation for Quota.
+
+ storage.write(new MutateWork.NoResult.Quiet() {
+ @Override protected void execute(MutableStoreProvider storeProvider) {
+ storeProvider.getQuotaStore().saveQuota(ownerRole, IQuota.build(quota));
+ }
+ });
+
+ return new Response().setResponseCode(OK).setMessage("Quota applied.");
+ }
+
+ @Override
+ public Response forceTaskState(
+ String taskId,
+ ScheduleStatus status,
+ SessionKey session) {
+
+ checkNotBlank(taskId);
+ checkNotNull(status);
+ checkNotNull(session);
+
+ Response response = new Response();
+ SessionContext context;
+ try {
+ // TODO(Sathya): Remove this after AOP-style session validation passes in a SessionContext.
+ context = sessionValidator.checkAuthorized(session, Capability.ROOT, AuditCheck.REQUIRED);
+ } catch (AuthFailedException e) {
+ response.setResponseCode(AUTH_FAILED).setMessage(e.getMessage());
+ return response;
+ }
+
+ schedulerCore.setTaskStatus(
+ Query.taskScoped(taskId), status, transitionMessage(context.getIdentity()));
+ return new Response().setResponseCode(OK).setMessage("Transition attempted.");
+ }
+
+ @Override
+ public Response performBackup(SessionKey session) {
+ backup.backupNow();
+ return new Response().setResponseCode(OK);
+ }
+
+ @Override
+ public Response listBackups(SessionKey session) {
+ return new Response()
+ .setResponseCode(OK)
+ .setResult(Result.listBackupsResult(new ListBackupsResult()
+ .setBackups(recovery.listBackups())));
+ }
+
+ @Override
+ public Response stageRecovery(String backupId, SessionKey session) {
+ Response response = new Response().setResponseCode(OK);
+ try {
+ recovery.stage(backupId);
+ } catch (RecoveryException e) {
+ response.setResponseCode(ERROR).setMessage(e.getMessage());
+ LOG.log(Level.WARNING, "Failed to stage recovery: " + e, e);
+ }
+
+ return response;
+ }
+
+ @Override
+ public Response queryRecovery(TaskQuery query, SessionKey session) {
+ Response response = new Response();
+ try {
+ response.setResponseCode(OK)
+ .setResult(Result.queryRecoveryResult(new QueryRecoveryResult()
+ .setTasks(IScheduledTask.toBuildersSet(recovery.query(Query.arbitrary(query))))));
+ } catch (RecoveryException e) {
+ response.setResponseCode(ERROR).setMessage(e.getMessage());
+ LOG.log(Level.WARNING, "Failed to query recovery: " + e, e);
+ }
+
+ return response;
+ }
+
+ @Override
+ public Response deleteRecoveryTasks(TaskQuery query, SessionKey session) {
+ Response response = new Response().setResponseCode(OK);
+ try {
+ recovery.deleteTasks(Query.arbitrary(query));
+ } catch (RecoveryException e) {
+ response.setResponseCode(ERROR).setMessage(e.getMessage());
+ LOG.log(Level.WARNING, "Failed to delete recovery tasks: " + e, e);
+ }
+
+ return response;
+ }
+
+ @Override
+ public Response commitRecovery(SessionKey session) {
+ Response response = new Response().setResponseCode(OK);
+ try {
+ recovery.commit();
+ } catch (RecoveryException e) {
+ response.setResponseCode(ERROR).setMessage(e.getMessage());
+ }
+
+ return response;
+ }
+
+ @Override
+ public Response unloadRecovery(SessionKey session) {
+ recovery.unload();
+ return new Response().setResponseCode(OK);
+ }
+
+ @Override
+ public Response snapshot(SessionKey session) {
+ Response response = new Response();
+ try {
+ storage.snapshot();
+ return response.setResponseCode(OK).setMessage("Compaction successful.");
+ } catch (Storage.StorageException e) {
+ LOG.log(Level.WARNING, "Requested snapshot failed.", e);
+ return response.setResponseCode(ERROR).setMessage(e.getMessage());
+ }
+ }
+
+ private static Multimap<String, IJobConfiguration> jobsByKey(JobStore jobStore, IJobKey jobKey) {
+ ImmutableMultimap.Builder<String, IJobConfiguration> matches = ImmutableMultimap.builder();
+ for (String managerId : jobStore.fetchManagerIds()) {
+ for (IJobConfiguration job : jobStore.fetchJobs(managerId)) {
+ if (job.getKey().equals(jobKey)) {
+ matches.put(managerId, job);
+ }
+ }
+ }
+ return matches.build();
+ }
+
+ @Override
+ public Response rewriteConfigs(
+ final RewriteConfigsRequest request,
+ SessionKey session) {
+
+ if (request.getRewriteCommandsSize() == 0) {
+ return new Response()
+ .setResponseCode(ResponseCode.ERROR)
+ .setMessage("No rewrite commands provided.");
+ }
+
+ return storage.write(new MutateWork.Quiet<Response>() {
+ @Override public Response apply(MutableStoreProvider storeProvider) {
+ List<String> errors = Lists.newArrayList();
+
+ for (ConfigRewrite command : request.getRewriteCommands()) {
+ Optional<String> error = rewriteConfig(command, storeProvider);
+ if (error.isPresent()) {
+ errors.add(error.get());
+ }
+ }
+
+ Response resp = new Response();
+ if (!errors.isEmpty()) {
+ resp.setResponseCode(ResponseCode.WARNING).setMessage(Joiner.on(", ").join(errors));
+ } else {
+ resp.setResponseCode(OK).setMessage("All rewrites completed successfully.");
+ }
+ return resp;
+ }
+ });
+ }
+
+ private Optional<String> rewriteConfig(
+ ConfigRewrite command,
+ MutableStoreProvider storeProvider) {
+
+ Optional<String> error = Optional.absent();
+ switch (command.getSetField()) {
+ case JOB_REWRITE:
+ JobConfigRewrite jobRewrite = command.getJobRewrite();
+ IJobConfiguration existingJob = IJobConfiguration.build(jobRewrite.getOldJob());
+ IJobConfiguration rewrittenJob;
+ try {
+ rewrittenJob = ConfigurationManager.validateAndPopulate(
+ IJobConfiguration.build(jobRewrite.getRewrittenJob()));
+ } catch (TaskDescriptionException e) {
+ // We could add an error here, but this is probably a hint of something wrong in
+ // the client that's causing a bad configuration to be applied.
+ throw Throwables.propagate(e);
+ }
+ if (!existingJob.getKey().equals(rewrittenJob.getKey())) {
+ error = Optional.of("Disallowing rewrite attempting to change job key.");
+ } else if (!existingJob.getOwner().equals(rewrittenJob.getOwner())) {
+ error = Optional.of("Disallowing rewrite attempting to change job owner.");
+ } else {
+ JobStore.Mutable jobStore = storeProvider.getJobStore();
+ Multimap<String, IJobConfiguration> matches =
+ jobsByKey(jobStore, existingJob.getKey());
+ switch (matches.size()) {
+ case 0:
+ error = Optional.of("No jobs found for key " + JobKeys.toPath(existingJob));
+ break;
+
+ case 1:
+ Map.Entry<String, IJobConfiguration> match =
+ Iterables.getOnlyElement(matches.entries());
+ IJobConfiguration storedJob = match.getValue();
+ if (!storedJob.equals(existingJob)) {
+ error = Optional.of("CAS compare failed for " + JobKeys.toPath(storedJob));
+ } else {
+ jobStore.saveAcceptedJob(match.getKey(), rewrittenJob);
+ }
+ break;
+
+ default:
+ error = Optional.of("Multiple jobs found for key " + JobKeys.toPath(existingJob));
+ }
+ }
+ break;
+
+ case INSTANCE_REWRITE:
+ InstanceConfigRewrite instanceRewrite = command.getInstanceRewrite();
+ InstanceKey instanceKey = instanceRewrite.getInstanceKey();
+ Iterable<IScheduledTask> tasks = storeProvider.getTaskStore().fetchTasks(
+ Query.instanceScoped(IJobKey.build(instanceKey.getJobKey()),
+ instanceKey.getInstanceId())
+ .active());
+ Optional<IAssignedTask> task =
+ Optional.fromNullable(Iterables.getOnlyElement(tasks, null))
+ .transform(Tasks.SCHEDULED_TO_ASSIGNED);
+ if (!task.isPresent()) {
+ error = Optional.of("No active task found for " + instanceKey);
+ } else if (!task.get().getTask().newBuilder().equals(instanceRewrite.getOldTask())) {
+ error = Optional.of("CAS compare failed for " + instanceKey);
+ } else {
+ ITaskConfig newConfiguration = ITaskConfig.build(
+ ConfigurationManager.applyDefaultsIfUnset(instanceRewrite.getRewrittenTask()));
+ boolean changed = storeProvider.getUnsafeTaskStore().unsafeModifyInPlace(
+ task.get().getTaskId(), newConfiguration);
+ if (!changed) {
+ error = Optional.of("Did not change " + task.get().getTaskId());
+ }
+ }
+ break;
+
+ default:
+ throw new IllegalArgumentException("Unhandled command type " + command.getSetField());
+ }
+
+ return error;
+ }
+
+ @Override
+ public Response getVersion() {
+ return new Response()
+ .setResponseCode(OK)
+ .setResult(Result.getVersionResult(CURRENT_API_VERSION));
+ }
+
+ @Override
+ public Response addInstances(
+ AddInstancesConfig config,
+ @Nullable Lock mutableLock,
+ SessionKey session) {
+
+ checkNotNull(config);
+ checkNotNull(session);
+ checkNotBlank(config.getInstanceIds());
+ IJobKey jobKey = JobKeys.assertValid(IJobKey.build(config.getKey()));
+
+ Response resp = new Response();
+ try {
+ sessionValidator.checkAuthenticated(session, ImmutableSet.of(jobKey.getRole()));
+ ITaskConfig task = ConfigurationManager.validateAndPopulate(
+ ITaskConfig.build(config.getTaskConfig()));
+
+ if (cronJobManager.hasJob(jobKey)) {
+ return resp.setResponseCode(INVALID_REQUEST)
+ .setMessage("Cron jobs are not supported here.");
+ }
+
+ lockManager.validateIfLocked(
+ ILockKey.build(LockKey.job(jobKey.newBuilder())),
+ Optional.fromNullable(mutableLock).transform(ILock.FROM_BUILDER));
+
+ schedulerCore.addInstances(jobKey, ImmutableSet.copyOf(config.getInstanceIds()), task);
+ return resp.setResponseCode(OK).setMessage("Successfully added instances.");
+ } catch (AuthFailedException e) {
+ return resp.setResponseCode(AUTH_FAILED).setMessage(e.getMessage());
+ } catch (LockException e) {
+ return resp.setResponseCode(LOCK_ERROR).setMessage(e.getMessage());
+ } catch (TaskDescriptionException | ScheduleException e) {
+ return resp.setResponseCode(INVALID_REQUEST).setMessage(e.getMessage());
+ }
+ }
+
+ private String getRoleFromLockKey(ILockKey lockKey) {
+ switch (lockKey.getSetField()) {
+ case JOB:
+ JobKeys.assertValid(lockKey.getJob());
+ return lockKey.getJob().getRole();
+ default:
+ throw new IllegalArgumentException("Unhandled LockKey: " + lockKey.getSetField());
+ }
+ }
+
+ @Override
+ public Response acquireLock(LockKey mutableLockKey, SessionKey session) {
+ checkNotNull(mutableLockKey);
+ checkNotNull(session);
+
+ ILockKey lockKey = ILockKey.build(mutableLockKey);
+ Response response = new Response();
+
+ try {
+ SessionContext context = sessionValidator.checkAuthenticated(
+ session,
+ ImmutableSet.of(getRoleFromLockKey(lockKey)));
+
+ ILock lock = lockManager.acquireLock(lockKey, context.getIdentity());
+ response.setResult(Result.acquireLockResult(
+ new AcquireLockResult().setLock(lock.newBuilder())));
+
+ return response.setResponseCode(OK).setMessage("Lock has been acquired.");
+ } catch (AuthFailedException e) {
+ return response.setResponseCode(AUTH_FAILED).setMessage(e.getMessage());
+ } catch (LockException e) {
+ return response.setResponseCode(ResponseCode.LOCK_ERROR).setMessage(e.getMessage());
+ }
+ }
+
+ @Override
+ public Response releaseLock(Lock mutableLock, LockValidation validation, SessionKey session) {
+ checkNotNull(mutableLock);
+ checkNotNull(validation);
+ checkNotNull(session);
+
+ Response response = new Response();
+ ILock lock = ILock.build(mutableLock);
+
+ try {
+ sessionValidator.checkAuthenticated(
+ session,
+ ImmutableSet.of(getRoleFromLockKey(lock.getKey())));
+
+ if (validation == LockValidation.CHECKED) {
+ lockManager.validateIfLocked(lock.getKey(), Optional.of(lock));
+ }
+ lockManager.releaseLock(lock);
+ return response.setResponseCode(OK).setMessage("Lock has been released.");
+ } catch (AuthFailedException e) {
+ return response.setResponseCode(AUTH_FAILED).setMessage(e.getMessage());
+ } catch (LockException e) {
+ return response.setResponseCode(ResponseCode.LOCK_ERROR).setMessage(e.getMessage());
+ }
+ }
+
+ @VisibleForTesting
+ static Optional<String> transitionMessage(String user) {
+ return Optional.of("Transition forced by " + user);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/thrift/ThriftConfiguration.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/ThriftConfiguration.java b/src/main/java/org/apache/aurora/scheduler/thrift/ThriftConfiguration.java
new file mode 100644
index 0000000..c6e9b18
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/ThriftConfiguration.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.thrift;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import com.google.common.base.Optional;
+
+/**
+ * Container for thrift server configuration options.
+ */
+public interface ThriftConfiguration {
+ /**
+ * Gets a stream for the thrift socket SSL key if this server is configured to use SSL.
+ *
+ * @return A stream that contains the SSL key data if SSL is enabled, absent otherwise.
+ * @throws IOException If the stream could not be opened.
+ */
+ Optional<? extends InputStream> getSslKeyStream() throws IOException;
+
+ /**
+ * Gets the port that the thrift server should listen on.
+ *
+ * @return Thrift server port.
+ */
+ int getServingPort();
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/thrift/ThriftModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/ThriftModule.java b/src/main/java/org/apache/aurora/scheduler/thrift/ThriftModule.java
new file mode 100644
index 0000000..cca9053
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/ThriftModule.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.thrift;
+
+import javax.inject.Singleton;
+
+import com.google.inject.AbstractModule;
+
+import com.twitter.aurora.gen.AuroraAdmin;
+import com.twitter.aurora.scheduler.thrift.aop.AopModule;
+import com.twitter.common.application.http.Registration;
+import com.twitter.common.application.modules.LifecycleModule;
+
+/**
+ * Binding module to configure a thrift server.
+ */
+public class ThriftModule extends AbstractModule {
+
+ @Override
+ protected void configure() {
+ bind(AuroraAdmin.Iface.class).to(SchedulerThriftInterface.class);
+ bind(ThriftServer.class).in(Singleton.class);
+ LifecycleModule.bindServiceRunner(binder(), ThriftServerLauncher.class);
+
+ Registration.registerServlet(binder(), "/api", SchedulerAPIServlet.class, true);
+
+ install(new AopModule());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/thrift/ThriftServer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/ThriftServer.java b/src/main/java/org/apache/aurora/scheduler/thrift/ThriftServer.java
new file mode 100644
index 0000000..7b9abd1
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/ThriftServer.java
@@ -0,0 +1,107 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.thrift;
+
+import java.net.ServerSocket;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TServerSocket;
+
+import com.twitter.thrift.Status;
+
+class ThriftServer {
+ private static final Logger LOG = Logger.getLogger(ThriftServer.class.getName());
+
+ private TServer server = null;
+
+ // Current health status of the server.
+ private Status status = Status.STARTING;
+
+ /**
+ * Starts the server.
+ * This may be called at any point except when the server is already alive. That is, it's
+ * allowable to start, stop, and re-start the server.
+ *
+ * @param socket The socket to use.
+ * @param processor The processor to handle requests.
+ */
+ public synchronized void start(ServerSocket socket, TProcessor processor) {
+ Preconditions.checkNotNull(socket);
+ Preconditions.checkNotNull(processor);
+ Preconditions.checkState(status != Status.ALIVE, "Server must only be started once.");
+ setStatus(Status.ALIVE);
+ TThreadPoolServer.Args args = new TThreadPoolServer.Args(new TServerSocket(socket))
+ .processor(processor)
+ .protocolFactory(new TBinaryProtocol.Factory(false, true));
+
+ final TServer starting = new TThreadPoolServer(args);
+ server = starting;
+ LOG.info("Starting thrift server on port " + socket.getLocalPort());
+
+ Thread listeningThread = new ThreadFactoryBuilder().setDaemon(false).build()
+ .newThread(new Runnable() {
+ @Override public void run() {
+ try {
+ starting.serve();
+ } catch (Throwable t) {
+ LOG.log(Level.WARNING,
+ "Uncaught exception while attempting to handle service requests: " + t, t);
+ setStatus(Status.DEAD);
+ }
+ }
+ });
+
+ listeningThread.start();
+ }
+
+ private synchronized void setStatus(Status status) {
+ LOG.info("Moving from status " + this.status + " to " + status);
+ this.status = status;
+ }
+
+ /**
+ * Attempts to shut down the server.
+ * The server may be shut down at any time, though the request will be ignored if the server is
+ * already stopped.
+ */
+ public synchronized void shutdown() {
+ if (status == Status.STOPPED) {
+ LOG.info("Server already stopped, shutdown request ignored.");
+ return;
+ }
+
+ LOG.info("Received shutdown request, stopping server.");
+ setStatus(Status.STOPPING);
+
+ // TODO(William Farner): Figure out what happens to queued / in-process requests when the server
+ // is stopped. Might want to allow a sleep period for the active requests to be completed.
+
+ if (server != null) {
+ server.stop();
+ }
+
+ server = null;
+ setStatus(Status.STOPPED);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/thrift/ThriftServerLauncher.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/ThriftServerLauncher.java b/src/main/java/org/apache/aurora/scheduler/thrift/ThriftServerLauncher.java
new file mode 100644
index 0000000..6743060
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/ThriftServerLauncher.java
@@ -0,0 +1,115 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.thrift;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.ServerSocket;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLServerSocket;
+import javax.net.ssl.SSLServerSocketFactory;
+
+import com.google.common.base.Optional;
+
+import com.twitter.aurora.gen.AuroraAdmin;
+import com.twitter.aurora.gen.AuroraAdmin.Iface;
+import com.twitter.common.application.modules.LifecycleModule.ServiceRunner;
+import com.twitter.common.application.modules.LocalServiceRegistry.LocalService;
+import com.twitter.common.base.Command;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Service launcher that starts up and registers the scheduler thrift server as a primary service
+ * for the application.
+ */
+class ThriftServerLauncher implements ServiceRunner {
+
+ private static final Logger LOG = Logger.getLogger(ThriftServerLauncher.class.getName());
+
+ private final ThriftConfiguration configuration;
+
+ // Security is enforced via file permissions, not via this password, for what it's worth.
+ private static final String SSL_KEYFILE_PASSWORD = "MesosKeyStorePassword";
+
+ private final Iface schedulerThriftInterface;
+ private final ThriftServer schedulerThriftServer;
+
+ @Inject
+ ThriftServerLauncher(
+ Iface schedulerThriftInterface,
+ ThriftServer schedulerThriftServer,
+ ThriftConfiguration configuration) {
+
+ this.schedulerThriftInterface = checkNotNull(schedulerThriftInterface);
+ this.schedulerThriftServer = checkNotNull(schedulerThriftServer);
+ this.configuration = checkNotNull(configuration);
+ }
+
+ @Override
+ public LocalService launch() {
+ ServerSocket socket = getServerSocket();
+ schedulerThriftServer.start(
+ socket,
+ new AuroraAdmin.Processor<>(schedulerThriftInterface));
+
+ Command shutdown = new Command() {
+ @Override public void execute() {
+ LOG.info("Stopping thrift server.");
+ schedulerThriftServer.shutdown();
+ }
+ };
+
+ return LocalService.primaryService(socket.getLocalPort(), shutdown);
+ }
+
+ private ServerSocket getServerSocket() {
+ try {
+ Optional<? extends InputStream> sslKeyStream = configuration.getSslKeyStream();
+ if (!sslKeyStream.isPresent()) {
+ LOG.warning("Running Thrift Server without SSL.");
+ return new ServerSocket(configuration.getServingPort());
+ } else {
+ // TODO(Kevin Sweeney): Add helper to perform this keyfile import.
+ KeyStore ks = KeyStore.getInstance("JKS");
+ ks.load(sslKeyStream.get(), SSL_KEYFILE_PASSWORD.toCharArray());
+
+ KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
+ kmf.init(ks, SSL_KEYFILE_PASSWORD.toCharArray());
+
+ SSLContext ctx = SSLContext.getInstance("TLS");
+ ctx.init(kmf.getKeyManagers(), null, null);
+
+ SSLServerSocketFactory ssf = ctx.getServerSocketFactory();
+ SSLServerSocket serverSocket = (SSLServerSocket) ssf.createServerSocket(
+ configuration.getServingPort());
+ serverSocket.setEnabledCipherSuites(serverSocket.getSupportedCipherSuites());
+ serverSocket.setNeedClientAuth(false);
+ return serverSocket;
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to read key file.", e);
+ } catch (GeneralSecurityException e) {
+ throw new RuntimeException("SSL setup failed.", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/thrift/aop/APIVersionInterceptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/aop/APIVersionInterceptor.java b/src/main/java/org/apache/aurora/scheduler/thrift/aop/APIVersionInterceptor.java
new file mode 100644
index 0000000..d66a2b2
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/aop/APIVersionInterceptor.java
@@ -0,0 +1,20 @@
+package com.twitter.aurora.scheduler.thrift.aop;
+
+import org.aopalliance.intercept.MethodInterceptor;
+import org.aopalliance.intercept.MethodInvocation;
+
+import com.twitter.aurora.gen.Response;
+
+import static com.twitter.aurora.gen.apiConstants.CURRENT_API_VERSION;
+
+class APIVersionInterceptor implements MethodInterceptor {
+
+ @Override
+ public Object invoke(MethodInvocation invocation) throws Throwable {
+ Response resp = (Response) invocation.proceed();
+ if (resp.version == null) {
+ resp.setVersion(CURRENT_API_VERSION);
+ }
+ return resp;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/thrift/aop/AopModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/aop/AopModule.java b/src/main/java/org/apache/aurora/scheduler/thrift/aop/AopModule.java
new file mode 100644
index 0000000..4afc263
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/aop/AopModule.java
@@ -0,0 +1,169 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.thrift.aop;
+
+import java.lang.reflect.Method;
+import java.util.List;
+import java.util.Map;
+
+import javax.inject.Inject;
+import javax.inject.Singleton;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.inject.AbstractModule;
+import com.google.inject.Binder;
+import com.google.inject.Key;
+import com.google.inject.PrivateModule;
+import com.google.inject.TypeLiteral;
+import com.google.inject.matcher.Matcher;
+import com.google.inject.matcher.Matchers;
+
+import org.aopalliance.intercept.MethodInterceptor;
+
+import com.twitter.aurora.GuiceUtils;
+import com.twitter.aurora.auth.CapabilityValidator;
+import com.twitter.aurora.gen.AuroraAdmin;
+import com.twitter.aurora.gen.AuroraSchedulerManager;
+import com.twitter.aurora.scheduler.thrift.auth.DecoratedThrift;
+import com.twitter.common.args.Arg;
+import com.twitter.common.args.CmdLine;
+
+/**
+ * Binding module for AOP-style decorations of the thrift API.
+ */
+public class AopModule extends AbstractModule {
+
+ @CmdLine(name = "enable_job_updates", help = "Whether new job updates should be accepted.")
+ private static final Arg<Boolean> ENABLE_UPDATES = Arg.create(true);
+
+ @CmdLine(name = "enable_job_creation",
+ help = "Allow new jobs to be created, if false all job creation requests will be denied.")
+ private static final Arg<Boolean> ENABLE_JOB_CREATION = Arg.create(true);
+
+ private static final Matcher<? super Class<?>> THRIFT_IFACE_MATCHER =
+ Matchers.subclassesOf(AuroraAdmin.Iface.class)
+ .and(Matchers.annotatedWith(DecoratedThrift.class));
+
+ private final Map<String, Boolean> toggledMethods;
+
+ public AopModule() {
+ this(ImmutableMap.of(
+ "createJob", ENABLE_JOB_CREATION.get(),
+ "acquireLock", ENABLE_UPDATES.get()));
+ }
+
+ @VisibleForTesting
+ AopModule(Map<String, Boolean> toggledMethods) {
+ this.toggledMethods = ImmutableMap.copyOf(toggledMethods);
+ }
+
+ private static final Function<Method, String> GET_NAME = new Function<Method, String>() {
+ @Override public String apply(Method method) {
+ return method.getName();
+ }
+ };
+
+ @Override
+ protected void configure() {
+ requireBinding(CapabilityValidator.class);
+
+ // Layer ordering:
+ // Log -> CapabilityValidator -> FeatureToggle -> StatsExporter -> APIVersion ->
+ // SchedulerThriftInterface
+
+ // TODO(Sathya): Consider using provider pattern for constructing interceptors to facilitate
+ // unit testing without the creation of Guice injectors.
+ bindThriftDecorator(new LoggingInterceptor());
+
+ // Note: it's important that the capability interceptor is only applied to AuroraAdmin.Iface
+ // methods, and does not pick up methods on AuroraSchedulerManager.Iface.
+ MethodInterceptor authInterceptor = new UserCapabilityInterceptor();
+ requestInjection(authInterceptor);
+ bindInterceptor(
+ THRIFT_IFACE_MATCHER,
+ GuiceUtils.interfaceMatcher(AuroraAdmin.Iface.class, true),
+ authInterceptor);
+
+ install(new PrivateModule() {
+ @Override protected void configure() {
+ // Ensure that the provided methods exist on the decorated interface.
+ List<Method> methods =
+ ImmutableList.copyOf(AuroraSchedulerManager.Iface.class.getMethods());
+ for (String toggledMethod : toggledMethods.keySet()) {
+ Preconditions.checkArgument(
+ Iterables.any(methods,
+ Predicates.compose(Predicates.equalTo(toggledMethod), GET_NAME)),
+ String.format("Method %s was not found in class %s",
+ toggledMethod,
+ AuroraSchedulerManager.Iface.class));
+ }
+
+ bind(new TypeLiteral<Map<String, Boolean>>() { }).toInstance(toggledMethods);
+ bind(IsFeatureEnabled.class).in(Singleton.class);
+ Key<Predicate<Method>> predicateKey = Key.get(new TypeLiteral<Predicate<Method>>() { });
+ bind(predicateKey).to(IsFeatureEnabled.class);
+ expose(predicateKey);
+ }
+ });
+ bindThriftDecorator(new FeatureToggleInterceptor());
+ bindThriftDecorator(new ThriftStatsExporterInterceptor());
+ bindThriftDecorator(new APIVersionInterceptor());
+ }
+
+ private void bindThriftDecorator(MethodInterceptor interceptor) {
+ bindThriftDecorator(binder(), THRIFT_IFACE_MATCHER, interceptor);
+ }
+
+ @VisibleForTesting
+ static void bindThriftDecorator(
+ Binder binder,
+ Matcher<? super Class<?>> classMatcher,
+ MethodInterceptor interceptor) {
+
+ binder.bindInterceptor(classMatcher, Matchers.any(), interceptor);
+ binder.requestInjection(interceptor);
+ }
+
+ private static class IsFeatureEnabled implements Predicate<Method> {
+ private final Predicate<String> methodEnabled;
+
+ @Inject
+ IsFeatureEnabled(Map<String, Boolean> toggleMethods) {
+ Predicate<String> builder = Predicates.alwaysTrue();
+ for (Map.Entry<String, Boolean> toggleMethod : toggleMethods.entrySet()) {
+ Predicate<String> enableMethod = Predicates.or(
+ toggleMethod.getValue()
+ ? Predicates.<String>alwaysTrue()
+ : Predicates.<String>alwaysFalse(),
+ Predicates.not(Predicates.equalTo(toggleMethod.getKey())));
+ builder = Predicates.and(builder, enableMethod);
+ }
+ methodEnabled = builder;
+ }
+
+ @Override
+ public boolean apply(Method method) {
+ return methodEnabled.apply(method.getName());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/thrift/aop/FeatureToggleInterceptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/aop/FeatureToggleInterceptor.java b/src/main/java/org/apache/aurora/scheduler/thrift/aop/FeatureToggleInterceptor.java
new file mode 100644
index 0000000..03c3d99
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/aop/FeatureToggleInterceptor.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.thrift.aop;
+
+import java.lang.reflect.Method;
+
+import javax.inject.Inject;
+
+import com.google.common.base.Predicate;
+
+import org.aopalliance.intercept.MethodInterceptor;
+import org.aopalliance.intercept.MethodInvocation;
+
+import com.twitter.aurora.gen.ResponseCode;
+
+/**
+ * A method interceptor that blocks access to features based on a supplied predicate.
+ */
+public class FeatureToggleInterceptor implements MethodInterceptor {
+
+ @Inject private Predicate<Method> allowMethod;
+
+ @Override
+ public Object invoke(MethodInvocation invocation) throws Throwable {
+ Method method = invocation.getMethod();
+ if (!allowMethod.apply(method)) {
+ return Interceptors.properlyTypedResponse(
+ method,
+ ResponseCode.ERROR,
+ "The " + method.getName() + " feature is currently disabled on this scheduler.");
+ } else {
+ return invocation.proceed();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/thrift/aop/Interceptors.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/aop/Interceptors.java b/src/main/java/org/apache/aurora/scheduler/thrift/aop/Interceptors.java
new file mode 100644
index 0000000..d0cb9c1
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/aop/Interceptors.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.thrift.aop;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.logging.Logger;
+
+import com.google.common.base.Throwables;
+
+import com.twitter.aurora.gen.ResponseCode;
+
+/**
+ * Utility class for functions useful when implementing an interceptor on the thrift interface.
+ */
+final class Interceptors {
+
+ private Interceptors() {
+ // Utility class.
+ }
+
+ private static final Logger LOG = Logger.getLogger(Interceptors.class.getName());
+
+ static Object properlyTypedResponse(Method method, ResponseCode responseCode, String message)
+ throws IllegalAccessException, InstantiationException {
+
+ Class<?> returnType = method.getReturnType();
+ Object response = returnType.newInstance();
+ invoke(returnType, response, "setResponseCode", ResponseCode.class, responseCode);
+ invoke(returnType, response, "setMessage", String.class, message);
+ return response;
+ }
+
+ private static <T> void invoke(
+ Class<?> type,
+ Object obj,
+ String name,
+ Class<T> parameterType,
+ T argument) {
+
+ Method method;
+ try {
+ method = type.getMethod(name, parameterType);
+ } catch (NoSuchMethodException e) {
+ LOG.severe(type + " does not support " + name);
+ throw Throwables.propagate(e);
+ }
+ try {
+ method.invoke(obj, argument);
+ } catch (IllegalAccessException e) {
+ LOG.severe("Method " + name + " is not accessible in " + type);
+ throw Throwables.propagate(e);
+ } catch (InvocationTargetException e) {
+ LOG.severe("Failed to invoke " + name + " on " + type);
+ throw Throwables.propagate(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/thrift/aop/LoggingInterceptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/aop/LoggingInterceptor.java b/src/main/java/org/apache/aurora/scheduler/thrift/aop/LoggingInterceptor.java
new file mode 100644
index 0000000..5f773cc
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/aop/LoggingInterceptor.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.thrift.aop;
+
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
+import org.aopalliance.intercept.MethodInterceptor;
+import org.aopalliance.intercept.MethodInvocation;
+
+import com.twitter.aurora.auth.CapabilityValidator;
+import com.twitter.aurora.gen.ExecutorConfig;
+import com.twitter.aurora.gen.JobConfiguration;
+import com.twitter.aurora.gen.ResponseCode;
+import com.twitter.aurora.gen.SessionKey;
+
+import static com.twitter.aurora.scheduler.thrift.aop.Interceptors.properlyTypedResponse;
+
+/**
+ * A method interceptor that logs all invocations as well as any unchecked exceptions thrown from
+ * the underlying call.
+ */
+class LoggingInterceptor implements MethodInterceptor {
+
+ private static final Logger LOG = Logger.getLogger(LoggingInterceptor.class.getName());
+
+ @Inject private CapabilityValidator validator;
+
+ // TODO(wfarner): Scrub updateToken when it is identifiable by type.
+ private final Map<Class<?>, Function<Object, String>> printFunctions =
+ ImmutableMap.<Class<?>, Function<Object, String>>of(
+ JobConfiguration.class,
+ new Function<Object, String>() {
+ @Override public String apply(Object input) {
+ JobConfiguration configuration = ((JobConfiguration) input).deepCopy();
+ if (configuration.isSetTaskConfig()) {
+ configuration.getTaskConfig().setExecutorConfig(
+ new ExecutorConfig("BLANKED", "BLANKED"));
+ }
+ return configuration.toString();
+ }
+ },
+ SessionKey.class,
+ new Function<Object, String>() {
+ @Override public String apply(Object input) {
+ SessionKey key = (SessionKey) input;
+ return validator.toString(key);
+ }
+ }
+ );
+
+ @Override
+ public Object invoke(MethodInvocation invocation) throws Throwable {
+ List<String> argStrings = Lists.newArrayList();
+ for (Object arg : invocation.getArguments()) {
+ if (arg == null) {
+ argStrings.add("null");
+ } else {
+ Function<Object, String> printFunction = printFunctions.get(arg.getClass());
+ argStrings.add((printFunction == null) ? arg.toString() : printFunction.apply(arg));
+ }
+ }
+ String methodName = invocation.getMethod().getName();
+ String message = String.format("%s(%s)", methodName, Joiner.on(", ").join(argStrings));
+ LOG.info(message);
+ try {
+ return invocation.proceed();
+ } catch (RuntimeException e) {
+ LOG.log(Level.WARNING, "Uncaught exception while handling " + message, e);
+ return properlyTypedResponse(invocation.getMethod(), ResponseCode.ERROR, e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/thrift/aop/ThriftStatsExporterInterceptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/aop/ThriftStatsExporterInterceptor.java b/src/main/java/org/apache/aurora/scheduler/thrift/aop/ThriftStatsExporterInterceptor.java
new file mode 100644
index 0000000..d700ab5
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/aop/ThriftStatsExporterInterceptor.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.thrift.aop;
+
+import java.lang.reflect.Method;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
+import org.aopalliance.intercept.MethodInterceptor;
+import org.aopalliance.intercept.MethodInvocation;
+
+import com.twitter.common.stats.SlidingStats;
+import com.twitter.common.stats.Stats;
+
+/**
+ * A method interceptor that exports counterStats about thrift calls.
+ */
+class ThriftStatsExporterInterceptor implements MethodInterceptor {
+
+ private final LoadingCache<Method, SlidingStats> stats =
+ CacheBuilder.newBuilder().build(new CacheLoader<Method, SlidingStats>() {
+ @Override public SlidingStats load(Method method) {
+ return new SlidingStats(
+ Stats.normalizeName(String.format("scheduler_thrift_%s", method.getName())),
+ "nanos");
+ }
+ });
+
+ @Override
+ public Object invoke(MethodInvocation invocation) throws Throwable {
+ SlidingStats stat = stats.get(invocation.getMethod());
+ long start = System.nanoTime();
+ try {
+ return invocation.proceed();
+ } finally {
+ stat.accumulate(System.nanoTime() - start);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/thrift/aop/UserCapabilityInterceptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/aop/UserCapabilityInterceptor.java b/src/main/java/org/apache/aurora/scheduler/thrift/aop/UserCapabilityInterceptor.java
new file mode 100644
index 0000000..d9240bc
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/aop/UserCapabilityInterceptor.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.thrift.aop;
+
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.List;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicates;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+
+import org.aopalliance.intercept.MethodInterceptor;
+import org.aopalliance.intercept.MethodInvocation;
+
+import com.twitter.aurora.auth.CapabilityValidator;
+import com.twitter.aurora.auth.CapabilityValidator.AuditCheck;
+import com.twitter.aurora.auth.CapabilityValidator.Capability;
+import com.twitter.aurora.auth.SessionValidator.AuthFailedException;
+import com.twitter.aurora.gen.ResponseCode;
+import com.twitter.aurora.gen.SessionKey;
+import com.twitter.aurora.scheduler.thrift.auth.Requires;
+
+/**
+ * A method interceptor that will authenticate users identified by a {@link SessionKey} argument
+ * to invoked methods.
+ * <p>
+ * Intercepted methods will require {@link Capability#ROOT}, but additional capabilities
+ * may be specified by annotating methods with {@link Requires} and supplying a whitelist.
+ */
+class UserCapabilityInterceptor implements MethodInterceptor {
+ private static final Logger LOG = Logger.getLogger(UserCapabilityInterceptor.class.getName());
+
+ @Inject private CapabilityValidator capabilityValidator;
+
+ private static final Function<Object, SessionKey> CAST = new Function<Object, SessionKey>() {
+ @Override public SessionKey apply(Object o) {
+ return (SessionKey) o;
+ }
+ };
+
+ @Override
+ public Object invoke(MethodInvocation invocation) throws Throwable {
+ Preconditions.checkNotNull(capabilityValidator, "Session validator has not yet been set.");
+
+ // Ensure ROOT is always permitted.
+ ImmutableList.Builder<Capability> whitelistBuilder =
+ ImmutableList.<Capability>builder().add(Capability.ROOT);
+
+ Method method = invocation.getMethod();
+ Requires requires = method.getAnnotation(Requires.class);
+ if (requires != null) {
+ whitelistBuilder.add(requires.whitelist());
+ }
+
+ List<Capability> whitelist = whitelistBuilder.build();
+ LOG.fine("Operation " + method.getName() + " may be performed by: " + whitelist);
+ Optional<SessionKey> sessionKey = FluentIterable.from(Arrays.asList(invocation.getArguments()))
+ .firstMatch(Predicates.instanceOf(SessionKey.class)).transform(CAST);
+ if (!sessionKey.isPresent()) {
+ LOG.severe("Interceptor should only be applied to methods accepting a SessionKey, but "
+ + method + " does not.");
+ return invocation.proceed();
+ }
+
+ String key = capabilityValidator.toString(sessionKey.get());
+ for (Capability user : whitelist) {
+ LOG.fine("Attempting to validate " + key + " against " + user);
+ try {
+ capabilityValidator.checkAuthorized(sessionKey.get(), user, AuditCheck.NONE);
+
+ LOG.info("Permitting " + key + " to act as "
+ + user + " and perform action " + method.getName());
+ return invocation.proceed();
+ } catch (AuthFailedException e) {
+ LOG.fine("Auth failed: " + e);
+ }
+ }
+
+ // User is not permitted to perform this operation.
+ return Interceptors.properlyTypedResponse(
+ method,
+ ResponseCode.AUTH_FAILED,
+ "Session identified by '" + key
+ + "' does not have the required capability to perform this action: " + whitelist);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/thrift/auth/DecoratedThrift.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/auth/DecoratedThrift.java b/src/main/java/org/apache/aurora/scheduler/thrift/auth/DecoratedThrift.java
new file mode 100644
index 0000000..4a667c5
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/auth/DecoratedThrift.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.thrift.auth;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.ElementType.TYPE;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+/**
+ * Type annotation to apply to a thrift interface implementation that should be decorated with
+ * additional functionality.
+ */
+@Target({PARAMETER, TYPE}) @Retention(RUNTIME)
+public @interface DecoratedThrift {
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/thrift/auth/Requires.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/auth/Requires.java b/src/main/java/org/apache/aurora/scheduler/thrift/auth/Requires.java
new file mode 100644
index 0000000..0fff3f6
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/auth/Requires.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.thrift.auth;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import com.twitter.aurora.auth.CapabilityValidator.Capability;
+
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+/**
+ * Annotation applied to a method that may allow users with non-ROOT capabilities to perform
+ * an action.
+ */
+@Target(METHOD) @Retention(RUNTIME)
+public @interface Requires {
+ /**
+ * The list of capabilities required to perform an action.
+ */
+ Capability[] whitelist() default { Capability.ROOT };
+}