You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2013/12/31 22:20:33 UTC
[40/51] [partial] Rename twitter* and com.twitter to apache and
org.apache directories to preserve all file history before the refactor.
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemStorage.java b/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemStorage.java
deleted file mode 100644
index 438c023..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemStorage.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.storage.mem;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.inject.Inject;
-
-import com.google.common.annotations.VisibleForTesting;
-
-import com.twitter.aurora.scheduler.storage.AttributeStore;
-import com.twitter.aurora.scheduler.storage.JobStore;
-import com.twitter.aurora.scheduler.storage.LockStore;
-import com.twitter.aurora.scheduler.storage.QuotaStore;
-import com.twitter.aurora.scheduler.storage.ReadWriteLockManager;
-import com.twitter.aurora.scheduler.storage.SchedulerStore;
-import com.twitter.aurora.scheduler.storage.Storage;
-import com.twitter.aurora.scheduler.storage.TaskStore;
-import com.twitter.common.inject.TimedInterceptor.Timed;
-import com.twitter.common.stats.Stats;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * A storage implementation comprised of individual in-memory store implementations.
- * <p>
- * This storage has a global read-write lock, which is used when invoking
- * {@link #consistentRead(Work)} and {@link #write(MutateWork)}. However, no locks are used at this
- * level for {@link #weaklyConsistentRead(Work)}. It is the responsibility of the
- * individual stores to ensure that read operations are thread-safe (optimally supporting
- * concurrency). Store implementations may assume that all methods invoked on {@code Mutable}
- * store interfaces are protected by the global write lock, and thus invoked serially.
- */
-public class MemStorage implements Storage {
- private final AtomicLong readLockWaitNanos = Stats.exportLong("read_lock_wait_nanos");
- private final AtomicLong writeLockWaitNanos = Stats.exportLong("write_lock_wait_nanos");
-
- private final MutableStoreProvider storeProvider;
- private final ReadWriteLockManager lockManager = new ReadWriteLockManager();
-
- @Inject
- MemStorage(
- final SchedulerStore.Mutable schedulerStore,
- final JobStore.Mutable jobStore,
- final TaskStore.Mutable taskStore,
- final LockStore.Mutable lockStore,
- final QuotaStore.Mutable quotaStore,
- final AttributeStore.Mutable attributeStore) {
-
- storeProvider = new MutableStoreProvider() {
- @Override public SchedulerStore.Mutable getSchedulerStore() {
- return schedulerStore;
- }
-
- @Override public JobStore.Mutable getJobStore() {
- return jobStore;
- }
-
- @Override public TaskStore getTaskStore() {
- return taskStore;
- }
-
- @Override public TaskStore.Mutable getUnsafeTaskStore() {
- return taskStore;
- }
-
- @Override public LockStore.Mutable getLockStore() {
- return lockStore;
- }
-
- @Override public QuotaStore.Mutable getQuotaStore() {
- return quotaStore;
- }
-
- @Override public AttributeStore.Mutable getAttributeStore() {
- return attributeStore;
- }
- };
- }
-
- /**
- * Creates a new empty in-memory storage for use in testing.
- */
- @VisibleForTesting
- public static MemStorage newEmptyStorage() {
- return new MemStorage(
- new MemSchedulerStore(),
- new MemJobStore(),
- new MemTaskStore(),
- new MemLockStore(),
- new MemQuotaStore(),
- new MemAttributeStore());
- }
-
- @Timed("mem_storage_consistent_read_operation")
- @Override
- public <T, E extends Exception> T consistentRead(Work<T, E> work) throws StorageException, E {
- checkNotNull(work);
-
- long lockStartNanos = System.nanoTime();
- boolean topLevelOperation = lockManager.readLock();
- if (topLevelOperation) {
- readLockWaitNanos.addAndGet(System.nanoTime() - lockStartNanos);
- }
- try {
- return work.apply(storeProvider);
- } finally {
- lockManager.readUnlock();
- }
- }
-
- @Timed("mem_storage_weakly_consistent_read_operation")
- @Override
- public <T, E extends Exception> T weaklyConsistentRead(Work<T, E> work)
- throws StorageException, E {
-
- return work.apply(storeProvider);
- }
-
- @Timed("mem_storage_write_operation")
- @Override
- public <T, E extends Exception> T write(MutateWork<T, E> work)
- throws StorageException, E {
-
- checkNotNull(work);
-
- long lockStartNanos = System.nanoTime();
- boolean topLevelOperation = lockManager.writeLock();
- if (topLevelOperation) {
- writeLockWaitNanos.addAndGet(System.nanoTime() - lockStartNanos);
- }
- try {
- return work.apply(storeProvider);
- } finally {
- lockManager.writeUnlock();
- }
- }
-
- @Override
- public void snapshot() {
- // No-op.
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemStorageModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemStorageModule.java b/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemStorageModule.java
deleted file mode 100644
index a99aa12..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemStorageModule.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.storage.mem;
-
-import javax.inject.Singleton;
-
-import com.google.inject.Key;
-import com.google.inject.PrivateModule;
-
-import com.twitter.aurora.scheduler.storage.AttributeStore;
-import com.twitter.aurora.scheduler.storage.JobStore;
-import com.twitter.aurora.scheduler.storage.LockStore;
-import com.twitter.aurora.scheduler.storage.QuotaStore;
-import com.twitter.aurora.scheduler.storage.SchedulerStore;
-import com.twitter.aurora.scheduler.storage.Storage;
-import com.twitter.aurora.scheduler.storage.Storage.Volatile;
-import com.twitter.aurora.scheduler.storage.TaskStore;
-import com.twitter.common.inject.Bindings.KeyFactory;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Binding module for an in-memory storage system.
- * <p>
- * Exposes bindings for storage components:
- * <ul>
- * <li>{@link com.twitter.aurora.scheduler.storage.Storage}</li>
- * <li>Keyed with keys provided by the provided{@code keyFactory}:</li>
- * <ul>
- * <li>{@link com.twitter.aurora.scheduler.storage.SchedulerStore}</li>
- * <li>{@link com.twitter.aurora.scheduler.storage.JobStore}</li>
- * <li>{@link com.twitter.aurora.scheduler.storage.TaskStore}</li>
- * <li>{@link com.twitter.aurora.scheduler.storage.LockStore}</li>
- * <li>{@link com.twitter.aurora.scheduler.storage.QuotaStore}</li>
- * <li>{@link com.twitter.aurora.scheduler.storage.AttributeStore}</li>
- * </ul>
- * </ul>
- */
-public final class MemStorageModule extends PrivateModule {
-
- private final KeyFactory keyFactory;
-
- public MemStorageModule(KeyFactory keyFactory) {
- this.keyFactory = checkNotNull(keyFactory);
- }
-
- private <T> void bindStore(Class<T> binding, Class<? extends T> impl) {
- bind(binding).to(impl);
- bind(impl).in(Singleton.class);
- Key<T> key = keyFactory.create(binding);
- bind(key).to(impl);
- expose(key);
- }
-
- @Override
- protected void configure() {
- Key<Storage> storageKey = keyFactory.create(Storage.class);
- bind(storageKey).to(MemStorage.class);
- expose(storageKey);
- Key<Storage> exposedMemStorageKey = Key.get(Storage.class, Volatile.class);
- bind(exposedMemStorageKey).to(MemStorage.class);
- expose(exposedMemStorageKey);
- bind(MemStorage.class).in(Singleton.class);
-
- bindStore(SchedulerStore.Mutable.class, MemSchedulerStore.class);
- bindStore(JobStore.Mutable.class, MemJobStore.class);
- bindStore(TaskStore.Mutable.class, MemTaskStore.class);
- bindStore(LockStore.Mutable.class, MemLockStore.class);
- bindStore(QuotaStore.Mutable.class, MemQuotaStore.class);
- bindStore(AttributeStore.Mutable.class, MemAttributeStore.class);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemTaskStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemTaskStore.java b/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemTaskStore.java
deleted file mode 100644
index d02511f..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemTaskStore.java
+++ /dev/null
@@ -1,304 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.storage.mem;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import com.google.common.base.Function;
-import com.google.common.base.Functions;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
-
-import org.apache.commons.lang.StringUtils;
-
-import com.twitter.aurora.gen.ScheduledTask;
-import com.twitter.aurora.gen.TaskConfig;
-import com.twitter.aurora.gen.TaskQuery;
-import com.twitter.aurora.scheduler.base.JobKeys;
-import com.twitter.aurora.scheduler.base.Query;
-import com.twitter.aurora.scheduler.base.Tasks;
-import com.twitter.aurora.scheduler.storage.TaskStore;
-import com.twitter.aurora.scheduler.storage.entities.IJobKey;
-import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
-import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
-import com.twitter.common.args.Arg;
-import com.twitter.common.args.CmdLine;
-import com.twitter.common.base.MorePreconditions;
-import com.twitter.common.inject.TimedInterceptor.Timed;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.stats.Stats;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * An in-memory task store.
- */
-class MemTaskStore implements TaskStore.Mutable {
-
- private static final Logger LOG = Logger.getLogger(MemTaskStore.class.getName());
-
- @CmdLine(name = "slow_query_log_threshold",
- help = "Log all queries that take at least this long to execute.")
- private static final Arg<Amount<Long, Time>> SLOW_QUERY_LOG_THRESHOLD =
- Arg.create(Amount.of(25L, Time.MILLISECONDS));
-
- private final long slowQueryThresholdNanos = SLOW_QUERY_LOG_THRESHOLD.get().as(Time.NANOSECONDS);
-
- private final Map<String, Task> tasks = Maps.newConcurrentMap();
- private final Multimap<IJobKey, String> tasksByJobKey =
- Multimaps.synchronizedSetMultimap(HashMultimap.<IJobKey, String>create());
-
- // An interner is used here to collapse equivalent TaskConfig instances into canonical instances.
- // Ideally this would fall out of the object hierarchy (TaskConfig being associated with the job
- // rather than the task), but we intuit this detail here for performance reasons.
- private final Interner<TaskConfig, String> configInterner = new Interner<TaskConfig, String>();
-
- private final AtomicLong taskQueriesById = Stats.exportLong("task_queries_by_id");
- private final AtomicLong taskQueriesByJob = Stats.exportLong("task_queries_by_job");
- private final AtomicLong taskQueriesAll = Stats.exportLong("task_queries_all");
-
- @Timed("mem_storage_fetch_tasks")
- @Override
- public ImmutableSet<IScheduledTask> fetchTasks(Query.Builder query) {
- checkNotNull(query);
-
- long start = System.nanoTime();
- ImmutableSet<IScheduledTask> result = matches(query.get()).toSet();
- long durationNanos = System.nanoTime() - start;
- Level level = (durationNanos >= slowQueryThresholdNanos) ? Level.INFO : Level.FINE;
- if (LOG.isLoggable(level)) {
- Long time = Amount.of(durationNanos, Time.NANOSECONDS).as(Time.MILLISECONDS);
- LOG.log(level, "Query took " + time + " ms: " + query.get());
- }
-
- return result;
- }
-
- private final Function<IScheduledTask, Task> toTask =
- new Function<IScheduledTask, Task>() {
- @Override public Task apply(IScheduledTask task) {
- return new Task(task, configInterner);
- }
- };
-
- @Timed("mem_storage_save_tasks")
- @Override
- public void saveTasks(Set<IScheduledTask> newTasks) {
- checkNotNull(newTasks);
- Preconditions.checkState(Tasks.ids(newTasks).size() == newTasks.size(),
- "Proposed new tasks would create task ID collision.");
-
- Iterable<Task> canonicalized = Iterables.transform(newTasks, toTask);
- tasks.putAll(Maps.uniqueIndex(canonicalized, TO_ID));
- tasksByJobKey.putAll(taskIdsByJobKey(canonicalized));
- }
-
- private Multimap<IJobKey, String> taskIdsByJobKey(Iterable<Task> toIndex) {
- return Multimaps.transformValues(
- Multimaps.index(toIndex, Functions.compose(Tasks.SCHEDULED_TO_JOB_KEY, TO_SCHEDULED)),
- TO_ID);
- }
-
- @Timed("mem_storage_delete_all_tasks")
- @Override
- public void deleteAllTasks() {
- tasks.clear();
- tasksByJobKey.clear();
- configInterner.clear();
- }
-
- @Timed("mem_storage_delete_tasks")
- @Override
- public void deleteTasks(Set<String> taskIds) {
- checkNotNull(taskIds);
-
- for (String id : taskIds) {
- Task removed = tasks.remove(id);
- if (removed != null) {
- tasksByJobKey.remove(Tasks.SCHEDULED_TO_JOB_KEY.apply(removed.task), id);
- configInterner.removeAssociation(removed.task.getAssignedTask().getTask().newBuilder(), id);
- }
- }
- }
-
- @Timed("mem_storage_mutate_tasks")
- @Override
- public ImmutableSet<IScheduledTask> mutateTasks(
- Query.Builder query,
- Function<IScheduledTask, IScheduledTask> mutator) {
-
- checkNotNull(query);
- checkNotNull(mutator);
-
- ImmutableSet.Builder<IScheduledTask> mutated = ImmutableSet.builder();
- for (IScheduledTask original : matches(query.get())) {
- IScheduledTask maybeMutated = mutator.apply(original);
- if (!original.equals(maybeMutated)) {
- Preconditions.checkState(
- Tasks.id(original).equals(Tasks.id(maybeMutated)),
- "A task's ID may not be mutated.");
- tasks.put(Tasks.id(maybeMutated), toTask.apply(maybeMutated));
- mutated.add(maybeMutated);
- }
- }
-
- return mutated.build();
- }
-
- @Timed("mem_storage_unsafe_modify_in_place")
- @Override
- public boolean unsafeModifyInPlace(String taskId, ITaskConfig taskConfiguration) {
- MorePreconditions.checkNotBlank(taskId);
- checkNotNull(taskConfiguration);
-
- Task stored = tasks.get(taskId);
- if (stored == null) {
- return false;
- } else {
- ScheduledTask updated = stored.task.newBuilder();
- updated.getAssignedTask().setTask(taskConfiguration.newBuilder());
- tasks.put(taskId, toTask.apply(IScheduledTask.build(updated)));
- return true;
- }
- }
-
- private static Predicate<IScheduledTask> queryFilter(final TaskQuery query) {
- return new Predicate<IScheduledTask>() {
- @Override public boolean apply(IScheduledTask task) {
- ITaskConfig config = task.getAssignedTask().getTask();
- if (query.getOwner() != null) {
- if (!StringUtils.isBlank(query.getOwner().getRole())) {
- if (!query.getOwner().getRole().equals(config.getOwner().getRole())) {
- return false;
- }
- }
- if (!StringUtils.isBlank(query.getOwner().getUser())) {
- if (!query.getOwner().getUser().equals(config.getOwner().getUser())) {
- return false;
- }
- }
- }
- if (query.getEnvironment() != null) {
- if (!query.getEnvironment().equals(config.getEnvironment())) {
- return false;
- }
- }
- if (query.getJobName() != null) {
- if (!query.getJobName().equals(config.getJobName())) {
- return false;
- }
- }
-
- if (query.getTaskIds() != null) {
- if (!query.getTaskIds().contains(Tasks.id(task))) {
- return false;
- }
- }
-
- if (query.getStatusesSize() > 0) {
- if (!query.getStatuses().contains(task.getStatus())) {
- return false;
- }
- }
- if (!StringUtils.isEmpty(query.getSlaveHost())) {
- if (!query.getSlaveHost().equals(task.getAssignedTask().getSlaveHost())) {
- return false;
- }
- }
- if (query.getInstanceIdsSize() > 0) {
- if (!query.getInstanceIds().contains(task.getAssignedTask().getInstanceId())) {
- return false;
- }
- }
-
- return true;
- }
- };
- }
-
- private Iterable<Task> fromIdIndex(Iterable<String> taskIds) {
- ImmutableList.Builder<Task> matches = ImmutableList.builder();
- for (String id : taskIds) {
- Task match = tasks.get(id);
- if (match != null) {
- matches.add(match);
- }
- }
- return matches.build();
- }
-
- private FluentIterable<IScheduledTask> matches(TaskQuery query) {
- // Apply the query against the working set.
- Iterable<Task> from;
- Optional<IJobKey> jobKey = JobKeys.from(Query.arbitrary(query));
- if (query.isSetTaskIds()) {
- taskQueriesById.incrementAndGet();
- from = fromIdIndex(query.getTaskIds());
- } else if (jobKey.isPresent()) {
- taskQueriesByJob.incrementAndGet();
- Collection<String> taskIds = tasksByJobKey.get(jobKey.get());
- if (taskIds == null) {
- from = ImmutableList.of();
- } else {
- from = fromIdIndex(taskIds);
- }
- } else {
- taskQueriesAll.incrementAndGet();
- from = tasks.values();
- }
-
- return FluentIterable.from(from).transform(TO_SCHEDULED).filter(queryFilter(query));
- }
-
- private static final Function<Task, IScheduledTask> TO_SCHEDULED =
- new Function<Task, IScheduledTask>() {
- @Override public IScheduledTask apply(Task task) {
- return task.task;
- }
- };
-
- private static final Function<Task, String> TO_ID =
- Functions.compose(Tasks.SCHEDULED_TO_ID, TO_SCHEDULED);
-
- private static class Task {
- private final IScheduledTask task;
-
- Task(IScheduledTask task, Interner<TaskConfig, String> interner) {
- interner.removeAssociation(task.getAssignedTask().getTask().newBuilder(), Tasks.id(task));
- TaskConfig canonical = interner.addAssociation(
- task.getAssignedTask().getTask().newBuilder(),
- Tasks.id(task));
- ScheduledTask builder = task.newBuilder();
- builder.getAssignedTask().setTask(canonical);
- this.task = IScheduledTask.build(builder);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/storage/mem/Util.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/storage/mem/Util.java b/src/main/java/com/twitter/aurora/scheduler/storage/mem/Util.java
deleted file mode 100644
index cf1b057..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/storage/mem/Util.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.storage.mem;
-
-import javax.annotation.Nullable;
-
-import com.google.common.base.Function;
-
-import org.apache.thrift.TBase;
-
-/**
- * Utility class for common operations amongst in-memory store implementations.
- */
-final class Util {
-
- private Util() {
- // Utility class.
- }
-
- /**
- * Creates a function that performs a 'deep copy' on a thrift struct of a specific type. The
- * resulting copied objects will be exactly identical to the original. Mutations to the original
- * object will not be reflected in the copy, and vice versa.
- *
- * @return A copier for the provided type of thrift structs.
- */
- static <T extends TBase<T, ?>> Function<T, T> deepCopier() {
- return new Function<T, T>() {
- @Override public T apply(@Nullable T input) {
- if (input == null) {
- return null;
- }
-
- @SuppressWarnings("unchecked")
- T t = (T) input.deepCopy();
- return t;
- }
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/storage/testing/StorageTestUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/storage/testing/StorageTestUtil.java b/src/main/java/com/twitter/aurora/scheduler/storage/testing/StorageTestUtil.java
deleted file mode 100644
index d735828..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/storage/testing/StorageTestUtil.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.storage.testing;
-
-import com.google.common.collect.ImmutableSet;
-
-import org.easymock.Capture;
-import org.easymock.IAnswer;
-import org.easymock.IExpectationSetters;
-
-import com.twitter.aurora.scheduler.base.Query;
-import com.twitter.aurora.scheduler.storage.AttributeStore;
-import com.twitter.aurora.scheduler.storage.JobStore;
-import com.twitter.aurora.scheduler.storage.LockStore;
-import com.twitter.aurora.scheduler.storage.QuotaStore;
-import com.twitter.aurora.scheduler.storage.SchedulerStore;
-import com.twitter.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import com.twitter.aurora.scheduler.storage.Storage.MutateWork;
-import com.twitter.aurora.scheduler.storage.Storage.NonVolatileStorage;
-import com.twitter.aurora.scheduler.storage.Storage.StoreProvider;
-import com.twitter.aurora.scheduler.storage.Storage.Work;
-import com.twitter.aurora.scheduler.storage.TaskStore;
-import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
-import com.twitter.common.testing.easymock.EasyMockTest;
-
-import static org.easymock.EasyMock.capture;
-import static org.easymock.EasyMock.expect;
-
-/**
- * Auxiliary class to simplify testing against a mocked storage. This allows callers to directly
- * set up call expectations on individual stores rather than writing plumbing code to deal with
- * operations and {@link StoreProvider}.
- */
-public class StorageTestUtil {
-
- public final StoreProvider storeProvider;
- public final MutableStoreProvider mutableStoreProvider;
- public final TaskStore.Mutable taskStore;
- public final QuotaStore.Mutable quotaStore;
- public final AttributeStore.Mutable attributeStore;
- public final JobStore.Mutable jobStore;
- public final LockStore.Mutable lockStore;
- public final SchedulerStore.Mutable schedulerStore;
- public final NonVolatileStorage storage;
-
- /**
- * Creates a new storage test utility.
- *
- * @param easyMock Mocking framework to use for setting up mocks and expectations.
- */
- public StorageTestUtil(EasyMockTest easyMock) {
- this.storeProvider = easyMock.createMock(StoreProvider.class);
- this.mutableStoreProvider = easyMock.createMock(MutableStoreProvider.class);
- this.taskStore = easyMock.createMock(TaskStore.Mutable.class);
- this.quotaStore = easyMock.createMock(QuotaStore.Mutable.class);
- this.attributeStore = easyMock.createMock(AttributeStore.Mutable.class);
- this.jobStore = easyMock.createMock(JobStore.Mutable.class);
- this.lockStore = easyMock.createMock(LockStore.Mutable.class);
- this.schedulerStore = easyMock.createMock(SchedulerStore.Mutable.class);
- this.storage = easyMock.createMock(NonVolatileStorage.class);
- }
-
- private <T> IExpectationSetters<T> expectConsistentRead() {
- final Capture<Work<T, RuntimeException>> work = EasyMockTest.createCapture();
- return expect(storage.consistentRead(capture(work))).andAnswer(new IAnswer<T>() {
- @Override public T answer() {
- return work.getValue().apply(storeProvider);
- }
- });
- }
-
- private <T> IExpectationSetters<T> expectWeaklyConsistentRead() {
- final Capture<Work<T, RuntimeException>> work = EasyMockTest.createCapture();
- return expect(storage.weaklyConsistentRead(capture(work))).andAnswer(new IAnswer<T>() {
- @Override public T answer() {
- return work.getValue().apply(storeProvider);
- }
- });
- }
-
- private <T> IExpectationSetters<T> expectWriteOperation() {
- final Capture<MutateWork<T, RuntimeException>> work = EasyMockTest.createCapture();
- return expect(storage.write(capture(work))).andAnswer(new IAnswer<T>() {
- @Override public T answer() {
- return work.getValue().apply(mutableStoreProvider);
- }
- });
- }
-
- /**
- * Expects any number of read or write operations.
- */
- public void expectOperations() {
- expect(storeProvider.getTaskStore()).andReturn(taskStore).anyTimes();
- expect(storeProvider.getQuotaStore()).andReturn(quotaStore).anyTimes();
- expect(storeProvider.getAttributeStore()).andReturn(attributeStore).anyTimes();
- expect(storeProvider.getJobStore()).andReturn(jobStore).anyTimes();
- expect(storeProvider.getLockStore()).andReturn(lockStore).anyTimes();
- expect(storeProvider.getSchedulerStore()).andReturn(schedulerStore).anyTimes();
- expect(mutableStoreProvider.getTaskStore()).andReturn(taskStore).anyTimes();
- expect(mutableStoreProvider.getUnsafeTaskStore()).andReturn(taskStore).anyTimes();
- expect(mutableStoreProvider.getQuotaStore()).andReturn(quotaStore).anyTimes();
- expect(mutableStoreProvider.getAttributeStore()).andReturn(attributeStore).anyTimes();
- expect(mutableStoreProvider.getJobStore()).andReturn(jobStore).anyTimes();
- expect(mutableStoreProvider.getLockStore()).andReturn(lockStore).anyTimes();
- expect(mutableStoreProvider.getSchedulerStore()).andReturn(schedulerStore).anyTimes();
- expectConsistentRead().anyTimes();
- expectWeaklyConsistentRead().anyTimes();
- expectWriteOperation().anyTimes();
- }
-
- public IExpectationSetters<?> expectTaskFetch(
- Query.Builder query,
- ImmutableSet<IScheduledTask> result) {
-
- return expect(taskStore.fetchTasks(query)).andReturn(result);
- }
-
- public IExpectationSetters<?> expectTaskFetch(Query.Builder query, IScheduledTask... result) {
- return expectTaskFetch(query, ImmutableSet.<IScheduledTask>builder().add(result).build());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/thrift/SchedulerAPIServlet.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/thrift/SchedulerAPIServlet.java b/src/main/java/com/twitter/aurora/scheduler/thrift/SchedulerAPIServlet.java
deleted file mode 100644
index 2acf5c8..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/thrift/SchedulerAPIServlet.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package com.twitter.aurora.scheduler.thrift;
-
-import javax.inject.Inject;
-
-import org.apache.thrift.protocol.TJSONProtocol;
-import org.apache.thrift.server.TServlet;
-
-import com.twitter.aurora.gen.AuroraAdmin;
-
-/**
- * A servlet that exposes the scheduler Thrift API over HTTP/JSON.
- */
-class SchedulerAPIServlet extends TServlet {
-
- @Inject
- SchedulerAPIServlet(AuroraAdmin.Iface schedulerThriftInterface) {
- super(new AuroraAdmin.Processor<>(schedulerThriftInterface), new TJSONProtocol.Factory());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/thrift/SchedulerThriftInterface.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/thrift/SchedulerThriftInterface.java b/src/main/java/com/twitter/aurora/scheduler/thrift/SchedulerThriftInterface.java
deleted file mode 100644
index 3ff0f1c..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/thrift/SchedulerThriftInterface.java
+++ /dev/null
@@ -1,1000 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.thrift;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.annotation.Nullable;
-import javax.inject.Inject;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Functions;
-import com.google.common.base.Joiner;
-import com.google.common.base.Optional;
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
-import com.google.common.base.Throwables;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableMultimap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
-import com.google.common.collect.Sets;
-
-import org.apache.commons.lang.StringUtils;
-
-import com.twitter.aurora.auth.CapabilityValidator;
-import com.twitter.aurora.auth.CapabilityValidator.AuditCheck;
-import com.twitter.aurora.auth.CapabilityValidator.Capability;
-import com.twitter.aurora.auth.SessionValidator.AuthFailedException;
-import com.twitter.aurora.gen.AcquireLockResult;
-import com.twitter.aurora.gen.AddInstancesConfig;
-import com.twitter.aurora.gen.AuroraAdmin;
-import com.twitter.aurora.gen.ConfigRewrite;
-import com.twitter.aurora.gen.DrainHostsResult;
-import com.twitter.aurora.gen.EndMaintenanceResult;
-import com.twitter.aurora.gen.GetJobsResult;
-import com.twitter.aurora.gen.GetQuotaResult;
-import com.twitter.aurora.gen.Hosts;
-import com.twitter.aurora.gen.InstanceConfigRewrite;
-import com.twitter.aurora.gen.InstanceKey;
-import com.twitter.aurora.gen.JobConfigRewrite;
-import com.twitter.aurora.gen.JobConfigValidation;
-import com.twitter.aurora.gen.JobConfiguration;
-import com.twitter.aurora.gen.JobKey;
-import com.twitter.aurora.gen.JobSummary;
-import com.twitter.aurora.gen.JobSummaryResult;
-import com.twitter.aurora.gen.ListBackupsResult;
-import com.twitter.aurora.gen.Lock;
-import com.twitter.aurora.gen.LockKey;
-import com.twitter.aurora.gen.LockValidation;
-import com.twitter.aurora.gen.MaintenanceStatusResult;
-import com.twitter.aurora.gen.PopulateJobResult;
-import com.twitter.aurora.gen.QueryRecoveryResult;
-import com.twitter.aurora.gen.Quota;
-import com.twitter.aurora.gen.Response;
-import com.twitter.aurora.gen.ResponseCode;
-import com.twitter.aurora.gen.Result;
-import com.twitter.aurora.gen.RewriteConfigsRequest;
-import com.twitter.aurora.gen.ScheduleStatus;
-import com.twitter.aurora.gen.ScheduleStatusResult;
-import com.twitter.aurora.gen.SessionKey;
-import com.twitter.aurora.gen.StartMaintenanceResult;
-import com.twitter.aurora.gen.TaskConfig;
-import com.twitter.aurora.gen.TaskQuery;
-import com.twitter.aurora.scheduler.base.JobKeys;
-import com.twitter.aurora.scheduler.base.Query;
-import com.twitter.aurora.scheduler.base.ScheduleException;
-import com.twitter.aurora.scheduler.base.Tasks;
-import com.twitter.aurora.scheduler.configuration.ConfigurationManager;
-import com.twitter.aurora.scheduler.configuration.ConfigurationManager.TaskDescriptionException;
-import com.twitter.aurora.scheduler.configuration.SanitizedConfiguration;
-import com.twitter.aurora.scheduler.quota.Quotas;
-import com.twitter.aurora.scheduler.state.CronJobManager;
-import com.twitter.aurora.scheduler.state.LockManager;
-import com.twitter.aurora.scheduler.state.LockManager.LockException;
-import com.twitter.aurora.scheduler.state.MaintenanceController;
-import com.twitter.aurora.scheduler.state.SchedulerCore;
-import com.twitter.aurora.scheduler.storage.JobStore;
-import com.twitter.aurora.scheduler.storage.Storage;
-import com.twitter.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import com.twitter.aurora.scheduler.storage.Storage.MutateWork;
-import com.twitter.aurora.scheduler.storage.Storage.StoreProvider;
-import com.twitter.aurora.scheduler.storage.Storage.Work;
-import com.twitter.aurora.scheduler.storage.backup.Recovery;
-import com.twitter.aurora.scheduler.storage.backup.Recovery.RecoveryException;
-import com.twitter.aurora.scheduler.storage.backup.StorageBackup;
-import com.twitter.aurora.scheduler.storage.entities.IAssignedTask;
-import com.twitter.aurora.scheduler.storage.entities.IJobConfiguration;
-import com.twitter.aurora.scheduler.storage.entities.IJobKey;
-import com.twitter.aurora.scheduler.storage.entities.ILock;
-import com.twitter.aurora.scheduler.storage.entities.ILockKey;
-import com.twitter.aurora.scheduler.storage.entities.IQuota;
-import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
-import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
-import com.twitter.aurora.scheduler.thrift.auth.DecoratedThrift;
-import com.twitter.aurora.scheduler.thrift.auth.Requires;
-import com.twitter.common.args.Arg;
-import com.twitter.common.args.CmdLine;
-import com.twitter.common.base.MorePreconditions;
-import com.twitter.common.base.Supplier;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.util.BackoffHelper;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import static com.twitter.aurora.auth.SessionValidator.SessionContext;
-import static com.twitter.aurora.gen.ResponseCode.AUTH_FAILED;
-import static com.twitter.aurora.gen.ResponseCode.ERROR;
-import static com.twitter.aurora.gen.ResponseCode.INVALID_REQUEST;
-import static com.twitter.aurora.gen.ResponseCode.LOCK_ERROR;
-import static com.twitter.aurora.gen.ResponseCode.OK;
-import static com.twitter.aurora.gen.apiConstants.CURRENT_API_VERSION;
-import static com.twitter.common.base.MorePreconditions.checkNotBlank;
-
-/**
- * Aurora scheduler thrift server implementation.
- * <p>
- * Interfaces between users and the scheduler to access/modify jobs and perform cluster
- * administration tasks.
- */
-@DecoratedThrift
-class SchedulerThriftInterface implements AuroraAdmin.Iface {
- private static final Logger LOG = Logger.getLogger(SchedulerThriftInterface.class.getName());
-
- @CmdLine(name = "kill_task_initial_backoff",
- help = "Initial backoff delay while waiting for the tasks to transition to KILLED.")
- private static final Arg<Amount<Long, Time>> KILL_TASK_INITIAL_BACKOFF =
- Arg.create(Amount.of(1L, Time.SECONDS));
-
- @CmdLine(name = "kill_task_max_backoff",
- help = "Max backoff delay while waiting for the tasks to transition to KILLED.")
- private static final Arg<Amount<Long, Time>> KILL_TASK_MAX_BACKOFF =
- Arg.create(Amount.of(30L, Time.SECONDS));
-
- private static final Function<IScheduledTask, String> GET_ROLE = Functions.compose(
- new Function<ITaskConfig, String>() {
- @Override public String apply(ITaskConfig task) {
- return task.getOwner().getRole();
- }
- },
- Tasks.SCHEDULED_TO_INFO);
-
- private final Storage storage;
- private final SchedulerCore schedulerCore;
- private final LockManager lockManager;
- private final CapabilityValidator sessionValidator;
- private final StorageBackup backup;
- private final Recovery recovery;
- private final MaintenanceController maintenance;
- private final CronJobManager cronJobManager;
- private final Amount<Long, Time> killTaskInitialBackoff;
- private final Amount<Long, Time> killTaskMaxBackoff;
-
- @Inject
- SchedulerThriftInterface(
- Storage storage,
- SchedulerCore schedulerCore,
- LockManager lockManager,
- CapabilityValidator sessionValidator,
- StorageBackup backup,
- Recovery recovery,
- CronJobManager cronJobManager,
- MaintenanceController maintenance) {
-
- this(storage,
- schedulerCore,
- lockManager,
- sessionValidator,
- backup,
- recovery,
- maintenance,
- cronJobManager,
- KILL_TASK_INITIAL_BACKOFF.get(),
- KILL_TASK_MAX_BACKOFF.get());
- }
-
- @VisibleForTesting
- SchedulerThriftInterface(
- Storage storage,
- SchedulerCore schedulerCore,
- LockManager lockManager,
- CapabilityValidator sessionValidator,
- StorageBackup backup,
- Recovery recovery,
- MaintenanceController maintenance,
- CronJobManager cronJobManager,
- Amount<Long, Time> initialBackoff,
- Amount<Long, Time> maxBackoff) {
-
- this.storage = checkNotNull(storage);
- this.schedulerCore = checkNotNull(schedulerCore);
- this.lockManager = checkNotNull(lockManager);
- this.sessionValidator = checkNotNull(sessionValidator);
- this.backup = checkNotNull(backup);
- this.recovery = checkNotNull(recovery);
- this.maintenance = checkNotNull(maintenance);
- this.cronJobManager = checkNotNull(cronJobManager);
- this.killTaskInitialBackoff = checkNotNull(initialBackoff);
- this.killTaskMaxBackoff = checkNotNull(maxBackoff);
- }
-
- @Override
- public Response createJob(
- JobConfiguration mutableJob,
- @Nullable Lock mutableLock,
- SessionKey session) {
-
- IJobConfiguration job = IJobConfiguration.build(mutableJob);
- IJobKey jobKey = JobKeys.assertValid(job.getKey());
- checkNotNull(session);
-
- Response response = new Response();
-
- try {
- sessionValidator.checkAuthenticated(session, ImmutableSet.of(job.getOwner().getRole()));
- } catch (AuthFailedException e) {
- return response.setResponseCode(AUTH_FAILED).setMessage(e.getMessage());
- }
-
- try {
- SanitizedConfiguration sanitized = SanitizedConfiguration.fromUnsanitized(job);
-
- lockManager.validateIfLocked(
- ILockKey.build(LockKey.job(jobKey.newBuilder())),
- Optional.fromNullable(mutableLock).transform(ILock.FROM_BUILDER));
-
- schedulerCore.createJob(sanitized);
- response.setResponseCode(OK)
- .setMessage(String.format("%d new tasks pending for job %s",
- sanitized.getJobConfig().getInstanceCount(), JobKeys.toPath(job)));
- } catch (LockException e) {
- response.setResponseCode(LOCK_ERROR).setMessage(e.getMessage());
- } catch (TaskDescriptionException | ScheduleException e) {
- response.setResponseCode(INVALID_REQUEST).setMessage(e.getMessage());
- }
-
- return response;
- }
-
- @Override
- public Response replaceCronTemplate(
- JobConfiguration mutableConfig,
- @Nullable Lock mutableLock,
- SessionKey session) {
-
- checkNotNull(mutableConfig);
- IJobConfiguration job = IJobConfiguration.build(mutableConfig);
- IJobKey jobKey = JobKeys.assertValid(job.getKey());
- checkNotNull(session);
-
- Response response = new Response();
- try {
- sessionValidator.checkAuthenticated(session, ImmutableSet.of(job.getOwner().getRole()));
- } catch (AuthFailedException e) {
- return response.setResponseCode(AUTH_FAILED).setMessage(e.getMessage());
- }
-
- try {
- lockManager.validateIfLocked(
- ILockKey.build(LockKey.job(jobKey.newBuilder())),
- Optional.fromNullable(mutableLock).transform(ILock.FROM_BUILDER));
-
- SanitizedConfiguration sanitized = SanitizedConfiguration.fromUnsanitized(job);
-
- if (!cronJobManager.hasJob(jobKey)) {
- return response.setResponseCode(INVALID_REQUEST).setMessage(
- "No cron template found for the given key: " + jobKey);
- }
- cronJobManager.updateJob(sanitized);
- return response.setResponseCode(OK).setMessage("Replaced template for: " + jobKey);
-
- } catch (LockException e) {
- return response.setResponseCode(LOCK_ERROR).setMessage(e.getMessage());
- } catch (TaskDescriptionException | ScheduleException e) {
- return response.setResponseCode(INVALID_REQUEST).setMessage(e.getMessage());
- }
- }
-
- @Override
- public Response populateJobConfig(JobConfiguration description, JobConfigValidation validation) {
-
- checkNotNull(description);
-
- Response response = new Response();
- try {
- SanitizedConfiguration sanitized =
- SanitizedConfiguration.fromUnsanitized(IJobConfiguration.build(description));
-
- // TODO(maximk): Consider moving job validation logic into a dedicated RPC. MESOS-4476.
- if (validation != null && validation == JobConfigValidation.RUN_FILTERS) {
- schedulerCore.validateJobResources(sanitized);
- }
-
- PopulateJobResult result = new PopulateJobResult()
- .setPopulated(ITaskConfig.toBuildersSet(sanitized.getTaskConfigs().values()));
- response.setResult(Result.populateJobResult(result))
- .setResponseCode(OK)
- .setMessage("Tasks populated");
- } catch (TaskDescriptionException | ScheduleException e) {
- response.setResponseCode(INVALID_REQUEST)
- .setMessage("Invalid configuration: " + e.getMessage());
- }
- return response;
- }
-
- @Override
- public Response startCronJob(JobKey mutableJobKey, SessionKey session) {
- checkNotNull(session);
- IJobKey jobKey = JobKeys.assertValid(IJobKey.build(mutableJobKey));
-
- Response response = new Response();
- try {
- sessionValidator.checkAuthenticated(session, ImmutableSet.of(jobKey.getRole()));
- } catch (AuthFailedException e) {
- response.setResponseCode(AUTH_FAILED).setMessage(e.getMessage());
- return response;
- }
-
- try {
- schedulerCore.startCronJob(jobKey);
- response.setResponseCode(OK).setMessage("Cron run started.");
- } catch (ScheduleException e) {
- response.setResponseCode(INVALID_REQUEST)
- .setMessage("Failed to start cron job - " + e.getMessage());
- } catch (TaskDescriptionException e) {
- response.setResponseCode(ERROR).setMessage("Invalid task description: " + e.getMessage());
- }
-
- return response;
- }
-
- // TODO(William Farner): Provide status information about cron jobs here.
- @Override
- public Response getTasksStatus(TaskQuery query) {
- checkNotNull(query);
-
- Set<IScheduledTask> tasks =
- Storage.Util.weaklyConsistentFetchTasks(storage, Query.arbitrary(query));
-
- Response response = new Response();
-
- if (tasks.isEmpty()) {
- response.setResponseCode(INVALID_REQUEST)
- .setMessage("No tasks found for query: " + query);
- } else {
- response.setResponseCode(OK)
- .setResult(Result.scheduleStatusResult(
- new ScheduleStatusResult().setTasks(IScheduledTask.toBuildersList(tasks))));
- }
-
- return response;
- }
-
- @Override
- public Response getJobSummary() {
- Set<IScheduledTask> tasks = Storage.Util.weaklyConsistentFetchTasks(storage, Query.unscoped());
- Multimap<String, IJobKey> jobsByRole = Multimaps.index(
- FluentIterable.from(tasks).transform(Tasks.SCHEDULED_TO_JOB_KEY),
- JobKeys.TO_ROLE);
-
- Multimap<String, IJobKey> cronJobsByRole = Multimaps.index(
- FluentIterable.from(cronJobManager.getJobs()).transform(JobKeys.FROM_CONFIG),
- JobKeys.TO_ROLE);
-
- List<JobSummary> jobSummaries = Lists.newLinkedList();
- for (String role : Sets.union(jobsByRole.keySet(), cronJobsByRole.keySet())) {
- JobSummary summary = new JobSummary();
- summary.setRole(role);
- summary.setJobCount(jobsByRole.get(role).size());
- summary.setCronJobCount(cronJobsByRole.get(role).size());
- jobSummaries.add(summary);
- }
-
- return new Response()
- .setResponseCode(OK)
- .setResult(Result.jobSummaryResult(new JobSummaryResult(jobSummaries)));
- }
-
- @Override
- public Response getJobs(@Nullable String maybeNullRole) {
- Optional<String> ownerRole = Optional.fromNullable(maybeNullRole);
-
-
- // Ensure we only return one JobConfiguration for each JobKey.
- Map<IJobKey, IJobConfiguration> jobs = Maps.newHashMap();
-
- // Query the task store, find immediate jobs, and synthesize a JobConfiguration for them.
- // This is necessary because the ImmediateJobManager doesn't store jobs directly and
- // ImmediateJobManager#getJobs always returns an empty Collection.
- Query.Builder scope = ownerRole.isPresent()
- ? Query.roleScoped(ownerRole.get())
- : Query.unscoped();
- Multimap<IJobKey, IScheduledTask> tasks =
- Tasks.byJobKey(Storage.Util.weaklyConsistentFetchTasks(storage, scope.active()));
-
- jobs.putAll(Maps.transformEntries(tasks.asMap(),
- new Maps.EntryTransformer<IJobKey, Collection<IScheduledTask>, IJobConfiguration>() {
- @Override
- public IJobConfiguration transformEntry(
- IJobKey jobKey,
- Collection<IScheduledTask> tasks) {
-
- // Pick an arbitrary task for each immediate job. The chosen task might not be the most
- // recent if the job is in the middle of an update or some shards have been selectively
- // created.
- TaskConfig firstTask = tasks.iterator().next().getAssignedTask().getTask().newBuilder();
- return IJobConfiguration.build(new JobConfiguration()
- .setKey(jobKey.newBuilder())
- .setOwner(firstTask.getOwner())
- .setTaskConfig(firstTask)
- .setInstanceCount(tasks.size()));
- }
- }));
-
- // Get cron jobs directly from the manager. Do this after querying the task store so the real
- // template JobConfiguration for a cron job will overwrite the synthesized one that could have
- // been created above.
- Predicate<IJobConfiguration> configFilter = ownerRole.isPresent()
- ? Predicates.compose(Predicates.equalTo(ownerRole.get()), JobKeys.CONFIG_TO_ROLE)
- : Predicates.<IJobConfiguration>alwaysTrue();
- jobs.putAll(Maps.uniqueIndex(
- FluentIterable.from(cronJobManager.getJobs()).filter(configFilter),
- JobKeys.FROM_CONFIG));
-
- return new Response()
- .setResponseCode(OK)
- .setResult(Result.getJobsResult(new GetJobsResult()
- .setConfigs(IJobConfiguration.toBuildersSet(jobs.values()))));
- }
-
- private void validateLockForTasks(Optional<ILock> lock, Iterable<IScheduledTask> tasks)
- throws LockException {
-
- ImmutableSet<IJobKey> uniqueKeys = FluentIterable.from(tasks)
- .transform(Tasks.SCHEDULED_TO_JOB_KEY)
- .toSet();
-
- // Validate lock against every unique job key derived from the tasks.
- for (IJobKey key : uniqueKeys) {
- lockManager.validateIfLocked(ILockKey.build(LockKey.job(key.newBuilder())), lock);
- }
- }
-
- private SessionContext validateSessionKeyForTasks(
- SessionKey session,
- TaskQuery taskQuery,
- Iterable<IScheduledTask> tasks) throws AuthFailedException {
-
- // Authenticate the session against any affected roles, always including the role for a
- // role-scoped query. This papers over the implementation detail that dormant cron jobs are
- // authenticated this way.
- ImmutableSet.Builder<String> targetRoles = ImmutableSet.<String>builder()
- .addAll(FluentIterable.from(tasks).transform(GET_ROLE));
- if (taskQuery.isSetOwner()) {
- targetRoles.add(taskQuery.getOwner().getRole());
- }
- return sessionValidator.checkAuthenticated(session, targetRoles.build());
- }
-
- private Optional<SessionContext> isAdmin(SessionKey session) {
- try {
- return Optional.of(
- sessionValidator.checkAuthorized(session, Capability.ROOT, AuditCheck.REQUIRED));
- } catch (AuthFailedException e) {
- return Optional.absent();
- }
- }
-
- @Override
- public Response killTasks(final TaskQuery query, Lock mutablelock, SessionKey session) {
- checkNotNull(query);
- checkNotNull(session);
-
- Response response = new Response();
-
- if (query.getJobName() != null && StringUtils.isBlank(query.getJobName())) {
- response.setResponseCode(INVALID_REQUEST).setMessage(
- String.format("Invalid job name: '%s'", query.getJobName()));
- return response;
- }
-
- Set<IScheduledTask> tasks = Storage.Util.consistentFetchTasks(storage, Query.arbitrary(query));
-
- Optional<SessionContext> context = isAdmin(session);
- if (context.isPresent()) {
- LOG.info("Granting kill query to admin user: " + query);
- } else {
- try {
- context = Optional.of(validateSessionKeyForTasks(session, query, tasks));
- } catch (AuthFailedException e) {
- response.setResponseCode(AUTH_FAILED).setMessage(e.getMessage());
- return response;
- }
- }
-
- try {
- validateLockForTasks(Optional.fromNullable(mutablelock).transform(ILock.FROM_BUILDER), tasks);
- schedulerCore.killTasks(Query.arbitrary(query), context.get().getIdentity());
- } catch (LockException e) {
- return response.setResponseCode(LOCK_ERROR).setMessage(e.getMessage());
- } catch (ScheduleException e) {
- return response.setResponseCode(INVALID_REQUEST).setMessage(e.getMessage());
- }
-
- // TODO(William Farner): Move this into the client.
- BackoffHelper backoff = new BackoffHelper(killTaskInitialBackoff, killTaskMaxBackoff, true);
- final Query.Builder activeQuery = Query.arbitrary(query.setStatuses(Tasks.ACTIVE_STATES));
- try {
- backoff.doUntilSuccess(new Supplier<Boolean>() {
- @Override public Boolean get() {
- Set<IScheduledTask> tasks = Storage.Util.consistentFetchTasks(storage, activeQuery);
- if (tasks.isEmpty()) {
- LOG.info("Tasks all killed, done waiting.");
- return true;
- } else {
- LOG.info("Jobs not yet killed, waiting...");
- return false;
- }
- }
- });
- response.setResponseCode(OK).setMessage("Tasks killed.");
- } catch (InterruptedException e) {
- LOG.warning("Interrupted while trying to kill tasks: " + e);
- Thread.currentThread().interrupt();
- response.setResponseCode(ERROR).setMessage("killTasks thread was interrupted.");
- } catch (BackoffHelper.BackoffStoppedException e) {
- response.setResponseCode(ERROR).setMessage("Tasks were not killed in time.");
- }
- return response;
- }
-
- @Override
- public Response restartShards(
- JobKey mutableJobKey,
- Set<Integer> shardIds,
- Lock mutableLock,
- SessionKey session) {
-
- IJobKey jobKey = JobKeys.assertValid(IJobKey.build(mutableJobKey));
- MorePreconditions.checkNotBlank(shardIds);
- checkNotNull(session);
-
- Response response = new Response();
- SessionContext context;
- try {
- context = sessionValidator.checkAuthenticated(session, ImmutableSet.of(jobKey.getRole()));
- } catch (AuthFailedException e) {
- response.setResponseCode(AUTH_FAILED).setMessage(e.getMessage());
- return response;
- }
-
- try {
- lockManager.validateIfLocked(
- ILockKey.build(LockKey.job(jobKey.newBuilder())),
- Optional.fromNullable(mutableLock).transform(ILock.FROM_BUILDER));
- schedulerCore.restartShards(jobKey, shardIds, context.getIdentity());
- response.setResponseCode(OK).setMessage("Shards are restarting.");
- } catch (LockException e) {
- response.setResponseCode(ResponseCode.LOCK_ERROR).setMessage(e.getMessage());
- } catch (ScheduleException e) {
- response.setResponseCode(ResponseCode.INVALID_REQUEST).setMessage(e.getMessage());
- }
-
- return response;
- }
-
- @Override
- public Response getQuota(final String ownerRole) {
- checkNotBlank(ownerRole);
-
- IQuota quota = storage.consistentRead(new Work.Quiet<IQuota>() {
- @Override public IQuota apply(StoreProvider storeProvider) {
- return storeProvider.getQuotaStore().fetchQuota(ownerRole).or(Quotas.noQuota());
- }
- });
-
- return new Response()
- .setResponseCode(OK)
- .setResult(Result.getQuotaResult(new GetQuotaResult()
- .setQuota(quota.newBuilder())));
- }
-
- @Override
- public Response startMaintenance(Hosts hosts, SessionKey session) {
- return new Response()
- .setResponseCode(OK)
- .setResult(Result.startMaintenanceResult(new StartMaintenanceResult()
- .setStatuses(maintenance.startMaintenance(hosts.getHostNames()))));
- }
-
- @Override
- public Response drainHosts(Hosts hosts, SessionKey session) {
- return new Response()
- .setResponseCode(OK)
- .setResult(Result.drainHostsResult(new DrainHostsResult()
- .setStatuses(maintenance.drain(hosts.getHostNames()))));
- }
-
- @Override
- public Response maintenanceStatus(Hosts hosts, SessionKey session) {
- return new Response()
- .setResponseCode(OK)
- .setResult(Result.maintenanceStatusResult(new MaintenanceStatusResult()
- .setStatuses(maintenance.getStatus(hosts.getHostNames()))));
- }
-
- @Override
- public Response endMaintenance(Hosts hosts, SessionKey session) {
- return new Response()
- .setResponseCode(OK)
- .setResult(Result.endMaintenanceResult(new EndMaintenanceResult()
- .setStatuses(maintenance.endMaintenance(hosts.getHostNames()))));
- }
-
- @Requires(whitelist = Capability.PROVISIONER)
- @Override
- public Response setQuota(final String ownerRole, final Quota quota, SessionKey session) {
- checkNotBlank(ownerRole);
- checkNotNull(quota);
- checkNotNull(session);
-
- // TODO(Kevin Sweeney): Input validation for Quota.
-
- storage.write(new MutateWork.NoResult.Quiet() {
- @Override protected void execute(MutableStoreProvider storeProvider) {
- storeProvider.getQuotaStore().saveQuota(ownerRole, IQuota.build(quota));
- }
- });
-
- return new Response().setResponseCode(OK).setMessage("Quota applied.");
- }
-
- @Override
- public Response forceTaskState(
- String taskId,
- ScheduleStatus status,
- SessionKey session) {
-
- checkNotBlank(taskId);
- checkNotNull(status);
- checkNotNull(session);
-
- Response response = new Response();
- SessionContext context;
- try {
- // TODO(Sathya): Remove this after AOP-style session validation passes in a SessionContext.
- context = sessionValidator.checkAuthorized(session, Capability.ROOT, AuditCheck.REQUIRED);
- } catch (AuthFailedException e) {
- response.setResponseCode(AUTH_FAILED).setMessage(e.getMessage());
- return response;
- }
-
- schedulerCore.setTaskStatus(
- Query.taskScoped(taskId), status, transitionMessage(context.getIdentity()));
- return new Response().setResponseCode(OK).setMessage("Transition attempted.");
- }
-
- @Override
- public Response performBackup(SessionKey session) {
- backup.backupNow();
- return new Response().setResponseCode(OK);
- }
-
- @Override
- public Response listBackups(SessionKey session) {
- return new Response()
- .setResponseCode(OK)
- .setResult(Result.listBackupsResult(new ListBackupsResult()
- .setBackups(recovery.listBackups())));
- }
-
- @Override
- public Response stageRecovery(String backupId, SessionKey session) {
- Response response = new Response().setResponseCode(OK);
- try {
- recovery.stage(backupId);
- } catch (RecoveryException e) {
- response.setResponseCode(ERROR).setMessage(e.getMessage());
- LOG.log(Level.WARNING, "Failed to stage recovery: " + e, e);
- }
-
- return response;
- }
-
- @Override
- public Response queryRecovery(TaskQuery query, SessionKey session) {
- Response response = new Response();
- try {
- response.setResponseCode(OK)
- .setResult(Result.queryRecoveryResult(new QueryRecoveryResult()
- .setTasks(IScheduledTask.toBuildersSet(recovery.query(Query.arbitrary(query))))));
- } catch (RecoveryException e) {
- response.setResponseCode(ERROR).setMessage(e.getMessage());
- LOG.log(Level.WARNING, "Failed to query recovery: " + e, e);
- }
-
- return response;
- }
-
- @Override
- public Response deleteRecoveryTasks(TaskQuery query, SessionKey session) {
- Response response = new Response().setResponseCode(OK);
- try {
- recovery.deleteTasks(Query.arbitrary(query));
- } catch (RecoveryException e) {
- response.setResponseCode(ERROR).setMessage(e.getMessage());
- LOG.log(Level.WARNING, "Failed to delete recovery tasks: " + e, e);
- }
-
- return response;
- }
-
- @Override
- public Response commitRecovery(SessionKey session) {
- Response response = new Response().setResponseCode(OK);
- try {
- recovery.commit();
- } catch (RecoveryException e) {
- response.setResponseCode(ERROR).setMessage(e.getMessage());
- }
-
- return response;
- }
-
- @Override
- public Response unloadRecovery(SessionKey session) {
- recovery.unload();
- return new Response().setResponseCode(OK);
- }
-
- @Override
- public Response snapshot(SessionKey session) {
- Response response = new Response();
- try {
- storage.snapshot();
- return response.setResponseCode(OK).setMessage("Compaction successful.");
- } catch (Storage.StorageException e) {
- LOG.log(Level.WARNING, "Requested snapshot failed.", e);
- return response.setResponseCode(ERROR).setMessage(e.getMessage());
- }
- }
-
- private static Multimap<String, IJobConfiguration> jobsByKey(JobStore jobStore, IJobKey jobKey) {
- ImmutableMultimap.Builder<String, IJobConfiguration> matches = ImmutableMultimap.builder();
- for (String managerId : jobStore.fetchManagerIds()) {
- for (IJobConfiguration job : jobStore.fetchJobs(managerId)) {
- if (job.getKey().equals(jobKey)) {
- matches.put(managerId, job);
- }
- }
- }
- return matches.build();
- }
-
- @Override
- public Response rewriteConfigs(
- final RewriteConfigsRequest request,
- SessionKey session) {
-
- if (request.getRewriteCommandsSize() == 0) {
- return new Response()
- .setResponseCode(ResponseCode.ERROR)
- .setMessage("No rewrite commands provided.");
- }
-
- return storage.write(new MutateWork.Quiet<Response>() {
- @Override public Response apply(MutableStoreProvider storeProvider) {
- List<String> errors = Lists.newArrayList();
-
- for (ConfigRewrite command : request.getRewriteCommands()) {
- Optional<String> error = rewriteConfig(command, storeProvider);
- if (error.isPresent()) {
- errors.add(error.get());
- }
- }
-
- Response resp = new Response();
- if (!errors.isEmpty()) {
- resp.setResponseCode(ResponseCode.WARNING).setMessage(Joiner.on(", ").join(errors));
- } else {
- resp.setResponseCode(OK).setMessage("All rewrites completed successfully.");
- }
- return resp;
- }
- });
- }
-
- private Optional<String> rewriteConfig(
- ConfigRewrite command,
- MutableStoreProvider storeProvider) {
-
- Optional<String> error = Optional.absent();
- switch (command.getSetField()) {
- case JOB_REWRITE:
- JobConfigRewrite jobRewrite = command.getJobRewrite();
- IJobConfiguration existingJob = IJobConfiguration.build(jobRewrite.getOldJob());
- IJobConfiguration rewrittenJob;
- try {
- rewrittenJob = ConfigurationManager.validateAndPopulate(
- IJobConfiguration.build(jobRewrite.getRewrittenJob()));
- } catch (TaskDescriptionException e) {
- // We could add an error here, but this is probably a hint of something wrong in
- // the client that's causing a bad configuration to be applied.
- throw Throwables.propagate(e);
- }
- if (!existingJob.getKey().equals(rewrittenJob.getKey())) {
- error = Optional.of("Disallowing rewrite attempting to change job key.");
- } else if (!existingJob.getOwner().equals(rewrittenJob.getOwner())) {
- error = Optional.of("Disallowing rewrite attempting to change job owner.");
- } else {
- JobStore.Mutable jobStore = storeProvider.getJobStore();
- Multimap<String, IJobConfiguration> matches =
- jobsByKey(jobStore, existingJob.getKey());
- switch (matches.size()) {
- case 0:
- error = Optional.of("No jobs found for key " + JobKeys.toPath(existingJob));
- break;
-
- case 1:
- Map.Entry<String, IJobConfiguration> match =
- Iterables.getOnlyElement(matches.entries());
- IJobConfiguration storedJob = match.getValue();
- if (!storedJob.equals(existingJob)) {
- error = Optional.of("CAS compare failed for " + JobKeys.toPath(storedJob));
- } else {
- jobStore.saveAcceptedJob(match.getKey(), rewrittenJob);
- }
- break;
-
- default:
- error = Optional.of("Multiple jobs found for key " + JobKeys.toPath(existingJob));
- }
- }
- break;
-
- case INSTANCE_REWRITE:
- InstanceConfigRewrite instanceRewrite = command.getInstanceRewrite();
- InstanceKey instanceKey = instanceRewrite.getInstanceKey();
- Iterable<IScheduledTask> tasks = storeProvider.getTaskStore().fetchTasks(
- Query.instanceScoped(IJobKey.build(instanceKey.getJobKey()),
- instanceKey.getInstanceId())
- .active());
- Optional<IAssignedTask> task =
- Optional.fromNullable(Iterables.getOnlyElement(tasks, null))
- .transform(Tasks.SCHEDULED_TO_ASSIGNED);
- if (!task.isPresent()) {
- error = Optional.of("No active task found for " + instanceKey);
- } else if (!task.get().getTask().newBuilder().equals(instanceRewrite.getOldTask())) {
- error = Optional.of("CAS compare failed for " + instanceKey);
- } else {
- ITaskConfig newConfiguration = ITaskConfig.build(
- ConfigurationManager.applyDefaultsIfUnset(instanceRewrite.getRewrittenTask()));
- boolean changed = storeProvider.getUnsafeTaskStore().unsafeModifyInPlace(
- task.get().getTaskId(), newConfiguration);
- if (!changed) {
- error = Optional.of("Did not change " + task.get().getTaskId());
- }
- }
- break;
-
- default:
- throw new IllegalArgumentException("Unhandled command type " + command.getSetField());
- }
-
- return error;
- }
-
- @Override
- public Response getVersion() {
- return new Response()
- .setResponseCode(OK)
- .setResult(Result.getVersionResult(CURRENT_API_VERSION));
- }
-
- @Override
- public Response addInstances(
- AddInstancesConfig config,
- @Nullable Lock mutableLock,
- SessionKey session) {
-
- checkNotNull(config);
- checkNotNull(session);
- checkNotBlank(config.getInstanceIds());
- IJobKey jobKey = JobKeys.assertValid(IJobKey.build(config.getKey()));
-
- Response resp = new Response();
- try {
- sessionValidator.checkAuthenticated(session, ImmutableSet.of(jobKey.getRole()));
- ITaskConfig task = ConfigurationManager.validateAndPopulate(
- ITaskConfig.build(config.getTaskConfig()));
-
- if (cronJobManager.hasJob(jobKey)) {
- return resp.setResponseCode(INVALID_REQUEST)
- .setMessage("Cron jobs are not supported here.");
- }
-
- lockManager.validateIfLocked(
- ILockKey.build(LockKey.job(jobKey.newBuilder())),
- Optional.fromNullable(mutableLock).transform(ILock.FROM_BUILDER));
-
- schedulerCore.addInstances(jobKey, ImmutableSet.copyOf(config.getInstanceIds()), task);
- return resp.setResponseCode(OK).setMessage("Successfully added instances.");
- } catch (AuthFailedException e) {
- return resp.setResponseCode(AUTH_FAILED).setMessage(e.getMessage());
- } catch (LockException e) {
- return resp.setResponseCode(LOCK_ERROR).setMessage(e.getMessage());
- } catch (TaskDescriptionException | ScheduleException e) {
- return resp.setResponseCode(INVALID_REQUEST).setMessage(e.getMessage());
- }
- }
-
- private String getRoleFromLockKey(ILockKey lockKey) {
- switch (lockKey.getSetField()) {
- case JOB:
- JobKeys.assertValid(lockKey.getJob());
- return lockKey.getJob().getRole();
- default:
- throw new IllegalArgumentException("Unhandled LockKey: " + lockKey.getSetField());
- }
- }
-
- @Override
- public Response acquireLock(LockKey mutableLockKey, SessionKey session) {
- checkNotNull(mutableLockKey);
- checkNotNull(session);
-
- ILockKey lockKey = ILockKey.build(mutableLockKey);
- Response response = new Response();
-
- try {
- SessionContext context = sessionValidator.checkAuthenticated(
- session,
- ImmutableSet.of(getRoleFromLockKey(lockKey)));
-
- ILock lock = lockManager.acquireLock(lockKey, context.getIdentity());
- response.setResult(Result.acquireLockResult(
- new AcquireLockResult().setLock(lock.newBuilder())));
-
- return response.setResponseCode(OK).setMessage("Lock has been acquired.");
- } catch (AuthFailedException e) {
- return response.setResponseCode(AUTH_FAILED).setMessage(e.getMessage());
- } catch (LockException e) {
- return response.setResponseCode(ResponseCode.LOCK_ERROR).setMessage(e.getMessage());
- }
- }
-
- @Override
- public Response releaseLock(Lock mutableLock, LockValidation validation, SessionKey session) {
- checkNotNull(mutableLock);
- checkNotNull(validation);
- checkNotNull(session);
-
- Response response = new Response();
- ILock lock = ILock.build(mutableLock);
-
- try {
- sessionValidator.checkAuthenticated(
- session,
- ImmutableSet.of(getRoleFromLockKey(lock.getKey())));
-
- if (validation == LockValidation.CHECKED) {
- lockManager.validateIfLocked(lock.getKey(), Optional.of(lock));
- }
- lockManager.releaseLock(lock);
- return response.setResponseCode(OK).setMessage("Lock has been released.");
- } catch (AuthFailedException e) {
- return response.setResponseCode(AUTH_FAILED).setMessage(e.getMessage());
- } catch (LockException e) {
- return response.setResponseCode(ResponseCode.LOCK_ERROR).setMessage(e.getMessage());
- }
- }
-
- @VisibleForTesting
- static Optional<String> transitionMessage(String user) {
- return Optional.of("Transition forced by " + user);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/thrift/ThriftConfiguration.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/thrift/ThriftConfiguration.java b/src/main/java/com/twitter/aurora/scheduler/thrift/ThriftConfiguration.java
deleted file mode 100644
index c6e9b18..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/thrift/ThriftConfiguration.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.thrift;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-import com.google.common.base.Optional;
-
-/**
- * Container for thrift server configuration options.
- */
-public interface ThriftConfiguration {
- /**
- * Gets a stream for the thrift socket SSL key if this server is configured to use SSL.
- *
- * @return A stream that contains the SSL key data if SSL is enabled, absent otherwise.
- * @throws IOException If the stream could not be opened.
- */
- Optional<? extends InputStream> getSslKeyStream() throws IOException;
-
- /**
- * Gets the port that the thrift server should listen on.
- *
- * @return Thrift server port.
- */
- int getServingPort();
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/thrift/ThriftModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/thrift/ThriftModule.java b/src/main/java/com/twitter/aurora/scheduler/thrift/ThriftModule.java
deleted file mode 100644
index cca9053..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/thrift/ThriftModule.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.thrift;
-
-import javax.inject.Singleton;
-
-import com.google.inject.AbstractModule;
-
-import com.twitter.aurora.gen.AuroraAdmin;
-import com.twitter.aurora.scheduler.thrift.aop.AopModule;
-import com.twitter.common.application.http.Registration;
-import com.twitter.common.application.modules.LifecycleModule;
-
-/**
- * Binding module to configure a thrift server.
- */
-public class ThriftModule extends AbstractModule {
-
- @Override
- protected void configure() {
- bind(AuroraAdmin.Iface.class).to(SchedulerThriftInterface.class);
- bind(ThriftServer.class).in(Singleton.class);
- LifecycleModule.bindServiceRunner(binder(), ThriftServerLauncher.class);
-
- Registration.registerServlet(binder(), "/api", SchedulerAPIServlet.class, true);
-
- install(new AopModule());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/thrift/ThriftServer.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/thrift/ThriftServer.java b/src/main/java/com/twitter/aurora/scheduler/thrift/ThriftServer.java
deleted file mode 100644
index 7b9abd1..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/thrift/ThriftServer.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.thrift;
-
-import java.net.ServerSocket;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-import org.apache.thrift.TProcessor;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.server.TServer;
-import org.apache.thrift.server.TThreadPoolServer;
-import org.apache.thrift.transport.TServerSocket;
-
-import com.twitter.thrift.Status;
-
-class ThriftServer {
- private static final Logger LOG = Logger.getLogger(ThriftServer.class.getName());
-
- private TServer server = null;
-
- // Current health status of the server.
- private Status status = Status.STARTING;
-
- /**
- * Starts the server.
- * This may be called at any point except when the server is already alive. That is, it's
- * allowable to start, stop, and re-start the server.
- *
- * @param socket The socket to use.
- * @param processor The processor to handle requests.
- */
- public synchronized void start(ServerSocket socket, TProcessor processor) {
- Preconditions.checkNotNull(socket);
- Preconditions.checkNotNull(processor);
- Preconditions.checkState(status != Status.ALIVE, "Server must only be started once.");
- setStatus(Status.ALIVE);
- TThreadPoolServer.Args args = new TThreadPoolServer.Args(new TServerSocket(socket))
- .processor(processor)
- .protocolFactory(new TBinaryProtocol.Factory(false, true));
-
- final TServer starting = new TThreadPoolServer(args);
- server = starting;
- LOG.info("Starting thrift server on port " + socket.getLocalPort());
-
- Thread listeningThread = new ThreadFactoryBuilder().setDaemon(false).build()
- .newThread(new Runnable() {
- @Override public void run() {
- try {
- starting.serve();
- } catch (Throwable t) {
- LOG.log(Level.WARNING,
- "Uncaught exception while attempting to handle service requests: " + t, t);
- setStatus(Status.DEAD);
- }
- }
- });
-
- listeningThread.start();
- }
-
- private synchronized void setStatus(Status status) {
- LOG.info("Moving from status " + this.status + " to " + status);
- this.status = status;
- }
-
- /**
- * Attempts to shut down the server.
- * The server may be shut down at any time, though the request will be ignored if the server is
- * already stopped.
- */
- public synchronized void shutdown() {
- if (status == Status.STOPPED) {
- LOG.info("Server already stopped, shutdown request ignored.");
- return;
- }
-
- LOG.info("Received shutdown request, stopping server.");
- setStatus(Status.STOPPING);
-
- // TODO(William Farner): Figure out what happens to queued / in-process requests when the server
- // is stopped. Might want to allow a sleep period for the active requests to be completed.
-
- if (server != null) {
- server.stop();
- }
-
- server = null;
- setStatus(Status.STOPPED);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/thrift/ThriftServerLauncher.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/thrift/ThriftServerLauncher.java b/src/main/java/com/twitter/aurora/scheduler/thrift/ThriftServerLauncher.java
deleted file mode 100644
index 6743060..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/thrift/ThriftServerLauncher.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.thrift;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.ServerSocket;
-import java.security.GeneralSecurityException;
-import java.security.KeyStore;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLServerSocket;
-import javax.net.ssl.SSLServerSocketFactory;
-
-import com.google.common.base.Optional;
-
-import com.twitter.aurora.gen.AuroraAdmin;
-import com.twitter.aurora.gen.AuroraAdmin.Iface;
-import com.twitter.common.application.modules.LifecycleModule.ServiceRunner;
-import com.twitter.common.application.modules.LocalServiceRegistry.LocalService;
-import com.twitter.common.base.Command;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Service launcher that starts up and registers the scheduler thrift server as a primary service
- * for the application.
- */
-class ThriftServerLauncher implements ServiceRunner {
-
- private static final Logger LOG = Logger.getLogger(ThriftServerLauncher.class.getName());
-
- private final ThriftConfiguration configuration;
-
- // Security is enforced via file permissions, not via this password, for what it's worth.
- private static final String SSL_KEYFILE_PASSWORD = "MesosKeyStorePassword";
-
- private final Iface schedulerThriftInterface;
- private final ThriftServer schedulerThriftServer;
-
- @Inject
- ThriftServerLauncher(
- Iface schedulerThriftInterface,
- ThriftServer schedulerThriftServer,
- ThriftConfiguration configuration) {
-
- this.schedulerThriftInterface = checkNotNull(schedulerThriftInterface);
- this.schedulerThriftServer = checkNotNull(schedulerThriftServer);
- this.configuration = checkNotNull(configuration);
- }
-
- @Override
- public LocalService launch() {
- ServerSocket socket = getServerSocket();
- schedulerThriftServer.start(
- socket,
- new AuroraAdmin.Processor<>(schedulerThriftInterface));
-
- Command shutdown = new Command() {
- @Override public void execute() {
- LOG.info("Stopping thrift server.");
- schedulerThriftServer.shutdown();
- }
- };
-
- return LocalService.primaryService(socket.getLocalPort(), shutdown);
- }
-
- private ServerSocket getServerSocket() {
- try {
- Optional<? extends InputStream> sslKeyStream = configuration.getSslKeyStream();
- if (!sslKeyStream.isPresent()) {
- LOG.warning("Running Thrift Server without SSL.");
- return new ServerSocket(configuration.getServingPort());
- } else {
- // TODO(Kevin Sweeney): Add helper to perform this keyfile import.
- KeyStore ks = KeyStore.getInstance("JKS");
- ks.load(sslKeyStream.get(), SSL_KEYFILE_PASSWORD.toCharArray());
-
- KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
- kmf.init(ks, SSL_KEYFILE_PASSWORD.toCharArray());
-
- SSLContext ctx = SSLContext.getInstance("TLS");
- ctx.init(kmf.getKeyManagers(), null, null);
-
- SSLServerSocketFactory ssf = ctx.getServerSocketFactory();
- SSLServerSocket serverSocket = (SSLServerSocket) ssf.createServerSocket(
- configuration.getServingPort());
- serverSocket.setEnabledCipherSuites(serverSocket.getSupportedCipherSuites());
- serverSocket.setNeedClientAuth(false);
- return serverSocket;
- }
- } catch (IOException e) {
- throw new RuntimeException("Failed to read key file.", e);
- } catch (GeneralSecurityException e) {
- throw new RuntimeException("SSL setup failed.", e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/thrift/aop/APIVersionInterceptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/thrift/aop/APIVersionInterceptor.java b/src/main/java/com/twitter/aurora/scheduler/thrift/aop/APIVersionInterceptor.java
deleted file mode 100644
index d66a2b2..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/thrift/aop/APIVersionInterceptor.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package com.twitter.aurora.scheduler.thrift.aop;
-
-import org.aopalliance.intercept.MethodInterceptor;
-import org.aopalliance.intercept.MethodInvocation;
-
-import com.twitter.aurora.gen.Response;
-
-import static com.twitter.aurora.gen.apiConstants.CURRENT_API_VERSION;
-
-class APIVersionInterceptor implements MethodInterceptor {
-
- @Override
- public Object invoke(MethodInvocation invocation) throws Throwable {
- Response resp = (Response) invocation.proceed();
- if (resp.version == null) {
- resp.setVersion(CURRENT_API_VERSION);
- }
- return resp;
- }
-}