You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2013/12/31 22:20:30 UTC
[37/51] [partial] Rename twitter* and com.twitter to apache and
org.apache directories to preserve all file history before the refactor.
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java b/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java
new file mode 100644
index 0000000..95334ff
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java
@@ -0,0 +1,341 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.async;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.google.common.base.Function;
+import com.google.common.base.Objects;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Supplier;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Ordering;
+import com.google.common.eventbus.Subscribe;
+
+import org.apache.mesos.Protos.Offer;
+import org.apache.mesos.Protos.OfferID;
+import org.apache.mesos.Protos.TaskInfo;
+
+import com.twitter.aurora.gen.HostStatus;
+import com.twitter.aurora.gen.MaintenanceMode;
+import com.twitter.aurora.scheduler.Driver;
+import com.twitter.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
+import com.twitter.aurora.scheduler.events.PubsubEvent.EventSubscriber;
+import com.twitter.aurora.scheduler.events.PubsubEvent.HostMaintenanceStateChange;
+import com.twitter.aurora.scheduler.state.MaintenanceController;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.Stats;
+
+import static com.twitter.aurora.gen.MaintenanceMode.DRAINED;
+import static com.twitter.aurora.gen.MaintenanceMode.DRAINING;
+import static com.twitter.aurora.gen.MaintenanceMode.NONE;
+import static com.twitter.aurora.gen.MaintenanceMode.SCHEDULED;
+
+/**
+ * Tracks the Offers currently known by the scheduler
+ */
+public interface OfferQueue extends EventSubscriber {
+
+ /**
+ * Notifies the scheduler of a new resource offer.
+ *
+ * @param offer Newly-available resource offer.
+ */
+ void addOffer(Offer offer);
+
+ /**
+ * Invalidates an offer. This indicates that the scheduler should not attempt to match any
+ * tasks against the offer.
+ *
+ * @param offer Canceled offer.
+ */
+ void cancelOffer(OfferID offer);
+
+ /**
+ * Launches the first task that satisfies the {@code acceptor} by returning a {@link TaskInfo}.
+ *
+ * @param acceptor Function that determines if an offer is accepted.
+ * @return {@code true} if the task was launched, {@code false} if no offers satisfied the
+ * {@code acceptor}.
+ * @throws LaunchException If the acceptor accepted an offer, but there was an error launching the
+ * task.
+ */
+ boolean launchFirst(Function<Offer, Optional<TaskInfo>> acceptor) throws LaunchException;
+
+ /**
+ * Notifies the offer queue that a host has changed state.
+ *
+ * @param change State change notification.
+ */
+ void hostChangedState(HostMaintenanceStateChange change);
+
+ /**
+ * Gets the offers that the scheduler is holding.
+ *
+ * @return A snapshot of the offers that the scheduler is currently holding.
+ */
+ Iterable<Offer> getOffers();
+
+ /**
+ * Calculates the amount of time before an offer should be 'returned' by declining it.
+ * The delay is calculated for each offer that is received, so the return delay may be
+ * fixed or variable.
+ */
+ public interface OfferReturnDelay extends Supplier<Amount<Integer, Time>> {
+ }
+
+ /**
+ * Thrown when there was an unexpected failure trying to launch a task.
+ */
+ static class LaunchException extends Exception {
+ LaunchException(String msg) {
+ super(msg);
+ }
+
+ LaunchException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+ }
+
+ class OfferQueueImpl implements OfferQueue {
+ private static final Logger LOG = Logger.getLogger(OfferQueueImpl.class.getName());
+
+ static final Comparator<HostOffer> PREFERENCE_COMPARATOR =
+ // Currently, the only preference is based on host maintenance status.
+ Ordering.explicit(NONE, SCHEDULED, DRAINING, DRAINED)
+ .onResultOf(new Function<HostOffer, MaintenanceMode>() {
+ @Override public MaintenanceMode apply(HostOffer offer) {
+ return offer.mode;
+ }
+ })
+ .compound(Ordering.arbitrary());
+
+ private final Set<HostOffer> hostOffers = new ConcurrentSkipListSet<>(PREFERENCE_COMPARATOR);
+ private final AtomicLong offerRaces = Stats.exportLong("offer_accept_races");
+
+ private final Driver driver;
+ private final OfferReturnDelay returnDelay;
+ private final ScheduledExecutorService executor;
+ private final MaintenanceController maintenance;
+
+ @Inject
+ OfferQueueImpl(Driver driver,
+ OfferReturnDelay returnDelay,
+ ScheduledExecutorService executor,
+ MaintenanceController maintenance) {
+
+ this.driver = driver;
+ this.returnDelay = returnDelay;
+ this.executor = executor;
+ this.maintenance = maintenance;
+ // Potential gotcha - since this is now a ConcurrentSkipListSet, size() is more expensive.
+ // Could track this separately if it turns out to pose problems.
+ Stats.exportSize("outstanding_offers", hostOffers);
+ }
+
+ @Override
+ public void addOffer(final Offer offer) {
+ // We run a slight risk of a race here, which is acceptable. The worst case is that we
+ // temporarily hold two offers for the same host, which should be corrected when we return
+ // them after the return delay.
+ // There's also a chance that we return an offer for compaction ~simultaneously with the
+ // same-host offer being canceled/returned. This is also fine.
+ List<HostOffer> sameSlave = FluentIterable.from(hostOffers)
+ .filter(new Predicate<HostOffer>() {
+ @Override public boolean apply(HostOffer hostOffer) {
+ return hostOffer.offer.getSlaveId().equals(offer.getSlaveId());
+ }
+ })
+ .toList();
+ if (sameSlave.isEmpty()) {
+ hostOffers.add(new HostOffer(offer, maintenance.getMode(offer.getHostname())));
+ executor.schedule(
+ new Runnable() {
+ @Override public void run() {
+ removeAndDecline(offer.getId());
+ }
+ },
+ returnDelay.get().as(Time.MILLISECONDS),
+ TimeUnit.MILLISECONDS);
+ } else {
+ // If there are existing offers for the slave, decline all of them so the master can
+ // compact all of those offers into a single offer and send them back.
+ LOG.info("Returning " + (sameSlave.size() + 1)
+ + " offers for " + offer.getSlaveId().getValue() + " for compaction.");
+ decline(offer.getId());
+ for (HostOffer sameSlaveOffer : sameSlave) {
+ removeAndDecline(sameSlaveOffer.offer.getId());
+ }
+ }
+ }
+
+ void removeAndDecline(OfferID id) {
+ if (removeFromHostOffers(id)) {
+ decline(id);
+ }
+ }
+
+ void decline(OfferID id) {
+ LOG.fine("Declining offer " + id);
+ driver.declineOffer(id);
+ }
+
+ @Override
+ public void cancelOffer(final OfferID offerId) {
+ removeFromHostOffers(offerId);
+ }
+
+ private boolean removeFromHostOffers(final OfferID offerId) {
+ Preconditions.checkNotNull(offerId);
+
+ // The small risk of inconsistency is acceptable here - if we have an accept/remove race
+ // on an offer, the master will mark the task as LOST and it will be retried.
+ return Iterables.removeIf(hostOffers,
+ new Predicate<HostOffer>() {
+ @Override public boolean apply(HostOffer input) {
+ return input.offer.getId().equals(offerId);
+ }
+ });
+ }
+
+ @Override
+ public Iterable<Offer> getOffers() {
+ return Iterables.unmodifiableIterable(
+ FluentIterable.from(hostOffers)
+ .transform(new Function<HostOffer, Offer>() {
+ @Override public Offer apply(HostOffer offer) {
+ return offer.offer;
+ }
+ }));
+ }
+
+ /**
+ * Updates the preference of a host's offers.
+ *
+ * @param change Host change notification.
+ */
+ @Subscribe
+ public void hostChangedState(HostMaintenanceStateChange change) {
+ final HostStatus hostStatus = change.getStatus();
+
+ // Remove and re-add a host's offers to re-sort based on its new hostStatus
+ Set<HostOffer> changedOffers = FluentIterable.from(hostOffers)
+ .filter(new Predicate<HostOffer>() {
+ @Override public boolean apply(HostOffer hostOffer) {
+ return hostOffer.offer.getHostname().equals(hostStatus.getHost());
+ }
+ })
+ .toSet();
+ hostOffers.removeAll(changedOffers);
+ hostOffers.addAll(
+ FluentIterable.from(changedOffers)
+ .transform(new Function<HostOffer, HostOffer>() {
+ @Override public HostOffer apply(HostOffer hostOffer) {
+ return new HostOffer(hostOffer.offer, hostStatus.getMode());
+ }
+ })
+ .toSet());
+ }
+
+ /**
+ * Notifies the queue that the driver is disconnected, and all the stored offers are now
+ * invalid.
+ * <p>
+ * The queue takes this as a signal to flush its queue.
+ *
+ * @param event Disconnected event.
+ */
+ @Subscribe
+ public void driverDisconnected(DriverDisconnected event) {
+ LOG.info("Clearing stale offers since the driver is disconnected.");
+ hostOffers.clear();
+ }
+
+ /**
+ * Encapsulate an offer from a host, and the host's maintenance mode.
+ */
+ private static class HostOffer {
+ private final Offer offer;
+ private final MaintenanceMode mode;
+
+ HostOffer(Offer offer, MaintenanceMode mode) {
+ this.offer = offer;
+ this.mode = mode;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof HostOffer)) {
+ return false;
+ }
+ HostOffer other = (HostOffer) o;
+ return Objects.equal(offer, other.offer) && (mode == other.mode);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(offer, mode);
+ }
+ }
+
+ @Override
+ public boolean launchFirst(Function<Offer, Optional<TaskInfo>> acceptor)
+ throws LaunchException {
+
+ // It's important that this method is not called concurrently - doing so would open up the
+ // possibility of a race between the same offers being accepted by different threads.
+
+ for (HostOffer hostOffer : hostOffers) {
+ Optional<TaskInfo> assignment = acceptor.apply(hostOffer.offer);
+ if (assignment.isPresent()) {
+ // Guard against an offer being removed after we grabbed it from the iterator.
+ // If that happens, the offer will not exist in hostOffers, and we can immediately
+ // send it back to LOST for quick reschedule.
+ if (hostOffers.remove(hostOffer)) {
+ try {
+ driver.launchTask(hostOffer.offer.getId(), assignment.get());
+ return true;
+ } catch (IllegalStateException e) {
+ // TODO(William Farner): Catch only the checked exception produced by Driver
+ // once it changes from throwing IllegalStateException when the driver is not yet
+ // registered.
+ throw new LaunchException("Failed to launch task.", e);
+ }
+ } else {
+ offerRaces.incrementAndGet();
+ throw new LaunchException(
+ "Accepted offer no longer exists in offer queue, likely data race.");
+ }
+ }
+ }
+
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java b/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
new file mode 100644
index 0000000..a01790c
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
@@ -0,0 +1,411 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.async;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import com.google.common.collect.Ordering;
+import com.google.common.collect.Sets;
+import com.google.inject.BindingAnnotation;
+
+import com.twitter.aurora.scheduler.ResourceSlot;
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.base.ScheduleException;
+import com.twitter.aurora.scheduler.base.Tasks;
+import com.twitter.aurora.scheduler.filter.SchedulingFilter;
+import com.twitter.aurora.scheduler.state.SchedulerCore;
+import com.twitter.aurora.scheduler.storage.Storage;
+import com.twitter.aurora.scheduler.storage.entities.IAssignedTask;
+import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.Stats;
+import com.twitter.common.util.Clock;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import org.apache.mesos.Protos.Offer;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import static com.twitter.aurora.gen.ScheduleStatus.PENDING;
+import static com.twitter.aurora.gen.ScheduleStatus.PREEMPTING;
+import static com.twitter.aurora.scheduler.base.Tasks.SCHEDULED_TO_ASSIGNED;
+
+/**
+ * Preempts active tasks in favor of higher priority tasks.
+ */
+public interface Preemptor {
+
+ /**
+ * Preempts active tasks in favor of the input task.
+ *
+ * @param taskId ID of the preempting task.
+ * @return ID of the slave where preemption occured.
+ */
+ Optional<String> findPreemptionSlotFor(String taskId);
+
+ /**
+ * A task preemptor that tries to find tasks that are waiting to be scheduled, which are of higher
+ * priority than tasks that are currently running.
+ *
+ * To avoid excessive churn, the preemptor requires that a task is PENDING for a duration
+ * (dictated by {@link #preemptionCandidacyDelay}) before it becomes eligible to preempt other
+ * tasks.
+ */
+ class PreemptorImpl implements Preemptor {
+
+ /**
+ * Binding annotation for the time interval after which a pending task becomes eligible to
+ * preempt other tasks.
+ */
+ @BindingAnnotation
+ @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+ @interface PreemptionDelay { }
+
+ @VisibleForTesting
+ static final Query.Builder CANDIDATE_QUERY = Query.statusScoped(
+ EnumSet.copyOf(Sets.difference(Tasks.ACTIVE_STATES, EnumSet.of(PENDING, PREEMPTING))));
+
+ private static final Logger LOG = Logger.getLogger(PreemptorImpl.class.getName());
+
+ private static final Function<IAssignedTask, Integer> GET_PRIORITY =
+ new Function<IAssignedTask, Integer>() {
+ @Override public Integer apply(IAssignedTask task) {
+ return task.getTask().getPriority();
+ }
+ };
+
+ private final AtomicLong tasksPreempted = Stats.exportLong("preemptor_tasks_preempted");
+ private final AtomicLong failedPreemptions = Stats.exportLong("preemptor_failed_preemptions");
+ // Incremented every time the preemptor is invoked and finds tasks pending and preemptable tasks
+ private final AtomicLong attemptedPreemptions = Stats.exportLong("preemptor_attempts");
+ // Incremented every time we fail to find tasks to preempt for a pending task.
+ private final AtomicLong noSlotsFound = Stats.exportLong("preemptor_no_slots_found");
+
+ private final Predicate<IScheduledTask> isIdleTask = new Predicate<IScheduledTask>() {
+ @Override public boolean apply(IScheduledTask task) {
+ return (clock.nowMillis() - Iterables.getLast(task.getTaskEvents()).getTimestamp())
+ >= preemptionCandidacyDelay.as(Time.MILLISECONDS);
+ }
+ };
+
+ private final Storage storage;
+ private final SchedulerCore scheduler;
+ private final OfferQueue offerQueue;
+ private final SchedulingFilter schedulingFilter;
+ private final Amount<Long, Time> preemptionCandidacyDelay;
+ private final Clock clock;
+
+ /**
+ * Creates a new preemptor.
+ *
+ * @param storage Backing store for tasks.
+ * @param scheduler Scheduler to fetch task information from, and instruct when preempting
+ * tasks.
+ * @param offerQueue Queue that contains available Mesos resource offers.
+ * @param schedulingFilter Filter to identify whether tasks may reside on given slaves.
+ * @param preemptionCandidacyDelay Time a task must be PENDING before it may preempt other
+ * tasks.
+ * @param clock Clock to check current time.
+ */
+ @Inject
+ PreemptorImpl(
+ Storage storage,
+ SchedulerCore scheduler,
+ OfferQueue offerQueue,
+ SchedulingFilter schedulingFilter,
+ @PreemptionDelay Amount<Long, Time> preemptionCandidacyDelay,
+ Clock clock) {
+
+ this.storage = checkNotNull(storage);
+ this.scheduler = checkNotNull(scheduler);
+ this.offerQueue = checkNotNull(offerQueue);
+ this.schedulingFilter = checkNotNull(schedulingFilter);
+ this.preemptionCandidacyDelay = checkNotNull(preemptionCandidacyDelay);
+ this.clock = checkNotNull(clock);
+ }
+
+ private List<IAssignedTask> fetch(Query.Builder query, Predicate<IScheduledTask> filter) {
+ return Lists.newArrayList(Iterables.transform(Iterables.filter(
+ Storage.Util.consistentFetchTasks(storage, query), filter),
+ SCHEDULED_TO_ASSIGNED));
+ }
+
+ private List<IAssignedTask> fetch(Query.Builder query) {
+ return fetch(query, Predicates.<IScheduledTask>alwaysTrue());
+ }
+
+ private static final Function<IAssignedTask, String> TASK_TO_SLAVE_ID =
+ new Function<IAssignedTask, String>() {
+ @Override public String apply(IAssignedTask input) {
+ return input.getSlaveId();
+ }
+ };
+
+ private static final Function<IAssignedTask, String> TASK_TO_HOST =
+ new Function<IAssignedTask, String>() {
+ @Override public String apply(IAssignedTask input) {
+ return input.getSlaveHost();
+ }
+ };
+
+ private static Predicate<IAssignedTask> canPreempt(final IAssignedTask pending) {
+ return new Predicate<IAssignedTask>() {
+ @Override public boolean apply(IAssignedTask possibleVictim) {
+ return preemptionFilter(possibleVictim).apply(pending);
+ }
+ };
+ }
+
+ private static final Function<IAssignedTask, ResourceSlot> TASK_TO_RESOURCES =
+ new Function<IAssignedTask, ResourceSlot>() {
+ @Override public ResourceSlot apply(IAssignedTask input) {
+ return ResourceSlot.from(input.getTask());
+ }
+ };
+
+ private static final Function<Offer, ResourceSlot> OFFER_TO_RESOURCE_SLOT =
+ new Function<Offer, ResourceSlot>() {
+ @Override public ResourceSlot apply(Offer offer) {
+ return ResourceSlot.from(offer);
+ }
+ };
+
+ private static final Function<Offer, String> OFFER_TO_HOST =
+ new Function<Offer, String>() {
+ @Override public String apply(Offer offer) {
+ return offer.getHostname();
+ }
+ };
+
+ // TODO(zmanji) Consider using Dominant Resource Fairness for ordering instead of the vector
+ // ordering
+ private static final Ordering<IAssignedTask> RESOURCE_ORDER =
+ ResourceSlot.ORDER.onResultOf(TASK_TO_RESOURCES).reverse();
+
+ /**
+ * Optional.absent indicates that this slave does not have enough resources to satisfy the task.
+ * The empty set indicates the offers (slack) are enough.
+ * A set with elements indicates those tasks and the offers are enough.
+ */
+ private Optional<Set<IAssignedTask>> getTasksToPreempt(
+ Iterable<IAssignedTask> possibleVictims,
+ Iterable<Offer> offers,
+ IAssignedTask pendingTask) {
+
+ // This enforces the precondition that all of the resources are from the same host. We need to
+ // get the host for the schedulingFilter.
+ Set<String> hosts = ImmutableSet.<String>builder()
+ .addAll(Iterables.transform(possibleVictims, TASK_TO_HOST))
+ .addAll(Iterables.transform(offers, OFFER_TO_HOST)).build();
+
+ String host = Iterables.getOnlyElement(hosts);
+
+ ResourceSlot slackResources =
+ ResourceSlot.sum(Iterables.transform(offers, OFFER_TO_RESOURCE_SLOT));
+
+ if (!Iterables.isEmpty(offers)) {
+ Set<SchedulingFilter.Veto> vetos = schedulingFilter.filter(
+ slackResources,
+ host,
+ pendingTask.getTask(),
+ pendingTask.getTaskId());
+
+ if (vetos.isEmpty()) {
+ return Optional.<Set<IAssignedTask>>of(ImmutableSet.<IAssignedTask>of());
+ }
+ }
+
+ FluentIterable<IAssignedTask> preemptableTasks =
+ FluentIterable.from(possibleVictims).filter(canPreempt(pendingTask));
+
+ if (preemptableTasks.isEmpty()) {
+ return Optional.absent();
+ }
+
+ List<IAssignedTask> toPreemptTasks = Lists.newArrayList();
+
+ Iterable<IAssignedTask> sortedVictims = RESOURCE_ORDER.immutableSortedCopy(preemptableTasks);
+
+ for (IAssignedTask victim : sortedVictims) {
+ toPreemptTasks.add(victim);
+
+ ResourceSlot totalResource = ResourceSlot.sum(
+ ResourceSlot.sum(Iterables.transform(toPreemptTasks, TASK_TO_RESOURCES)),
+ slackResources);
+
+ Set<SchedulingFilter.Veto> vetos = schedulingFilter.filter(
+ totalResource,
+ host,
+ pendingTask.getTask(),
+ pendingTask.getTaskId());
+
+ if (vetos.isEmpty()) {
+ return Optional.<Set<IAssignedTask>>of(ImmutableSet.copyOf(toPreemptTasks));
+ }
+ }
+ return Optional.absent();
+ }
+
+ private static final Function<Offer, String> OFFER_TO_SLAVE_ID =
+ new Function<Offer, String>() {
+ @Override public String apply(Offer offer) {
+ return offer.getSlaveId().getValue();
+ }
+ };
+
+ private Multimap<String, IAssignedTask> getSlavesToActiveTasks() {
+ // Only non-pending active tasks may be preempted.
+ List<IAssignedTask> activeTasks = fetch(CANDIDATE_QUERY);
+
+ // Walk through the preemption candidates in reverse scheduling order.
+ Collections.sort(activeTasks, Tasks.SCHEDULING_ORDER.reverse());
+
+ // Group the tasks by slave id so they can be paired with offers from the same slave.
+ return Multimaps.index(activeTasks, TASK_TO_SLAVE_ID);
+ }
+
+ @Override
+ public synchronized Optional<String> findPreemptionSlotFor(String taskId) {
+ List<IAssignedTask> pendingTasks =
+ fetch(Query.statusScoped(PENDING).byId(taskId), isIdleTask);
+
+ // Task is no longer PENDING no need to preempt
+ if (pendingTasks.isEmpty()) {
+ return Optional.absent();
+ }
+
+ IAssignedTask pendingTask = Iterables.getOnlyElement(pendingTasks);
+
+ Multimap<String, IAssignedTask> slavesToActiveTasks = getSlavesToActiveTasks();
+
+ if (slavesToActiveTasks.isEmpty()) {
+ return Optional.absent();
+ }
+
+ attemptedPreemptions.incrementAndGet();
+
+ // Group the offers by slave id so they can be paired with active tasks from the same slave.
+ Multimap<String, Offer> slavesToOffers =
+ Multimaps.index(offerQueue.getOffers(), OFFER_TO_SLAVE_ID);
+
+ Set<String> allSlaves = ImmutableSet.<String>builder()
+ .addAll(slavesToOffers.keySet())
+ .addAll(slavesToActiveTasks.keySet())
+ .build();
+
+ for (String slaveID : allSlaves) {
+ Optional<Set<IAssignedTask>> toPreemptTasks = getTasksToPreempt(
+ slavesToActiveTasks.get(slaveID),
+ slavesToOffers.get(slaveID),
+ pendingTask);
+
+ if (toPreemptTasks.isPresent()) {
+ try {
+ for (IAssignedTask toPreempt : toPreemptTasks.get()) {
+ scheduler.preemptTask(toPreempt, pendingTask);
+ tasksPreempted.incrementAndGet();
+ }
+ return Optional.of(slaveID);
+ } catch (ScheduleException e) {
+ LOG.log(Level.SEVERE, "Preemption failed", e);
+ failedPreemptions.incrementAndGet();
+ }
+ }
+ }
+
+ noSlotsFound.incrementAndGet();
+ return Optional.absent();
+ }
+
+ private static final Predicate<IAssignedTask> IS_PRODUCTION =
+ Predicates.compose(Tasks.IS_PRODUCTION, Tasks.ASSIGNED_TO_INFO);
+
+ /**
+ * Creates a static filter that will identify tasks that may preempt the provided task.
+ * A task may preempt another task if the following conditions hold true:
+ * - The resources reserved for {@code preemptableTask} are sufficient to satisfy the task.
+ * - The tasks are owned by the same user and the priority of {@code preemptableTask} is lower
+ * OR {@code preemptableTask} is non-production and the compared task is production.
+ *
+ * @param preemptableTask Task to possibly preempt.
+ * @return A filter that will compare the priorities and resources required by other tasks
+ * with {@code preemptableTask}.
+ */
+ private static Predicate<IAssignedTask> preemptionFilter(IAssignedTask preemptableTask) {
+ Predicate<IAssignedTask> preemptableIsProduction = preemptableTask.getTask().isProduction()
+ ? Predicates.<IAssignedTask>alwaysTrue()
+ : Predicates.<IAssignedTask>alwaysFalse();
+
+ Predicate<IAssignedTask> priorityFilter =
+ greaterPriorityFilter(GET_PRIORITY.apply(preemptableTask));
+ return Predicates.or(
+ Predicates.and(Predicates.not(preemptableIsProduction), IS_PRODUCTION),
+ Predicates.and(isOwnedBy(getRole(preemptableTask)), priorityFilter)
+ );
+ }
+
+ private static Predicate<IAssignedTask> isOwnedBy(final String role) {
+ return new Predicate<IAssignedTask>() {
+ @Override public boolean apply(IAssignedTask task) {
+ return getRole(task).equals(role);
+ }
+ };
+ }
+
+ private static String getRole(IAssignedTask task) {
+ return task.getTask().getOwner().getRole();
+ }
+
+ private static Predicate<Integer> greaterThan(final int value) {
+ return new Predicate<Integer>() {
+ @Override public boolean apply(Integer input) {
+ return input > value;
+ }
+ };
+ }
+
+ private static Predicate<IAssignedTask> greaterPriorityFilter(int priority) {
+ return Predicates.compose(greaterThan(priority), GET_PRIORITY);
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java b/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java
new file mode 100644
index 0000000..eefc03a
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java
@@ -0,0 +1,188 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.async;
+
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Set;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+import com.twitter.aurora.gen.ScheduleStatus;
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.base.Tasks;
+import com.twitter.aurora.scheduler.storage.Storage;
+import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
+import com.twitter.aurora.scheduler.storage.entities.ITaskEvent;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.util.BackoffStrategy;
+import com.twitter.common.util.Clock;
+import com.twitter.common.util.Random;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import static com.twitter.aurora.gen.ScheduleStatus.KILLING;
+import static com.twitter.aurora.gen.ScheduleStatus.RESTARTING;
+
+/**
+ * Calculates scheduling delays for tasks.
+ */
+interface RescheduleCalculator {
+ /**
+ * Gets a timestamp for the task to become eligible for (re)scheduling at scheduler startup.
+ *
+ * @param task Task to calculate timestamp for.
+ * @return Timestamp in msec.
+ */
+ long getStartupReadyTimeMs(IScheduledTask task);
+
+ /**
+ * Gets a timestamp for the task to become eligible for (re)scheduling.
+ *
+ * @param task Task to calculate timestamp for.
+ * @return Timestamp in msec.
+ */
+ long getReadyTimeMs(IScheduledTask task);
+
+ class RescheduleCalculatorImpl implements RescheduleCalculator {
+
+ private static final Logger LOG = Logger.getLogger(TaskGroups.class.getName());
+
+ private final Storage storage;
+ private final RescheduleCalculatorSettings settings;
+ private final Clock clock;
+ private final Random random = new Random.SystemRandom(new java.util.Random());
+
+ private static final Predicate<ScheduleStatus> IS_ACTIVE_STATUS =
+ Predicates.in(Tasks.ACTIVE_STATES);
+
+ private static final Function<ITaskEvent, ScheduleStatus> TO_STATUS =
+ new Function<ITaskEvent, ScheduleStatus>() {
+ @Override public ScheduleStatus apply(ITaskEvent input) {
+ return input.getStatus();
+ }
+ };
+
+ private static final Set<ScheduleStatus> INTERRUPTED_TASK_STATES =
+ EnumSet.of(RESTARTING, KILLING);
+
+ private final Predicate<IScheduledTask> flapped = new Predicate<IScheduledTask>() {
+ @Override public boolean apply(IScheduledTask task) {
+ if (!task.isSetTaskEvents()) {
+ return false;
+ }
+
+ List<ITaskEvent> events = Lists.reverse(task.getTaskEvents());
+
+ // Avoid penalizing tasks that were interrupted by outside action, such as a user
+ // restarting them.
+ if (Iterables.any(Iterables.transform(events, TO_STATUS),
+ Predicates.in(INTERRUPTED_TASK_STATES))) {
+ return false;
+ }
+
+ ITaskEvent terminalEvent = Iterables.get(events, 0);
+ ScheduleStatus terminalState = terminalEvent.getStatus();
+ Preconditions.checkState(Tasks.isTerminated(terminalState));
+
+ ITaskEvent activeEvent =
+ Iterables.find(events, Predicates.compose(IS_ACTIVE_STATUS, TO_STATUS));
+
+ long thresholdMs = settings.flappingTaskThreashold.as(Time.MILLISECONDS);
+
+ return (terminalEvent.getTimestamp() - activeEvent.getTimestamp()) < thresholdMs;
+ }
+ };
+
+ static class RescheduleCalculatorSettings {
+ private final BackoffStrategy flappingTaskBackoff;
+ private final Amount<Long, Time> flappingTaskThreashold;
+ private final Amount<Integer, Time> maxStartupRescheduleDelay;
+
+ RescheduleCalculatorSettings(
+ BackoffStrategy flappingTaskBackoff,
+ Amount<Long, Time> flappingTaskThreashold,
+ Amount<Integer, Time> maxStartupRescheduleDelay) {
+
+ this.flappingTaskBackoff = checkNotNull(flappingTaskBackoff);
+ this.flappingTaskThreashold = checkNotNull(flappingTaskThreashold);
+ this.maxStartupRescheduleDelay = checkNotNull(maxStartupRescheduleDelay);
+ }
+ }
+
+ @Inject
+ RescheduleCalculatorImpl(
+ Storage storage,
+ RescheduleCalculatorSettings settings,
+ Clock clock) {
+
+ this.storage = checkNotNull(storage);
+ this.settings = checkNotNull(settings);
+ this.clock = checkNotNull(clock);
+ }
+
+ @Override
+ public long getStartupReadyTimeMs(IScheduledTask task) {
+ return random.nextInt(settings.maxStartupRescheduleDelay.as(Time.MILLISECONDS))
+ + getTaskReadyTimestamp(task);
+ }
+
+ @Override
+ public long getReadyTimeMs(IScheduledTask task) {
+ return getTaskReadyTimestamp(task);
+ }
+
+ private Optional<IScheduledTask> getTaskAncestor(IScheduledTask task) {
+ if (!task.isSetAncestorId()) {
+ return Optional.absent();
+ }
+
+ ImmutableSet<IScheduledTask> res =
+ Storage.Util.weaklyConsistentFetchTasks(storage, Query.taskScoped(task.getAncestorId()));
+
+ return Optional.fromNullable(Iterables.getOnlyElement(res, null));
+ }
+
+ private long getTaskReadyTimestamp(IScheduledTask task) {
+ Optional<IScheduledTask> curTask = getTaskAncestor(task);
+ long penaltyMs = 0;
+ while (curTask.isPresent() && flapped.apply(curTask.get())) {
+ LOG.info(
+ String.format("Ancestor of %s flapped: %s", Tasks.id(task), Tasks.id(curTask.get())));
+ long newPenalty = settings.flappingTaskBackoff.calculateBackoffMs(penaltyMs);
+ // If the backoff strategy is truncated then there is no need for us to continue.
+ if (newPenalty == penaltyMs) {
+ break;
+ }
+ penaltyMs = newPenalty;
+ curTask = getTaskAncestor(curTask.get());
+ }
+
+ return penaltyMs + clock.nowMillis();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/async/TaskGroup.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskGroup.java b/src/main/java/org/apache/aurora/scheduler/async/TaskGroup.java
new file mode 100644
index 0000000..018022b
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskGroup.java
@@ -0,0 +1,142 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.async;
+
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Ordering;
+
+import com.twitter.aurora.scheduler.async.TaskGroups.GroupKey;
+import com.twitter.common.base.Function;
+import com.twitter.common.util.BackoffStrategy;
+
+/**
+ * A group of task IDs that are eligible for scheduling, but may be waiting for a backoff to expire.
+ */
+class TaskGroup {
+ private final GroupKey key;
+ private final BackoffStrategy backoffStrategy;
+
+ private static final Function<Task, Long> TO_TIMESTAMP = new Function<Task, Long>() {
+ @Override public Long apply(Task item) {
+ return item.readyTimestampMs;
+ }
+ };
+
+ // Order the tasks by the time they are ready to be scheduled
+ private static final Ordering<Task> TASK_ORDERING = Ordering.natural().onResultOf(TO_TIMESTAMP);
+ // 11 is the magic number used by PriorityBlockingQueue as the initial size.
+ private final Queue<Task> tasks = new PriorityBlockingQueue<>(11, TASK_ORDERING);
+ // Penalty for the task group for failing to schedule.
+ private final AtomicLong penaltyMs;
+
+ TaskGroup(GroupKey key, BackoffStrategy backoffStrategy) {
+ this.key = key;
+ this.backoffStrategy = backoffStrategy;
+ penaltyMs = new AtomicLong();
+ resetPenaltyAndGet();
+ }
+
+ GroupKey getKey() {
+ return key;
+ }
+
+ private static final Function<Task, String> TO_TASK_ID =
+ new Function<Task, String>() {
+ @Override public String apply(Task item) {
+ return item.taskId;
+ }
+ };
+
+ /**
+ * Removes the task at the head of the queue.
+ *
+ * @return String the id of the head task.
+ * @throws IllegalStateException if the queue is empty.
+ */
+ String pop() throws IllegalStateException {
+ Task head = tasks.poll();
+ Preconditions.checkState(head != null);
+ return head.taskId;
+ }
+
+ void remove(String taskId) {
+ Iterables.removeIf(tasks, Predicates.compose(Predicates.equalTo(taskId), TO_TASK_ID));
+ }
+
+ void push(final String taskId, long readyTimestamp) {
+ tasks.offer(new Task(taskId, readyTimestamp));
+ }
+
+ synchronized long resetPenaltyAndGet() {
+ penaltyMs.set(backoffStrategy.calculateBackoffMs(0));
+ return getPenaltyMs();
+ }
+
+ synchronized long penalizeAndGet() {
+ penaltyMs.set(backoffStrategy.calculateBackoffMs(getPenaltyMs()));
+ return getPenaltyMs();
+ }
+
+ GroupState isReady(long nowMs) {
+ Task task = tasks.peek();
+ if (task == null) {
+ return GroupState.EMPTY;
+ }
+
+ if (task.readyTimestampMs > nowMs) {
+ return GroupState.NOT_READY;
+ }
+ return GroupState.READY;
+ }
+ // Begin methods used for debug interfaces.
+
+ public String getName() {
+ return key.toString();
+ }
+
+ // TODO(zmanji): Return Task instances here. Can use them to display flapping penalty on web UI.
+ public Set<String> getTaskIds() {
+ return ImmutableSet.copyOf(Iterables.transform(tasks, TO_TASK_ID));
+ }
+
+ public long getPenaltyMs() {
+ return penaltyMs.get();
+ }
+
+ private static class Task {
+ private final String taskId;
+ private final long readyTimestampMs;
+
+ Task(String taskId, long readyTimestampMs) {
+ this.taskId = Preconditions.checkNotNull(taskId);
+ this.readyTimestampMs = readyTimestampMs;
+ }
+ }
+
+ enum GroupState {
+ EMPTY, // The group is empty.
+ NOT_READY, // Every task in the group is not ready yet.
+ READY // The task at the head of the queue is ready.
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/async/TaskGroups.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskGroups.java b/src/main/java/org/apache/aurora/scheduler/async/TaskGroups.java
new file mode 100644
index 0000000..a59e5c8
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskGroups.java
@@ -0,0 +1,294 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.async;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.google.common.base.Objects;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.eventbus.Subscribe;
+import com.google.common.util.concurrent.RateLimiter;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import com.twitter.aurora.scheduler.base.JobKeys;
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.base.Tasks;
+import com.twitter.aurora.scheduler.events.PubsubEvent.EventSubscriber;
+import com.twitter.aurora.scheduler.events.PubsubEvent.StorageStarted;
+import com.twitter.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+import com.twitter.aurora.scheduler.events.PubsubEvent.TasksDeleted;
+import com.twitter.aurora.scheduler.storage.Storage;
+import com.twitter.aurora.scheduler.storage.entities.IAssignedTask;
+import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
+import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
+import com.twitter.common.application.ShutdownRegistry;
+import com.twitter.common.base.Command;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.Stats;
+import com.twitter.common.util.BackoffStrategy;
+import com.twitter.common.util.Clock;
+import com.twitter.common.util.concurrent.ExecutorServiceShutdown;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import static com.twitter.aurora.gen.ScheduleStatus.PENDING;
+import static com.twitter.aurora.scheduler.async.TaskGroup.GroupState;
+
+/**
+ * A collection of task groups, where a task group is a collection of tasks that are known to be
+ * equal in the way they schedule. This is expected to be tasks associated with the same job key,
+ * who also have {@code equal()} {@link ITaskConfig} values.
+ * <p>
+ * This is used to prevent redundant work in trying to schedule tasks as well as to provide
+ * nearly-equal responsiveness when scheduling across jobs. In other words, a 1000 instance job
+ * cannot starve a 1 instance job.
+ */
+public class TaskGroups implements EventSubscriber {
+
+ private static final Logger LOG = Logger.getLogger(TaskGroups.class.getName());
+
+ private final Storage storage;
+ private final LoadingCache<GroupKey, TaskGroup> groups;
+ private final Clock clock;
+ private final RescheduleCalculator rescheduleCalculator;
+
+ static class TaskGroupsSettings {
+ private final BackoffStrategy taskGroupBackoff;
+ private final RateLimiter rateLimiter;
+
+ TaskGroupsSettings(BackoffStrategy taskGroupBackoff, RateLimiter rateLimiter) {
+ this.taskGroupBackoff = checkNotNull(taskGroupBackoff);
+ this.rateLimiter = checkNotNull(rateLimiter);
+ }
+ }
+
+ @Inject
+ TaskGroups(
+ ShutdownRegistry shutdownRegistry,
+ Storage storage,
+ TaskGroupsSettings settings,
+ TaskScheduler taskScheduler,
+ Clock clock,
+ RescheduleCalculator rescheduleCalculator) {
+
+ this(
+ createThreadPool(shutdownRegistry),
+ storage,
+ settings.taskGroupBackoff,
+ settings.rateLimiter,
+ taskScheduler,
+ clock,
+ rescheduleCalculator);
+ }
+
+ TaskGroups(
+ final ScheduledExecutorService executor,
+ final Storage storage,
+ final BackoffStrategy taskGroupBackoffStrategy,
+ final RateLimiter rateLimiter,
+ final TaskScheduler taskScheduler,
+ final Clock clock,
+ final RescheduleCalculator rescheduleCalculator) {
+
+ this.storage = checkNotNull(storage);
+ checkNotNull(executor);
+ checkNotNull(taskGroupBackoffStrategy);
+ checkNotNull(rateLimiter);
+ checkNotNull(taskScheduler);
+ this.clock = checkNotNull(clock);
+ this.rescheduleCalculator = checkNotNull(rescheduleCalculator);
+
+ final TaskScheduler ratelLimitedScheduler = new TaskScheduler() {
+ @Override public TaskSchedulerResult schedule(String taskId) {
+ rateLimiter.acquire();
+ return taskScheduler.schedule(taskId);
+ }
+ };
+
+ groups = CacheBuilder.newBuilder().build(new CacheLoader<GroupKey, TaskGroup>() {
+ @Override public TaskGroup load(GroupKey key) {
+ TaskGroup group = new TaskGroup(key, taskGroupBackoffStrategy);
+ LOG.info("Evaluating group " + key + " in " + group.getPenaltyMs() + " ms");
+ startGroup(group, executor, ratelLimitedScheduler);
+ return group;
+ }
+ });
+ }
+
+ private synchronized boolean maybeInvalidate(TaskGroup group) {
+ if (group.getTaskIds().isEmpty()) {
+ groups.invalidate(group.getKey());
+ return true;
+ }
+ return false;
+ }
+
+ private void startGroup(
+ final TaskGroup group,
+ final ScheduledExecutorService executor,
+ final TaskScheduler taskScheduler) {
+
+ Runnable monitor = new Runnable() {
+ @Override public void run() {
+ GroupState state = group.isReady(clock.nowMillis());
+
+ switch (state) {
+ case EMPTY:
+ maybeInvalidate(group);
+ break;
+
+ case READY:
+ String id = group.pop();
+ TaskScheduler.TaskSchedulerResult result = taskScheduler.schedule(id);
+ switch (result) {
+ case SUCCESS:
+ if (!maybeInvalidate(group)) {
+ executor.schedule(this, group.resetPenaltyAndGet(), TimeUnit.MILLISECONDS);
+ }
+ break;
+
+ case TRY_AGAIN:
+ group.push(id, clock.nowMillis());
+ executor.schedule(this, group.penalizeAndGet(), TimeUnit.MILLISECONDS);
+ break;
+
+ default:
+ throw new IllegalStateException("Unknown TaskSchedulerResult " + result);
+ }
+ break;
+
+ case NOT_READY:
+ executor.schedule(this, group.getPenaltyMs(), TimeUnit.MILLISECONDS);
+ break;
+
+ default:
+ throw new IllegalStateException("Unknown GroupState " + state);
+ }
+ }
+ };
+ executor.schedule(monitor, group.getPenaltyMs(), TimeUnit.MILLISECONDS);
+ }
+
+ private static ScheduledExecutorService createThreadPool(ShutdownRegistry shutdownRegistry) {
+ // TODO(William Farner): Leverage ExceptionHandlingScheduledExecutorService:
+ // com.twitter.common.util.concurrent.ExceptionHandlingScheduledExecutorService
+ final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(
+ 1,
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("TaskScheduler-%d").build());
+ Stats.exportSize("schedule_queue_size", executor.getQueue());
+ shutdownRegistry.addAction(new Command() {
+ @Override public void execute() {
+ new ExecutorServiceShutdown(executor, Amount.of(1L, Time.SECONDS)).execute();
+ }
+ });
+ return executor;
+ }
+
+ private synchronized void add(IAssignedTask task, long readyTimestamp) {
+ groups.getUnchecked(new GroupKey(task.getTask())).push(task.getTaskId(), readyTimestamp);
+ }
+
+ /**
+ * Informs the task groups of a task state change.
+ * <p>
+ * This is used to observe {@link com.twitter.aurora.gen.ScheduleStatus#PENDING} tasks and begin
+ * attempting to schedule them.
+ *
+ * @param stateChange State change notification.
+ */
+ @Subscribe
+ public synchronized void taskChangedState(TaskStateChange stateChange) {
+ if (stateChange.getNewState() == PENDING) {
+ add(
+ stateChange.getTask().getAssignedTask(),
+ rescheduleCalculator.getReadyTimeMs(stateChange.getTask()));
+ }
+ }
+
+ /**
+ * Signals that storage has started and is consistent.
+ * <p>
+ * Upon this signal, all {@link com.twitter.aurora.gen.ScheduleStatus#PENDING} tasks in the stoage
+ * will become eligible for scheduling.
+ *
+ * @param event Storage started notification.
+ */
+ @Subscribe
+ public void storageStarted(StorageStarted event) {
+ for (IScheduledTask task
+ : Storage.Util.consistentFetchTasks(storage, Query.unscoped().byStatus(PENDING))) {
+
+ add(task.getAssignedTask(), rescheduleCalculator.getStartupReadyTimeMs(task));
+ }
+ }
+
+ /**
+ * Signals the scheduler that tasks have been deleted.
+ *
+ * @param deleted Tasks deleted event.
+ */
+ @Subscribe
+ public synchronized void tasksDeleted(TasksDeleted deleted) {
+ for (IAssignedTask task
+ : Iterables.transform(deleted.getTasks(), Tasks.SCHEDULED_TO_ASSIGNED)) {
+ TaskGroup group = groups.getIfPresent(new GroupKey(task.getTask()));
+ if (group != null) {
+ group.remove(task.getTaskId());
+ }
+ }
+ }
+
+ public Iterable<TaskGroup> getGroups() {
+ return ImmutableSet.copyOf(groups.asMap().values());
+ }
+
+ static class GroupKey {
+ private final ITaskConfig canonicalTask;
+
+ GroupKey(ITaskConfig task) {
+ this.canonicalTask = task;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(canonicalTask);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof GroupKey)) {
+ return false;
+ }
+ GroupKey other = (GroupKey) o;
+ return Objects.equal(canonicalTask, other.canonicalTask);
+ }
+
+ @Override
+ public String toString() {
+ return JobKeys.toPath(Tasks.INFO_TO_JOB_KEY.apply(canonicalTask));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
new file mode 100644
index 0000000..0ad9e13
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
@@ -0,0 +1,257 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.async;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Ticker;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Iterables;
+import com.google.common.eventbus.Subscribe;
+import com.google.inject.BindingAnnotation;
+
+import org.apache.mesos.Protos.Offer;
+import org.apache.mesos.Protos.SlaveID;
+import org.apache.mesos.Protos.TaskInfo;
+
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.events.PubsubEvent.EventSubscriber;
+import com.twitter.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+import com.twitter.aurora.scheduler.state.StateManager;
+import com.twitter.aurora.scheduler.state.TaskAssigner;
+import com.twitter.aurora.scheduler.storage.Storage;
+import com.twitter.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import com.twitter.aurora.scheduler.storage.Storage.MutateWork;
+import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
+import com.twitter.common.inject.TimedInterceptor.Timed;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.StatImpl;
+import com.twitter.common.stats.Stats;
+import com.twitter.common.util.Clock;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import static com.twitter.aurora.gen.ScheduleStatus.LOST;
+import static com.twitter.aurora.gen.ScheduleStatus.PENDING;
+
+/**
+ * Enables scheduling and preemption of tasks.
+ */
+interface TaskScheduler extends EventSubscriber {
+
+ /**
+ * Attempts to schedule a task, possibly performing irreversible actions.
+ *
+ * @param taskId The task to attempt to schedule.
+ * @return SUCCESS if the task was scheduled, TRY_AGAIN otherwise. The caller should call schedule
+ * again if TRY_AGAIN is returned.
+ */
+ TaskSchedulerResult schedule(String taskId);
+
+ enum TaskSchedulerResult {
+ SUCCESS,
+ TRY_AGAIN
+ }
+
+ /**
+ * An asynchronous task scheduler. Scheduling of tasks is performed on a delay, where each task
+ * backs off after a failed scheduling attempt.
+ * <p>
+ * Pending tasks are advertised to the scheduler via internal pubsub notifications.
+ */
+ class TaskSchedulerImpl implements TaskScheduler {
+ /**
+ * Binding annotation for the time duration of reservations
+ */
+ @BindingAnnotation
+ @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+ @interface ReservationDuration { }
+
+ private static final Logger LOG = Logger.getLogger(TaskSchedulerImpl.class.getName());
+
+ private final Storage storage;
+ private final StateManager stateManager;
+ private final TaskAssigner assigner;
+ private final OfferQueue offerQueue;
+ private final Preemptor preemptor;
+ private final Reservations reservations;
+
+ private final AtomicLong scheduleAttemptsFired = Stats.exportLong("schedule_attempts_fired");
+ private final AtomicLong scheduleAttemptsFailed = Stats.exportLong("schedule_attempts_failed");
+
+ @Inject
+ TaskSchedulerImpl(
+ Storage storage,
+ StateManager stateManager,
+ TaskAssigner assigner,
+ OfferQueue offerQueue,
+ Preemptor preemptor,
+ @ReservationDuration Amount<Long, Time> reservationDuration,
+ final Clock clock) {
+
+ this.storage = checkNotNull(storage);
+ this.stateManager = checkNotNull(stateManager);
+ this.assigner = checkNotNull(assigner);
+ this.offerQueue = checkNotNull(offerQueue);
+ this.preemptor = checkNotNull(preemptor);
+ this.reservations = new Reservations(reservationDuration, clock);
+ }
+
+ private Function<Offer, Optional<TaskInfo>> getAssignerFunction(
+ final String taskId,
+ final IScheduledTask task) {
+
+ return new Function<Offer, Optional<TaskInfo>>() {
+ @Override public Optional<TaskInfo> apply(Offer offer) {
+ Optional<String> reservedTaskId = reservations.getSlaveReservation(offer.getSlaveId());
+ if (reservedTaskId.isPresent()) {
+ if (taskId.equals(reservedTaskId.get())) {
+ // Slave is reserved to satisfy this task.
+ return assigner.maybeAssign(offer, task);
+ } else {
+ // Slave is reserved for another task.
+ return Optional.absent();
+ }
+ } else {
+ // Slave is not reserved.
+ return assigner.maybeAssign(offer, task);
+ }
+ }
+ };
+ }
+
+ @VisibleForTesting
+ static final Optional<String> LAUNCH_FAILED_MSG =
+ Optional.of("Unknown exception attempting to schedule task.");
+
+ @Timed("task_schedule_attempt")
+ @Override
+ public TaskSchedulerResult schedule(final String taskId) {
+ scheduleAttemptsFired.incrementAndGet();
+ try {
+ return storage.write(new MutateWork.Quiet<TaskSchedulerResult>() {
+ @Override public TaskSchedulerResult apply(MutableStoreProvider store) {
+ LOG.fine("Attempting to schedule task " + taskId);
+ Query.Builder pendingTaskQuery = Query.taskScoped(taskId).byStatus(PENDING);
+ final IScheduledTask task =
+ Iterables.getOnlyElement(store.getTaskStore().fetchTasks(pendingTaskQuery), null);
+ if (task == null) {
+ LOG.warning("Failed to look up task " + taskId + ", it may have been deleted.");
+ } else {
+ try {
+ if (!offerQueue.launchFirst(getAssignerFunction(taskId, task))) {
+ // Task could not be scheduled.
+ maybePreemptFor(taskId);
+ return TaskSchedulerResult.TRY_AGAIN;
+ }
+ } catch (OfferQueue.LaunchException e) {
+ LOG.log(Level.WARNING, "Failed to launch task.", e);
+ scheduleAttemptsFailed.incrementAndGet();
+
+ // The attempt to schedule the task failed, so we need to backpedal on the
+ // assignment.
+ // It is in the LOST state and a new task will move to PENDING to replace it.
+ // Should the state change fail due to storage issues, that's okay. The task will
+ // time out in the ASSIGNED state and be moved to LOST.
+ stateManager.changeState(pendingTaskQuery, LOST, LAUNCH_FAILED_MSG);
+ }
+ }
+
+ return TaskSchedulerResult.SUCCESS;
+ }
+ });
+ } catch (RuntimeException e) {
+ // We catch the generic unchecked exception here to ensure tasks are not abandoned
+ // if there is a transient issue resulting in an unchecked exception.
+ LOG.log(Level.WARNING, "Task scheduling unexpectedly failed, will be retried", e);
+ scheduleAttemptsFailed.incrementAndGet();
+ return TaskSchedulerResult.TRY_AGAIN;
+ }
+ }
+
+ private void maybePreemptFor(String taskId) {
+ if (reservations.hasReservationForTask(taskId)) {
+ return;
+ }
+ Optional<String> slaveId = preemptor.findPreemptionSlotFor(taskId);
+ if (slaveId.isPresent()) {
+ this.reservations.add(SlaveID.newBuilder().setValue(slaveId.get()).build(), taskId);
+ }
+ }
+
+ @Subscribe
+ public void taskChanged(final TaskStateChange stateChangeEvent) {
+ if (stateChangeEvent.getOldState() == PENDING) {
+ reservations.invalidateTask(stateChangeEvent.getTaskId());
+ }
+ }
+
+ private static class Reservations {
+ private final Cache<SlaveID, String> reservations;
+
+ Reservations(final Amount<Long, Time> duration, final Clock clock) {
+ checkNotNull(duration);
+ checkNotNull(clock);
+ this.reservations = CacheBuilder.newBuilder()
+ .expireAfterWrite(duration.as(Time.MINUTES), TimeUnit.MINUTES)
+ .ticker(new Ticker() {
+ @Override public long read() {
+ return clock.nowNanos();
+ }
+ })
+ .build();
+ Stats.export(new StatImpl<Long>("reservation_cache_size") {
+ @Override public Long read() {
+ return reservations.size();
+ }
+ });
+ }
+
+ private synchronized void add(SlaveID slaveId, String taskId) {
+ reservations.put(slaveId, taskId);
+ }
+
+ private synchronized boolean hasReservationForTask(String taskId) {
+ return reservations.asMap().containsValue(taskId);
+ }
+
+ private synchronized Optional<String> getSlaveReservation(SlaveID slaveID) {
+ return Optional.fromNullable(reservations.getIfPresent(slaveID));
+ }
+
+ private synchronized void invalidateTask(String taskId) {
+ reservations.asMap().values().remove(taskId);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java b/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java
new file mode 100644
index 0000000..19848c7
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java
@@ -0,0 +1,269 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.async;
+
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Objects;
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.base.Supplier;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Ordering;
+import com.google.common.eventbus.Subscribe;
+
+import com.twitter.aurora.gen.ScheduleStatus;
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.base.Tasks;
+import com.twitter.aurora.scheduler.events.PubsubEvent.EventSubscriber;
+import com.twitter.aurora.scheduler.events.PubsubEvent.StorageStarted;
+import com.twitter.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+import com.twitter.aurora.scheduler.state.StateManager;
+import com.twitter.aurora.scheduler.storage.Storage;
+import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.StatsProvider;
+import com.twitter.common.util.Clock;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Observes task transitions and identifies tasks that are 'stuck' in a transient state. Stuck
+ * tasks will be transitioned to the LOST state.
+ */
+class TaskTimeout implements EventSubscriber {
+ private static final Logger LOG = Logger.getLogger(TaskTimeout.class.getName());
+
+ @VisibleForTesting
+ static final String TIMED_OUT_TASKS_COUNTER = "timed_out_tasks";
+
+ @VisibleForTesting
+ static final String TRANSIENT_COUNT_STAT_NAME = "transient_states";
+
+ @VisibleForTesting
+ static final Optional<String> TIMEOUT_MESSAGE = Optional.of("Task timed out");
+
+ @VisibleForTesting
+ static final Set<ScheduleStatus> TRANSIENT_STATES = EnumSet.of(
+ ScheduleStatus.ASSIGNED,
+ ScheduleStatus.PREEMPTING,
+ ScheduleStatus.RESTARTING,
+ ScheduleStatus.KILLING);
+
+ @VisibleForTesting
+ static final Query.Builder TRANSIENT_QUERY = Query.unscoped().byStatus(TRANSIENT_STATES);
+
+ private final Map<TimeoutKey, Context> futures = Maps.newConcurrentMap();
+
+ private static final class TimeoutKey {
+ private final String taskId;
+ private final ScheduleStatus status;
+
+ private TimeoutKey(String taskId, ScheduleStatus status) {
+ this.taskId = taskId;
+ this.status = status;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(taskId, status);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof TimeoutKey)) {
+ return false;
+ }
+ TimeoutKey key = (TimeoutKey) o;
+ return Objects.equal(taskId, key.taskId)
+ && (status == key.status);
+ }
+
+ @Override
+ public String toString() {
+ return taskId + ":" + status;
+ }
+ }
+
+ private final Storage storage;
+ private final ScheduledExecutorService executor;
+ private final StateManager stateManager;
+ private final long timeoutMillis;
+ private final Clock clock;
+ private final AtomicLong timedOutTasks;
+
+ @Inject
+ TaskTimeout(
+ Storage storage,
+ ScheduledExecutorService executor,
+ StateManager stateManager,
+ final Clock clock,
+ Amount<Long, Time> timeout,
+ StatsProvider statsProvider) {
+
+ this.storage = checkNotNull(storage);
+ this.executor = checkNotNull(executor);
+ this.stateManager = checkNotNull(stateManager);
+ this.timeoutMillis = timeout.as(Time.MILLISECONDS);
+ this.clock = checkNotNull(clock);
+ this.timedOutTasks = statsProvider.makeCounter(TIMED_OUT_TASKS_COUNTER);
+
+ exportStats(statsProvider);
+ }
+
+ private void registerTimeout(TimeoutKey key) {
+ // This is an obvious check-then-act, but:
+ // - there isn't much of a better option, given that we have to get the Future before
+ // inserting into the map
+ // - a key collision only happens in practice if something is wrong externally to this class
+ // (double event for the same state)
+ // - the outcome is low-risk, we would wind up with a redundant Future that will eventually
+ // no-op
+ if (!futures.containsKey(key)) {
+ Future<?> timeoutHandler = executor.schedule(
+ new TimedOutTaskHandler(key),
+ timeoutMillis,
+ TimeUnit.MILLISECONDS);
+ futures.put(key, new Context(clock.nowMillis(), timeoutHandler));
+ }
+ }
+
+ private static boolean isTransient(ScheduleStatus status) {
+ return TRANSIENT_STATES.contains(status);
+ }
+
+ @Subscribe
+ public void recordStateChange(TaskStateChange change) {
+ String taskId = change.getTaskId();
+ ScheduleStatus newState = change.getNewState();
+ if (isTransient(change.getOldState())) {
+ TimeoutKey oldKey = new TimeoutKey(taskId, change.getOldState());
+ Context context = futures.remove(oldKey);
+ if (context != null) {
+ LOG.fine("Canceling state timeout for task " + oldKey);
+ context.future.cancel(false);
+ }
+ }
+
+ if (isTransient(newState)) {
+ registerTimeout(new TimeoutKey(taskId, change.getNewState()));
+ }
+ }
+
+ @Subscribe
+ public void storageStarted(StorageStarted event) {
+ for (IScheduledTask task : Storage.Util.consistentFetchTasks(storage, TRANSIENT_QUERY)) {
+ registerTimeout(new TimeoutKey(Tasks.id(task), task.getStatus()));
+ }
+ }
+
+ private class TimedOutTaskHandler implements Runnable {
+ private final TimeoutKey key;
+
+ TimedOutTaskHandler(TimeoutKey key) {
+ this.key = key;
+ }
+
+ @Override public void run() {
+ Context context = futures.get(key);
+ try {
+ if (context == null) {
+ LOG.warning("Timeout context not found for " + key);
+ return;
+ }
+
+ LOG.info("Timeout reached for task " + key);
+ // This query acts as a CAS by including the state that we expect the task to be in if the
+ // timeout is still valid. Ideally, the future would have already been canceled, but in the
+ // event of a state transition race, including transientState prevents an unintended
+ // task timeout.
+ Query.Builder query = Query.taskScoped(key.taskId).byStatus(key.status);
+ // Note: This requires LOST transitions trigger Driver.killTask.
+ if (stateManager.changeState(query, ScheduleStatus.LOST, TIMEOUT_MESSAGE) > 0) {
+ timedOutTasks.incrementAndGet();
+ } else {
+ LOG.warning("Task " + key + " does not exist, or was not in the expected state.");
+ }
+ } finally {
+ futures.remove(key);
+ }
+ }
+ }
+
+ private class Context {
+ private final long timestampMillis;
+ private final Future<?> future;
+
+ Context(long timestampMillis, Future<?> future) {
+ this.timestampMillis = timestampMillis;
+ this.future = future;
+ }
+ }
+
+ private static final Function<Context, Long> CONTEXT_TIMESTAMP = new Function<Context, Long>() {
+ @Override public Long apply(Context context) {
+ return context.timestampMillis;
+ }
+ };
+
+ private static final Ordering<Context> TIMESTAMP_ORDER =
+ Ordering.natural().onResultOf(CONTEXT_TIMESTAMP);
+
+ @VisibleForTesting
+ static String waitingTimeStatName(ScheduleStatus status) {
+ return "scheduler_max_" + status + "_waiting_ms";
+ }
+
+ private void exportStats(StatsProvider statsProvider) {
+ statsProvider.makeGauge(TRANSIENT_COUNT_STAT_NAME, new Supplier<Number>() {
+ @Override public Number get() {
+ return futures.size();
+ }
+ });
+
+ for (final ScheduleStatus status : TRANSIENT_STATES) {
+ statsProvider.makeGauge(waitingTimeStatName(status), new Supplier<Number>() {
+ private final Predicate<TimeoutKey> statusMatcher = new Predicate<TimeoutKey>() {
+ @Override public boolean apply(TimeoutKey key) {
+ return key.status == status;
+ }
+ };
+
+ @Override public Number get() {
+ Iterable<Context> matches = Maps.filterKeys(futures, statusMatcher).values();
+ if (Iterables.isEmpty(matches)) {
+ return 0L;
+ } else {
+ return clock.nowMillis() - TIMESTAMP_ORDER.min(matches).timestampMillis;
+ }
+ }
+ });
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/base/CommandUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/base/CommandUtil.java b/src/main/java/org/apache/aurora/scheduler/base/CommandUtil.java
new file mode 100644
index 0000000..b11c683
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/base/CommandUtil.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.base;
+
+import org.apache.mesos.Protos.CommandInfo;
+import org.apache.mesos.Protos.CommandInfo.URI;
+
+import com.twitter.common.base.MorePreconditions;
+
+/**
+ * Utility class for constructing {@link CommandInfo} objects given an executor URI.
+ */
+public final class CommandUtil {
+
+ private CommandUtil() {
+ // Utility class.
+ }
+
+ private static String uriBasename(String uri) {
+ int lastSlash = uri.lastIndexOf("/");
+ if (lastSlash == -1) {
+ return uri;
+ } else {
+ String basename = uri.substring(lastSlash + 1);
+ MorePreconditions.checkNotBlank(basename, "URI must not end with a slash.");
+
+ return basename;
+ }
+ }
+
+ /**
+ * Creates a description of a command that will fetch and execute the given URI to an executor
+ * binary.
+ *
+ * @param executorUri URI to the executor.
+ * @return A command that will fetch and execute the executor.
+ */
+ public static CommandInfo create(String executorUri) {
+ MorePreconditions.checkNotBlank(executorUri);
+
+ return CommandInfo.newBuilder()
+ .addUris(URI.newBuilder().setValue(executorUri).setExecutable(true))
+ .setValue("./" + uriBasename(executorUri))
+ .build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/base/Conversions.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/base/Conversions.java b/src/main/java/org/apache/aurora/scheduler/base/Conversions.java
new file mode 100644
index 0000000..2f84b5c
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/base/Conversions.java
@@ -0,0 +1,135 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.base;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.logging.Logger;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicates;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.Offer;
+import org.apache.mesos.Protos.TaskState;
+
+import com.twitter.aurora.gen.Attribute;
+import com.twitter.aurora.gen.HostAttributes;
+import com.twitter.aurora.gen.ScheduleStatus;
+
+/**
+ * Collection of utility functions to convert mesos protobuf types to internal thrift types.
+ */
+public final class Conversions {
+
+ private static final Logger LOG = Logger.getLogger(Conversions.class.getName());
+
+ private Conversions() {
+ // Utility class.
+ }
+
+ // Maps from mesos state to scheduler interface state.
+ private static final Map<TaskState, ScheduleStatus> STATE_TRANSLATION =
+ new ImmutableMap.Builder<TaskState, ScheduleStatus>()
+ .put(TaskState.TASK_STARTING, ScheduleStatus.STARTING)
+ .put(TaskState.TASK_RUNNING, ScheduleStatus.RUNNING)
+ .put(TaskState.TASK_FINISHED, ScheduleStatus.FINISHED)
+ .put(TaskState.TASK_FAILED, ScheduleStatus.FAILED)
+ .put(TaskState.TASK_KILLED, ScheduleStatus.KILLED)
+ .put(TaskState.TASK_LOST, ScheduleStatus.LOST)
+ .build();
+
+ /**
+ * Converts a protobuf state to an internal schedule status.
+ *
+ * @param taskState Protobuf state.
+ * @return Equivalent thrift-generated state.
+ */
+ public static ScheduleStatus convertProtoState(TaskState taskState) {
+ ScheduleStatus status = STATE_TRANSLATION.get(taskState);
+ Preconditions.checkArgument(status != null, "Unrecognized task state " + taskState);
+ return status;
+ }
+
+ private static final Function<Protos.Attribute, String> ATTRIBUTE_NAME =
+ new Function<Protos.Attribute, String>() {
+ @Override public String apply(Protos.Attribute attr) {
+ return attr.getName();
+ }
+ };
+
+ /**
+ * Typedef to make anonymous implementation more concise.
+ */
+ private abstract static class AttributeConverter
+ implements Function<Entry<String, Collection<Protos.Attribute>>, Attribute> {
+ }
+
+ private static final Function<Protos.Attribute, String> VALUE_CONVERTER =
+ new Function<Protos.Attribute, String>() {
+ @Override public String apply(Protos.Attribute attribute) {
+ switch (attribute.getType()) {
+ case SCALAR:
+ return String.valueOf(attribute.getScalar().getValue());
+
+ case TEXT:
+ return attribute.getText().getValue();
+
+ default:
+ LOG.finest("Unrecognized attribute type:" + attribute.getType() + " , ignoring.");
+ return null;
+ }
+ }
+ };
+
+ private static final AttributeConverter ATTRIBUTE_CONVERTER = new AttributeConverter() {
+ @Override public Attribute apply(Entry<String, Collection<Protos.Attribute>> entry) {
+ // Convert values and filter any that were ignored.
+ return new Attribute(
+ entry.getKey(),
+ FluentIterable.from(entry.getValue())
+ .transform(VALUE_CONVERTER)
+ .filter(Predicates.notNull())
+ .toSet());
+ }
+ };
+
+ /**
+ * Converts protobuf attributes into thrift-generated attributes.
+ *
+ * @param offer Resource offer.
+ * @return Equivalent thrift host attributes.
+ */
+ public static HostAttributes getAttributes(Offer offer) {
+ // Group by attribute name.
+ Multimap<String, Protos.Attribute> valuesByName =
+ Multimaps.index(offer.getAttributesList(), ATTRIBUTE_NAME);
+
+ // TODO(William Farner): Include slave id.
+ return new HostAttributes(
+ offer.getHostname(),
+ FluentIterable.from(valuesByName.asMap().entrySet())
+ .transform(ATTRIBUTE_CONVERTER)
+ .toSet())
+ .setSlaveId(offer.getSlaveId().getValue());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/base/JobKeys.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/base/JobKeys.java b/src/main/java/org/apache/aurora/scheduler/base/JobKeys.java
new file mode 100644
index 0000000..008e1cb
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/base/JobKeys.java
@@ -0,0 +1,166 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.base;
+
+import javax.annotation.Nullable;
+
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
+import com.google.common.base.Optional;
+import com.google.common.base.Strings;
+
+import com.twitter.aurora.gen.JobKey;
+import com.twitter.aurora.gen.TaskQuery;
+import com.twitter.aurora.scheduler.storage.entities.IJobConfiguration;
+import com.twitter.aurora.scheduler.storage.entities.IJobKey;
+import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * Utility class providing convenience functions relating to JobKeys.
+ */
+public final class JobKeys {
+ private JobKeys() {
+ // Utility class.
+ }
+
+ public static final Function<IJobConfiguration, IJobKey> FROM_CONFIG =
+ new Function<IJobConfiguration, IJobKey>() {
+ @Override public IJobKey apply(IJobConfiguration job) {
+ return job.getKey();
+ }
+ };
+
+ public static final Function<IJobKey, String> TO_ROLE =
+ new Function<IJobKey, String>() {
+ @Override public String apply(IJobKey jobKey) {
+ return jobKey.getRole();
+ }
+ };
+
+ public static final Function<IJobKey, String> TO_ENVIRONMENT =
+ new Function<IJobKey, String>() {
+ @Override public String apply(IJobKey jobKey) {
+ return jobKey.getEnvironment();
+ }
+ };
+
+ public static final Function<IJobKey, String> TO_JOB_NAME =
+ new Function<IJobKey, String>() {
+ @Override public String apply(IJobKey jobKey) {
+ return jobKey.getName();
+ }
+ };
+
+ public static final Function<IJobConfiguration, String> CONFIG_TO_ROLE =
+ Functions.compose(TO_ROLE, FROM_CONFIG);
+
+ /**
+ * Check that a jobKey struct is valid.
+ *
+ * @param jobKey The jobKey to validate.
+ * @return {@code true} if the jobKey validates.
+ */
+ public static boolean isValid(@Nullable IJobKey jobKey) {
+ return jobKey != null
+ && !Strings.isNullOrEmpty(jobKey.getRole())
+ && !Strings.isNullOrEmpty(jobKey.getEnvironment())
+ && !Strings.isNullOrEmpty(jobKey.getName());
+ }
+
+ /**
+ * Assert that a jobKey struct is valid.
+ *
+ * @param jobKey The key struct to validate.
+ * @return The validated jobKey argument.
+ * @throws IllegalArgumentException if the key struct fails to validate.
+ */
+ public static IJobKey assertValid(IJobKey jobKey) throws IllegalArgumentException {
+ checkArgument(isValid(jobKey));
+
+ return jobKey;
+ }
+
+ /**
+ * Attempt to create a valid JobKey from the given (role, environment, name) triple.
+ *
+ * @param role The job's role.
+ * @param environment The job's environment.
+ * @param name The job's name.
+ * @return A valid JobKey if it can be created.
+ * @throws IllegalArgumentException if the key fails to validate.
+ */
+ public static IJobKey from(String role, String environment, String name)
+ throws IllegalArgumentException {
+
+ IJobKey job = IJobKey.build(new JobKey()
+ .setRole(role)
+ .setEnvironment(environment)
+ .setName(name));
+ return assertValid(job);
+ }
+
+ /**
+ * Attempts to create a valid JobKey from the given task.
+ *
+ * @param task The task to create job key from.
+ * @return A valid JobKey if it can be created.
+ * @throws IllegalArgumentException if the key fails to validate.
+ */
+ public static IJobKey from(ITaskConfig task) throws IllegalArgumentException {
+ return from(task.getOwner().getRole(), task.getEnvironment(), task.getJobName());
+ }
+
+ /**
+ * Create a "/"-delimited String representation of a job key, suitable for logging but not
+ * necessarily suitable for use as a unique identifier.
+ *
+ * @param jobKey Key to represent.
+ * @return "/"-delimited representation of the key.
+ */
+ public static String toPath(IJobKey jobKey) {
+ return jobKey.getRole() + "/" + jobKey.getEnvironment() + "/" + jobKey.getName();
+ }
+
+ /**
+ * Create a "/"-delimited String representation of job key, suitable for logging but not
+ * necessarily suitable for use as a unique identifier.
+ *
+ * @param job Job to represent.
+ * @return "/"-delimited representation of the job's key.
+ */
+ public static String toPath(IJobConfiguration job) {
+ return toPath(job.getKey());
+ }
+
+ /**
+ * Attempt to extract a job key from the given query if it is scoped to a single job.
+ *
+ * @param query Query to extract the key from.
+ * @return A present if one can be extracted, absent otherwise.
+ */
+ public static Optional<IJobKey> from(Query.Builder query) {
+ if (Query.isJobScoped(query)) {
+ TaskQuery taskQuery = query.get();
+ return Optional.of(
+ from(taskQuery.getOwner().getRole(), taskQuery.getEnvironment(), taskQuery.getJobName()));
+
+ } else {
+ return Optional.absent();
+ }
+ }
+}