You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2014/01/29 21:16:00 UTC

git commit: More updates.

Updated Branches:
  refs/heads/wfarner/AURORA-139 ab5395b3e -> 13506c73d


More updates.


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/13506c73
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/13506c73
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/13506c73

Branch: refs/heads/wfarner/AURORA-139
Commit: 13506c73dca539644f5197f00aac96aa1ca2d4ad
Parents: ab5395b
Author: Bill Farner <wf...@apache.org>
Authored: Wed Jan 29 12:15:49 2014 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Wed Jan 29 12:15:49 2014 -0800

----------------------------------------------------------------------
 .../aurora/scheduler/async/AsyncModule.java     |  6 ++++-
 .../aurora/scheduler/async/Preemptor.java       | 20 +++++++++++-----
 .../aurora/scheduler/async/TaskScheduler.java   | 25 +++++++++-----------
 .../events/NotifyingSchedulingFilter.java       | 11 +++++++--
 4 files changed, 39 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/13506c73/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
index 72d3621..430ff03 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
@@ -49,6 +49,7 @@ import org.apache.aurora.scheduler.async.RescheduleCalculator.RescheduleCalculat
 import org.apache.aurora.scheduler.async.TaskGroups.TaskGroupsSettings;
 import org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl;
 import org.apache.aurora.scheduler.events.PubsubEventModule;
+import org.apache.aurora.scheduler.filter.CachedJobState;
 
 import static java.lang.annotation.ElementType.FIELD;
 import static java.lang.annotation.ElementType.METHOD;
