You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2014/01/29 21:16:00 UTC
git commit: More updates.
Updated Branches:
refs/heads/wfarner/AURORA-139 ab5395b3e -> 13506c73d
More updates.
Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/13506c73
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/13506c73
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/13506c73
Branch: refs/heads/wfarner/AURORA-139
Commit: 13506c73dca539644f5197f00aac96aa1ca2d4ad
Parents: ab5395b
Author: Bill Farner <wf...@apache.org>
Authored: Wed Jan 29 12:15:49 2014 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Wed Jan 29 12:15:49 2014 -0800
----------------------------------------------------------------------
.../aurora/scheduler/async/AsyncModule.java | 6 ++++-
.../aurora/scheduler/async/Preemptor.java | 20 +++++++++++-----
.../aurora/scheduler/async/TaskScheduler.java | 25 +++++++++-----------
.../events/NotifyingSchedulingFilter.java | 11 +++++++--
4 files changed, 39 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/13506c73/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
index 72d3621..430ff03 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
@@ -49,6 +49,7 @@ import org.apache.aurora.scheduler.async.RescheduleCalculator.RescheduleCalculat
import org.apache.aurora.scheduler.async.TaskGroups.TaskGroupsSettings;
import org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl;
import org.apache.aurora.scheduler.events.PubsubEventModule;
+import org.apache.aurora.scheduler.filter.CachedJobState;
import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.METHOD;
@@ -130,7 +131,10 @@ public class AsyncModule extends AbstractModule {
private static final Arg<Boolean> ENABLE_PREEMPTOR = Arg.create(true);
private static final Preemptor NULL_PREEMPTOR = new Preemptor() {
- @Override public Optional<String> findPreemptionSlotFor(String taskId) {
+ @Override public Optional<String> findPreemptionSlotFor(
+ String taskId,
+ CachedJobState cachedJobState) {
+
return Optional.absent();
}
};
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/13506c73/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java b/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
index c11f483..b59678e 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
@@ -48,6 +48,7 @@ import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.scheduler.ResourceSlot;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.filter.CachedJobState;
import org.apache.aurora.scheduler.filter.SchedulingFilter;
import org.apache.aurora.scheduler.state.StateManager;
import org.apache.aurora.scheduler.storage.Storage;
@@ -77,7 +78,7 @@ public interface Preemptor {
* @param taskId ID of the preempting task.
* @return ID of the slave where preemption occured.
*/
- Optional<String> findPreemptionSlotFor(String taskId);
+ Optional<String> findPreemptionSlotFor(String taskId, CachedJobState cachedJobState);
/**
* A task preemptor that tries to find tasks that are waiting to be scheduled, which are of higher
@@ -215,7 +216,8 @@ public interface Preemptor {
private Optional<Set<IAssignedTask>> getTasksToPreempt(
Iterable<IAssignedTask> possibleVictims,
Iterable<Offer> offers,
- IAssignedTask pendingTask) {
+ IAssignedTask pendingTask,
+ CachedJobState cachedJobState) {
// This enforces the precondition that all of the resources are from the same host. We need to
// get the host for the schedulingFilter.
@@ -233,7 +235,8 @@ public interface Preemptor {
slackResources,
host,
pendingTask.getTask(),
- pendingTask.getTaskId());
+ pendingTask.getTaskId(),
+ cachedJobState);
if (vetos.isEmpty()) {
return Optional.<Set<IAssignedTask>>of(ImmutableSet.<IAssignedTask>of());
@@ -262,7 +265,8 @@ public interface Preemptor {
totalResource,
host,
pendingTask.getTask(),
- pendingTask.getTaskId());
+ pendingTask.getTaskId(),
+ cachedJobState);
if (vetos.isEmpty()) {
return Optional.<Set<IAssignedTask>>of(ImmutableSet.copyOf(toPreemptTasks));
@@ -290,7 +294,10 @@ public interface Preemptor {
}
@Override
- public synchronized Optional<String> findPreemptionSlotFor(String taskId) {
+ public synchronized Optional<String> findPreemptionSlotFor(
+ String taskId,
+ CachedJobState cachedJobState) {
+
List<IAssignedTask> pendingTasks =
fetch(Query.statusScoped(PENDING).byId(taskId), isIdleTask);
@@ -322,7 +329,8 @@ public interface Preemptor {
Optional<Set<IAssignedTask>> toPreemptTasks = getTasksToPreempt(
slavesToActiveTasks.get(slaveID),
slavesToOffers.get(slaveID),
- pendingTask);
+ pendingTask,
+ cachedJobState);
if (toPreemptTasks.isPresent()) {
for (IAssignedTask toPreempt : toPreemptTasks.get()) {
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/13506c73/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
index 75843ad..f7aedc8 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
@@ -53,7 +53,6 @@ import org.apache.aurora.scheduler.state.TaskAssigner;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
import org.apache.aurora.scheduler.storage.Storage.MutateWork;
-import org.apache.aurora.scheduler.storage.TaskStore;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.mesos.Protos.Offer;
import org.apache.mesos.Protos.SlaveID;
@@ -132,18 +131,11 @@ interface TaskScheduler extends EventSubscriber {
this.reservations = new Reservations(reservationDuration, clock);
}
- private static final Iterable<ScheduleStatus> ACTIVE_NOT_PENDING_STATES =
- EnumSet.copyOf(Sets.difference(Tasks.ACTIVE_STATES, EnumSet.of(ScheduleStatus.PENDING)));
-
private Function<Offer, Optional<TaskInfo>> getAssignerFunction(
- TaskStore taskStore,
+ final CachedJobState cachedJobState,
final String taskId,
final IScheduledTask task) {
- final CachedJobState cachedJobState = new CachedJobState(taskStore.fetchTasks(
- Query.jobScoped(Tasks.SCHEDULED_TO_JOB_KEY.apply(task))
- .byStatus(ACTIVE_NOT_PENDING_STATES)));
-
return new Function<Offer, Optional<TaskInfo>>() {
@Override public Optional<TaskInfo> apply(Offer offer) {
Optional<String> reservedTaskId = reservations.getSlaveReservation(offer.getSlaveId());
@@ -167,6 +159,9 @@ interface TaskScheduler extends EventSubscriber {
static final Optional<String> LAUNCH_FAILED_MSG =
Optional.of("Unknown exception attempting to schedule task.");
+ private static final Iterable<ScheduleStatus> ACTIVE_NOT_PENDING_STATES =
+ EnumSet.copyOf(Sets.difference(Tasks.ACTIVE_STATES, EnumSet.of(ScheduleStatus.PENDING)));
+
@Timed("task_schedule_attempt")
@Override
public TaskSchedulerResult schedule(final String taskId) {
@@ -181,11 +176,13 @@ interface TaskScheduler extends EventSubscriber {
if (task == null) {
LOG.warning("Failed to look up task " + taskId + ", it may have been deleted.");
} else {
+ final CachedJobState cachedJobState = new CachedJobState(store.getTaskStore()
+ .fetchTasks(Query.jobScoped(Tasks.SCHEDULED_TO_JOB_KEY.apply(task))
+ .byStatus(ACTIVE_NOT_PENDING_STATES)));
try {
- if (!offerQueue.launchFirst(
- getAssignerFunction(store.getTaskStore(), taskId, task))) {
+ if (!offerQueue.launchFirst(getAssignerFunction(cachedJobState, taskId, task))) {
// Task could not be scheduled.
- maybePreemptFor(taskId);
+ maybePreemptFor(taskId, cachedJobState);
return TaskSchedulerResult.TRY_AGAIN;
}
} catch (OfferQueue.LaunchException e) {
@@ -213,11 +210,11 @@ interface TaskScheduler extends EventSubscriber {
}
}
- private void maybePreemptFor(String taskId) {
+ private void maybePreemptFor(String taskId, CachedJobState cachedJobState) {
if (reservations.hasReservationForTask(taskId)) {
return;
}
- Optional<String> slaveId = preemptor.findPreemptionSlotFor(taskId);
+ Optional<String> slaveId = preemptor.findPreemptionSlotFor(taskId, cachedJobState);
if (slaveId.isPresent()) {
this.reservations.add(SlaveID.newBuilder().setValue(slaveId.get()).build(), taskId);
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/13506c73/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java b/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java
index c7f4a1b..d2be7ac 100644
--- a/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java
+++ b/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java
@@ -25,6 +25,7 @@ import com.google.inject.BindingAnnotation;
import org.apache.aurora.scheduler.ResourceSlot;
import org.apache.aurora.scheduler.events.PubsubEvent.Vetoed;
+import org.apache.aurora.scheduler.filter.CachedJobState;
import org.apache.aurora.scheduler.filter.SchedulingFilter;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
@@ -60,8 +61,14 @@ class NotifyingSchedulingFilter implements SchedulingFilter {
}
@Override
- public Set<Veto> filter(ResourceSlot offer, String slaveHost, ITaskConfig task, String taskId) {
- Set<Veto> vetoes = delegate.filter(offer, slaveHost, task, taskId);
+ public Set<Veto> filter(
+ ResourceSlot offer,
+ String slaveHost,
+ ITaskConfig task,
+ String taskId,
+ CachedJobState jobState) {
+
+ Set<Veto> vetoes = delegate.filter(offer, slaveHost, task, taskId, jobState);
if (!vetoes.isEmpty()) {
eventSink.post(new Vetoed(taskId, vetoes));
}