You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2013/12/31 22:20:36 UTC
[43/51] [partial] Rename twitter* and com.twitter to apache and
org.apache directories to preserve all file history before the refactor.
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/state/SchedulerCoreImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/state/SchedulerCoreImpl.java b/src/main/java/com/twitter/aurora/scheduler/state/SchedulerCoreImpl.java
deleted file mode 100644
index cf74e49..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/state/SchedulerCoreImpl.java
+++ /dev/null
@@ -1,328 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.state;
-
-
-import java.util.Set;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Functions;
-import com.google.common.base.Optional;
-import com.google.common.base.Predicate;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-import com.twitter.aurora.gen.ScheduleStatus;
-import com.twitter.aurora.scheduler.TaskIdGenerator;
-import com.twitter.aurora.scheduler.base.JobKeys;
-import com.twitter.aurora.scheduler.base.Query;
-import com.twitter.aurora.scheduler.base.ScheduleException;
-import com.twitter.aurora.scheduler.base.Tasks;
-import com.twitter.aurora.scheduler.configuration.ConfigurationManager.TaskDescriptionException;
-import com.twitter.aurora.scheduler.configuration.SanitizedConfiguration;
-import com.twitter.aurora.scheduler.storage.Storage;
-import com.twitter.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import com.twitter.aurora.scheduler.storage.Storage.MutateWork;
-import com.twitter.aurora.scheduler.storage.entities.IAssignedTask;
-import com.twitter.aurora.scheduler.storage.entities.IJobConfiguration;
-import com.twitter.aurora.scheduler.storage.entities.IJobKey;
-import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
-import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
-import com.twitter.common.args.Arg;
-import com.twitter.common.args.CmdLine;
-import com.twitter.common.args.constraints.Positive;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import static com.twitter.aurora.gen.ScheduleStatus.KILLING;
-import static com.twitter.aurora.gen.ScheduleStatus.RESTARTING;
-import static com.twitter.aurora.scheduler.base.Tasks.ACTIVE_STATES;
-
-/**
- * Implementation of the scheduler core.
- */
-class SchedulerCoreImpl implements SchedulerCore {
- @Positive
- @CmdLine(name = "max_tasks_per_job", help = "Maximum number of allowed tasks in a single job.")
- public static final Arg<Integer> MAX_TASKS_PER_JOB = Arg.create(4000);
-
- private static final Logger LOG = Logger.getLogger(SchedulerCoreImpl.class.getName());
-
- private final Storage storage;
-
- private final CronJobManager cronScheduler;
-
- // Schedulers that are responsible for triggering execution of jobs.
- private final ImmutableList<JobManager> jobManagers;
-
- // TODO(Bill Farner): Avoid using StateManagerImpl.
- // State manager handles persistence of task modifications and state transitions.
- private final StateManagerImpl stateManager;
-
- private final TaskIdGenerator taskIdGenerator;
- private final JobFilter jobFilter;
-
- /**
- * Creates a new core scheduler.
- *
- * @param storage Backing store implementation.
- * @param cronScheduler Cron scheduler.
- * @param immediateScheduler Immediate scheduler.
- * @param stateManager Persistent state manager.
- * @param taskIdGenerator Task ID generator.
- * @param jobFilter Job filter.
- */
- @Inject
- public SchedulerCoreImpl(
- Storage storage,
- CronJobManager cronScheduler,
- ImmediateJobManager immediateScheduler,
- StateManagerImpl stateManager,
- TaskIdGenerator taskIdGenerator,
- JobFilter jobFilter) {
-
- this.storage = checkNotNull(storage);
-
- // The immediate scheduler will accept any job, so it's important that other schedulers are
- // placed first.
- this.jobManagers = ImmutableList.of(cronScheduler, immediateScheduler);
- this.cronScheduler = cronScheduler;
- this.stateManager = checkNotNull(stateManager);
- this.taskIdGenerator = checkNotNull(taskIdGenerator);
- this.jobFilter = checkNotNull(jobFilter);
- }
-
- private boolean hasActiveJob(IJobConfiguration job) {
- return Iterables.any(jobManagers, managerHasJob(job));
- }
-
- @Override
- public synchronized void tasksDeleted(Set<String> taskIds) {
- setTaskStatus(Query.taskScoped(taskIds), ScheduleStatus.UNKNOWN, Optional.<String>absent());
- }
-
- @Override
- public synchronized void createJob(SanitizedConfiguration sanitizedConfiguration)
- throws ScheduleException {
-
- IJobConfiguration job = sanitizedConfiguration.getJobConfig();
- if (hasActiveJob(job)) {
- throw new ScheduleException("Job already exists: " + JobKeys.toPath(job));
- }
-
- runJobFilters(job.getKey(), job.getTaskConfig(), job.getInstanceCount(), false);
-
- boolean accepted = false;
- for (final JobManager manager : jobManagers) {
- if (manager.receiveJob(sanitizedConfiguration)) {
- LOG.info("Job accepted by manager: " + manager.getUniqueKey());
- accepted = true;
- break;
- }
- }
-
- if (!accepted) {
- LOG.severe("Job was not accepted by any of the configured schedulers, discarding.");
- LOG.severe("Discarded job: " + job);
- throw new ScheduleException("Job not accepted, discarding.");
- }
- }
-
- // This number is derived from the maximum file name length limit on most UNIX systems, less
- // the number of characters we've observed being added by mesos for the executor ID, prefix, and
- // delimiters.
- @VisibleForTesting
- static final int MAX_TASK_ID_LENGTH = 255 - 90;
-
- // TODO(maximk): Consider a better approach to quota checking. MESOS-4476.
- private void runJobFilters(IJobKey jobKey, ITaskConfig task, int count, boolean incremental)
- throws ScheduleException {
-
- int instanceCount = count;
- if (incremental) {
- instanceCount +=
- Storage.Util.weaklyConsistentFetchTasks(storage, Query.jobScoped(jobKey).active()).size();
- }
-
- // TODO(maximk): This is a short-term hack to stop the bleeding from
- // https://issues.apache.org/jira/browse/MESOS-691
- if (taskIdGenerator.generate(task, instanceCount).length() > MAX_TASK_ID_LENGTH) {
- throw new ScheduleException(
- "Task ID is too long, please shorten your role or job name.");
- }
-
- JobFilter.JobFilterResult filterResult = jobFilter.filter(task, instanceCount);
- // TODO(maximk): Consider deprecating JobFilterResult in favor of custom exception.
- if (!filterResult.isPass()) {
- throw new ScheduleException(filterResult.getReason());
- }
-
- if (instanceCount > MAX_TASKS_PER_JOB.get()) {
- throw new ScheduleException("Job exceeds task limit of " + MAX_TASKS_PER_JOB.get());
- }
- }
-
- @Override
- public void validateJobResources(SanitizedConfiguration sanitizedConfiguration)
- throws ScheduleException {
-
- IJobConfiguration job = sanitizedConfiguration.getJobConfig();
- runJobFilters(job.getKey(), job.getTaskConfig(), job.getInstanceCount(), false);
- }
-
- @Override
- public void addInstances(
- final IJobKey jobKey,
- final ImmutableSet<Integer> instanceIds,
- final ITaskConfig config) throws ScheduleException {
-
- runJobFilters(jobKey, config, instanceIds.size(), true);
- storage.write(new MutateWork.NoResult<ScheduleException>() {
- @Override
- protected void execute(MutableStoreProvider storeProvider)
- throws ScheduleException {
-
- ImmutableSet<IScheduledTask> tasks =
- storeProvider.getTaskStore().fetchTasks(Query.jobScoped(jobKey).active());
-
- Set<Integer> existingInstanceIds =
- FluentIterable.from(tasks).transform(Tasks.SCHEDULED_TO_INSTANCE_ID).toSet();
- if (!Sets.intersection(existingInstanceIds, instanceIds).isEmpty()) {
- throw new ScheduleException("Instance ID collision detected.");
- }
-
- stateManager.insertPendingTasks(Maps.asMap(instanceIds, Functions.constant(config)));
- }
- });
- }
-
- @Override
- public synchronized void startCronJob(IJobKey jobKey)
- throws ScheduleException, TaskDescriptionException {
-
- checkNotNull(jobKey);
-
- if (!cronScheduler.hasJob(jobKey)) {
- throw new ScheduleException("Cron job does not exist for " + JobKeys.toPath(jobKey));
- }
-
- cronScheduler.startJobNow(jobKey);
- }
-
- /**
- * Creates a predicate that will determine whether a job manager has a job matching a job key.
- *
- * @param job Job to match.
- * @return A new predicate matching the job owner and name given.
- */
- private static Predicate<JobManager> managerHasJob(final IJobConfiguration job) {
- return new Predicate<JobManager>() {
- @Override public boolean apply(JobManager manager) {
- return manager.hasJob(job.getKey());
- }
- };
- }
-
- @Override
- public synchronized void setTaskStatus(
- Query.Builder query,
- final ScheduleStatus status,
- Optional<String> message) {
-
- checkNotNull(query);
- checkNotNull(status);
-
- stateManager.changeState(query, status, message);
- }
-
- @Override
- public synchronized void killTasks(Query.Builder query, String user) throws ScheduleException {
- checkNotNull(query);
- LOG.info("Killing tasks matching " + query);
-
- boolean jobDeleted = false;
-
- if (Query.isOnlyJobScoped(query)) {
- // If this looks like a query for all tasks in a job, instruct the scheduler modules to
- // delete the job.
- IJobKey jobKey = JobKeys.from(query).get();
- for (JobManager manager : jobManagers) {
- if (manager.deleteJob(jobKey)) {
- jobDeleted = true;
- }
- }
- }
-
- // Unless statuses were specifically supplied, only attempt to kill active tasks.
- Query.Builder taskQuery = query.get().isSetStatuses() ? query.byStatus(ACTIVE_STATES) : query;
-
- int tasksAffected =
- stateManager.changeState(taskQuery, KILLING, Optional.of("Killed by " + user));
- if (!jobDeleted && (tasksAffected == 0)) {
- throw new ScheduleException("No jobs to kill");
- }
- }
-
- @Override
- public void restartShards(
- IJobKey jobKey,
- final Set<Integer> shards,
- final String requestingUser) throws ScheduleException {
-
- if (!JobKeys.isValid(jobKey)) {
- throw new ScheduleException("Invalid job key: " + jobKey);
- }
-
- if (shards.isEmpty()) {
- throw new ScheduleException("At least one shard must be specified.");
- }
-
- final Query.Builder query = Query.instanceScoped(jobKey, shards).active();
- storage.write(new MutateWork.NoResult<ScheduleException>() {
- @Override protected void execute(MutableStoreProvider storeProvider)
- throws ScheduleException {
-
- Set<IScheduledTask> matchingTasks = storeProvider.getTaskStore().fetchTasks(query);
- if (matchingTasks.size() != shards.size()) {
- throw new ScheduleException("Not all requested shards are active.");
- }
- LOG.info("Restarting shards matching " + query);
- stateManager.changeState(
- Query.taskScoped(Tasks.ids(matchingTasks)),
- RESTARTING,
- Optional.of("Restarted by " + requestingUser));
- }
- });
- }
-
-
- @Override
- public synchronized void preemptTask(IAssignedTask task, IAssignedTask preemptingTask) {
- checkNotNull(task);
- checkNotNull(preemptingTask);
- // TODO(William Farner): Throw SchedulingException if either task doesn't exist, etc.
-
- stateManager.changeState(Query.taskScoped(task.getTaskId()), ScheduleStatus.PREEMPTING,
- Optional.of("Preempting in favor of " + preemptingTask.getTaskId()));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/state/SideEffectStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/state/SideEffectStorage.java b/src/main/java/com/twitter/aurora/scheduler/state/SideEffectStorage.java
deleted file mode 100644
index 23ffff9..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/state/SideEffectStorage.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.state;
-
-import java.util.Queue;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-import com.twitter.aurora.scheduler.events.PubsubEvent;
-import com.twitter.aurora.scheduler.storage.Storage;
-import com.twitter.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import com.twitter.aurora.scheduler.storage.Storage.MutateWork;
-import com.twitter.aurora.scheduler.storage.Storage.StorageException;
-import com.twitter.aurora.scheduler.storage.Storage.Work;
-import com.twitter.common.base.Closure;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Wrapper around the persistent storage and mutable state.
- */
-class SideEffectStorage {
-
- private final Queue<PubsubEvent> events = Lists.newLinkedList();
- @VisibleForTesting
- Queue<PubsubEvent> getEvents() {
- return events;
- }
-
- private AtomicBoolean inOperation = new AtomicBoolean(false);
-
- private final Storage storage;
- private final OperationFinalizer operationFinalizer;
- private final Closure<PubsubEvent> taskEventSink;
-
- interface OperationFinalizer {
- /**
- * Performs any work necessary to complete the operation.
- * This is executed in the context of a write operation, immediately after the work
- * executes normally.
- * NOTE: At present, this is executed for every nesting level of operations, rather than
- * at the completion of the top-level operation.
- * See comment in {@link #SideEffectStorage#executeSideEffectsAfter(SideEffectWork)}
- * for more detail.
- *
- * @param work Work to finalize.
- * @param storeProvider Mutable store reference.
- */
- void finalize(SideEffectWork<?, ?> work, MutableStoreProvider storeProvider);
- }
-
- SideEffectStorage(
- Storage storage,
- OperationFinalizer operationFinalizer,
- Closure<PubsubEvent> taskEventSink) {
-
- this.storage = checkNotNull(storage);
- this.operationFinalizer = checkNotNull(operationFinalizer);
- this.taskEventSink = checkNotNull(taskEventSink);
- }
-
- /**
- * Perform a unit of work in a mutating operation. This supports nesting/reentrancy.
- *
- * @param work Work to perform.
- * @param <T> Work return type
- * @param <E> Work exception type.
- * @return The work return value.
- * @throws E The work exception.
- */
- <T, E extends Exception> T write(SideEffectWork<T, E> work) throws E {
- return storage.write(executeSideEffectsAfter(work));
- }
-
- <T, E extends Exception> T consistentRead(Work<T, E> work) throws StorageException, E {
- return storage.consistentRead(work);
- }
-
- /**
- * Work that has side effects external to the storage system.
- * Work may add side effect and pubsub events, which will be executed/sent upon normal
- * completion of the operation.
- *
- * @param <T> Work return type.
- * @param <E> Work exception type.
- */
- abstract class SideEffectWork<T, E extends Exception> implements MutateWork<T, E> {
- protected final void addTaskEvent(PubsubEvent notice) {
- Preconditions.checkState(inOperation.get());
- events.add(Preconditions.checkNotNull(notice));
- }
- }
-
- /**
- * Work with side effects which does not throw checked exceptions.
- *
- * @param <T> Work return type.
- */
- abstract class QuietSideEffectWork<T> extends SideEffectWork<T, RuntimeException> {
- }
-
- /**
- * Work with side effects that does not have a return value.
- *
- * @param <E> Work exception type.
- */
- abstract class NoResultSideEffectWork<E extends Exception> extends SideEffectWork<Void, E> {
-
- @Override public final Void apply(MutableStoreProvider storeProvider) throws E {
- execute(storeProvider);
- return null;
- }
-
- abstract void execute(MutableStoreProvider storeProvider) throws E;
- }
-
- /**
- * Work with side effects which does not throw checked exceptions or have a return
- * value.
- */
- abstract class NoResultQuietSideEffectWork extends NoResultSideEffectWork<RuntimeException> {
- }
-
- private <T, E extends Exception> MutateWork<T, E> executeSideEffectsAfter(
- final SideEffectWork<T, E> work) {
-
- return new MutateWork<T, E>() {
- @Override public T apply(MutableStoreProvider storeProvider) throws E {
- boolean topLevelOperation = inOperation.compareAndSet(false, true);
-
- try {
- T result = work.apply(storeProvider);
-
- // TODO(William Farner): Maintaining this since it matches prior behavior, but this
- // seems wrong. Double-check whether this is necessary, or if only the top-level
- // operation should be executing the finalizer. Update doc on OperationFinalizer
- // once this is assessed.
- operationFinalizer.finalize(work, storeProvider);
- if (topLevelOperation) {
- while (!events.isEmpty()) {
- taskEventSink.execute(events.remove());
- }
- }
- return result;
- } finally {
- if (topLevelOperation) {
- inOperation.set(false);
- }
- }
- }
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/state/StateManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/state/StateManager.java b/src/main/java/com/twitter/aurora/scheduler/state/StateManager.java
deleted file mode 100644
index 099ec70..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/state/StateManager.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.state;
-
-import java.util.Map;
-import java.util.Set;
-
-import com.google.common.base.Optional;
-
-import org.apache.mesos.Protos.SlaveID;
-
-import com.twitter.aurora.gen.ScheduleStatus;
-import com.twitter.aurora.scheduler.base.Query;
-import com.twitter.aurora.scheduler.storage.entities.IAssignedTask;
-import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
-
-/**
- * Thin interface for the state manager.
- */
-public interface StateManager {
-
- /**
- * Performs a simple state change, transitioning all tasks matching a query to the given
- * state and applying the given audit message.
- * TODO(William Farner): Consider removing the return value.
- *
- * @param query Builder of the query to perform, the results of which will be modified.
- * @param newState State to move the resulting tasks into.
- * @param auditMessage Audit message to apply along with the state change.
- * @return the number of successful state changes.
- */
- int changeState(Query.Builder query, ScheduleStatus newState, Optional<String> auditMessage);
-
- /**
- * Assigns a task to a specific slave.
- * This will modify the task record to reflect the host assignment and return the updated record.
- *
- * @param taskId ID of the task to mutate.
- * @param slaveHost Host name that the task is being assigned to.
- * @param slaveId ID of the slave that the task is being assigned to.
- * @param assignedPorts Ports on the host that are being assigned to the task.
- * @return The updated task record, or {@code null} if the task was not found.
- */
- IAssignedTask assignTask(
- String taskId,
- String slaveHost,
- SlaveID slaveId,
- Set<Integer> assignedPorts);
-
- /**
- * Inserts new tasks into the store. Tasks will immediately move into PENDING and will be eligible
- * for scheduling.
- *
- * @param tasks Tasks to insert, mapped by their instance IDs.
- */
- void insertPendingTasks(Map<Integer, ITaskConfig> tasks);
-
- /**
- * Deletes records of tasks from the task store.
- * This will not perform any state checking or state transitions, but will immediately remove
- * the tasks from the store. It will also silently ignore attempts to delete task IDs that do
- * not exist.
- *
- * @param taskIds IDs of tasks to delete.
- */
- void deleteTasks(final Set<String> taskIds);
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/state/StateManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/state/StateManagerImpl.java b/src/main/java/com/twitter/aurora/scheduler/state/StateManagerImpl.java
deleted file mode 100644
index 37d13f4..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/state/StateManagerImpl.java
+++ /dev/null
@@ -1,458 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.state;
-
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.logging.Logger;
-
-import javax.annotation.Nullable;
-import javax.inject.Inject;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.Atomics;
-
-import org.apache.mesos.Protos.SlaveID;
-
-import com.twitter.aurora.gen.AssignedTask;
-import com.twitter.aurora.gen.ScheduleStatus;
-import com.twitter.aurora.gen.ScheduledTask;
-import com.twitter.aurora.scheduler.Driver;
-import com.twitter.aurora.scheduler.TaskIdGenerator;
-import com.twitter.aurora.scheduler.base.Query;
-import com.twitter.aurora.scheduler.base.Tasks;
-import com.twitter.aurora.scheduler.events.PubsubEvent;
-import com.twitter.aurora.scheduler.state.SideEffectStorage.SideEffectWork;
-import com.twitter.aurora.scheduler.storage.Storage;
-import com.twitter.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import com.twitter.aurora.scheduler.storage.Storage.StoreProvider;
-import com.twitter.aurora.scheduler.storage.Storage.Work;
-import com.twitter.aurora.scheduler.storage.TaskStore;
-import com.twitter.aurora.scheduler.storage.TaskStore.Mutable.TaskMutation;
-import com.twitter.aurora.scheduler.storage.entities.IAssignedTask;
-import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
-import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
-import com.twitter.common.base.Closure;
-import com.twitter.common.stats.Stats;
-import com.twitter.common.util.Clock;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.collect.Iterables.transform;
-
-import static com.twitter.aurora.gen.ScheduleStatus.INIT;
-import static com.twitter.aurora.gen.ScheduleStatus.PENDING;
-import static com.twitter.aurora.gen.ScheduleStatus.UNKNOWN;
-import static com.twitter.aurora.scheduler.state.SideEffectStorage.OperationFinalizer;
-import static com.twitter.common.base.MorePreconditions.checkNotBlank;
-
-/**
- * Manager of all persistence-related operations for the scheduler. Acts as a controller for
- * persisted state machine transitions, and their side-effects.
- *
- * TODO(William Farner): Re-evaluate thread safety here, specifically risk of races that
- * modify managerState.
- */
-public class StateManagerImpl implements StateManager {
- private static final Logger LOG = Logger.getLogger(StateManagerImpl.class.getName());
-
- private final SideEffectStorage storage;
- @VisibleForTesting
- SideEffectStorage getStorage() {
- return storage;
- }
-
- private final TaskIdGenerator taskIdGenerator;
-
- // Work queue to receive state machine side effect work.
- // Items are sorted to place DELETE entries last. This is to ensure that within an operation,
- // a delete is always processed after a state transition.
- private final Queue<WorkEntry> workQueue = new PriorityQueue<>(10,
- new Comparator<WorkEntry>() {
- @Override public int compare(WorkEntry a, WorkEntry b) {
- if ((a.command == WorkCommand.DELETE) != (b.command == WorkCommand.DELETE)) {
- return (a.command == WorkCommand.DELETE) ? 1 : -1;
- } else {
- return 0;
- }
- }
- });
-
- // Adapt the work queue into a sink.
- private final TaskStateMachine.WorkSink workSink = new TaskStateMachine.WorkSink() {
- @Override public void addWork(
- WorkCommand work,
- TaskStateMachine stateMachine,
- Function<IScheduledTask, IScheduledTask> mutation) {
-
- workQueue.add(new WorkEntry(work, stateMachine, mutation));
- }
- };
-
- private final Function<Map.Entry<Integer, ITaskConfig>, IScheduledTask> taskCreator =
- new Function<Map.Entry<Integer, ITaskConfig>, IScheduledTask>() {
- @Override public IScheduledTask apply(Map.Entry<Integer, ITaskConfig> entry) {
- ITaskConfig task = entry.getValue();
- AssignedTask assigned = new AssignedTask()
- .setTaskId(taskIdGenerator.generate(task, entry.getKey()))
- .setInstanceId(entry.getKey())
- .setTask(task.newBuilder());
- return IScheduledTask.build(new ScheduledTask()
- .setStatus(INIT)
- .setAssignedTask(assigned));
- }
- };
-
- private final Driver driver;
- private final Clock clock;
-
- /**
- * An item of work on the work queue.
- */
- private static class WorkEntry {
- private final WorkCommand command;
- private final TaskStateMachine stateMachine;
- private final Function<IScheduledTask, IScheduledTask> mutation;
-
- WorkEntry(
- WorkCommand command,
- TaskStateMachine stateMachine,
- Function<IScheduledTask, IScheduledTask> mutation) {
-
- this.command = command;
- this.stateMachine = stateMachine;
- this.mutation = mutation;
- }
- }
-
- @Inject
- StateManagerImpl(
- final Storage storage,
- final Clock clock,
- Driver driver,
- TaskIdGenerator taskIdGenerator,
- Closure<PubsubEvent> taskEventSink) {
-
- checkNotNull(storage);
- this.clock = checkNotNull(clock);
-
- OperationFinalizer finalizer = new OperationFinalizer() {
- @Override public void finalize(SideEffectWork<?, ?> work, MutableStoreProvider store) {
- processWorkQueueInWriteOperation(work, store);
- }
- };
-
- this.storage = new SideEffectStorage(storage, finalizer, taskEventSink);
-
- this.driver = checkNotNull(driver);
- this.taskIdGenerator = checkNotNull(taskIdGenerator);
-
- Stats.exportSize("work_queue_depth", workQueue);
- }
-
- @Override
- public void insertPendingTasks(final Map<Integer, ITaskConfig> tasks) {
- checkNotNull(tasks);
-
- // Done outside the write transaction to minimize the work done inside a transaction.
- final Set<IScheduledTask> scheduledTasks =
- ImmutableSet.copyOf(transform(tasks.entrySet(), taskCreator));
-
- storage.write(storage.new NoResultQuietSideEffectWork() {
- @Override protected void execute(MutableStoreProvider storeProvider) {
- storeProvider.getUnsafeTaskStore().saveTasks(scheduledTasks);
-
- for (IScheduledTask task : scheduledTasks) {
- createStateMachine(task).updateState(PENDING);
- }
- }
- });
- }
-
- @Override
- public int changeState(
- Query.Builder query,
- final ScheduleStatus newState,
- final Optional<String> auditMessage) {
-
- return changeState(query, new Function<TaskStateMachine, Boolean>() {
- @Override public Boolean apply(TaskStateMachine stateMachine) {
- return stateMachine.updateState(newState, auditMessage);
- }
- });
- }
-
- @Override
- public IAssignedTask assignTask(
- String taskId,
- String slaveHost,
- SlaveID slaveId,
- Set<Integer> assignedPorts) {
-
- checkNotBlank(taskId);
- checkNotBlank(slaveHost);
- checkNotNull(assignedPorts);
-
- TaskAssignMutation mutation = assignHost(slaveHost, slaveId, assignedPorts);
- changeState(Query.taskScoped(taskId), mutation);
-
- return mutation.getAssignedTask();
- }
-
- private int changeStateInWriteOperation(
- Set<String> taskIds,
- Function<TaskStateMachine, Boolean> stateChange) {
-
- int count = 0;
- for (TaskStateMachine stateMachine : getStateMachines(taskIds).values()) {
- if (stateChange.apply(stateMachine)) {
- ++count;
- }
- }
- return count;
- }
-
- private int changeState(
- final Query.Builder query,
- final Function<TaskStateMachine, Boolean> stateChange) {
-
- return storage.write(storage.new QuietSideEffectWork<Integer>() {
- @Override public Integer apply(MutableStoreProvider storeProvider) {
- Set<String> ids = FluentIterable.from(storeProvider.getTaskStore().fetchTasks(query))
- .transform(Tasks.SCHEDULED_TO_ID)
- .toSet();
- return changeStateInWriteOperation(ids, stateChange);
- }
- });
- }
-
- private interface TaskAssignMutation extends Function<TaskStateMachine, Boolean> {
- IAssignedTask getAssignedTask();
- }
-
- private static Map<String, Integer> getNameMappedPorts(
- Set<String> portNames,
- Set<Integer> allocatedPorts) {
-
- Preconditions.checkNotNull(portNames);
-
- // Expand ports.
- Map<String, Integer> ports = Maps.newHashMap();
- Set<Integer> portsRemaining = Sets.newHashSet(allocatedPorts);
- Iterator<Integer> portConsumer = Iterables.consumingIterable(portsRemaining).iterator();
-
- for (String portName : portNames) {
- Preconditions.checkArgument(portConsumer.hasNext(),
- "Allocated ports %s were not sufficient to expand task.", allocatedPorts);
- int portNumber = portConsumer.next();
- ports.put(portName, portNumber);
- }
-
- if (!portsRemaining.isEmpty()) {
- LOG.warning("Not all allocated ports were used to map ports!");
- }
-
- return ports;
- }
-
- private TaskAssignMutation assignHost(
- final String slaveHost,
- final SlaveID slaveId,
- final Set<Integer> assignedPorts) {
-
- final TaskMutation mutation = new TaskMutation() {
- @Override public IScheduledTask apply(IScheduledTask task) {
- ScheduledTask builder = task.newBuilder();
- AssignedTask assigned = builder.getAssignedTask();
- assigned.setAssignedPorts(
- getNameMappedPorts(assigned.getTask().getRequestedPorts(), assignedPorts));
- assigned.setSlaveHost(slaveHost)
- .setSlaveId(slaveId.getValue());
- return IScheduledTask.build(builder);
- }
- };
-
- return new TaskAssignMutation() {
- private AtomicReference<IAssignedTask> assignedTask = Atomics.newReference();
- @Override public IAssignedTask getAssignedTask() {
- return assignedTask.get();
- }
-
- @Override public Boolean apply(final TaskStateMachine stateMachine) {
- TaskMutation wrapper = new TaskMutation() {
- @Override public IScheduledTask apply(IScheduledTask task) {
- IScheduledTask mutated = mutation.apply(task);
- Preconditions.checkState(
- assignedTask.compareAndSet(null, mutated.getAssignedTask()),
- "More than one result was found for an identity query.");
- return mutated;
- }
- };
- return stateMachine.updateState(ScheduleStatus.ASSIGNED, wrapper);
- }
- };
- }
-
- private void processWorkQueueInWriteOperation(
- SideEffectWork<?, ?> sideEffectWork,
- MutableStoreProvider storeProvider) {
-
- for (final WorkEntry work : Iterables.consumingIterable(workQueue)) {
- final TaskStateMachine stateMachine = work.stateMachine;
-
- if (work.command == WorkCommand.KILL) {
- driver.killTask(stateMachine.getTaskId());
- } else {
- TaskStore.Mutable taskStore = storeProvider.getUnsafeTaskStore();
- String taskId = stateMachine.getTaskId();
- Query.Builder idQuery = Query.taskScoped(taskId);
-
- switch (work.command) {
- case RESCHEDULE:
- ScheduledTask builder =
- Iterables.getOnlyElement(taskStore.fetchTasks(idQuery)).newBuilder();
- builder.getAssignedTask().unsetSlaveId();
- builder.getAssignedTask().unsetSlaveHost();
- builder.getAssignedTask().unsetAssignedPorts();
- builder.unsetTaskEvents();
- builder.setAncestorId(taskId);
- String newTaskId = taskIdGenerator.generate(
- ITaskConfig.build(builder.getAssignedTask().getTask()),
- builder.getAssignedTask().getInstanceId());
- builder.getAssignedTask().setTaskId(newTaskId);
-
- LOG.info("Task being rescheduled: " + taskId);
-
- IScheduledTask task = IScheduledTask.build(builder);
- taskStore.saveTasks(ImmutableSet.of(task));
-
- createStateMachine(task).updateState(PENDING, Optional.of("Rescheduled"));
- ITaskConfig taskInfo = task.getAssignedTask().getTask();
- sideEffectWork.addTaskEvent(
- new PubsubEvent.TaskRescheduled(
- taskInfo.getOwner().getRole(),
- taskInfo.getJobName(),
- task.getAssignedTask().getInstanceId()));
- break;
-
- case UPDATE_STATE:
- taskStore.mutateTasks(idQuery, new TaskMutation() {
- @Override public IScheduledTask apply(IScheduledTask task) {
- return work.mutation.apply(
- IScheduledTask.build(task.newBuilder().setStatus(stateMachine.getState())));
- }
- });
- sideEffectWork.addTaskEvent(
- new PubsubEvent.TaskStateChange(
- Iterables.getOnlyElement(taskStore.fetchTasks(idQuery)),
- stateMachine.getPreviousState()));
- break;
-
- case DELETE:
- deleteTasks(ImmutableSet.of(taskId));
- break;
-
- case INCREMENT_FAILURES:
- taskStore.mutateTasks(idQuery, new TaskMutation() {
- @Override public IScheduledTask apply(IScheduledTask task) {
- return IScheduledTask.build(
- task.newBuilder().setFailureCount(task.getFailureCount() + 1));
- }
- });
- break;
-
- default:
- LOG.severe("Unrecognized work command type " + work.command);
- }
- }
- }
- }
-
- @Override
- public void deleteTasks(final Set<String> taskIds) {
- storage.write(storage.new NoResultQuietSideEffectWork() {
- @Override protected void execute(final MutableStoreProvider storeProvider) {
- TaskStore.Mutable taskStore = storeProvider.getUnsafeTaskStore();
- Iterable<IScheduledTask> tasks = taskStore.fetchTasks(Query.taskScoped(taskIds));
- addTaskEvent(new PubsubEvent.TasksDeleted(ImmutableSet.copyOf(tasks)));
- taskStore.deleteTasks(taskIds);
- }
- });
- }
-
- private Map<String, TaskStateMachine> getStateMachines(final Set<String> taskIds) {
- return storage.consistentRead(new Work.Quiet<Map<String, TaskStateMachine>>() {
- @Override public Map<String, TaskStateMachine> apply(StoreProvider storeProvider) {
- Map<String, IScheduledTask> existingTasks = Maps.uniqueIndex(
- storeProvider.getTaskStore().fetchTasks(Query.taskScoped(taskIds)),
- new Function<IScheduledTask, String>() {
- @Override public String apply(IScheduledTask input) {
- return input.getAssignedTask().getTaskId();
- }
- });
-
- ImmutableMap.Builder<String, TaskStateMachine> builder = ImmutableMap.builder();
- for (String taskId : taskIds) {
- // Pass null get() values through.
- builder.put(taskId, getStateMachine(taskId, existingTasks.get(taskId)));
- }
- return builder.build();
- }
- });
- }
-
- private TaskStateMachine getStateMachine(String taskId, @Nullable IScheduledTask task) {
- if (task != null) {
- return createStateMachine(task, task.getStatus());
- }
-
- // The task is unknown, not present in storage.
- TaskStateMachine stateMachine = new TaskStateMachine(
- taskId,
- null,
- workSink,
- clock,
- INIT);
- stateMachine.updateState(UNKNOWN);
- return stateMachine;
- }
-
- private TaskStateMachine createStateMachine(IScheduledTask task) {
- return createStateMachine(task, INIT);
- }
-
- private TaskStateMachine createStateMachine(IScheduledTask task, ScheduleStatus initialState) {
- return new TaskStateMachine(
- Tasks.id(task),
- task,
- workSink,
- clock,
- initialState);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/state/StateModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/state/StateModule.java b/src/main/java/com/twitter/aurora/scheduler/state/StateModule.java
deleted file mode 100644
index 870d085..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/state/StateModule.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.state;
-
-import javax.inject.Singleton;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.inject.AbstractModule;
-import com.google.inject.Binder;
-
-import com.twitter.aurora.scheduler.MesosTaskFactory;
-import com.twitter.aurora.scheduler.MesosTaskFactory.MesosTaskFactoryImpl;
-import com.twitter.aurora.scheduler.events.PubsubEventModule;
-import com.twitter.aurora.scheduler.state.MaintenanceController.MaintenanceControllerImpl;
-import com.twitter.aurora.scheduler.state.TaskAssigner.TaskAssignerImpl;
-import com.twitter.aurora.scheduler.state.UUIDGenerator.UUIDGeneratorImpl;
-
-/**
- * Binding module for scheduling logic and higher-level state management.
- */
-public class StateModule extends AbstractModule {
-
- @Override
- protected void configure() {
- bind(TaskAssigner.class).to(TaskAssignerImpl.class);
- bind(TaskAssignerImpl.class).in(Singleton.class);
- bind(MesosTaskFactory.class).to(MesosTaskFactoryImpl.class);
-
- bind(SchedulerCore.class).to(SchedulerCoreImpl.class).in(Singleton.class);
-
- bind(StateManager.class).to(StateManagerImpl.class);
- bind(StateManagerImpl.class).in(Singleton.class);
-
- bind(UUIDGenerator.class).to(UUIDGeneratorImpl.class);
- bind(UUIDGeneratorImpl.class).in(Singleton.class);
- bind(LockManager.class).to(LockManagerImpl.class);
- bind(LockManagerImpl.class).in(Singleton.class);
-
- bindCronJobManager(binder());
- bind(ImmediateJobManager.class).in(Singleton.class);
-
- bindMaintenanceController(binder());
- }
-
- @VisibleForTesting
- static void bindCronJobManager(Binder binder) {
- binder.bind(CronJobManager.class).in(Singleton.class);
- PubsubEventModule.bindSubscriber(binder, CronJobManager.class);
- }
-
- @VisibleForTesting
- static void bindMaintenanceController(Binder binder) {
- binder.bind(MaintenanceController.class).to(MaintenanceControllerImpl.class);
- binder.bind(MaintenanceControllerImpl.class).in(Singleton.class);
- PubsubEventModule.bindSubscriber(binder, MaintenanceControllerImpl.class);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/state/TaskAssigner.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/state/TaskAssigner.java b/src/main/java/com/twitter/aurora/scheduler/state/TaskAssigner.java
deleted file mode 100644
index c37feae..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/state/TaskAssigner.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.state;
-
-import java.util.Set;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-
-import com.google.common.base.Optional;
-
-import org.apache.mesos.Protos.Offer;
-import org.apache.mesos.Protos.TaskInfo;
-
-import com.twitter.aurora.scheduler.MesosTaskFactory;
-import com.twitter.aurora.scheduler.ResourceSlot;
-import com.twitter.aurora.scheduler.base.Tasks;
-import com.twitter.aurora.scheduler.configuration.Resources;
-import com.twitter.aurora.scheduler.filter.SchedulingFilter;
-import com.twitter.aurora.scheduler.filter.SchedulingFilter.Veto;
-import com.twitter.aurora.scheduler.storage.entities.IAssignedTask;
-import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Responsible for matching a task against an offer.
- */
-public interface TaskAssigner {
-
- /**
- * Tries to match a task against an offer. If a match is found, the assigner should
- * make the appropriate changes to the task and provide a non-empty result.
- *
- * @param offer The resource offer.
- * @param task The task to match against and optionally assign.
- * @return Instructions for launching the task if matching and assignment were successful.
- */
- Optional<TaskInfo> maybeAssign(Offer offer, IScheduledTask task);
-
- class TaskAssignerImpl implements TaskAssigner {
- private static final Logger LOG = Logger.getLogger(TaskAssignerImpl.class.getName());
-
- private final StateManager stateManager;
- private final SchedulingFilter filter;
- private final MesosTaskFactory taskFactory;
-
- @Inject
- public TaskAssignerImpl(
- StateManager stateManager,
- SchedulingFilter filter,
- MesosTaskFactory taskFactory) {
-
- this.stateManager = checkNotNull(stateManager);
- this.filter = checkNotNull(filter);
- this.taskFactory = checkNotNull(taskFactory);
- }
-
- private TaskInfo assign(Offer offer, IScheduledTask task) {
- String host = offer.getHostname();
- Set<Integer> selectedPorts =
- Resources.getPorts(offer, task.getAssignedTask().getTask().getRequestedPorts().size());
- IAssignedTask assigned = stateManager.assignTask(
- Tasks.id(task),
- host,
- offer.getSlaveId(),
- selectedPorts);
- LOG.info(String.format("Offer on slave %s (id %s) is being assigned task for %s.",
- host, offer.getSlaveId(), Tasks.id(task)));
- return taskFactory.createFrom(assigned, offer.getSlaveId());
- }
-
- @Override
- public Optional<TaskInfo> maybeAssign(Offer offer, IScheduledTask task) {
- Set<Veto> vetoes = filter.filter(
- ResourceSlot.from(offer),
- offer.getHostname(),
- task.getAssignedTask().getTask(),
- Tasks.id(task));
- if (vetoes.isEmpty()) {
- return Optional.of(assign(offer, task));
- } else {
- LOG.fine("Slave " + offer.getHostname() + " vetoed task " + Tasks.id(task)
- + ": " + vetoes);
- return Optional.absent();
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/state/TaskStateMachine.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/state/TaskStateMachine.java b/src/main/java/com/twitter/aurora/scheduler/state/TaskStateMachine.java
deleted file mode 100644
index f32fd14..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/state/TaskStateMachine.java
+++ /dev/null
@@ -1,622 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.state;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.annotation.Nullable;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Functions;
-import com.google.common.base.Optional;
-import com.google.common.base.Supplier;
-import com.google.common.base.Suppliers;
-import com.google.common.base.Throwables;
-
-import org.apache.commons.lang.builder.HashCodeBuilder;
-
-import com.twitter.aurora.gen.ScheduleStatus;
-import com.twitter.aurora.gen.ScheduledTask;
-import com.twitter.aurora.gen.TaskEvent;
-import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
-import com.twitter.common.base.Closure;
-import com.twitter.common.base.Closures;
-import com.twitter.common.base.Command;
-import com.twitter.common.base.MorePreconditions;
-import com.twitter.common.stats.Stats;
-import com.twitter.common.util.Clock;
-import com.twitter.common.util.StateMachine;
-import com.twitter.common.util.StateMachine.Rule;
-import com.twitter.common.util.StateMachine.Transition;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * State machine for a task.
- * <p>
- * This enforces the lifecycle of a task, and triggers the actions that should be taken in response
- * to different state transitions. These responses are externally communicated by populating a
- * provided work queue.
- * <p>
- * TODO(William Farner): Introduce an interface to allow state machines to be dealt with
- * abstractly from the consumption side.
- */
-class TaskStateMachine {
- private static final Logger LOG = Logger.getLogger(TaskStateMachine.class.getName());
-
- private static final AtomicLong ILLEGAL_TRANSITIONS =
- Stats.exportLong("scheduler_illegal_task_state_transitions");
-
- // Re-declarations of statuses as wrapped state objects.
- private static final State ASSIGNED = State.create(ScheduleStatus.ASSIGNED);
- private static final State FAILED = State.create(ScheduleStatus.FAILED);
- private static final State FINISHED = State.create(ScheduleStatus.FINISHED);
- private static final State INIT = State.create(ScheduleStatus.INIT);
- private static final State KILLED = State.create(ScheduleStatus.KILLED);
- private static final State KILLING = State.create(ScheduleStatus.KILLING);
- private static final State LOST = State.create(ScheduleStatus.LOST);
- private static final State PENDING = State.create(ScheduleStatus.PENDING);
- private static final State PREEMPTING = State.create(ScheduleStatus.PREEMPTING);
- private static final State RESTARTING = State.create(ScheduleStatus.RESTARTING);
- private static final State RUNNING = State.create(ScheduleStatus.RUNNING);
- private static final State STARTING = State.create(ScheduleStatus.STARTING);
- private static final State UNKNOWN = State.create(ScheduleStatus.UNKNOWN);
-
- @VisibleForTesting
- static final Supplier<String> LOCAL_HOST_SUPPLIER = Suppliers.memoize(
- new Supplier<String>() {
- @Override public String get() {
- try {
- return InetAddress.getLocalHost().getHostName();
- } catch (UnknownHostException e) {
- LOG.log(Level.SEVERE, "Failed to get self hostname.");
- throw Throwables.propagate(e);
- }
- }
- });
-
- private final String taskId;
- private final WorkSink workSink;
- private final StateMachine<State> stateMachine;
- private ScheduleStatus previousState = null;
- private final Clock clock;
-
- /**
- * Composes a schedule status and a state change argument. Only the ScheduleStatuses in two
- * States must be equal for them to be considered equal.
- */
- private static class State {
- private final ScheduleStatus state;
- private final Function<IScheduledTask, IScheduledTask> mutation;
-
- State(ScheduleStatus state, Function<IScheduledTask, IScheduledTask> mutation) {
- this.state = state;
- this.mutation = mutation;
- }
-
- static State create(ScheduleStatus status) {
- return create(status, Functions.<IScheduledTask>identity());
- }
-
- static State create(
- ScheduleStatus status,
- Function<IScheduledTask, IScheduledTask> mutation) {
-
- return new State(status, mutation);
- }
-
- @Override
- public boolean equals(Object o) {
- if (!(o instanceof State)) {
- return false;
- }
-
- if (o == this) {
- return true;
- }
-
- State other = (State) o;
- return state == other.state;
- }
-
- @Override
- public int hashCode() {
- return new HashCodeBuilder()
- .append(state)
- .toHashCode();
- }
-
- @Override
- public String toString() {
- return state.toString();
- }
-
- private ScheduleStatus getState() {
- return state;
- }
-
- private Function<IScheduledTask, IScheduledTask> getMutation() {
- return mutation;
- }
- }
-
- /**
- * A write-only work acceptor.
- */
- public interface WorkSink {
- /**
- * Appends external work that must be performed for a state machine transition to be fully
- * complete.
- *
- * @param work Description of the work to be performed.
- * @param stateMachine The state machine that the work is associated with.
- * @param mutation Mutate operation to perform along with the state transition.
- */
- void addWork(
- WorkCommand work,
- TaskStateMachine stateMachine,
- Function<IScheduledTask, IScheduledTask> mutation);
- }
-
- /**
- * Creates a new task state machine.
- *
- * @param taskId ID of the task managed by this state machine.
- * @param task Read-only task that this state machine manages.
- * @param workSink Work sink to receive transition response actions
- * @param clock Clock to use for reading the current time.
- * @param initialState The state to begin the state machine at. All legal transitions will be
- * added, but this allows the state machine to 'skip' states, for instance when a task is
- * loaded from a persistent store.
- */
- public TaskStateMachine(
- final String taskId,
- final IScheduledTask task,
- final WorkSink workSink,
- final Clock clock,
- final ScheduleStatus initialState) {
-
- this.taskId = MorePreconditions.checkNotBlank(taskId);
- this.workSink = checkNotNull(workSink);
- this.clock = checkNotNull(clock);
- checkNotNull(initialState);
-
- @SuppressWarnings("unchecked")
- Closure<Transition<State>> manageTerminatedTasks = Closures.combine(
- /* Kill a task that we believe to be terminated when an attempt is made to revive. */
- Closures.filter(Transition.to(ASSIGNED, STARTING, RUNNING),
- addWorkClosure(WorkCommand.KILL)),
- /* Remove a terminated task that is remotely removed. */
- Closures.filter(Transition.to(UNKNOWN), addWorkClosure(WorkCommand.DELETE)));
-
- final Closure<Transition<State>> manageRestartingTask = new Closure<Transition<State>>() {
- @SuppressWarnings("fallthrough")
- @Override public void execute(Transition<State> transition) {
- switch (transition.getTo().getState()) {
- case ASSIGNED:
- case STARTING:
- case RUNNING:
- addWork(WorkCommand.KILL);
- break;
-
- case LOST:
- addWork(WorkCommand.KILL);
- // fall through
-
- case FINISHED:
- case FAILED:
- case KILLED:
- addWork(WorkCommand.RESCHEDULE, transition.getTo().getMutation());
- break;
-
- case UNKNOWN:
- updateState(ScheduleStatus.LOST);
- break;
-
- default:
- // No-op.
- }
- }
- };
-
- // To be called on a task transitioning into the FINISHED state.
- final Command rescheduleIfService = new Command() {
- @Override public void execute() {
- if (task.getAssignedTask().getTask().isIsService()) {
- addWork(WorkCommand.RESCHEDULE);
- }
- }
- };
-
- // To be called on a task transitioning into the FAILED state.
- final Command incrementFailuresMaybeReschedule = new Command() {
- @Override public void execute() {
- addWork(WorkCommand.INCREMENT_FAILURES);
-
- // Max failures is ignored for service task.
- boolean isService = task.getAssignedTask().getTask().isIsService();
-
- // Max failures is ignored when set to -1.
- int maxFailures = task.getAssignedTask().getTask().getMaxTaskFailures();
- if (isService || (maxFailures == -1) || (task.getFailureCount() < (maxFailures - 1))) {
- addWork(WorkCommand.RESCHEDULE);
- } else {
- LOG.info("Task " + getTaskId() + " reached failure limit, not rescheduling");
- }
- }
- };
-
- stateMachine = StateMachine.<State>builder(taskId)
- .logTransitions()
- .initialState(State.create(initialState))
- .addState(
- Rule.from(INIT)
- .to(PENDING, UNKNOWN))
- .addState(
- Rule.from(PENDING)
- .to(ASSIGNED, KILLING)
- .withCallback(
- new Closure<Transition<State>>() {
- @Override public void execute(Transition<State> transition) {
- switch (transition.getTo().getState()) {
- case KILLING:
- addWork(WorkCommand.DELETE);
- break;
-
- default:
- // No-op.
- }
- }
- }
- ))
- .addState(
- Rule.from(ASSIGNED)
- .to(STARTING, RUNNING, FINISHED, FAILED, RESTARTING, KILLED,
- KILLING, LOST, PREEMPTING)
- .withCallback(
- new Closure<Transition<State>>() {
- @SuppressWarnings("fallthrough")
- @Override public void execute(Transition<State> transition) {
- switch (transition.getTo().getState()) {
- case FINISHED:
- rescheduleIfService.execute();
- break;
-
- case PREEMPTING:
- addWork(WorkCommand.KILL);
- break;
-
- case FAILED:
- incrementFailuresMaybeReschedule.execute();
- break;
-
- case RESTARTING:
- addWork(WorkCommand.KILL);
- break;
-
- case KILLED:
- addWork(WorkCommand.RESCHEDULE);
- break;
-
- case LOST:
- addWork(WorkCommand.RESCHEDULE);
- // fall through
- case KILLING:
- addWork(WorkCommand.KILL);
- break;
-
- case UNKNOWN:
- break;
-
- default:
- // No-op.
- }
- }
- }
- ))
- .addState(
- Rule.from(STARTING)
- .to(RUNNING, FINISHED, FAILED, RESTARTING, KILLING, KILLED, LOST, PREEMPTING)
- .withCallback(
- new Closure<Transition<State>>() {
- @SuppressWarnings("fallthrough")
- @Override public void execute(Transition<State> transition) {
- switch (transition.getTo().getState()) {
- case FINISHED:
- rescheduleIfService.execute();
- break;
-
- case RESTARTING:
- addWork(WorkCommand.KILL);
- break;
-
- case PREEMPTING:
- addWork(WorkCommand.KILL);
- break;
-
- case FAILED:
- incrementFailuresMaybeReschedule.execute();
- break;
-
- case KILLED:
- addWork(WorkCommand.RESCHEDULE);
- break;
-
- case KILLING:
- addWork(WorkCommand.KILL);
- break;
-
- case LOST:
- addWork(WorkCommand.RESCHEDULE);
- break;
-
- case UNKNOWN:
- // The slave previously acknowledged that it had the task, and now
- // stopped reporting it.
- updateState(ScheduleStatus.LOST);
- break;
-
- default:
- // No-op.
- }
- }
- }
- ))
- .addState(
- Rule.from(RUNNING)
- .to(FINISHED, RESTARTING, FAILED, KILLING, KILLED, LOST, PREEMPTING)
- .withCallback(
- new Closure<Transition<State>>() {
- @SuppressWarnings("fallthrough")
- @Override public void execute(Transition<State> transition) {
- switch (transition.getTo().getState()) {
- case FINISHED:
- rescheduleIfService.execute();
- break;
-
- case PREEMPTING:
- addWork(WorkCommand.KILL);
- break;
-
- case RESTARTING:
- addWork(WorkCommand.KILL);
- break;
-
- case FAILED:
- incrementFailuresMaybeReschedule.execute();
- break;
-
- case KILLED:
- addWork(WorkCommand.RESCHEDULE);
- break;
-
- case KILLING:
- addWork(WorkCommand.KILL);
- break;
-
- case LOST:
- addWork(WorkCommand.RESCHEDULE);
- break;
-
- case UNKNOWN:
- updateState(ScheduleStatus.LOST);
- break;
-
- default:
- // No-op.
- }
- }
- }
- ))
- .addState(
- Rule.from(FINISHED)
- .to(UNKNOWN)
- .withCallback(manageTerminatedTasks))
- .addState(
- Rule.from(PREEMPTING)
- .to(FINISHED, FAILED, KILLING, KILLED, LOST)
- .withCallback(manageRestartingTask))
- .addState(
- Rule.from(RESTARTING)
- .to(FINISHED, FAILED, KILLING, KILLED, LOST)
- .withCallback(manageRestartingTask))
- .addState(
- Rule.from(FAILED)
- .to(UNKNOWN)
- .withCallback(manageTerminatedTasks))
- .addState(
- Rule.from(KILLED)
- .to(UNKNOWN)
- .withCallback(manageTerminatedTasks))
- .addState(
- Rule.from(KILLING)
- .to(FINISHED, FAILED, KILLED, LOST, UNKNOWN)
- .withCallback(manageTerminatedTasks))
- .addState(
- Rule.from(LOST)
- .to(UNKNOWN)
- .withCallback(manageTerminatedTasks))
- .addState(
- Rule.from(UNKNOWN)
- .noTransitions()
- .withCallback(manageTerminatedTasks))
- // Since we want this action to be performed last in the transition sequence, the callback
- // must be the last chained transition callback.
- .onAnyTransition(
- new Closure<Transition<State>>() {
- @Override public void execute(final Transition<State> transition) {
- ScheduleStatus from = transition.getFrom().getState();
- ScheduleStatus to = transition.getTo().getState();
-
- if (transition.isValidStateChange() && (to != ScheduleStatus.UNKNOWN)
- // Prevent an update when killing a pending task, since the task is deleted
- // prior to the update.
- && !((from == ScheduleStatus.PENDING) && (to == ScheduleStatus.KILLING))) {
- addWork(WorkCommand.UPDATE_STATE, transition.getTo().getMutation());
- } else if (!transition.isAllowed()) {
- LOG.log(Level.SEVERE, "Illegal state transition attempted: " + transition);
- ILLEGAL_TRANSITIONS.incrementAndGet();
- }
-
- if (transition.isValidStateChange()) {
- previousState = from;
- }
- }
- }
- )
- // TODO(wfarner): Consider alternatives to allow exceptions to surface. This would allow
- // the state machine to surface illegal state transitions and propagate better information
- // to the caller. As it stands, the caller must implement logic that really belongs in
- // the state machine. For example, preventing RESTARTING->UPDATING transitions
- // (or for that matter, almost any user-initiated state transition) is awkward.
- .throwOnBadTransition(false)
- .build();
- }
-
- private Closure<Transition<State>> addWorkClosure(final WorkCommand work) {
- return new Closure<Transition<State>>() {
- @Override public void execute(Transition<State> item) {
- addWork(work);
- }
- };
- }
-
- private void addWork(WorkCommand work) {
- addWork(work, Functions.<IScheduledTask>identity());
- }
-
- private void addWork(WorkCommand work, Function<IScheduledTask, IScheduledTask> mutation) {
- LOG.info("Adding work command " + work + " for " + this);
- workSink.addWork(work, TaskStateMachine.this, mutation);
- }
-
- /**
- * Same as {@link #updateState(ScheduleStatus, Function)}, but uses a noop mutation.
- *
- * @param status Status to apply to the task.
- * @return {@code true} if the state change was allowed, {@code false} otherwise.
- */
- public synchronized boolean updateState(ScheduleStatus status) {
- return updateState(status, Functions.<IScheduledTask>identity());
- }
-
- /**
- * Same as {@link #updateState(ScheduleStatus, Function, Optional)}, but uses a noop mutation.
- *
- * @param status Status to apply to the task.
- * @param auditMessage The (optional) audit message to associate with the transition.
- * @return {@code true} if the state change was allowed, {@code false} otherwise.
- */
- public synchronized boolean updateState(ScheduleStatus status, Optional<String> auditMessage) {
- return updateState(status, Functions.<IScheduledTask>identity(), auditMessage);
- }
-
- /**
- * Same as {@link #updateState(ScheduleStatus, Function, Optional)}, but omits the audit message.
- *
- * @param status Status to apply to the task.
- * @param mutation Mutate operation to perform while updating the task.
- * @return {@code true} if the state change was allowed, {@code false} otherwise.
- */
- public synchronized boolean updateState(
- ScheduleStatus status,
- Function<IScheduledTask, IScheduledTask> mutation) {
-
- return updateState(status, mutation, Optional.<String>absent());
- }
-
- /**
- * Attempt to transition the state machine to the provided state.
- * At the time this method returns, any work commands required to satisfy the state transition
- * will be appended to the work queue.
- *
- * @param status Status to apply to the task.
- * @param auditMessage The audit message to associate with the transition.
- * @param mutation Mutate operation to perform while updating the task.
- * @return {@code true} if the state change was allowed, {@code false} otherwise.
- */
- public synchronized boolean updateState(
- final ScheduleStatus status,
- Function<IScheduledTask, IScheduledTask> mutation,
- final Optional<String> auditMessage) {
-
- checkNotNull(status);
- checkNotNull(mutation);
- checkNotNull(auditMessage);
-
- /**
- * Don't bother applying noop state changes. If we end up modifying task state without a
- * state transition (e.g. storing resource consumption of a running task), we need to find
- * a different way to suppress noop transitions.
- */
- if (stateMachine.getState().getState() != status) {
- Function<IScheduledTask, IScheduledTask> operation = Functions.compose(mutation,
- new Function<IScheduledTask, IScheduledTask>() {
- @Override public IScheduledTask apply(IScheduledTask task) {
- ScheduledTask builder = task.newBuilder();
- builder.addToTaskEvents(new TaskEvent()
- .setTimestamp(clock.nowMillis())
- .setStatus(status)
- .setMessage(auditMessage.orNull())
- .setScheduler(LOCAL_HOST_SUPPLIER.get()));
- return IScheduledTask.build(builder);
- }
- });
- return stateMachine.transition(State.create(status, operation));
- }
-
- return false;
- }
-
- /**
- * Fetch the current state from the state machine.
- *
- * @return The current state.
- */
- public synchronized ScheduleStatus getState() {
- return stateMachine.getState().getState();
- }
-
- /**
- * Gets the ID for the task that this state machine manages.
- *
- * @return The state machine's task ID.
- */
- public String getTaskId() {
- return taskId;
- }
-
- /**
- * Gets the previous state of this state machine.
- *
- * @return The state machine's previous state, or {@code null} if the state machine has not
- * transitioned since being created.
- */
- @Nullable
- ScheduleStatus getPreviousState() {
- return previousState;
- }
-
- @Override
- public String toString() {
- return getTaskId();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/state/UUIDGenerator.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/state/UUIDGenerator.java b/src/main/java/com/twitter/aurora/scheduler/state/UUIDGenerator.java
deleted file mode 100644
index d8de19c..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/state/UUIDGenerator.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package com.twitter.aurora.scheduler.state;
-
-import java.util.UUID;
-
-/**
- * Wraps {@link java.util.UUID#randomUUID()} to facilitate unit testing.
- */
-interface UUIDGenerator {
- UUID createNew();
-
- class UUIDGeneratorImpl implements UUIDGenerator {
- @Override
- public UUID createNew() {
- return UUID.randomUUID();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/state/WorkCommand.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/state/WorkCommand.java b/src/main/java/com/twitter/aurora/scheduler/state/WorkCommand.java
deleted file mode 100644
index 6c5637d..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/state/WorkCommand.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.state;
-
-/**
- * Descriptions of the different types of external work commands that task state machines may
- * trigger.
- */
-enum WorkCommand {
- // Send an instruction for the runner of this task to kill the task.
- KILL,
- // Create a new state machine with a copy of this task.
- RESCHEDULE,
- // Update the task's state (schedule status) in the persistent store to match the state machine.
- UPDATE_STATE,
- // Delete this task from the persistent store.
- DELETE,
- // Increment the failure count for this task.
- INCREMENT_FAILURES
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/stats/AsyncStatsModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/stats/AsyncStatsModule.java b/src/main/java/com/twitter/aurora/scheduler/stats/AsyncStatsModule.java
deleted file mode 100644
index a52c5c5..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/stats/AsyncStatsModule.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.stats;
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import javax.inject.Inject;
-import javax.inject.Singleton;
-
-import com.google.common.base.Function;
-import com.google.common.collect.FluentIterable;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.inject.AbstractModule;
-import com.google.inject.BindingAnnotation;
-
-import org.apache.mesos.Protos.Offer;
-
-import com.twitter.aurora.gen.Quota;
-import com.twitter.aurora.scheduler.async.OfferQueue;
-import com.twitter.aurora.scheduler.configuration.Resources;
-import com.twitter.aurora.scheduler.stats.SlotSizeCounter.ResourceSlotProvider;
-import com.twitter.aurora.scheduler.storage.entities.IQuota;
-import com.twitter.common.application.modules.LifecycleModule;
-import com.twitter.common.args.Arg;
-import com.twitter.common.args.CmdLine;
-import com.twitter.common.base.Command;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Data;
-import com.twitter.common.quantity.Time;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Module to configure export of cluster-wide resource allocation and consumption statistics.
- */
-public class AsyncStatsModule extends AbstractModule {
-
- @CmdLine(name = "async_task_stat_update_interval",
- help = "Interval on which to try to update resource consumption stats.")
- private static final Arg<Amount<Long, Time>> TASK_STAT_INTERVAL =
- Arg.create(Amount.of(1L, Time.HOURS));
-
- @CmdLine(name = "async_slot_stat_update_interval",
- help = "Interval on which to try to update open slot stats.")
- private static final Arg<Amount<Long, Time>> SLOT_STAT_INTERVAL =
- Arg.create(Amount.of(1L, Time.MINUTES));
-
- @BindingAnnotation
- @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
- private @interface StatExecutor { }
-
- @Override
- protected void configure() {
- final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(
- new ThreadFactoryBuilder().setNameFormat("AsyncStat-%d").setDaemon(true).build());
-
- bind(TaskStatCalculator.class).in(Singleton.class);
- bind(CachedCounters.class).in(Singleton.class);
- bind(ResourceSlotProvider.class).to(OfferAdapter.class);
- bind(SlotSizeCounter.class).in(Singleton.class);
-
- bind(ScheduledExecutorService.class).annotatedWith(StatExecutor.class).toInstance(executor);
- LifecycleModule.bindStartupAction(binder(), StatUpdater.class);
- }
-
- static class StatUpdater implements Command {
- private final ScheduledExecutorService executor;
- private final TaskStatCalculator taskStats;
- private final SlotSizeCounter slotCounter;
-
- @Inject
- StatUpdater(
- @StatExecutor ScheduledExecutorService executor,
- TaskStatCalculator taskStats,
- SlotSizeCounter slotCounter) {
-
- this.executor = checkNotNull(executor);
- this.taskStats = checkNotNull(taskStats);
- this.slotCounter = checkNotNull(slotCounter);
- }
-
- @Override
- public void execute() {
- long taskInterval = TASK_STAT_INTERVAL.get().as(Time.SECONDS);
- executor.scheduleAtFixedRate(taskStats, taskInterval, taskInterval, TimeUnit.SECONDS);
- long slotInterval = SLOT_STAT_INTERVAL.get().as(Time.SECONDS);
- executor.scheduleAtFixedRate(slotCounter, slotInterval, slotInterval, TimeUnit.SECONDS);
- }
- }
-
- static class OfferAdapter implements ResourceSlotProvider {
- private static final Function<Offer, IQuota> TO_QUOTA = new Function<Offer, IQuota>() {
- @Override public IQuota apply(Offer offer) {
- Resources resources = Resources.from(offer);
- return IQuota.build(new Quota()
- .setNumCpus(resources.getNumCpus())
- .setRamMb(resources.getRam().as(Data.MB))
- .setDiskMb(resources.getDisk().as(Data.MB)));
- }
- };
-
- private final OfferQueue offerQueue;
-
- @Inject
- OfferAdapter(OfferQueue offerQueue) {
- this.offerQueue = checkNotNull(offerQueue);
- }
-
- @Override
- public Iterable<IQuota> get() {
- Iterable<Offer> offers = offerQueue.getOffers();
- return FluentIterable.from(offers).transform(TO_QUOTA);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/stats/CachedCounters.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/stats/CachedCounters.java b/src/main/java/com/twitter/aurora/scheduler/stats/CachedCounters.java
deleted file mode 100644
index 81d6811..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/stats/CachedCounters.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.stats;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.inject.Inject;
-
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-
-import com.twitter.common.stats.StatsProvider;
-
-/**
- * A cache of stats, allowing counters to be fetched and reused based on their names.
- */
-class CachedCounters {
- private final LoadingCache<String, AtomicLong> cache;
-
- @Inject
- CachedCounters(final StatsProvider stats) {
- cache = CacheBuilder.newBuilder().build(
- new CacheLoader<String, AtomicLong>() {
- @Override public AtomicLong load(String key) {
- return stats.makeCounter(key);
- }
- }
- );
- }
-
- AtomicLong get(String name) {
- return cache.getUnchecked(name);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/stats/ResourceCounter.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/stats/ResourceCounter.java b/src/main/java/com/twitter/aurora/scheduler/stats/ResourceCounter.java
deleted file mode 100644
index 7b96e86..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/stats/ResourceCounter.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.stats;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-import javax.inject.Inject;
-
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.collect.Iterables;
-
-import com.twitter.aurora.scheduler.base.Query;
-import com.twitter.aurora.scheduler.base.Tasks;
-import com.twitter.aurora.scheduler.configuration.ConfigurationManager;
-import com.twitter.aurora.scheduler.storage.Storage;
-import com.twitter.aurora.scheduler.storage.Storage.StorageException;
-import com.twitter.aurora.scheduler.storage.Storage.StoreProvider;
-import com.twitter.aurora.scheduler.storage.Storage.Work;
-import com.twitter.aurora.scheduler.storage.entities.IQuota;
-import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
-
-/**
- * Computes aggregate metrics about resource allocation and consumption in the scheduler.
- */
-public class ResourceCounter {
- private final Storage storage;
-
- @Inject
- ResourceCounter(Storage storage) {
- this.storage = Preconditions.checkNotNull(storage);
- }
-
- private Iterable<ITaskConfig> getTasks(Query.Builder query) throws StorageException {
- return Iterables.transform(
- Storage.Util.consistentFetchTasks(storage, query),
- Tasks.SCHEDULED_TO_INFO);
- }
-
- /**
- * Computes totals for each of the {@link MetricType}s.
- *
- * @return aggregates for each global metric type.
- * @throws StorageException if there was a problem fetching tasks from storage.
- */
- public List<GlobalMetric> computeConsumptionTotals() throws StorageException {
- List<GlobalMetric> counts = Arrays.asList(
- new GlobalMetric(MetricType.TOTAL_CONSUMED),
- new GlobalMetric(MetricType.DEDICATED_CONSUMED),
- new GlobalMetric(MetricType.QUOTA_CONSUMED),
- new GlobalMetric(MetricType.FREE_POOL_CONSUMED));
-
- for (ITaskConfig task : getTasks(Query.unscoped().active())) {
- for (GlobalMetric count : counts) {
- count.accumulate(task);
- }
- }
- return counts;
- }
-
- /**
- * Computes total quota allocations.
- *
- * @return Total allocated quota.
- * @throws StorageException if there was a problem fetching quotas from storage.
- */
- public Metric computeQuotaAllocationTotals() throws StorageException {
- return storage.weaklyConsistentRead(new Work.Quiet<Metric>() {
- @Override public Metric apply(StoreProvider storeProvider) {
- Metric allocation = new Metric();
- for (IQuota quota : storeProvider.getQuotaStore().fetchQuotas().values()) {
- allocation.accumulate(quota);
- }
- return allocation;
- }
- });
- }
-
- /**
- * Computes arbitrary resource aggregates based on a query, a filter, and a grouping function.
- *
- * @param query Query to select tasks for aggregation.
- * @param filter Filter to apply on query result tasks.
- * @param keyFunction Function to define aggregation groupings.
- * @param <K> Key type.
- * @return A map from the keys to their aggregates based on the tasks fetched.
- * @throws StorageException if there was a problem fetching tasks from storage.
- */
- public <K> Map<K, Metric> computeAggregates(
- Query.Builder query,
- Predicate<ITaskConfig> filter,
- Function<ITaskConfig, K> keyFunction) throws StorageException {
-
- LoadingCache<K, Metric> metrics = CacheBuilder.newBuilder()
- .build(new CacheLoader<K, Metric>() {
- @Override public Metric load(K key) {
- return new Metric();
- }
- });
- for (ITaskConfig task : Iterables.filter(getTasks(query), filter)) {
- metrics.getUnchecked(keyFunction.apply(task)).accumulate(task);
- }
- return metrics.asMap();
- }
-
- public enum MetricType {
- TOTAL_CONSUMED(Predicates.<ITaskConfig>alwaysTrue()),
- DEDICATED_CONSUMED(new Predicate<ITaskConfig>() {
- @Override public boolean apply(ITaskConfig task) {
- return ConfigurationManager.isDedicated(task);
- }
- }),
- QUOTA_CONSUMED(new Predicate<ITaskConfig>() {
- @Override public boolean apply(ITaskConfig task) {
- return task.isProduction();
- }
- }),
- FREE_POOL_CONSUMED(new Predicate<ITaskConfig>() {
- @Override public boolean apply(ITaskConfig task) {
- return !ConfigurationManager.isDedicated(task) && !task.isProduction();
- }
- });
-
- public final Predicate<ITaskConfig> filter;
-
- MetricType(Predicate<ITaskConfig> filter) {
- this.filter = filter;
- }
- }
-
- public static class GlobalMetric extends Metric {
- public final MetricType type;
-
- public GlobalMetric(MetricType type) {
- this.type = type;
- }
-
- @Override
- protected void accumulate(ITaskConfig task) {
- if (type.filter.apply(task)) {
- super.accumulate(task);
- }
- }
- }
-
- public static class Metric {
- private long cpu = 0;
- private long ramMb = 0;
- private long diskMb = 0;
-
- public Metric() {
- this.cpu = 0;
- this.ramMb = 0;
- this.diskMb = 0;
- }
-
- public Metric(Metric copy) {
- this.cpu = copy.cpu;
- this.ramMb = copy.ramMb;
- this.diskMb = copy.diskMb;
- }
-
- protected void accumulate(ITaskConfig task) {
- cpu += task.getNumCpus();
- ramMb += task.getRamMb();
- diskMb += task.getDiskMb();
- }
-
- protected void accumulate(IQuota quota) {
- cpu += quota.getNumCpus();
- ramMb += quota.getRamMb();
- diskMb += quota.getDiskMb();
- }
-
- public long getCpu() {
- return cpu;
- }
-
- public long getRamGb() {
- return ramMb / 1024;
- }
-
- public long getDiskGb() {
- return diskMb / 1024;
- }
- }
-}