You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2013/12/31 22:20:25 UTC
[32/51] [partial] Rename twitter* and com.twitter to apache and
org.apache directories to preserve all file history before the refactor.
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/state/ImmediateJobManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/ImmediateJobManager.java b/src/main/java/org/apache/aurora/scheduler/state/ImmediateJobManager.java
new file mode 100644
index 0000000..a085d7d
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/state/ImmediateJobManager.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.state;
+
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.configuration.SanitizedConfiguration;
+import com.twitter.aurora.scheduler.storage.Storage;
+import com.twitter.aurora.scheduler.storage.entities.IJobKey;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Job scheduler that accepts any job and executes it immediately.
+ */
+class ImmediateJobManager extends JobManager {
+
+ private static final Logger LOG = Logger.getLogger(ImmediateJobManager.class.getName());
+
+ private final StateManager stateManager;
+ private final Storage storage;
+
+ @Inject
+ ImmediateJobManager(StateManager stateManager, Storage storage) {
+ this.stateManager = checkNotNull(stateManager);
+ this.storage = checkNotNull(storage);
+ }
+
+ @Override
+ public String getUniqueKey() {
+ return "IMMEDIATE";
+ }
+
+ @Override
+ public boolean receiveJob(SanitizedConfiguration config) {
+ LOG.info("Launching " + config.getTaskConfigs().size() + " tasks.");
+ stateManager.insertPendingTasks(config.getTaskConfigs());
+ return true;
+ }
+
+ @Override
+ public boolean hasJob(final IJobKey jobKey) {
+ return !Storage.Util.consistentFetchTasks(storage, Query.jobScoped(jobKey).active()).isEmpty();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/state/JobFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/JobFilter.java b/src/main/java/org/apache/aurora/scheduler/state/JobFilter.java
new file mode 100644
index 0000000..3b8f5d8
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/state/JobFilter.java
@@ -0,0 +1,119 @@
+package com.twitter.aurora.scheduler.state;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+
+import com.twitter.aurora.scheduler.storage.entities.IJobConfiguration;
+import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
+
+/**
+ * An action that either accepts a configuration or rejects it with a reason.
+ */
+public interface JobFilter {
+ /**
+ * Accept the JobConfiguration or reject it with a reason.
+ *
+ * @param jobConfiguration The job configuration to filter.
+ * @return A result and the reason the result was reached.
+ */
+ JobFilterResult filter(IJobConfiguration jobConfiguration);
+
+ /**
+ * Accept the TaskConfig with the specified number of instances
+ * or reject it with a reason.
+ *
+ * @param template The task configuration to filter.
+ * @param instanceCount Number of instances to apply taskConfig to.
+ * @return A result and the reason the result was reached.
+ */
+ JobFilterResult filter(ITaskConfig template, int instanceCount);
+
+ /**
+ * An indication of whether a job passed a filter or not.
+ */
+ public static final class JobFilterResult {
+ private final boolean pass;
+ private final String reason;
+
+ private JobFilterResult(boolean pass, String reason) {
+ this.pass = pass;
+ this.reason = Preconditions.checkNotNull(reason);
+ }
+
+ /**
+ * Create a new result indicating the job has passed the filter.
+ *
+ * @return A result indicating the job has passed with a default reason.
+ * @see #fail(String)
+ */
+ public static JobFilterResult pass() {
+ return new JobFilterResult(true, "Accepted by filter.");
+ }
+
+ /**
+ * Create a new result indicating the job has passed the filter.
+ *
+ * @param reason A reason that the job has passed.
+ * @return A result indicating the job has passed because of the given reason.
+ * @throws NullPointerException if reason is {@code null}.
+ * @see #fail(String)
+ */
+ public static JobFilterResult pass(String reason) {
+ return new JobFilterResult(true, reason);
+ }
+
+ /**
+ * Create a new result indicating the job has failed the filter.
+ *
+ * @param reason A reason that the job has failed.
+ * @return A result indicating the job has failed because of the given reason.
+ * @throws NullPointerException if {@code reason} is {@code null}.
+ * @see #pass()
+ * @see #pass(String)
+ */
+ public static JobFilterResult fail(String reason) {
+ return new JobFilterResult(false, reason);
+ }
+
+ /**
+ * Indicates whether the result indicates a pass.
+ *
+ * @return {@code true} if the result indicates a pass.
+ */
+ public boolean isPass() {
+ return pass;
+ }
+
+ /**
+ * Indicates the reason for the result.
+ *
+ * @return The reason for the result.
+ */
+ public String getReason() {
+ return reason;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof JobFilterResult)) {
+ return false;
+ }
+ JobFilterResult that = (JobFilterResult) o;
+ return Objects.equal(pass, that.pass)
+ && Objects.equal(reason, that.reason);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(pass, reason);
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this)
+ .add("pass", pass)
+ .add("reason", reason)
+ .toString();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/state/JobManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/JobManager.java b/src/main/java/org/apache/aurora/scheduler/state/JobManager.java
new file mode 100644
index 0000000..4cb4ff6
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/state/JobManager.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.state;
+
+import java.util.Collections;
+
+import javax.inject.Inject;
+
+import com.twitter.aurora.scheduler.base.ScheduleException;
+import com.twitter.aurora.scheduler.configuration.SanitizedConfiguration;
+import com.twitter.aurora.scheduler.storage.entities.IJobConfiguration;
+import com.twitter.aurora.scheduler.storage.entities.IJobKey;
+
+/**
+ * Interface for a job manager. A job manager is responsible for deciding whether and when to
+ * trigger execution of a job.
+ */
+public abstract class JobManager {
+
+ // TODO(Bill Farner): Remove this. It is only used since the CronJobManager and SchedulerCoreImpl
+ // have a circular dependency.
+ @Inject
+ protected SchedulerCore schedulerCore;
+
+ /**
+ * Gets a key that uniquely identifies this manager type, to distinguish from other schedulers.
+ * These keys end up being persisted, so they must be considered permanently immutable.
+ *
+ * @return Job manager key.
+ */
+ public abstract String getUniqueKey();
+
+ /**
+ * Submits a job to the manager. The job may be submitted to the job runner before this method
+ * returns or at any point in the future. This method will return false if the manager will not
+ * execute the job.
+ *
+ * @param config The job to schedule.
+ * @return {@code true} If the manager accepted the job, {@code false} otherwise.
+ * @throws ScheduleException If there is a problem with scheduling the job.
+ */
+ public abstract boolean receiveJob(SanitizedConfiguration config) throws ScheduleException;
+
+ /**
+ * Fetches the configured jobs that this manager is storing.
+ *
+ * @return Jobs stored by this job manager.
+ */
+ // TODO(ksweeney): Consider adding a Map<JobKey, JobConfiguration> to complement this.
+ public Iterable<IJobConfiguration> getJobs() {
+ return Collections.emptyList();
+ }
+
+ /**
+ * Checks whether this manager is storing a job with the given key.
+ *
+ * @param jobKey Job key.
+ * @return {@code true} if the manager has a matching job, {@code false} otherwise.
+ */
+ public abstract boolean hasJob(IJobKey jobKey);
+
+ /**
+ * Instructs the manager to delete any jobs with the given key.
+ *
+ * @param jobKey Job key.
+ * @return {@code true} if a matching job was deleted.
+ */
+ public boolean deleteJob(IJobKey jobKey) {
+ // Optionally overridden by implementing class.
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/state/LockManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/LockManager.java b/src/main/java/org/apache/aurora/scheduler/state/LockManager.java
new file mode 100644
index 0000000..58d7b0e
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/state/LockManager.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.state;
+
+import com.google.common.base.Optional;
+
+import com.twitter.aurora.scheduler.storage.entities.ILock;
+import com.twitter.aurora.scheduler.storage.entities.ILockKey;
+
+/**
+ * Defines all {@link ILock} primitives like: acquire, release, validate.
+ */
+public interface LockManager {
+ /**
+ * Creates, saves and returns a new {@link ILock} with the specified {@link ILockKey}.
+ * This method is not re-entrant, i.e. attempting to acquire a lock with the
+ * same key would throw a {@link LockException}.
+ *
+ * @param lockKey A key uniquely identify the lock to be created.
+ * @param user Name of the user requesting a lock.
+ * @return A new ILock instance.
+ * @throws LockException In case the lock with specified key already exists.
+ */
+ ILock acquireLock(ILockKey lockKey, String user) throws LockException;
+
+ /**
+ * Releases (removes) the specified {@link ILock} from the system.
+ *
+ * @param lock {@link ILock} to remove from the system.
+ */
+ void releaseLock(ILock lock);
+
+ /**
+ * Verifies if the provided lock instance is identical to the one stored in the scheduler
+ * ONLY if the operation context represented by the {@link ILockKey} is in fact locked.
+ * No validation will be performed in case there is no correspondent scheduler lock
+ * found for the provided context.
+ *
+ * @param context Operation context to validate with the provided lock.
+ * @param heldLock Lock to validate.
+ * @throws LockException If provided lock does not exist or not identical to the stored one.
+ */
+ void validateIfLocked(ILockKey context, Optional<ILock> heldLock) throws LockException;
+
+ /**
+ * Thrown when {@link ILock} related operation failed.
+ */
+ class LockException extends Exception {
+ public LockException(String msg) {
+ super(msg);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java
new file mode 100644
index 0000000..4667f9f
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java
@@ -0,0 +1,130 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.state;
+
+import java.util.Date;
+
+import javax.inject.Inject;
+
+import com.google.common.base.Optional;
+
+import com.twitter.aurora.gen.Lock;
+import com.twitter.aurora.scheduler.base.JobKeys;
+import com.twitter.aurora.scheduler.storage.LockStore;
+import com.twitter.aurora.scheduler.storage.Storage;
+import com.twitter.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import com.twitter.aurora.scheduler.storage.Storage.MutateWork;
+import com.twitter.aurora.scheduler.storage.Storage.StoreProvider;
+import com.twitter.aurora.scheduler.storage.Storage.Work;
+import com.twitter.aurora.scheduler.storage.entities.ILock;
+import com.twitter.aurora.scheduler.storage.entities.ILockKey;
+import com.twitter.common.util.Clock;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Implements lock-related primitives required to provide mutual exclusion guarantees
+ * to the critical Scheduler state-mutating operations.
+ */
+class LockManagerImpl implements LockManager {
+ private final Storage storage;
+ private final Clock clock;
+ private final UUIDGenerator tokenGenerator;
+
+ @Inject
+ LockManagerImpl(Storage storage, Clock clock, UUIDGenerator tokenGenerator) {
+ this.storage = checkNotNull(storage);
+ this.clock = checkNotNull(clock);
+ this.tokenGenerator = checkNotNull(tokenGenerator);
+ }
+
+ @Override
+ public ILock acquireLock(final ILockKey lockKey, final String user) throws LockException {
+ return storage.write(new MutateWork<ILock, LockException>() {
+ @Override public ILock apply(Storage.MutableStoreProvider storeProvider)
+ throws LockException {
+
+ LockStore.Mutable lockStore = storeProvider.getLockStore();
+ Optional<ILock> existingLock = lockStore.fetchLock(lockKey);
+
+ if (existingLock.isPresent()) {
+ throw new LockException(String.format(
+ "Operation for: %s is already in progress. Started at: %s. Current owner: %s.",
+ formatLockKey(lockKey),
+ new Date(existingLock.get().getTimestampMs()).toString(),
+ existingLock.get().getUser()));
+ }
+
+ ILock lock = ILock.build(new Lock()
+ .setKey(lockKey.newBuilder())
+ .setToken(tokenGenerator.createNew().toString())
+ .setTimestampMs(clock.nowMillis())
+ .setUser(user));
+
+ lockStore.saveLock(lock);
+ return lock;
+ }
+ });
+ }
+
+ @Override
+ public void releaseLock(final ILock lock) {
+ storage.write(new MutateWork.NoResult.Quiet() {
+ @Override public void execute(MutableStoreProvider storeProvider) {
+ storeProvider.getLockStore().removeLock(lock.getKey());
+ }
+ });
+ }
+
+ @Override
+ public synchronized void validateIfLocked(final ILockKey context, Optional<ILock> heldLock)
+ throws LockException {
+
+ Optional<ILock> stored = storage.consistentRead(new Work.Quiet<Optional<ILock>>() {
+ @Override public Optional<ILock> apply(StoreProvider storeProvider) {
+ return storeProvider.getLockStore().fetchLock(context);
+ }
+ });
+
+ // The implementation below assumes the following use cases:
+ // +-----------+-----------------+----------+
+ // | eq | held | not held |
+ // +-----------+-----------------+----------+
+ // |stored |(stored == held)?| invalid |
+ // +-----------+-----------------+----------+
+ // |not stored | invalid | valid |
+ // +-----------+-----------------+----------+
+ if (!stored.equals(heldLock)) {
+ if (stored.isPresent()) {
+ throw new LockException(String.format(
+ "Unable to perform operation for: %s. Use override/cancel option.",
+ formatLockKey(context)));
+ } else if (heldLock.isPresent()) {
+ throw new LockException(
+ String.format("Invalid operation context: %s", formatLockKey(context)));
+ }
+ }
+ }
+
+ private static String formatLockKey(ILockKey lockKey) {
+ switch (lockKey.getSetField()) {
+ case JOB:
+ return JobKeys.toPath(lockKey.getJob());
+ default:
+ return "Unknown lock key type: " + lockKey.getSetField();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java b/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
new file mode 100644
index 0000000..d447f52
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
@@ -0,0 +1,302 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.state;
+
+import java.util.Set;
+
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import com.google.common.eventbus.Subscribe;
+
+import com.twitter.aurora.gen.HostAttributes;
+import com.twitter.aurora.gen.HostStatus;
+import com.twitter.aurora.gen.MaintenanceMode;
+import com.twitter.aurora.gen.ScheduleStatus;
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.base.Tasks;
+import com.twitter.aurora.scheduler.events.PubsubEvent;
+import com.twitter.aurora.scheduler.events.PubsubEvent.EventSubscriber;
+import com.twitter.aurora.scheduler.events.PubsubEvent.StorageStarted;
+import com.twitter.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+import com.twitter.aurora.scheduler.storage.AttributeStore;
+import com.twitter.aurora.scheduler.storage.Storage;
+import com.twitter.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import com.twitter.aurora.scheduler.storage.Storage.MutateWork;
+import com.twitter.aurora.scheduler.storage.Storage.StoreProvider;
+import com.twitter.aurora.scheduler.storage.Storage.Work;
+import com.twitter.common.base.Closure;
+import com.twitter.common.base.Closures;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import static com.twitter.aurora.gen.MaintenanceMode.DRAINED;
+import static com.twitter.aurora.gen.MaintenanceMode.DRAINING;
+
+/**
+ * Logic that puts hosts into maintenance mode, and triggers draining of hosts upon request.
+ * All state-changing functions return their results. Additionally, all state-changing functions
+ * will ignore requests to change state of unknown hosts and subsequently omit these hosts from
+ * return values.
+ */
+public interface MaintenanceController {
+
+ /**
+ * Places hosts in maintenance mode.
+ * Hosts in maintenance mode are less-preferred for scheduling.
+ * No change will be made for hosts that are not recognized, and unrecognized hosts will not be
+ * included in the result.
+ *
+ * @param hosts Hosts to put into maintenance mode.
+ * @return The adjusted state of the hosts.
+ */
+ Set<HostStatus> startMaintenance(Set<String> hosts);
+
+ /**
+ * Initiate a drain of all active tasks on {@code hosts}.
+ *
+ * @param hosts Hosts to drain.
+ * @return The adjusted state of the hosts. Hosts without any active tasks will be immediately
+ * moved to DRAINED.
+ */
+ Set<HostStatus> drain(Set<String> hosts);
+
+ /**
+ * Fetches the current maintenance mode of {$code host}.
+ *
+ * @param host Host to fetch state for.
+ * @return Maintenance mode of host, {@link MaintenanceMode#NONE} if the host is not known.
+ */
+ MaintenanceMode getMode(String host);
+
+ /**
+ * Fetches the current state of {@code hosts}.
+ *
+ * @param hosts Hosts to fetch state for.
+ * @return The state of the hosts.
+ */
+ Set<HostStatus> getStatus(Set<String> hosts);
+
+ /**
+ * Moves {@code hosts} out of maintenance mode, returning them to mode NONE.
+ *
+ * @param hosts Hosts to move out of maintenance mode.
+ * @return The adjusted state of the hosts.
+ */
+ Set<HostStatus> endMaintenance(Set<String> hosts);
+
+ class MaintenanceControllerImpl implements MaintenanceController, EventSubscriber {
+ private final Storage storage;
+ private final StateManager stateManager;
+ private final Closure<PubsubEvent> eventSink;
+
+ @Inject
+ public MaintenanceControllerImpl(
+ Storage storage,
+ StateManager stateManager,
+ Closure<PubsubEvent> eventSink) {
+
+ this.storage = checkNotNull(storage);
+ this.stateManager = checkNotNull(stateManager);
+ this.eventSink = checkNotNull(eventSink);
+ }
+
+ private Set<HostStatus> watchDrainingTasks(
+ MutableStoreProvider store,
+ Set<String> hosts,
+ Closure<Query.Builder> callback) {
+
+ Set<String> emptyHosts = Sets.newHashSet();
+ for (String host : hosts) {
+ // If there are no tasks on the host, immediately transition to DRAINED.
+ Query.Builder query = Query.slaveScoped(host).active();
+ Set<String> activeTasks = FluentIterable.from(store.getTaskStore().fetchTasks(query))
+ .transform(Tasks.SCHEDULED_TO_ID)
+ .toSet();
+ if (activeTasks.isEmpty()) {
+ emptyHosts.add(host);
+ } else {
+ callback.execute(query);
+ }
+ }
+
+ return ImmutableSet.<HostStatus>builder()
+ .addAll(setMaintenanceMode(store, emptyHosts, DRAINED))
+ .addAll(setMaintenanceMode(store, Sets.difference(hosts, emptyHosts), DRAINING))
+ .build();
+ }
+
+ private Set<HostStatus> watchDrainingTasks(MutableStoreProvider store, Set<String> hosts) {
+ return watchDrainingTasks(store, hosts, Closures.<Query.Builder>noop());
+ }
+
+ private static final Predicate<HostAttributes> IS_DRAINING = new Predicate<HostAttributes>() {
+ @Override public boolean apply(HostAttributes attributes) {
+ return DRAINING == attributes.getMode();
+ }
+ };
+
+ /**
+ * Notifies the MaintenanceController that storage has started, and maintenance statuses are
+ * ready to be loaded.
+ *
+ * @param started Event.
+ */
+ @Subscribe
+ public void storageStarted(StorageStarted started) {
+ storage.write(new MutateWork.NoResult.Quiet() {
+ @Override protected void execute(MutableStoreProvider storeProvider) {
+ Set<String> drainingHosts =
+ FluentIterable.from(storeProvider.getAttributeStore().getHostAttributes())
+ .filter(IS_DRAINING)
+ .transform(HOST_NAME)
+ .toSet();
+ watchDrainingTasks(storeProvider, drainingHosts);
+ }
+ });
+ }
+
+ /**
+ * Notifies the MaintenanceController that a task has changed state
+ *
+ * @param change Event
+ */
+ @Subscribe
+ public void taskChangedState(final TaskStateChange change) {
+ if (Tasks.isTerminated(change.getNewState())) {
+ final String host = change.getTask().getAssignedTask().getSlaveHost();
+ storage.write(new MutateWork.NoResult.Quiet() {
+ @Override public void execute(MutableStoreProvider store) {
+ // If the task _was_ associated with a draining host, and it was the last task on the
+ // host.
+ Optional<HostAttributes> attributes = store.getAttributeStore().getHostAttributes(host);
+ if (attributes.isPresent() && attributes.get().getMode() == DRAINING) {
+ Query.Builder builder = Query.slaveScoped(host).active();
+ if (store.getTaskStore().fetchTasks(builder).isEmpty()) {
+ setMaintenanceMode(store, ImmutableSet.of(host), DRAINED);
+ }
+ }
+ }
+ });
+ }
+ }
+
+ @Override
+ public Set<HostStatus> startMaintenance(final Set<String> hosts) {
+ return storage.write(new MutateWork.Quiet<Set<HostStatus>>() {
+ @Override public Set<HostStatus> apply(MutableStoreProvider storeProvider) {
+ return setMaintenanceMode(storeProvider, hosts, MaintenanceMode.SCHEDULED);
+ }
+ });
+ }
+
+ @VisibleForTesting
+ static final Optional<String> DRAINING_MESSAGE =
+ Optional.of("Draining machine for maintenance.");
+
+ @Override
+ public Set<HostStatus> drain(final Set<String> hosts) {
+ return storage.write(new MutateWork.Quiet<Set<HostStatus>>() {
+ @Override public Set<HostStatus> apply(MutableStoreProvider store) {
+ return watchDrainingTasks(store, hosts, new Closure<Query.Builder>() {
+ @Override public void execute(Query.Builder query) {
+ stateManager.changeState(query, ScheduleStatus.RESTARTING, DRAINING_MESSAGE);
+ }
+ });
+ }
+ });
+ }
+
+ private static final Function<HostAttributes, String> HOST_NAME =
+ new Function<HostAttributes, String>() {
+ @Override public String apply(HostAttributes attributes) {
+ return attributes.getHost();
+ }
+ };
+
+ private static final Function<HostAttributes, HostStatus> ATTRS_TO_STATUS =
+ new Function<HostAttributes, HostStatus>() {
+ @Override public HostStatus apply(HostAttributes attributes) {
+ return new HostStatus().setHost(attributes.getHost()).setMode(attributes.getMode());
+ }
+ };
+
+ private static final Function<HostStatus, MaintenanceMode> GET_MODE =
+ new Function<HostStatus, MaintenanceMode>() {
+ @Override public MaintenanceMode apply(HostStatus status) {
+ return status.getMode();
+ }
+ };
+
+ @Override
+ public MaintenanceMode getMode(final String host) {
+ return storage.weaklyConsistentRead(new Work.Quiet<MaintenanceMode>() {
+ @Override public MaintenanceMode apply(StoreProvider storeProvider) {
+ return storeProvider.getAttributeStore().getHostAttributes(host)
+ .transform(ATTRS_TO_STATUS)
+ .transform(GET_MODE)
+ .or(MaintenanceMode.NONE);
+ }
+ });
+ }
+
+ @Override
+ public Set<HostStatus> getStatus(final Set<String> hosts) {
+ return storage.weaklyConsistentRead(new Work.Quiet<Set<HostStatus>>() {
+ @Override public Set<HostStatus> apply(StoreProvider storeProvider) {
+ // Warning - this is filtering _all_ host attributes. If using this to frequently query
+ // for a small set of hosts, a getHostAttributes variant should be added.
+ return FluentIterable.from(storeProvider.getAttributeStore().getHostAttributes())
+ .filter(Predicates.compose(Predicates.in(hosts), HOST_NAME))
+ .transform(ATTRS_TO_STATUS).toSet();
+ }
+ });
+ }
+
+ @Override
+ public Set<HostStatus> endMaintenance(final Set<String> hosts) {
+ return storage.write(new MutateWork.Quiet<Set<HostStatus>>() {
+ @Override public Set<HostStatus> apply(MutableStoreProvider storeProvider) {
+ return setMaintenanceMode(storeProvider, hosts, MaintenanceMode.NONE);
+ }
+ });
+ }
+
+ private Set<HostStatus> setMaintenanceMode(
+ MutableStoreProvider storeProvider,
+ Set<String> hosts,
+ MaintenanceMode mode) {
+
+ AttributeStore.Mutable store = storeProvider.getAttributeStore();
+ ImmutableSet.Builder<HostStatus> statuses = ImmutableSet.builder();
+ for (String host : hosts) {
+ if (store.setMaintenanceMode(host, mode)) {
+ HostStatus status = new HostStatus().setHost(host).setMode(mode);
+ eventSink.execute(new PubsubEvent.HostMaintenanceStateChange(status.deepCopy()));
+ statuses.add(status);
+ }
+ }
+ return statuses.build();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java b/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java
new file mode 100644
index 0000000..2db01c0
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java
@@ -0,0 +1,127 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.state;
+
+import java.util.Set;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
+
+import com.twitter.aurora.gen.ScheduleStatus;
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.base.ScheduleException;
+import com.twitter.aurora.scheduler.configuration.ConfigurationManager.TaskDescriptionException;
+import com.twitter.aurora.scheduler.configuration.SanitizedConfiguration;
+import com.twitter.aurora.scheduler.storage.entities.IAssignedTask;
+import com.twitter.aurora.scheduler.storage.entities.IJobKey;
+import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
+
+/**
+ * Scheduling core, stores scheduler state and makes decisions about which tasks to schedule when
+ * a resource offer is made.
+ *
+ * When a job is submitted to the scheduler core, it will store the job configuration and offer
+ * the job to all configured scheduler modules, which are responsible for triggering execution of
+ * the job. Until a job is triggered by a scheduler module, it is retained in the scheduler core
+ * in the PENDING state.
+ */
+public interface SchedulerCore {
+
+ /**
+ * Creates a new job, whose tasks will become candidates for scheduling.
+ *
+ * @param sanitizedConfiguration The configuration of the job to create tasks for.
+ * @throws ScheduleException If there was an error scheduling a cron job.
+ * @throws TaskDescriptionException If an invalid task description was given.
+ */
+ void createJob(SanitizedConfiguration sanitizedConfiguration)
+ throws ScheduleException, TaskDescriptionException;
+
+ /**
+ * Adds new instances specified by the instances set.
+ * <p>
+ * Provided instance IDs should be disjoint from the instance IDs active in the job.
+ *
+ * @param jobKey IJobKey identifying the parent job.
+ * @param instanceIds Set of instance IDs to be added to the job.
+ * @param config ITaskConfig to use with new instances.
+ * @throws ScheduleException If any of the existing instance IDs already exist.
+ */
+ void addInstances(IJobKey jobKey, ImmutableSet<Integer> instanceIds, ITaskConfig config)
+ throws ScheduleException;
+
+ /**
+ * Validates the new job configuration passes resource filters.
+ *
+ * @param sanitizedConfiguration Job configuration to validate.
+ * @throws ScheduleException If job resources do not pass filters.
+ */
+ void validateJobResources(SanitizedConfiguration sanitizedConfiguration) throws ScheduleException;
+
+ /**
+ * Starts a cron job immediately.
+ *
+ * @param jobKey Job key.
+ * @throws ScheduleException If the specified job does not exist, or is not a cron job.
+ * @throws TaskDescriptionException If the parsing of the job failed.
+ */
+ void startCronJob(IJobKey jobKey) throws ScheduleException, TaskDescriptionException;
+
+ /**
+ * Assigns a new state to tasks.
+ *
+ * @param query Builder for a query to identify tasks
+ * @param status The new state of the tasks.
+ * @param message Additional information about the state transition.
+ */
+ void setTaskStatus(Query.Builder query, ScheduleStatus status, Optional<String> message);
+
+ /**
+ * Kills a specific set of tasks.
+ *
+ * @param query Builder for a query to identify tasks
+ * @param user Name of the user performing the kill.
+ * @throws ScheduleException If a problem occurs with the kill request.
+ */
+ void killTasks(Query.Builder query, String user) throws ScheduleException;
+
+ /**
+ * Initiates a restart of shards within an active job.
+ *
+ * @param jobKey Key of job to be restarted.
+ * @param shards Shards to be restarted.
+ * @param requestingUser User performing the restart action.
+ * @throws ScheduleException If there are no matching active shards.
+ */
+ void restartShards(IJobKey jobKey, Set<Integer> shards, String requestingUser)
+ throws ScheduleException;
+
+ /**
+ * Preempts a task in favor of another.
+ *
+ * @param task Task being preempted.
+ * @param preemptingTask Task we are preempting in favor of.
+ * @throws ScheduleException If a problem occurs while trying to perform the preemption.
+ */
+ void preemptTask(IAssignedTask task, IAssignedTask preemptingTask) throws ScheduleException;
+
+ /**
+ * Indicates to the scheduler that tasks were deleted on the assigned host.
+ *
+ * @param taskIds IDs of tasks that were deleted.
+ */
+ void tasksDeleted(Set<String> taskIds);
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java b/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
new file mode 100644
index 0000000..cf74e49
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
@@ -0,0 +1,328 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.state;
+
+
+import java.util.Set;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Functions;
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import com.twitter.aurora.gen.ScheduleStatus;
+import com.twitter.aurora.scheduler.TaskIdGenerator;
+import com.twitter.aurora.scheduler.base.JobKeys;
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.base.ScheduleException;
+import com.twitter.aurora.scheduler.base.Tasks;
+import com.twitter.aurora.scheduler.configuration.ConfigurationManager.TaskDescriptionException;
+import com.twitter.aurora.scheduler.configuration.SanitizedConfiguration;
+import com.twitter.aurora.scheduler.storage.Storage;
+import com.twitter.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import com.twitter.aurora.scheduler.storage.Storage.MutateWork;
+import com.twitter.aurora.scheduler.storage.entities.IAssignedTask;
+import com.twitter.aurora.scheduler.storage.entities.IJobConfiguration;
+import com.twitter.aurora.scheduler.storage.entities.IJobKey;
+import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
+import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
+import com.twitter.common.args.Arg;
+import com.twitter.common.args.CmdLine;
+import com.twitter.common.args.constraints.Positive;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import static com.twitter.aurora.gen.ScheduleStatus.KILLING;
+import static com.twitter.aurora.gen.ScheduleStatus.RESTARTING;
+import static com.twitter.aurora.scheduler.base.Tasks.ACTIVE_STATES;
+
+/**
+ * Implementation of the scheduler core.
+ */
+class SchedulerCoreImpl implements SchedulerCore {
+ @Positive
+ @CmdLine(name = "max_tasks_per_job", help = "Maximum number of allowed tasks in a single job.")
+ public static final Arg<Integer> MAX_TASKS_PER_JOB = Arg.create(4000);
+
+ private static final Logger LOG = Logger.getLogger(SchedulerCoreImpl.class.getName());
+
+ private final Storage storage;
+
+ private final CronJobManager cronScheduler;
+
+ // Schedulers that are responsible for triggering execution of jobs.
+ private final ImmutableList<JobManager> jobManagers;
+
+ // TODO(Bill Farner): Avoid using StateManagerImpl.
+ // State manager handles persistence of task modifications and state transitions.
+ private final StateManagerImpl stateManager;
+
+ private final TaskIdGenerator taskIdGenerator;
+ private final JobFilter jobFilter;
+
+ /**
+ * Creates a new core scheduler.
+ *
+ * @param storage Backing store implementation.
+ * @param cronScheduler Cron scheduler.
+ * @param immediateScheduler Immediate scheduler.
+ * @param stateManager Persistent state manager.
+ * @param taskIdGenerator Task ID generator.
+ * @param jobFilter Job filter.
+ */
+ @Inject
+ public SchedulerCoreImpl(
+ Storage storage,
+ CronJobManager cronScheduler,
+ ImmediateJobManager immediateScheduler,
+ StateManagerImpl stateManager,
+ TaskIdGenerator taskIdGenerator,
+ JobFilter jobFilter) {
+
+ this.storage = checkNotNull(storage);
+
+ // The immediate scheduler will accept any job, so it's important that other schedulers are
+ // placed first.
+ this.jobManagers = ImmutableList.of(cronScheduler, immediateScheduler);
+ this.cronScheduler = cronScheduler;
+ this.stateManager = checkNotNull(stateManager);
+ this.taskIdGenerator = checkNotNull(taskIdGenerator);
+ this.jobFilter = checkNotNull(jobFilter);
+ }
+
+ private boolean hasActiveJob(IJobConfiguration job) {
+ return Iterables.any(jobManagers, managerHasJob(job));
+ }
+
+ @Override
+ public synchronized void tasksDeleted(Set<String> taskIds) {
+ setTaskStatus(Query.taskScoped(taskIds), ScheduleStatus.UNKNOWN, Optional.<String>absent());
+ }
+
+ @Override
+ public synchronized void createJob(SanitizedConfiguration sanitizedConfiguration)
+ throws ScheduleException {
+
+ IJobConfiguration job = sanitizedConfiguration.getJobConfig();
+ if (hasActiveJob(job)) {
+ throw new ScheduleException("Job already exists: " + JobKeys.toPath(job));
+ }
+
+ runJobFilters(job.getKey(), job.getTaskConfig(), job.getInstanceCount(), false);
+
+ boolean accepted = false;
+ for (final JobManager manager : jobManagers) {
+ if (manager.receiveJob(sanitizedConfiguration)) {
+ LOG.info("Job accepted by manager: " + manager.getUniqueKey());
+ accepted = true;
+ break;
+ }
+ }
+
+ if (!accepted) {
+ LOG.severe("Job was not accepted by any of the configured schedulers, discarding.");
+ LOG.severe("Discarded job: " + job);
+ throw new ScheduleException("Job not accepted, discarding.");
+ }
+ }
+
+ // This number is derived from the maximum file name length limit on most UNIX systems, less
+ // the number of characters we've observed being added by mesos for the executor ID, prefix, and
+ // delimiters.
+ @VisibleForTesting
+ static final int MAX_TASK_ID_LENGTH = 255 - 90;
+
+ // TODO(maximk): Consider a better approach to quota checking. MESOS-4476.
+ private void runJobFilters(IJobKey jobKey, ITaskConfig task, int count, boolean incremental)
+ throws ScheduleException {
+
+ int instanceCount = count;
+ if (incremental) {
+ instanceCount +=
+ Storage.Util.weaklyConsistentFetchTasks(storage, Query.jobScoped(jobKey).active()).size();
+ }
+
+ // TODO(maximk): This is a short-term hack to stop the bleeding from
+ // https://issues.apache.org/jira/browse/MESOS-691
+ if (taskIdGenerator.generate(task, instanceCount).length() > MAX_TASK_ID_LENGTH) {
+ throw new ScheduleException(
+ "Task ID is too long, please shorten your role or job name.");
+ }
+
+ JobFilter.JobFilterResult filterResult = jobFilter.filter(task, instanceCount);
+ // TODO(maximk): Consider deprecating JobFilterResult in favor of custom exception.
+ if (!filterResult.isPass()) {
+ throw new ScheduleException(filterResult.getReason());
+ }
+
+ if (instanceCount > MAX_TASKS_PER_JOB.get()) {
+ throw new ScheduleException("Job exceeds task limit of " + MAX_TASKS_PER_JOB.get());
+ }
+ }
+
+ @Override
+ public void validateJobResources(SanitizedConfiguration sanitizedConfiguration)
+ throws ScheduleException {
+
+ IJobConfiguration job = sanitizedConfiguration.getJobConfig();
+ runJobFilters(job.getKey(), job.getTaskConfig(), job.getInstanceCount(), false);
+ }
+
+ @Override
+ public void addInstances(
+ final IJobKey jobKey,
+ final ImmutableSet<Integer> instanceIds,
+ final ITaskConfig config) throws ScheduleException {
+
+ runJobFilters(jobKey, config, instanceIds.size(), true);
+ storage.write(new MutateWork.NoResult<ScheduleException>() {
+ @Override
+ protected void execute(MutableStoreProvider storeProvider)
+ throws ScheduleException {
+
+ ImmutableSet<IScheduledTask> tasks =
+ storeProvider.getTaskStore().fetchTasks(Query.jobScoped(jobKey).active());
+
+ Set<Integer> existingInstanceIds =
+ FluentIterable.from(tasks).transform(Tasks.SCHEDULED_TO_INSTANCE_ID).toSet();
+ if (!Sets.intersection(existingInstanceIds, instanceIds).isEmpty()) {
+ throw new ScheduleException("Instance ID collision detected.");
+ }
+
+ stateManager.insertPendingTasks(Maps.asMap(instanceIds, Functions.constant(config)));
+ }
+ });
+ }
+
+ @Override
+ public synchronized void startCronJob(IJobKey jobKey)
+ throws ScheduleException, TaskDescriptionException {
+
+ checkNotNull(jobKey);
+
+ if (!cronScheduler.hasJob(jobKey)) {
+ throw new ScheduleException("Cron job does not exist for " + JobKeys.toPath(jobKey));
+ }
+
+ cronScheduler.startJobNow(jobKey);
+ }
+
+ /**
+ * Creates a predicate that will determine whether a job manager has a job matching a job key.
+ *
+ * @param job Job to match.
+ * @return A new predicate matching the job owner and name given.
+ */
+ private static Predicate<JobManager> managerHasJob(final IJobConfiguration job) {
+ return new Predicate<JobManager>() {
+ @Override public boolean apply(JobManager manager) {
+ return manager.hasJob(job.getKey());
+ }
+ };
+ }
+
+ @Override
+ public synchronized void setTaskStatus(
+ Query.Builder query,
+ final ScheduleStatus status,
+ Optional<String> message) {
+
+ checkNotNull(query);
+ checkNotNull(status);
+
+ stateManager.changeState(query, status, message);
+ }
+
+ @Override
+ public synchronized void killTasks(Query.Builder query, String user) throws ScheduleException {
+ checkNotNull(query);
+ LOG.info("Killing tasks matching " + query);
+
+ boolean jobDeleted = false;
+
+ if (Query.isOnlyJobScoped(query)) {
+ // If this looks like a query for all tasks in a job, instruct the scheduler modules to
+ // delete the job.
+ IJobKey jobKey = JobKeys.from(query).get();
+ for (JobManager manager : jobManagers) {
+ if (manager.deleteJob(jobKey)) {
+ jobDeleted = true;
+ }
+ }
+ }
+
+ // Unless statuses were specifically supplied, only attempt to kill active tasks.
+ Query.Builder taskQuery = query.get().isSetStatuses() ? query.byStatus(ACTIVE_STATES) : query;
+
+ int tasksAffected =
+ stateManager.changeState(taskQuery, KILLING, Optional.of("Killed by " + user));
+ if (!jobDeleted && (tasksAffected == 0)) {
+ throw new ScheduleException("No jobs to kill");
+ }
+ }
+
+ @Override
+ public void restartShards(
+ IJobKey jobKey,
+ final Set<Integer> shards,
+ final String requestingUser) throws ScheduleException {
+
+ if (!JobKeys.isValid(jobKey)) {
+ throw new ScheduleException("Invalid job key: " + jobKey);
+ }
+
+ if (shards.isEmpty()) {
+ throw new ScheduleException("At least one shard must be specified.");
+ }
+
+ final Query.Builder query = Query.instanceScoped(jobKey, shards).active();
+ storage.write(new MutateWork.NoResult<ScheduleException>() {
+ @Override protected void execute(MutableStoreProvider storeProvider)
+ throws ScheduleException {
+
+ Set<IScheduledTask> matchingTasks = storeProvider.getTaskStore().fetchTasks(query);
+ if (matchingTasks.size() != shards.size()) {
+ throw new ScheduleException("Not all requested shards are active.");
+ }
+ LOG.info("Restarting shards matching " + query);
+ stateManager.changeState(
+ Query.taskScoped(Tasks.ids(matchingTasks)),
+ RESTARTING,
+ Optional.of("Restarted by " + requestingUser));
+ }
+ });
+ }
+
+
+ @Override
+ public synchronized void preemptTask(IAssignedTask task, IAssignedTask preemptingTask) {
+ checkNotNull(task);
+ checkNotNull(preemptingTask);
+ // TODO(William Farner): Throw SchedulingException if either task doesn't exist, etc.
+
+ stateManager.changeState(Query.taskScoped(task.getTaskId()), ScheduleStatus.PREEMPTING,
+ Optional.of("Preempting in favor of " + preemptingTask.getTaskId()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/state/SideEffectStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/SideEffectStorage.java b/src/main/java/org/apache/aurora/scheduler/state/SideEffectStorage.java
new file mode 100644
index 0000000..23ffff9
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/state/SideEffectStorage.java
@@ -0,0 +1,169 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.state;
+
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import com.twitter.aurora.scheduler.events.PubsubEvent;
+import com.twitter.aurora.scheduler.storage.Storage;
+import com.twitter.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import com.twitter.aurora.scheduler.storage.Storage.MutateWork;
+import com.twitter.aurora.scheduler.storage.Storage.StorageException;
+import com.twitter.aurora.scheduler.storage.Storage.Work;
+import com.twitter.common.base.Closure;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Wrapper around the persistent storage and mutable state.
+ */
+class SideEffectStorage {
+
+ private final Queue<PubsubEvent> events = Lists.newLinkedList();
+ @VisibleForTesting
+ Queue<PubsubEvent> getEvents() {
+ return events;
+ }
+
+ private AtomicBoolean inOperation = new AtomicBoolean(false);
+
+ private final Storage storage;
+ private final OperationFinalizer operationFinalizer;
+ private final Closure<PubsubEvent> taskEventSink;
+
+ interface OperationFinalizer {
+ /**
+ * Performs any work necessary to complete the operation.
+ * This is executed in the context of a write operation, immediately after the work
+ * executes normally.
+ * NOTE: At present, this is executed for every nesting level of operations, rather than
+ * at the completion of the top-level operation.
+ * See comment in {@link #SideEffectStorage#executeSideEffectsAfter(SideEffectWork)}
+ * for more detail.
+ *
+ * @param work Work to finalize.
+ * @param storeProvider Mutable store reference.
+ */
+ void finalize(SideEffectWork<?, ?> work, MutableStoreProvider storeProvider);
+ }
+
+ SideEffectStorage(
+ Storage storage,
+ OperationFinalizer operationFinalizer,
+ Closure<PubsubEvent> taskEventSink) {
+
+ this.storage = checkNotNull(storage);
+ this.operationFinalizer = checkNotNull(operationFinalizer);
+ this.taskEventSink = checkNotNull(taskEventSink);
+ }
+
+ /**
+ * Perform a unit of work in a mutating operation. This supports nesting/reentrancy.
+ *
+ * @param work Work to perform.
+ * @param <T> Work return type
+ * @param <E> Work exception type.
+ * @return The work return value.
+ * @throws E The work exception.
+ */
+ <T, E extends Exception> T write(SideEffectWork<T, E> work) throws E {
+ return storage.write(executeSideEffectsAfter(work));
+ }
+
+ <T, E extends Exception> T consistentRead(Work<T, E> work) throws StorageException, E {
+ return storage.consistentRead(work);
+ }
+
+ /**
+ * Work that has side effects external to the storage system.
+ * Work may add side effect and pubsub events, which will be executed/sent upon normal
+ * completion of the operation.
+ *
+ * @param <T> Work return type.
+ * @param <E> Work exception type.
+ */
+ abstract class SideEffectWork<T, E extends Exception> implements MutateWork<T, E> {
+ protected final void addTaskEvent(PubsubEvent notice) {
+ Preconditions.checkState(inOperation.get());
+ events.add(Preconditions.checkNotNull(notice));
+ }
+ }
+
+ /**
+ * Work with side effects which does not throw checked exceptions.
+ *
+ * @param <T> Work return type.
+ */
+ abstract class QuietSideEffectWork<T> extends SideEffectWork<T, RuntimeException> {
+ }
+
+ /**
+ * Work with side effects that does not have a return value.
+ *
+ * @param <E> Work exception type.
+ */
+ abstract class NoResultSideEffectWork<E extends Exception> extends SideEffectWork<Void, E> {
+
+ @Override public final Void apply(MutableStoreProvider storeProvider) throws E {
+ execute(storeProvider);
+ return null;
+ }
+
+ abstract void execute(MutableStoreProvider storeProvider) throws E;
+ }
+
+ /**
+ * Work with side effects which does not throw checked exceptions or have a return
+ * value.
+ */
+ abstract class NoResultQuietSideEffectWork extends NoResultSideEffectWork<RuntimeException> {
+ }
+
+ private <T, E extends Exception> MutateWork<T, E> executeSideEffectsAfter(
+ final SideEffectWork<T, E> work) {
+
+ return new MutateWork<T, E>() {
+ @Override public T apply(MutableStoreProvider storeProvider) throws E {
+ boolean topLevelOperation = inOperation.compareAndSet(false, true);
+
+ try {
+ T result = work.apply(storeProvider);
+
+ // TODO(William Farner): Maintaining this since it matches prior behavior, but this
+ // seems wrong. Double-check whether this is necessary, or if only the top-level
+ // operation should be executing the finalizer. Update doc on OperationFinalizer
+ // once this is assessed.
+ operationFinalizer.finalize(work, storeProvider);
+ if (topLevelOperation) {
+ while (!events.isEmpty()) {
+ taskEventSink.execute(events.remove());
+ }
+ }
+ return result;
+ } finally {
+ if (topLevelOperation) {
+ inOperation.set(false);
+ }
+ }
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/state/StateManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateManager.java b/src/main/java/org/apache/aurora/scheduler/state/StateManager.java
new file mode 100644
index 0000000..099ec70
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateManager.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.state;
+
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.base.Optional;
+
+import org.apache.mesos.Protos.SlaveID;
+
+import com.twitter.aurora.gen.ScheduleStatus;
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.storage.entities.IAssignedTask;
+import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
+
+/**
+ * Thin interface for the state manager.
+ */
+public interface StateManager {
+
+ /**
+ * Performs a simple state change, transitioning all tasks matching a query to the given
+ * state and applying the given audit message.
+ * TODO(William Farner): Consider removing the return value.
+ *
+ * @param query Builder of the query to perform, the results of which will be modified.
+ * @param newState State to move the resulting tasks into.
+ * @param auditMessage Audit message to apply along with the state change.
+ * @return the number of successful state changes.
+ */
+ int changeState(Query.Builder query, ScheduleStatus newState, Optional<String> auditMessage);
+
+ /**
+ * Assigns a task to a specific slave.
+ * This will modify the task record to reflect the host assignment and return the updated record.
+ *
+ * @param taskId ID of the task to mutate.
+ * @param slaveHost Host name that the task is being assigned to.
+ * @param slaveId ID of the slave that the task is being assigned to.
+ * @param assignedPorts Ports on the host that are being assigned to the task.
+ * @return The updated task record, or {@code null} if the task was not found.
+ */
+ IAssignedTask assignTask(
+ String taskId,
+ String slaveHost,
+ SlaveID slaveId,
+ Set<Integer> assignedPorts);
+
+ /**
+ * Inserts new tasks into the store. Tasks will immediately move into PENDING and will be eligible
+ * for scheduling.
+ *
+ * @param tasks Tasks to insert, mapped by their instance IDs.
+ */
+ void insertPendingTasks(Map<Integer, ITaskConfig> tasks);
+
+ /**
+ * Deletes records of tasks from the task store.
+ * This will not perform any state checking or state transitions, but will immediately remove
+ * the tasks from the store. It will also silently ignore attempts to delete task IDs that do
+ * not exist.
+ *
+ * @param taskIds IDs of tasks to delete.
+ */
+ void deleteTasks(final Set<String> taskIds);
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
new file mode 100644
index 0000000..37d13f4
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
@@ -0,0 +1,458 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.state;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.logging.Logger;
+
+import javax.annotation.Nullable;
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Atomics;
+
+import org.apache.mesos.Protos.SlaveID;
+
+import com.twitter.aurora.gen.AssignedTask;
+import com.twitter.aurora.gen.ScheduleStatus;
+import com.twitter.aurora.gen.ScheduledTask;
+import com.twitter.aurora.scheduler.Driver;
+import com.twitter.aurora.scheduler.TaskIdGenerator;
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.base.Tasks;
+import com.twitter.aurora.scheduler.events.PubsubEvent;
+import com.twitter.aurora.scheduler.state.SideEffectStorage.SideEffectWork;
+import com.twitter.aurora.scheduler.storage.Storage;
+import com.twitter.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import com.twitter.aurora.scheduler.storage.Storage.StoreProvider;
+import com.twitter.aurora.scheduler.storage.Storage.Work;
+import com.twitter.aurora.scheduler.storage.TaskStore;
+import com.twitter.aurora.scheduler.storage.TaskStore.Mutable.TaskMutation;
+import com.twitter.aurora.scheduler.storage.entities.IAssignedTask;
+import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
+import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
+import com.twitter.common.base.Closure;
+import com.twitter.common.stats.Stats;
+import com.twitter.common.util.Clock;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.collect.Iterables.transform;
+
+import static com.twitter.aurora.gen.ScheduleStatus.INIT;
+import static com.twitter.aurora.gen.ScheduleStatus.PENDING;
+import static com.twitter.aurora.gen.ScheduleStatus.UNKNOWN;
+import static com.twitter.aurora.scheduler.state.SideEffectStorage.OperationFinalizer;
+import static com.twitter.common.base.MorePreconditions.checkNotBlank;
+
+/**
+ * Manager of all persistence-related operations for the scheduler. Acts as a controller for
+ * persisted state machine transitions, and their side-effects.
+ *
+ * TODO(William Farner): Re-evaluate thread safety here, specifically risk of races that
+ * modify managerState.
+ */
+public class StateManagerImpl implements StateManager {
+ private static final Logger LOG = Logger.getLogger(StateManagerImpl.class.getName());
+
+ private final SideEffectStorage storage;
+ @VisibleForTesting
+ SideEffectStorage getStorage() {
+ return storage;
+ }
+
+ private final TaskIdGenerator taskIdGenerator;
+
+ // Work queue to receive state machine side effect work.
+ // Items are sorted to place DELETE entries last. This is to ensure that within an operation,
+ // a delete is always processed after a state transition.
+ private final Queue<WorkEntry> workQueue = new PriorityQueue<>(10,
+ new Comparator<WorkEntry>() {
+ @Override public int compare(WorkEntry a, WorkEntry b) {
+ if ((a.command == WorkCommand.DELETE) != (b.command == WorkCommand.DELETE)) {
+ return (a.command == WorkCommand.DELETE) ? 1 : -1;
+ } else {
+ return 0;
+ }
+ }
+ });
+
+ // Adapt the work queue into a sink.
+ private final TaskStateMachine.WorkSink workSink = new TaskStateMachine.WorkSink() {
+ @Override public void addWork(
+ WorkCommand work,
+ TaskStateMachine stateMachine,
+ Function<IScheduledTask, IScheduledTask> mutation) {
+
+ workQueue.add(new WorkEntry(work, stateMachine, mutation));
+ }
+ };
+
+ private final Function<Map.Entry<Integer, ITaskConfig>, IScheduledTask> taskCreator =
+ new Function<Map.Entry<Integer, ITaskConfig>, IScheduledTask>() {
+ @Override public IScheduledTask apply(Map.Entry<Integer, ITaskConfig> entry) {
+ ITaskConfig task = entry.getValue();
+ AssignedTask assigned = new AssignedTask()
+ .setTaskId(taskIdGenerator.generate(task, entry.getKey()))
+ .setInstanceId(entry.getKey())
+ .setTask(task.newBuilder());
+ return IScheduledTask.build(new ScheduledTask()
+ .setStatus(INIT)
+ .setAssignedTask(assigned));
+ }
+ };
+
+ private final Driver driver;
+ private final Clock clock;
+
+ /**
+ * An item of work on the work queue.
+ */
+ private static class WorkEntry {
+ private final WorkCommand command;
+ private final TaskStateMachine stateMachine;
+ private final Function<IScheduledTask, IScheduledTask> mutation;
+
+ WorkEntry(
+ WorkCommand command,
+ TaskStateMachine stateMachine,
+ Function<IScheduledTask, IScheduledTask> mutation) {
+
+ this.command = command;
+ this.stateMachine = stateMachine;
+ this.mutation = mutation;
+ }
+ }
+
+ @Inject
+ StateManagerImpl(
+ final Storage storage,
+ final Clock clock,
+ Driver driver,
+ TaskIdGenerator taskIdGenerator,
+ Closure<PubsubEvent> taskEventSink) {
+
+ checkNotNull(storage);
+ this.clock = checkNotNull(clock);
+
+ OperationFinalizer finalizer = new OperationFinalizer() {
+ @Override public void finalize(SideEffectWork<?, ?> work, MutableStoreProvider store) {
+ processWorkQueueInWriteOperation(work, store);
+ }
+ };
+
+ this.storage = new SideEffectStorage(storage, finalizer, taskEventSink);
+
+ this.driver = checkNotNull(driver);
+ this.taskIdGenerator = checkNotNull(taskIdGenerator);
+
+ Stats.exportSize("work_queue_depth", workQueue);
+ }
+
+ @Override
+ public void insertPendingTasks(final Map<Integer, ITaskConfig> tasks) {
+ checkNotNull(tasks);
+
+ // Done outside the write transaction to minimize the work done inside a transaction.
+ final Set<IScheduledTask> scheduledTasks =
+ ImmutableSet.copyOf(transform(tasks.entrySet(), taskCreator));
+
+ storage.write(storage.new NoResultQuietSideEffectWork() {
+ @Override protected void execute(MutableStoreProvider storeProvider) {
+ storeProvider.getUnsafeTaskStore().saveTasks(scheduledTasks);
+
+ for (IScheduledTask task : scheduledTasks) {
+ createStateMachine(task).updateState(PENDING);
+ }
+ }
+ });
+ }
+
+ @Override
+ public int changeState(
+ Query.Builder query,
+ final ScheduleStatus newState,
+ final Optional<String> auditMessage) {
+
+ return changeState(query, new Function<TaskStateMachine, Boolean>() {
+ @Override public Boolean apply(TaskStateMachine stateMachine) {
+ return stateMachine.updateState(newState, auditMessage);
+ }
+ });
+ }
+
+ @Override
+ public IAssignedTask assignTask(
+ String taskId,
+ String slaveHost,
+ SlaveID slaveId,
+ Set<Integer> assignedPorts) {
+
+ checkNotBlank(taskId);
+ checkNotBlank(slaveHost);
+ checkNotNull(assignedPorts);
+
+ TaskAssignMutation mutation = assignHost(slaveHost, slaveId, assignedPorts);
+ changeState(Query.taskScoped(taskId), mutation);
+
+ return mutation.getAssignedTask();
+ }
+
+ private int changeStateInWriteOperation(
+ Set<String> taskIds,
+ Function<TaskStateMachine, Boolean> stateChange) {
+
+ int count = 0;
+ for (TaskStateMachine stateMachine : getStateMachines(taskIds).values()) {
+ if (stateChange.apply(stateMachine)) {
+ ++count;
+ }
+ }
+ return count;
+ }
+
+ private int changeState(
+ final Query.Builder query,
+ final Function<TaskStateMachine, Boolean> stateChange) {
+
+ return storage.write(storage.new QuietSideEffectWork<Integer>() {
+ @Override public Integer apply(MutableStoreProvider storeProvider) {
+ Set<String> ids = FluentIterable.from(storeProvider.getTaskStore().fetchTasks(query))
+ .transform(Tasks.SCHEDULED_TO_ID)
+ .toSet();
+ return changeStateInWriteOperation(ids, stateChange);
+ }
+ });
+ }
+
+ private interface TaskAssignMutation extends Function<TaskStateMachine, Boolean> {
+ IAssignedTask getAssignedTask();
+ }
+
+ private static Map<String, Integer> getNameMappedPorts(
+ Set<String> portNames,
+ Set<Integer> allocatedPorts) {
+
+ Preconditions.checkNotNull(portNames);
+
+ // Expand ports.
+ Map<String, Integer> ports = Maps.newHashMap();
+ Set<Integer> portsRemaining = Sets.newHashSet(allocatedPorts);
+ Iterator<Integer> portConsumer = Iterables.consumingIterable(portsRemaining).iterator();
+
+ for (String portName : portNames) {
+ Preconditions.checkArgument(portConsumer.hasNext(),
+ "Allocated ports %s were not sufficient to expand task.", allocatedPorts);
+ int portNumber = portConsumer.next();
+ ports.put(portName, portNumber);
+ }
+
+ if (!portsRemaining.isEmpty()) {
+ LOG.warning("Not all allocated ports were used to map ports!");
+ }
+
+ return ports;
+ }
+
+ private TaskAssignMutation assignHost(
+ final String slaveHost,
+ final SlaveID slaveId,
+ final Set<Integer> assignedPorts) {
+
+ final TaskMutation mutation = new TaskMutation() {
+ @Override public IScheduledTask apply(IScheduledTask task) {
+ ScheduledTask builder = task.newBuilder();
+ AssignedTask assigned = builder.getAssignedTask();
+ assigned.setAssignedPorts(
+ getNameMappedPorts(assigned.getTask().getRequestedPorts(), assignedPorts));
+ assigned.setSlaveHost(slaveHost)
+ .setSlaveId(slaveId.getValue());
+ return IScheduledTask.build(builder);
+ }
+ };
+
+ return new TaskAssignMutation() {
+ private AtomicReference<IAssignedTask> assignedTask = Atomics.newReference();
+ @Override public IAssignedTask getAssignedTask() {
+ return assignedTask.get();
+ }
+
+ @Override public Boolean apply(final TaskStateMachine stateMachine) {
+ TaskMutation wrapper = new TaskMutation() {
+ @Override public IScheduledTask apply(IScheduledTask task) {
+ IScheduledTask mutated = mutation.apply(task);
+ Preconditions.checkState(
+ assignedTask.compareAndSet(null, mutated.getAssignedTask()),
+ "More than one result was found for an identity query.");
+ return mutated;
+ }
+ };
+ return stateMachine.updateState(ScheduleStatus.ASSIGNED, wrapper);
+ }
+ };
+ }
+
+ private void processWorkQueueInWriteOperation(
+ SideEffectWork<?, ?> sideEffectWork,
+ MutableStoreProvider storeProvider) {
+
+ for (final WorkEntry work : Iterables.consumingIterable(workQueue)) {
+ final TaskStateMachine stateMachine = work.stateMachine;
+
+ if (work.command == WorkCommand.KILL) {
+ driver.killTask(stateMachine.getTaskId());
+ } else {
+ TaskStore.Mutable taskStore = storeProvider.getUnsafeTaskStore();
+ String taskId = stateMachine.getTaskId();
+ Query.Builder idQuery = Query.taskScoped(taskId);
+
+ switch (work.command) {
+ case RESCHEDULE:
+ ScheduledTask builder =
+ Iterables.getOnlyElement(taskStore.fetchTasks(idQuery)).newBuilder();
+ builder.getAssignedTask().unsetSlaveId();
+ builder.getAssignedTask().unsetSlaveHost();
+ builder.getAssignedTask().unsetAssignedPorts();
+ builder.unsetTaskEvents();
+ builder.setAncestorId(taskId);
+ String newTaskId = taskIdGenerator.generate(
+ ITaskConfig.build(builder.getAssignedTask().getTask()),
+ builder.getAssignedTask().getInstanceId());
+ builder.getAssignedTask().setTaskId(newTaskId);
+
+ LOG.info("Task being rescheduled: " + taskId);
+
+ IScheduledTask task = IScheduledTask.build(builder);
+ taskStore.saveTasks(ImmutableSet.of(task));
+
+ createStateMachine(task).updateState(PENDING, Optional.of("Rescheduled"));
+ ITaskConfig taskInfo = task.getAssignedTask().getTask();
+ sideEffectWork.addTaskEvent(
+ new PubsubEvent.TaskRescheduled(
+ taskInfo.getOwner().getRole(),
+ taskInfo.getJobName(),
+ task.getAssignedTask().getInstanceId()));
+ break;
+
+ case UPDATE_STATE:
+ taskStore.mutateTasks(idQuery, new TaskMutation() {
+ @Override public IScheduledTask apply(IScheduledTask task) {
+ return work.mutation.apply(
+ IScheduledTask.build(task.newBuilder().setStatus(stateMachine.getState())));
+ }
+ });
+ sideEffectWork.addTaskEvent(
+ new PubsubEvent.TaskStateChange(
+ Iterables.getOnlyElement(taskStore.fetchTasks(idQuery)),
+ stateMachine.getPreviousState()));
+ break;
+
+ case DELETE:
+ deleteTasks(ImmutableSet.of(taskId));
+ break;
+
+ case INCREMENT_FAILURES:
+ taskStore.mutateTasks(idQuery, new TaskMutation() {
+ @Override public IScheduledTask apply(IScheduledTask task) {
+ return IScheduledTask.build(
+ task.newBuilder().setFailureCount(task.getFailureCount() + 1));
+ }
+ });
+ break;
+
+ default:
+ LOG.severe("Unrecognized work command type " + work.command);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void deleteTasks(final Set<String> taskIds) {
+ storage.write(storage.new NoResultQuietSideEffectWork() {
+ @Override protected void execute(final MutableStoreProvider storeProvider) {
+ TaskStore.Mutable taskStore = storeProvider.getUnsafeTaskStore();
+ Iterable<IScheduledTask> tasks = taskStore.fetchTasks(Query.taskScoped(taskIds));
+ addTaskEvent(new PubsubEvent.TasksDeleted(ImmutableSet.copyOf(tasks)));
+ taskStore.deleteTasks(taskIds);
+ }
+ });
+ }
+
+ private Map<String, TaskStateMachine> getStateMachines(final Set<String> taskIds) {
+ return storage.consistentRead(new Work.Quiet<Map<String, TaskStateMachine>>() {
+ @Override public Map<String, TaskStateMachine> apply(StoreProvider storeProvider) {
+ Map<String, IScheduledTask> existingTasks = Maps.uniqueIndex(
+ storeProvider.getTaskStore().fetchTasks(Query.taskScoped(taskIds)),
+ new Function<IScheduledTask, String>() {
+ @Override public String apply(IScheduledTask input) {
+ return input.getAssignedTask().getTaskId();
+ }
+ });
+
+ ImmutableMap.Builder<String, TaskStateMachine> builder = ImmutableMap.builder();
+ for (String taskId : taskIds) {
+ // Pass null get() values through.
+ builder.put(taskId, getStateMachine(taskId, existingTasks.get(taskId)));
+ }
+ return builder.build();
+ }
+ });
+ }
+
+ private TaskStateMachine getStateMachine(String taskId, @Nullable IScheduledTask task) {
+ if (task != null) {
+ return createStateMachine(task, task.getStatus());
+ }
+
+ // The task is unknown, not present in storage.
+ TaskStateMachine stateMachine = new TaskStateMachine(
+ taskId,
+ null,
+ workSink,
+ clock,
+ INIT);
+ stateMachine.updateState(UNKNOWN);
+ return stateMachine;
+ }
+
+ private TaskStateMachine createStateMachine(IScheduledTask task) {
+ return createStateMachine(task, INIT);
+ }
+
+ private TaskStateMachine createStateMachine(IScheduledTask task, ScheduleStatus initialState) {
+ return new TaskStateMachine(
+ Tasks.id(task),
+ task,
+ workSink,
+ clock,
+ initialState);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateModule.java b/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
new file mode 100644
index 0000000..870d085
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.state;
+
+import javax.inject.Singleton;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.inject.AbstractModule;
+import com.google.inject.Binder;
+
+import com.twitter.aurora.scheduler.MesosTaskFactory;
+import com.twitter.aurora.scheduler.MesosTaskFactory.MesosTaskFactoryImpl;
+import com.twitter.aurora.scheduler.events.PubsubEventModule;
+import com.twitter.aurora.scheduler.state.MaintenanceController.MaintenanceControllerImpl;
+import com.twitter.aurora.scheduler.state.TaskAssigner.TaskAssignerImpl;
+import com.twitter.aurora.scheduler.state.UUIDGenerator.UUIDGeneratorImpl;
+
+/**
+ * Binding module for scheduling logic and higher-level state management.
+ */
+public class StateModule extends AbstractModule {
+
+ @Override
+ protected void configure() {
+ bind(TaskAssigner.class).to(TaskAssignerImpl.class);
+ bind(TaskAssignerImpl.class).in(Singleton.class);
+ bind(MesosTaskFactory.class).to(MesosTaskFactoryImpl.class);
+
+ bind(SchedulerCore.class).to(SchedulerCoreImpl.class).in(Singleton.class);
+
+ bind(StateManager.class).to(StateManagerImpl.class);
+ bind(StateManagerImpl.class).in(Singleton.class);
+
+ bind(UUIDGenerator.class).to(UUIDGeneratorImpl.class);
+ bind(UUIDGeneratorImpl.class).in(Singleton.class);
+ bind(LockManager.class).to(LockManagerImpl.class);
+ bind(LockManagerImpl.class).in(Singleton.class);
+
+ bindCronJobManager(binder());
+ bind(ImmediateJobManager.class).in(Singleton.class);
+
+ bindMaintenanceController(binder());
+ }
+
+ @VisibleForTesting
+ static void bindCronJobManager(Binder binder) {
+ binder.bind(CronJobManager.class).in(Singleton.class);
+ PubsubEventModule.bindSubscriber(binder, CronJobManager.class);
+ }
+
+ @VisibleForTesting
+ static void bindMaintenanceController(Binder binder) {
+ binder.bind(MaintenanceController.class).to(MaintenanceControllerImpl.class);
+ binder.bind(MaintenanceControllerImpl.class).in(Singleton.class);
+ PubsubEventModule.bindSubscriber(binder, MaintenanceControllerImpl.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
new file mode 100644
index 0000000..c37feae
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.state;
+
+import java.util.Set;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.google.common.base.Optional;
+
+import org.apache.mesos.Protos.Offer;
+import org.apache.mesos.Protos.TaskInfo;
+
+import com.twitter.aurora.scheduler.MesosTaskFactory;
+import com.twitter.aurora.scheduler.ResourceSlot;
+import com.twitter.aurora.scheduler.base.Tasks;
+import com.twitter.aurora.scheduler.configuration.Resources;
+import com.twitter.aurora.scheduler.filter.SchedulingFilter;
+import com.twitter.aurora.scheduler.filter.SchedulingFilter.Veto;
+import com.twitter.aurora.scheduler.storage.entities.IAssignedTask;
+import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Responsible for matching a task against an offer.
+ */
+public interface TaskAssigner {
+
+ /**
+ * Tries to match a task against an offer. If a match is found, the assigner should
+ * make the appropriate changes to the task and provide a non-empty result.
+ *
+ * @param offer The resource offer.
+ * @param task The task to match against and optionally assign.
+ * @return Instructions for launching the task if matching and assignment were successful.
+ */
+ Optional<TaskInfo> maybeAssign(Offer offer, IScheduledTask task);
+
+ class TaskAssignerImpl implements TaskAssigner {
+ private static final Logger LOG = Logger.getLogger(TaskAssignerImpl.class.getName());
+
+ private final StateManager stateManager;
+ private final SchedulingFilter filter;
+ private final MesosTaskFactory taskFactory;
+
+ @Inject
+ public TaskAssignerImpl(
+ StateManager stateManager,
+ SchedulingFilter filter,
+ MesosTaskFactory taskFactory) {
+
+ this.stateManager = checkNotNull(stateManager);
+ this.filter = checkNotNull(filter);
+ this.taskFactory = checkNotNull(taskFactory);
+ }
+
+ private TaskInfo assign(Offer offer, IScheduledTask task) {
+ String host = offer.getHostname();
+ Set<Integer> selectedPorts =
+ Resources.getPorts(offer, task.getAssignedTask().getTask().getRequestedPorts().size());
+ IAssignedTask assigned = stateManager.assignTask(
+ Tasks.id(task),
+ host,
+ offer.getSlaveId(),
+ selectedPorts);
+ LOG.info(String.format("Offer on slave %s (id %s) is being assigned task for %s.",
+ host, offer.getSlaveId(), Tasks.id(task)));
+ return taskFactory.createFrom(assigned, offer.getSlaveId());
+ }
+
+ @Override
+ public Optional<TaskInfo> maybeAssign(Offer offer, IScheduledTask task) {
+ Set<Veto> vetoes = filter.filter(
+ ResourceSlot.from(offer),
+ offer.getHostname(),
+ task.getAssignedTask().getTask(),
+ Tasks.id(task));
+ if (vetoes.isEmpty()) {
+ return Optional.of(assign(offer, task));
+ } else {
+ LOG.fine("Slave " + offer.getHostname() + " vetoed task " + Tasks.id(task)
+ + ": " + vetoes);
+ return Optional.absent();
+ }
+ }
+ }
+}