@@ -130,7 +131,10 @@ public class AsyncModule extends AbstractModule {
   private static final Arg<Boolean> ENABLE_PREEMPTOR = Arg.create(true);
 
   private static final Preemptor NULL_PREEMPTOR = new Preemptor() {
-    @Override public Optional<String> findPreemptionSlotFor(String taskId) {
+    @Override public Optional<String> findPreemptionSlotFor(
+        String taskId,
+        CachedJobState cachedJobState) {
+
       return Optional.absent();
     }
   };

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/13506c73/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java b/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
index c11f483..b59678e 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
@@ -48,6 +48,7 @@ import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.scheduler.ResourceSlot;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.filter.CachedJobState;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.storage.Storage;
@@ -77,7 +78,7 @@ public interface Preemptor {
    * @param taskId ID of the preempting task.
    * @return ID of the slave where preemption occured.
    */
-  Optional<String> findPreemptionSlotFor(String taskId);
+  Optional<String> findPreemptionSlotFor(String taskId, CachedJobState cachedJobState);
 
   /**
    * A task preemptor that tries to find tasks that are waiting to be scheduled, which are of higher
@@ -215,7 +216,8 @@ public interface Preemptor {
     private Optional<Set<IAssignedTask>> getTasksToPreempt(
         Iterable<IAssignedTask> possibleVictims,
         Iterable<Offer> offers,
-        IAssignedTask pendingTask) {
+        IAssignedTask pendingTask,
+        CachedJobState cachedJobState) {
 
       // This enforces the precondition that all of the resources are from the same host. We need to
       // get the host for the schedulingFilter.
@@ -233,7 +235,8 @@ public interface Preemptor {
             slackResources,
             host,
             pendingTask.getTask(),
-            pendingTask.getTaskId());
+            pendingTask.getTaskId(),
+            cachedJobState);
 
         if (vetos.isEmpty()) {
           return Optional.<Set<IAssignedTask>>of(ImmutableSet.<IAssignedTask>of());
@@ -262,7 +265,8 @@ public interface Preemptor {
             totalResource,
             host,
             pendingTask.getTask(),
-            pendingTask.getTaskId());
+            pendingTask.getTaskId(),
+            cachedJobState);
 
         if (vetos.isEmpty()) {
           return Optional.<Set<IAssignedTask>>of(ImmutableSet.copyOf(toPreemptTasks));
@@ -290,7 +294,10 @@ public interface Preemptor {
     }
 
     @Override
-    public synchronized Optional<String> findPreemptionSlotFor(String taskId) {
+    public synchronized Optional<String> findPreemptionSlotFor(
+        String taskId,
+        CachedJobState cachedJobState) {
+
       List<IAssignedTask> pendingTasks =
           fetch(Query.statusScoped(PENDING).byId(taskId), isIdleTask);
 
@@ -322,7 +329,8 @@ public interface Preemptor {
         Optional<Set<IAssignedTask>> toPreemptTasks = getTasksToPreempt(
             slavesToActiveTasks.get(slaveID),
             slavesToOffers.get(slaveID),
-            pendingTask);
+            pendingTask,
+            cachedJobState);
 
         if (toPreemptTasks.isPresent()) {
           for (IAssignedTask toPreempt : toPreemptTasks.get()) {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/13506c73/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
index 75843ad..f7aedc8 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
@@ -53,7 +53,6 @@ import org.apache.aurora.scheduler.state.TaskAssigner;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork;
-import org.apache.aurora.scheduler.storage.TaskStore;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.mesos.Protos.Offer;
 import org.apache.mesos.Protos.SlaveID;
@@ -132,18 +131,11 @@ interface TaskScheduler extends EventSubscriber {
       this.reservations = new Reservations(reservationDuration, clock);
     }
 
-    private static final Iterable<ScheduleStatus> ACTIVE_NOT_PENDING_STATES =
-        EnumSet.copyOf(Sets.difference(Tasks.ACTIVE_STATES, EnumSet.of(ScheduleStatus.PENDING)));
-
     private Function<Offer, Optional<TaskInfo>> getAssignerFunction(
-        TaskStore taskStore,
+        final CachedJobState cachedJobState,
         final String taskId,
         final IScheduledTask task) {
 
-      final CachedJobState cachedJobState = new CachedJobState(taskStore.fetchTasks(
-          Query.jobScoped(Tasks.SCHEDULED_TO_JOB_KEY.apply(task))
-              .byStatus(ACTIVE_NOT_PENDING_STATES)));
-
       return new Function<Offer, Optional<TaskInfo>>() {
         @Override public Optional<TaskInfo> apply(Offer offer) {
           Optional<String> reservedTaskId = reservations.getSlaveReservation(offer.getSlaveId());
@@ -167,6 +159,9 @@ interface TaskScheduler extends EventSubscriber {
     static final Optional<String> LAUNCH_FAILED_MSG =
         Optional.of("Unknown exception attempting to schedule task.");
 
+    private static final Iterable<ScheduleStatus> ACTIVE_NOT_PENDING_STATES =
+        EnumSet.copyOf(Sets.difference(Tasks.ACTIVE_STATES, EnumSet.of(ScheduleStatus.PENDING)));
+
     @Timed("task_schedule_attempt")
     @Override
     public TaskSchedulerResult schedule(final String taskId) {
@@ -181,11 +176,13 @@ interface TaskScheduler extends EventSubscriber {
             if (task == null) {
               LOG.warning("Failed to look up task " + taskId + ", it may have been deleted.");
             } else {
+              final CachedJobState cachedJobState = new CachedJobState(store.getTaskStore()
+                  .fetchTasks(Query.jobScoped(Tasks.SCHEDULED_TO_JOB_KEY.apply(task))
+                      .byStatus(ACTIVE_NOT_PENDING_STATES)));
               try {
-                if (!offerQueue.launchFirst(
-                    getAssignerFunction(store.getTaskStore(), taskId, task))) {
+                if (!offerQueue.launchFirst(getAssignerFunction(cachedJobState, taskId, task))) {
                   // Task could not be scheduled.
-                  maybePreemptFor(taskId);
+                  maybePreemptFor(taskId, cachedJobState);
                   return TaskSchedulerResult.TRY_AGAIN;
                 }
               } catch (OfferQueue.LaunchException e) {
@@ -213,11 +210,11 @@ interface TaskScheduler extends EventSubscriber {
       }
     }
 
-    private void maybePreemptFor(String taskId) {
+    private void maybePreemptFor(String taskId, CachedJobState cachedJobState) {
       if (reservations.hasReservationForTask(taskId)) {
         return;
       }
-      Optional<String> slaveId = preemptor.findPreemptionSlotFor(taskId);
+      Optional<String> slaveId = preemptor.findPreemptionSlotFor(taskId, cachedJobState);
       if (slaveId.isPresent()) {
         this.reservations.add(SlaveID.newBuilder().setValue(slaveId.get()).build(), taskId);
       }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/13506c73/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java b/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java
index c7f4a1b..d2be7ac 100644
--- a/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java
+++ b/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java
@@ -25,6 +25,7 @@ import com.google.inject.BindingAnnotation;
 
 import org.apache.aurora.scheduler.ResourceSlot;
 import org.apache.aurora.scheduler.events.PubsubEvent.Vetoed;
+import org.apache.aurora.scheduler.filter.CachedJobState;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 
@@ -60,8 +61,14 @@ class NotifyingSchedulingFilter implements SchedulingFilter {
   }
 
   @Override
-  public Set<Veto> filter(ResourceSlot offer, String slaveHost, ITaskConfig task, String taskId) {
-    Set<Veto> vetoes = delegate.filter(offer, slaveHost, task, taskId);
+  public Set<Veto> filter(
+      ResourceSlot offer,
+      String slaveHost,
+      ITaskConfig task,
+      String taskId,
+      CachedJobState jobState) {
+
+    Set<Veto> vetoes = delegate.filter(offer, slaveHost, task, taskId, jobState);
     if (!vetoes.isEmpty()) {
       eventSink.post(new Vetoed(taskId, vetoes));
     }