You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2014/01/17 04:22:22 UTC
git commit: Fix deadlock caused when Preemptor tries to secure
SchedulerCoreImpl intrinsic lock.
Updated Branches:
refs/heads/master ba25e6617 -> 9dd46666c
Fix deadlock caused when Preemptor tries to secure SchedulerCoreImpl intrinsic lock.
Bugs closed: AURORA-50
Reviewed at https://reviews.apache.org/r/17028/
Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/9dd46666
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/9dd46666
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/9dd46666
Branch: refs/heads/master
Commit: 9dd46666c3d666347f68d27fb528c8927732d5c1
Parents: ba25e66
Author: Bill Farner <wf...@apache.org>
Authored: Thu Jan 16 19:22:00 2014 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Thu Jan 16 19:22:00 2014 -0800
----------------------------------------------------------------------
.../aurora/scheduler/async/Preemptor.java | 35 ++++++---------
.../aurora/scheduler/state/SchedulerCore.java | 10 -----
.../scheduler/state/SchedulerCoreImpl.java | 14 ------
.../scheduler/async/PreemptorImplTest.java | 46 ++++++++++----------
4 files changed, 37 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9dd46666/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java b/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
index b190a00..f344cb7 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
@@ -22,8 +22,6 @@ import java.util.EnumSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Level;
-import java.util.logging.Logger;
import javax.inject.Inject;
@@ -46,12 +44,12 @@ import com.twitter.common.quantity.Time;
import com.twitter.common.stats.Stats;
import com.twitter.common.util.Clock;
+import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.scheduler.ResourceSlot;
import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.base.ScheduleException;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.filter.SchedulingFilter;
-import org.apache.aurora.scheduler.state.SchedulerCore;
+import org.apache.aurora.scheduler.state.StateManager;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -103,8 +101,6 @@ public interface Preemptor {
static final Query.Builder CANDIDATE_QUERY = Query.statusScoped(
EnumSet.copyOf(Sets.difference(Tasks.ACTIVE_STATES, EnumSet.of(PENDING, PREEMPTING))));
- private static final Logger LOG = Logger.getLogger(PreemptorImpl.class.getName());
-
private static final Function<IAssignedTask, Integer> GET_PRIORITY =
new Function<IAssignedTask, Integer>() {
@Override public Integer apply(IAssignedTask task) {
@@ -113,7 +109,6 @@ public interface Preemptor {
};
private final AtomicLong tasksPreempted = Stats.exportLong("preemptor_tasks_preempted");
- private final AtomicLong failedPreemptions = Stats.exportLong("preemptor_failed_preemptions");
// Incremented every time the preemptor is invoked and finds tasks pending and preemptable tasks
private final AtomicLong attemptedPreemptions = Stats.exportLong("preemptor_attempts");
// Incremented every time we fail to find tasks to preempt for a pending task.
@@ -127,7 +122,7 @@ public interface Preemptor {
};
private final Storage storage;
- private final SchedulerCore scheduler;
+ private final StateManager stateManager;
private final OfferQueue offerQueue;
private final SchedulingFilter schedulingFilter;
private final Amount<Long, Time> preemptionCandidacyDelay;
@@ -137,8 +132,7 @@ public interface Preemptor {
* Creates a new preemptor.
*
* @param storage Backing store for tasks.
- * @param scheduler Scheduler to fetch task information from, and instruct when preempting
- * tasks.
+ * @param stateManager Scheduler state controller to instruct when preempting tasks.
* @param offerQueue Queue that contains available Mesos resource offers.
* @param schedulingFilter Filter to identify whether tasks may reside on given slaves.
* @param preemptionCandidacyDelay Time a task must be PENDING before it may preempt other
@@ -148,14 +142,14 @@ public interface Preemptor {
@Inject
PreemptorImpl(
Storage storage,
- SchedulerCore scheduler,
+ StateManager stateManager,
OfferQueue offerQueue,
SchedulingFilter schedulingFilter,
@PreemptionDelay Amount<Long, Time> preemptionCandidacyDelay,
Clock clock) {
this.storage = checkNotNull(storage);
- this.scheduler = checkNotNull(scheduler);
+ this.stateManager = checkNotNull(stateManager);
this.offerQueue = checkNotNull(offerQueue);
this.schedulingFilter = checkNotNull(schedulingFilter);
this.preemptionCandidacyDelay = checkNotNull(preemptionCandidacyDelay);
@@ -338,16 +332,15 @@ public interface Preemptor {
pendingTask);
if (toPreemptTasks.isPresent()) {
- try {
- for (IAssignedTask toPreempt : toPreemptTasks.get()) {
- scheduler.preemptTask(toPreempt, pendingTask);
- tasksPreempted.incrementAndGet();
- }
- return Optional.of(slaveID);
- } catch (ScheduleException e) {
- LOG.log(Level.SEVERE, "Preemption failed", e);
- failedPreemptions.incrementAndGet();
+ for (IAssignedTask toPreempt : toPreemptTasks.get()) {
+ stateManager.changeState(
+ toPreempt.getTaskId(),
+ Optional.<ScheduleStatus>absent(),
+ ScheduleStatus.PREEMPTING,
+ Optional.of("Preempting in favor of " + pendingTask.getTaskId()));
+ tasksPreempted.incrementAndGet();
}
+ return Optional.of(slaveID);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9dd46666/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java b/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java
index 7a88629..15ff590 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java
@@ -25,7 +25,6 @@ import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.ScheduleException;
import org.apache.aurora.scheduler.configuration.ConfigurationManager.TaskDescriptionException;
import org.apache.aurora.scheduler.configuration.SanitizedConfiguration;
-import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
@@ -110,15 +109,6 @@ public interface SchedulerCore {
throws ScheduleException;
/**
- * Preempts a task in favor of another.
- *
- * @param task Task being preempted.
- * @param preemptingTask Task we are preempting in favor of.
- * @throws ScheduleException If a problem occurs while trying to perform the preemption.
- */
- void preemptTask(IAssignedTask task, IAssignedTask preemptingTask) throws ScheduleException;
-
- /**
* Indicates to the scheduler that tasks were deleted on the assigned host.
*
* @param taskIds IDs of tasks that were deleted.
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9dd46666/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java b/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
index b3f19cc..8ad24f9 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
@@ -48,7 +48,6 @@ import org.apache.aurora.scheduler.quota.QuotaManager;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
import org.apache.aurora.scheduler.storage.Storage.MutateWork;
-import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -350,17 +349,4 @@ class SchedulerCoreImpl implements SchedulerCore {
}
});
}
-
- @Override
- public synchronized void preemptTask(IAssignedTask task, IAssignedTask preemptingTask) {
- checkNotNull(task);
- checkNotNull(preemptingTask);
- // TODO(William Farner): Throw SchedulingException if either task doesn't exist, etc.
-
- stateManager.changeState(
- task.getTaskId(),
- Optional.<ScheduleStatus>absent(),
- ScheduleStatus.PREEMPTING,
- Optional.of("Preempting in favor of " + preemptingTask.getTaskId()));
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9dd46666/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
index c09f845..025294f 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
@@ -47,8 +47,7 @@ import org.apache.aurora.scheduler.configuration.Resources;
import org.apache.aurora.scheduler.filter.SchedulingFilter;
import org.apache.aurora.scheduler.filter.SchedulingFilterImpl;
import org.apache.aurora.scheduler.state.MaintenanceController;
-import org.apache.aurora.scheduler.state.SchedulerCore;
-import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+import org.apache.aurora.scheduler.state.StateManager;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
@@ -65,6 +64,7 @@ import static org.apache.aurora.scheduler.async.Preemptor.PreemptorImpl;
import static org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
import static org.apache.mesos.Protos.Offer;
import static org.apache.mesos.Protos.Resource;
+import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
public class PreemptorImplTest extends EasyMockTest {
@@ -75,15 +75,12 @@ public class PreemptorImplTest extends EasyMockTest {
private static final String JOB_A = "job_a";
private static final String JOB_B = "job_b";
private static final String JOB_C = "job_c";
- private static final String JOB_D = "job_d";
private static final String TASK_ID_A = "task_a";
private static final String TASK_ID_B = "task_b";
private static final String TASK_ID_C = "task_c";
private static final String TASK_ID_D = "task_d";
private static final String HOST_A = "host_a";
- private static final String HOST_B = "host_b";
private static final String RACK_A = "rackA";
- private static final String RACK_B = "rackB";
private static final String RACK_ATTRIBUTE = "rack";
private static final String HOST_ATTRIBUTE = "host";
private static final String OFFER_A = "offer_a";
@@ -91,7 +88,7 @@ public class PreemptorImplTest extends EasyMockTest {
private static final Amount<Long, Time> PREEMPTION_DELAY = Amount.of(30L, Time.SECONDS);
private StorageTestUtil storageUtil;
- private SchedulerCore scheduler;
+ private StateManager stateManager;
private SchedulingFilter schedulingFilter;
private FakeClock clock;
private MaintenanceController maintenance;
@@ -101,7 +98,7 @@ public class PreemptorImplTest extends EasyMockTest {
public void setUp() {
storageUtil = new StorageTestUtil(this);
storageUtil.expectOperations();
- scheduler = createMock(SchedulerCore.class);
+ stateManager = createMock(StateManager.class);
maintenance = createMock(MaintenanceController.class);
clock = new FakeClock();
offerQueue = createMock(OfferQueue.class);
@@ -110,7 +107,7 @@ public class PreemptorImplTest extends EasyMockTest {
private void runPreemptor(ScheduledTask pendingTask) {
PreemptorImpl preemptor = new PreemptorImpl(
storageUtil.storage,
- scheduler,
+ stateManager,
offerQueue,
schedulingFilter,
PREEMPTION_DELAY,
@@ -151,7 +148,7 @@ public class PreemptorImplTest extends EasyMockTest {
expectGetActiveTasks(lowPriority);
expectFiltering();
- expectPreempted(lowPriority, highPriority);
+ expectPreempted(lowPriority);
control.replay();
runPreemptor(highPriority);
@@ -175,7 +172,7 @@ public class PreemptorImplTest extends EasyMockTest {
expectGetActiveTasks(lowerPriority, lowerPriority);
expectFiltering();
- expectPreempted(lowerPriority, highPriority);
+ expectPreempted(lowerPriority);
control.replay();
runPreemptor(highPriority);
@@ -202,7 +199,7 @@ public class PreemptorImplTest extends EasyMockTest {
expectGetActiveTasks(highPriority, lowerPriority, lowestPriority);
expectFiltering();
- expectPreempted(lowestPriority, pendingPriority);
+ expectPreempted(lowestPriority);
control.replay();
runPreemptor(pendingPriority);
@@ -242,7 +239,7 @@ public class PreemptorImplTest extends EasyMockTest {
expectGetActiveTasks(a1);
expectFiltering();
- expectPreempted(a1, p1);
+ expectPreempted(a1);
control.replay();
runPreemptor(p1);
@@ -264,7 +261,7 @@ public class PreemptorImplTest extends EasyMockTest {
expectGetActiveTasks(a1);
expectFiltering();
- expectPreempted(a1, p1);
+ expectPreempted(a1);
control.replay();
runPreemptor(p1);
@@ -313,8 +310,8 @@ public class PreemptorImplTest extends EasyMockTest {
expectGetPendingTasks(p1);
expectGetActiveTasks(a1, b1);
- expectPreempted(a1, p1);
- expectPreempted(b1, p1);
+ expectPreempted(a1);
+ expectPreempted(b1);
control.replay();
runPreemptor(p1);
@@ -349,7 +346,7 @@ public class PreemptorImplTest extends EasyMockTest {
expectGetPendingTasks(p1);
expectGetActiveTasks(b1, b2, a1);
- expectPreempted(a1, p1);
+ expectPreempted(a1);
control.replay();
runPreemptor(p1);
@@ -402,7 +399,7 @@ public class PreemptorImplTest extends EasyMockTest {
expectGetActiveTasks(a1);
expectGetPendingTasks(p1);
- expectPreempted(a1, p1);
+ expectPreempted(a1);
control.replay();
runPreemptor(p1);
@@ -434,8 +431,8 @@ public class PreemptorImplTest extends EasyMockTest {
expectGetActiveTasks(a1, a2);
expectGetPendingTasks(p1);
- expectPreempted(a1, p1);
- expectPreempted(a2, p1);
+ expectPreempted(a1);
+ expectPreempted(a2);
control.replay();
runPreemptor(p1);
@@ -509,10 +506,13 @@ public class PreemptorImplTest extends EasyMockTest {
);
}
- private void expectPreempted(ScheduledTask preempted, ScheduledTask preempting) throws Exception {
- scheduler.preemptTask(
- IAssignedTask.build(preempted.getAssignedTask()),
- IAssignedTask.build(preempting.getAssignedTask()));
+ private void expectPreempted(ScheduledTask preempted) throws Exception {
+ expect(stateManager.changeState(
+ eq(Tasks.id(preempted)),
+ eq(Optional.<ScheduleStatus>absent()),
+ eq(ScheduleStatus.PREEMPTING),
+ EasyMock.<Optional<String>>anyObject()))
+ .andReturn(true);
}
private ScheduledTask makeProductionTask(String role, String job, String taskId) {