You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2014/01/17 04:22:22 UTC

git commit: Fix deadlock caused when Preemptor tries to secure SchedulerCoreImpl intrinsic lock.

Updated Branches:
  refs/heads/master ba25e6617 -> 9dd46666c


Fix deadlock caused when Preemptor tries to secure SchedulerCoreImpl intrinsic lock.

Bugs closed: AURORA-50

Reviewed at https://reviews.apache.org/r/17028/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/9dd46666
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/9dd46666
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/9dd46666

Branch: refs/heads/master
Commit: 9dd46666c3d666347f68d27fb528c8927732d5c1
Parents: ba25e66
Author: Bill Farner <wf...@apache.org>
Authored: Thu Jan 16 19:22:00 2014 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Thu Jan 16 19:22:00 2014 -0800

----------------------------------------------------------------------
 .../aurora/scheduler/async/Preemptor.java       | 35 ++++++---------
 .../aurora/scheduler/state/SchedulerCore.java   | 10 -----
 .../scheduler/state/SchedulerCoreImpl.java      | 14 ------
 .../scheduler/async/PreemptorImplTest.java      | 46 ++++++++++----------
 4 files changed, 37 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9dd46666/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java b/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
index b190a00..f344cb7 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
@@ -22,8 +22,6 @@ import java.util.EnumSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Level;
-import java.util.logging.Logger;
 
 import javax.inject.Inject;
 
@@ -46,12 +44,12 @@ import com.twitter.common.quantity.Time;
 import com.twitter.common.stats.Stats;
 import com.twitter.common.util.Clock;
 
+import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.scheduler.ResourceSlot;
 import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.base.ScheduleException;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
-import org.apache.aurora.scheduler.state.SchedulerCore;
+import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -103,8 +101,6 @@ public interface Preemptor {
     static final Query.Builder CANDIDATE_QUERY = Query.statusScoped(
         EnumSet.copyOf(Sets.difference(Tasks.ACTIVE_STATES, EnumSet.of(PENDING, PREEMPTING))));
 
-    private static final Logger LOG = Logger.getLogger(PreemptorImpl.class.getName());
-
     private static final Function<IAssignedTask, Integer> GET_PRIORITY =
         new Function<IAssignedTask, Integer>() {
           @Override public Integer apply(IAssignedTask task) {
@@ -113,7 +109,6 @@ public interface Preemptor {
         };
 
     private final AtomicLong tasksPreempted = Stats.exportLong("preemptor_tasks_preempted");
-    private final AtomicLong failedPreemptions = Stats.exportLong("preemptor_failed_preemptions");
     // Incremented every time the preemptor is invoked and finds tasks pending and preemptable tasks
     private final AtomicLong attemptedPreemptions = Stats.exportLong("preemptor_attempts");
     // Incremented every time we fail to find tasks to preempt for a pending task.
@@ -127,7 +122,7 @@ public interface Preemptor {
     };
 
     private final Storage storage;
-    private final SchedulerCore scheduler;
+    private final StateManager stateManager;
     private final OfferQueue offerQueue;
     private final SchedulingFilter schedulingFilter;
     private final Amount<Long, Time> preemptionCandidacyDelay;
@@ -137,8 +132,7 @@ public interface Preemptor {
      * Creates a new preemptor.
      *
      * @param storage Backing store for tasks.
-     * @param scheduler Scheduler to fetch task information from, and instruct when preempting
-     *                  tasks.
+     * @param stateManager Scheduler state controller to instruct when preempting tasks.
      * @param offerQueue Queue that contains available Mesos resource offers.
      * @param schedulingFilter Filter to identify whether tasks may reside on given slaves.
      * @param preemptionCandidacyDelay Time a task must be PENDING before it may preempt other
@@ -148,14 +142,14 @@ public interface Preemptor {
     @Inject
     PreemptorImpl(
         Storage storage,
-        SchedulerCore scheduler,
+        StateManager stateManager,
         OfferQueue offerQueue,
         SchedulingFilter schedulingFilter,
         @PreemptionDelay Amount<Long, Time> preemptionCandidacyDelay,
         Clock clock) {
 
       this.storage = checkNotNull(storage);
-      this.scheduler = checkNotNull(scheduler);
+      this.stateManager = checkNotNull(stateManager);
       this.offerQueue = checkNotNull(offerQueue);
       this.schedulingFilter = checkNotNull(schedulingFilter);
       this.preemptionCandidacyDelay = checkNotNull(preemptionCandidacyDelay);
@@ -338,16 +332,15 @@ public interface Preemptor {
             pendingTask);
 
         if (toPreemptTasks.isPresent()) {
-          try {
-            for (IAssignedTask toPreempt : toPreemptTasks.get()) {
-              scheduler.preemptTask(toPreempt, pendingTask);
-              tasksPreempted.incrementAndGet();
-            }
-            return Optional.of(slaveID);
-          } catch (ScheduleException e) {
-            LOG.log(Level.SEVERE, "Preemption failed", e);
-            failedPreemptions.incrementAndGet();
+          for (IAssignedTask toPreempt : toPreemptTasks.get()) {
+            stateManager.changeState(
+                toPreempt.getTaskId(),
+                Optional.<ScheduleStatus>absent(),
+                ScheduleStatus.PREEMPTING,
+                Optional.of("Preempting in favor of " + pendingTask.getTaskId()));
+            tasksPreempted.incrementAndGet();
           }
+          return Optional.of(slaveID);
         }
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9dd46666/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java b/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java
index 7a88629..15ff590 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java
@@ -25,7 +25,6 @@ import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.ScheduleException;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager.TaskDescriptionException;
 import org.apache.aurora.scheduler.configuration.SanitizedConfiguration;
-import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 
@@ -110,15 +109,6 @@ public interface SchedulerCore {
       throws ScheduleException;
 
   /**
-   * Preempts a task in favor of another.
-   *
-   * @param task Task being preempted.
-   * @param preemptingTask Task we are preempting in favor of.
-   * @throws ScheduleException If a problem occurs while trying to perform the preemption.
-   */
-  void preemptTask(IAssignedTask task, IAssignedTask preemptingTask) throws ScheduleException;
-
-  /**
    * Indicates to the scheduler that tasks were deleted on the assigned host.
    *
    * @param taskIds IDs of tasks that were deleted.

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9dd46666/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java b/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
index b3f19cc..8ad24f9 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
@@ -48,7 +48,6 @@ import org.apache.aurora.scheduler.quota.QuotaManager;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork;
-import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -350,17 +349,4 @@ class SchedulerCoreImpl implements SchedulerCore {
       }
     });
   }
-
-  @Override
-  public synchronized void preemptTask(IAssignedTask task, IAssignedTask preemptingTask) {
-    checkNotNull(task);
-    checkNotNull(preemptingTask);
-    // TODO(William Farner): Throw SchedulingException if either task doesn't exist, etc.
-
-    stateManager.changeState(
-        task.getTaskId(),
-        Optional.<ScheduleStatus>absent(),
-        ScheduleStatus.PREEMPTING,
-        Optional.of("Preempting in favor of " + preemptingTask.getTaskId()));
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9dd46666/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
index c09f845..025294f 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
@@ -47,8 +47,7 @@ import org.apache.aurora.scheduler.configuration.Resources;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
 import org.apache.aurora.scheduler.filter.SchedulingFilterImpl;
 import org.apache.aurora.scheduler.state.MaintenanceController;
-import org.apache.aurora.scheduler.state.SchedulerCore;
-import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
@@ -65,6 +64,7 @@ import static org.apache.aurora.scheduler.async.Preemptor.PreemptorImpl;
 import static org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
 import static org.apache.mesos.Protos.Offer;
 import static org.apache.mesos.Protos.Resource;
+import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expect;
 
 public class PreemptorImplTest extends EasyMockTest {
@@ -75,15 +75,12 @@ public class PreemptorImplTest extends EasyMockTest {
   private static final String JOB_A = "job_a";
   private static final String JOB_B = "job_b";
   private static final String JOB_C = "job_c";
-  private static final String JOB_D = "job_d";
   private static final String TASK_ID_A = "task_a";
   private static final String TASK_ID_B = "task_b";
   private static final String TASK_ID_C = "task_c";
   private static final String TASK_ID_D = "task_d";
   private static final String HOST_A = "host_a";
-  private static final String HOST_B = "host_b";
   private static final String RACK_A = "rackA";
-  private static final String RACK_B = "rackB";
   private static final String RACK_ATTRIBUTE = "rack";
   private static final String HOST_ATTRIBUTE = "host";
   private static final String OFFER_A = "offer_a";
@@ -91,7 +88,7 @@ public class PreemptorImplTest extends EasyMockTest {
   private static final Amount<Long, Time> PREEMPTION_DELAY = Amount.of(30L, Time.SECONDS);
 
   private StorageTestUtil storageUtil;
-  private SchedulerCore scheduler;
+  private StateManager stateManager;
   private SchedulingFilter schedulingFilter;
   private FakeClock clock;
   private MaintenanceController maintenance;
@@ -101,7 +98,7 @@ public class PreemptorImplTest extends EasyMockTest {
   public void setUp() {
     storageUtil = new StorageTestUtil(this);
     storageUtil.expectOperations();
-    scheduler = createMock(SchedulerCore.class);
+    stateManager = createMock(StateManager.class);
     maintenance = createMock(MaintenanceController.class);
     clock = new FakeClock();
     offerQueue = createMock(OfferQueue.class);
@@ -110,7 +107,7 @@ public class PreemptorImplTest extends EasyMockTest {
   private void runPreemptor(ScheduledTask pendingTask) {
     PreemptorImpl preemptor = new PreemptorImpl(
         storageUtil.storage,
-        scheduler,
+        stateManager,
         offerQueue,
         schedulingFilter,
         PREEMPTION_DELAY,
@@ -151,7 +148,7 @@ public class PreemptorImplTest extends EasyMockTest {
     expectGetActiveTasks(lowPriority);
 
     expectFiltering();
-    expectPreempted(lowPriority, highPriority);
+    expectPreempted(lowPriority);
 
     control.replay();
     runPreemptor(highPriority);
@@ -175,7 +172,7 @@ public class PreemptorImplTest extends EasyMockTest {
     expectGetActiveTasks(lowerPriority, lowerPriority);
 
     expectFiltering();
-    expectPreempted(lowerPriority, highPriority);
+    expectPreempted(lowerPriority);
 
     control.replay();
     runPreemptor(highPriority);
@@ -202,7 +199,7 @@ public class PreemptorImplTest extends EasyMockTest {
     expectGetActiveTasks(highPriority, lowerPriority, lowestPriority);
 
     expectFiltering();
-    expectPreempted(lowestPriority, pendingPriority);
+    expectPreempted(lowestPriority);
 
     control.replay();
     runPreemptor(pendingPriority);
@@ -242,7 +239,7 @@ public class PreemptorImplTest extends EasyMockTest {
     expectGetActiveTasks(a1);
 
     expectFiltering();
-    expectPreempted(a1, p1);
+    expectPreempted(a1);
 
     control.replay();
     runPreemptor(p1);
@@ -264,7 +261,7 @@ public class PreemptorImplTest extends EasyMockTest {
     expectGetActiveTasks(a1);
 
     expectFiltering();
-    expectPreempted(a1, p1);
+    expectPreempted(a1);
 
     control.replay();
     runPreemptor(p1);
@@ -313,8 +310,8 @@ public class PreemptorImplTest extends EasyMockTest {
     expectGetPendingTasks(p1);
     expectGetActiveTasks(a1, b1);
 
-    expectPreempted(a1, p1);
-    expectPreempted(b1, p1);
+    expectPreempted(a1);
+    expectPreempted(b1);
 
     control.replay();
     runPreemptor(p1);
@@ -349,7 +346,7 @@ public class PreemptorImplTest extends EasyMockTest {
     expectGetPendingTasks(p1);
     expectGetActiveTasks(b1, b2, a1);
 
-    expectPreempted(a1, p1);
+    expectPreempted(a1);
 
     control.replay();
     runPreemptor(p1);
@@ -402,7 +399,7 @@ public class PreemptorImplTest extends EasyMockTest {
     expectGetActiveTasks(a1);
     expectGetPendingTasks(p1);
 
-    expectPreempted(a1, p1);
+    expectPreempted(a1);
 
     control.replay();
     runPreemptor(p1);
@@ -434,8 +431,8 @@ public class PreemptorImplTest extends EasyMockTest {
     expectGetActiveTasks(a1, a2);
     expectGetPendingTasks(p1);
 
-    expectPreempted(a1, p1);
-    expectPreempted(a2, p1);
+    expectPreempted(a1);
+    expectPreempted(a2);
 
     control.replay();
     runPreemptor(p1);
@@ -509,10 +506,13 @@ public class PreemptorImplTest extends EasyMockTest {
     );
   }
 
-  private void expectPreempted(ScheduledTask preempted, ScheduledTask preempting) throws Exception {
-    scheduler.preemptTask(
-        IAssignedTask.build(preempted.getAssignedTask()),
-        IAssignedTask.build(preempting.getAssignedTask()));
+  private void expectPreempted(ScheduledTask preempted) throws Exception {
+    expect(stateManager.changeState(
+        eq(Tasks.id(preempted)),
+        eq(Optional.<ScheduleStatus>absent()),
+        eq(ScheduleStatus.PREEMPTING),
+        EasyMock.<Optional<String>>anyObject()))
+        .andReturn(true);
   }
 
   private ScheduledTask makeProductionTask(String role, String job, String taskId) {