You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2014/11/14 03:35:05 UTC
[1/2] incubator-aurora git commit: Require StateManager callers to
open their own transactions.
Repository: incubator-aurora
Updated Branches:
refs/heads/master 316f291e3 -> e9f135dc0
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9f135dc/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
index 4170902..d1bc9bf 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
@@ -70,6 +70,7 @@ import org.apache.mesos.Protos.TaskID;
import org.apache.mesos.Protos.TaskInfo;
import org.easymock.Capture;
import org.easymock.EasyMock;
+import org.easymock.IExpectationSetters;
import org.junit.Before;
import org.junit.Test;
@@ -289,6 +290,18 @@ public class TaskSchedulerTest extends EasyMockTest {
timeoutCapture.getValue().run();
}
+ private IExpectationSetters<Optional<TaskInfo>> expectMaybeAssign(
+ HostOffer offer,
+ IScheduledTask task,
+ AttributeAggregate jobAggregate) {
+
+ return expect(assigner.maybeAssign(
+ EasyMock.<MutableStoreProvider>anyObject(),
+ eq(offer),
+ eq(task),
+ eq(jobAggregate)));
+ }
+
@Test
public void testTaskAssigned() {
expectAnyMaintenanceCalls();
@@ -298,11 +311,11 @@ public class TaskSchedulerTest extends EasyMockTest {
TaskInfo mesosTask = makeTaskInfo(task);
Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
- expect(assigner.maybeAssign(OFFER_A, task, emptyJob)).andReturn(Optional.<TaskInfo>absent());
+ expectMaybeAssign(OFFER_A, task, emptyJob).andReturn(Optional.<TaskInfo>absent());
expect(preemptor.findPreemptionSlotFor("a", emptyJob)).andReturn(Optional.<String>absent());
Capture<Runnable> timeoutCapture2 = expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10);
- expect(assigner.maybeAssign(OFFER_A, task, emptyJob)).andReturn(Optional.of(mesosTask));
+ expectMaybeAssign(OFFER_A, task, emptyJob).andReturn(Optional.of(mesosTask));
driver.launchTask(OFFER_A.getOffer().getId(), mesosTask);
Capture<Runnable> timeoutCapture3 = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
@@ -333,14 +346,15 @@ public class TaskSchedulerTest extends EasyMockTest {
Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
expectAnyMaintenanceCalls();
expectOfferDeclineIn(10);
- expect(assigner.maybeAssign(OFFER_A, task, emptyJob)).andReturn(Optional.of(mesosTask));
+ expectMaybeAssign(OFFER_A, task, emptyJob).andReturn(Optional.of(mesosTask));
driver.launchTask(OFFER_A.getOffer().getId(), mesosTask);
expectLastCall().andThrow(new IllegalStateException("Driver not ready."));
expect(stateManager.changeState(
- "a",
- Optional.of(PENDING),
- LOST,
- TaskSchedulerImpl.LAUNCH_FAILED_MSG))
+ EasyMock.<MutableStoreProvider>anyObject(),
+ eq("a"),
+ eq(Optional.of(PENDING)),
+ eq(LOST),
+ eq(TaskSchedulerImpl.LAUNCH_FAILED_MSG)))
.andReturn(true);
replayAndCreateScheduler();
@@ -362,11 +376,10 @@ public class TaskSchedulerTest extends EasyMockTest {
Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
expectAnyMaintenanceCalls();
expectOfferDeclineIn(10);
- expect(assigner.maybeAssign(OFFER_A, task, emptyJob))
- .andThrow(new StorageException("Injected failure."));
+ expectMaybeAssign(OFFER_A, task, emptyJob).andThrow(new StorageException("Injected failure."));
Capture<Runnable> timeoutCapture2 = expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10);
- expect(assigner.maybeAssign(OFFER_A, task, emptyJob)).andReturn(Optional.of(mesosTask));
+ expectMaybeAssign(OFFER_A, task, emptyJob).andReturn(Optional.of(mesosTask));
driver.launchTask(OFFER_A.getOffer().getId(), mesosTask);
expectLastCall();
@@ -385,7 +398,7 @@ public class TaskSchedulerTest extends EasyMockTest {
Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
Capture<Runnable> offerExpirationCapture = expectOfferDeclineIn(10);
expectAnyMaintenanceCalls();
- expect(assigner.maybeAssign(OFFER_A, task, emptyJob)).andReturn(Optional.<TaskInfo>absent());
+ expectMaybeAssign(OFFER_A, task, emptyJob).andReturn(Optional.<TaskInfo>absent());
Capture<Runnable> timeoutCapture2 = expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10);
expect(preemptor.findPreemptionSlotFor("a", emptyJob)).andReturn(Optional.<String>absent());
driver.declineOffer(OFFER_A.getOffer().getId());
@@ -447,13 +460,13 @@ public class TaskSchedulerTest extends EasyMockTest {
IScheduledTask taskA = makeTask("A", PENDING);
TaskInfo mesosTaskA = makeTaskInfo(taskA);
- expect(assigner.maybeAssign(OFFER_A, taskA, emptyJob)).andReturn(Optional.of(mesosTaskA));
+ expectMaybeAssign(OFFER_A, taskA, emptyJob).andReturn(Optional.of(mesosTaskA));
driver.launchTask(OFFER_A.getOffer().getId(), mesosTaskA);
Capture<Runnable> captureA = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
IScheduledTask taskB = makeTask("B", PENDING);
TaskInfo mesosTaskB = makeTaskInfo(taskB);
- expect(assigner.maybeAssign(OFFER_B, taskB, emptyJob)).andReturn(Optional.of(mesosTaskB));
+ expectMaybeAssign(OFFER_B, taskB, emptyJob).andReturn(Optional.of(mesosTaskB));
driver.launchTask(OFFER_B.getOffer().getId(), mesosTaskB);
Capture<Runnable> captureB = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
@@ -479,7 +492,7 @@ public class TaskSchedulerTest extends EasyMockTest {
IScheduledTask taskA = makeTask("A", PENDING);
TaskInfo mesosTaskA = makeTaskInfo(taskA);
- expect(assigner.maybeAssign(OFFER_B, taskA, emptyJob)).andReturn(Optional.of(mesosTaskA));
+ expectMaybeAssign(OFFER_B, taskA, emptyJob).andReturn(Optional.of(mesosTaskA));
driver.launchTask(OFFER_B.getOffer().getId(), mesosTaskA);
Capture<Runnable> captureA = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
@@ -488,7 +501,7 @@ public class TaskSchedulerTest extends EasyMockTest {
HostOffer updatedOfferC = new HostOffer(
OFFER_C.getOffer(),
IHostAttributes.build(OFFER_C.getAttributes().newBuilder().setMode(NONE)));
- expect(assigner.maybeAssign(updatedOfferC, taskB, emptyJob)).andReturn(Optional.of(mesosTaskB));
+ expectMaybeAssign(updatedOfferC, taskB, emptyJob).andReturn(Optional.of(mesosTaskB));
driver.launchTask(OFFER_C.getOffer().getId(), mesosTaskB);
Capture<Runnable> captureB = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
@@ -515,6 +528,7 @@ public class TaskSchedulerTest extends EasyMockTest {
TaskInfo mesosTask = makeTaskInfo(task);
Capture<IScheduledTask> taskScheduled = createCapture();
expect(assigner.maybeAssign(
+ EasyMock.<MutableStoreProvider>anyObject(),
EasyMock.<HostOffer>anyObject(),
capture(taskScheduled),
EasyMock.eq(emptyJob)))
@@ -583,7 +597,7 @@ public class TaskSchedulerTest extends EasyMockTest {
final IScheduledTask task = makeTask("a", PENDING);
Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
- expect(assigner.maybeAssign(OFFER_A, task, emptyJob)).andReturn(Optional.<TaskInfo>absent());
+ expectMaybeAssign(OFFER_A, task, emptyJob).andReturn(Optional.<TaskInfo>absent());
expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 20);
expect(preemptor.findPreemptionSlotFor("a", emptyJob)).andReturn(Optional.<String>absent());
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9f135dc/src/test/java/org/apache/aurora/scheduler/async/TaskThrottlerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskThrottlerTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskThrottlerTest.java
index a28e512..a637101 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskThrottlerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskThrottlerTest.java
@@ -31,6 +31,7 @@ import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
import org.apache.aurora.scheduler.state.StateManager;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
import org.easymock.Capture;
import org.junit.Before;
import org.junit.Test;
@@ -48,6 +49,7 @@ public class TaskThrottlerTest extends EasyMockTest {
private RescheduleCalculator rescheduleCalculator;
private FakeClock clock;
private ScheduledExecutorService executor;
+ private StorageTestUtil storageUtil;
private StateManager stateManager;
private TaskThrottler throttler;
@@ -56,8 +58,15 @@ public class TaskThrottlerTest extends EasyMockTest {
rescheduleCalculator = createMock(RescheduleCalculator.class);
clock = new FakeClock();
executor = createMock(ScheduledExecutorService.class);
+ storageUtil = new StorageTestUtil(this);
+ storageUtil.expectOperations();
stateManager = createMock(StateManager.class);
- throttler = new TaskThrottler(rescheduleCalculator, clock, executor, stateManager);
+ throttler = new TaskThrottler(
+ rescheduleCalculator,
+ clock,
+ executor,
+ storageUtil.storage,
+ stateManager);
}
@Test
@@ -116,6 +125,7 @@ public class TaskThrottlerTest extends EasyMockTest {
private void expectMovedToPending(IScheduledTask task) {
expect(stateManager.changeState(
+ storageUtil.mutableStoreProvider,
Tasks.id(task),
Optional.of(THROTTLED),
PENDING,
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9f135dc/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java
index 17295ac..88fc172 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java
@@ -33,6 +33,7 @@ import org.apache.aurora.gen.TaskEvent;
import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
import org.apache.aurora.scheduler.state.StateManager;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.junit.Before;
@@ -60,6 +61,7 @@ public class TaskTimeoutTest extends EasyMockTest {
private AtomicLong timedOutTaskCounter;
private ScheduledExecutorService executor;
+ private StorageTestUtil storageUtil;
private ScheduledFuture<?> future;
private StateManager stateManager;
private FakeClock clock;
@@ -69,6 +71,8 @@ public class TaskTimeoutTest extends EasyMockTest {
@Before
public void setUp() {
executor = createMock(ScheduledExecutorService.class);
+ storageUtil = new StorageTestUtil(this);
+ storageUtil.expectOperations();
future = createMock(new Clazz<ScheduledFuture<?>>() { });
stateManager = createMock(StateManager.class);
clock = new FakeClock();
@@ -80,7 +84,12 @@ public class TaskTimeoutTest extends EasyMockTest {
private void replayAndCreate() {
control.replay();
- timeout = new TaskTimeout(executor, stateManager, TIMEOUT, statsProvider);
+ timeout = new TaskTimeout(
+ executor,
+ storageUtil.storage,
+ stateManager,
+ TIMEOUT,
+ statsProvider);
timeout.startAsync().awaitRunning();
}
@@ -129,6 +138,7 @@ public class TaskTimeoutTest extends EasyMockTest {
expectTaskWatch();
Capture<Runnable> killingTimeout = expectTaskWatch();
expect(stateManager.changeState(
+ storageUtil.mutableStoreProvider,
TASK_ID,
Optional.of(KILLING),
LOST,
@@ -146,6 +156,7 @@ public class TaskTimeoutTest extends EasyMockTest {
public void testTimeout() throws Exception {
Capture<Runnable> assignedTimeout = expectTaskWatch();
expect(stateManager.changeState(
+ storageUtil.mutableStoreProvider,
TASK_ID,
Optional.of(ASSIGNED),
LOST,
@@ -164,6 +175,7 @@ public class TaskTimeoutTest extends EasyMockTest {
public void testTaskDeleted() throws Exception {
Capture<Runnable> assignedTimeout = expectTaskWatch();
expect(stateManager.changeState(
+ storageUtil.mutableStoreProvider,
TASK_ID,
Optional.of(KILLING),
LOST,
@@ -221,7 +233,7 @@ public class TaskTimeoutTest extends EasyMockTest {
expectTaskWatch(TaskTimeout.NOT_STARTED_RETRY);
control.replay();
- timeout = new TaskTimeout(executor, stateManager, TIMEOUT, statsProvider);
+ timeout = new TaskTimeout(executor, storageUtil.storage, stateManager, TIMEOUT, statsProvider);
changeState(INIT, PENDING);
changeState(PENDING, ASSIGNED);
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9f135dc/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java b/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java
index b6b1bcb..c99135a 100644
--- a/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java
@@ -38,6 +38,8 @@ import org.junit.Before;
import org.junit.Test;
import org.quartz.JobExecutionException;
+import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
import static org.junit.Assert.assertFalse;
@@ -47,7 +49,6 @@ public class AuroraCronJobTest extends EasyMockTest {
public static final String TASK_ID = "A";
private Storage storage;
private StateManager stateManager;
- private CronJobManager cronJobManager;
private BackoffHelper backoffHelper;
private AuroraCronJob auroraCronJob;
@@ -58,7 +59,7 @@ public class AuroraCronJobTest extends EasyMockTest {
public void setUp() {
storage = MemStorage.newEmptyStorage();
stateManager = createMock(StateManager.class);
- cronJobManager = createMock(CronJobManager.class);
+ CronJobManager cronJobManager = createMock(CronJobManager.class);
backoffHelper = createMock(BackoffHelper.class);
auroraCronJob = new AuroraCronJob(
@@ -78,7 +79,7 @@ public class AuroraCronJobTest extends EasyMockTest {
control.replay();
storage.write(new Storage.MutateWork.NoResult.Quiet() {
@Override
- protected void execute(Storage.MutableStoreProvider storeProvider) {
+ protected void execute(MutableStoreProvider storeProvider) {
storeProvider.getJobStore().saveAcceptedJob(
MANAGER_ID,
IJobConfiguration.build(QuartzTestUtil.JOB.newBuilder().setCronSchedule(null)));
@@ -91,6 +92,7 @@ public class AuroraCronJobTest extends EasyMockTest {
@Test
public void testEmptyStorage() throws JobExecutionException {
stateManager.insertPendingTasks(
+ EasyMock.<MutableStoreProvider>anyObject(),
EasyMock.<ITaskConfig>anyObject(),
EasyMock.<Set<Integer>>anyObject());
expectLastCall().times(3);
@@ -122,13 +124,15 @@ public class AuroraCronJobTest extends EasyMockTest {
Capture<Supplier<Boolean>> capture = createCapture();
expect(stateManager.changeState(
- TASK_ID,
- Optional.<ScheduleStatus>absent(),
- ScheduleStatus.KILLING,
- AuroraCronJob.KILL_AUDIT_MESSAGE))
+ EasyMock.<MutableStoreProvider>anyObject(),
+ eq(TASK_ID),
+ eq(Optional.<ScheduleStatus>absent()),
+ eq(ScheduleStatus.KILLING),
+ eq(AuroraCronJob.KILL_AUDIT_MESSAGE)))
.andReturn(true);
backoffHelper.doUntilSuccess(EasyMock.capture(capture));
stateManager.insertPendingTasks(
+ EasyMock.<MutableStoreProvider>anyObject(),
EasyMock.<ITaskConfig>anyObject(),
EasyMock.<Set<Integer>>anyObject());
@@ -141,7 +145,7 @@ public class AuroraCronJobTest extends EasyMockTest {
storage.write(
new Storage.MutateWork.NoResult.Quiet() {
@Override
- protected void execute(Storage.MutableStoreProvider storeProvider) {
+ protected void execute(MutableStoreProvider storeProvider) {
storeProvider.getUnsafeTaskStore().deleteAllTasks();
}
});
@@ -151,7 +155,7 @@ public class AuroraCronJobTest extends EasyMockTest {
private void populateTaskStore() {
storage.write(new Storage.MutateWork.NoResult.Quiet() {
@Override
- protected void execute(Storage.MutableStoreProvider storeProvider) {
+ protected void execute(MutableStoreProvider storeProvider) {
storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(
IScheduledTask.build(new ScheduledTask()
.setStatus(ScheduleStatus.RUNNING)
@@ -166,7 +170,7 @@ public class AuroraCronJobTest extends EasyMockTest {
private void populateStorage(final CronCollisionPolicy policy) {
storage.write(new Storage.MutateWork.NoResult.Quiet() {
@Override
- public void execute(Storage.MutableStoreProvider storeProvider) {
+ public void execute(MutableStoreProvider storeProvider) {
storeProvider.getJobStore().saveAcceptedJob(
MANAGER_ID,
QuartzTestUtil.makeSanitizedCronJob(policy).getSanitizedConfig().getJobConfig());
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9f135dc/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java
index 4739909..bd031a5 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java
@@ -102,6 +102,7 @@ public class MaintenanceControllerImplTest extends EasyMockTest {
expectMaintenanceModeChange(HOST_A, SCHEDULED);
expectFetchTasksByHost(HOST_A, ImmutableSet.of(task));
expect(stateManager.changeState(
+ storageUtil.mutableStoreProvider,
Tasks.id(task),
Optional.<ScheduleStatus>absent(),
ScheduleStatus.DRAINING,
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9f135dc/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
index c602cbd..157921c 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
@@ -36,7 +36,6 @@ import org.apache.aurora.gen.TaskConfig;
import org.apache.aurora.gen.TaskEvent;
import org.apache.aurora.scheduler.TaskIdGenerator;
import org.apache.aurora.scheduler.async.RescheduleCalculator;
-import org.apache.aurora.scheduler.base.JobKeys;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.events.EventSink;
@@ -45,7 +44,6 @@ import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
import org.apache.aurora.scheduler.mesos.Driver;
import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
import org.apache.aurora.scheduler.storage.mem.MemStorage;
@@ -77,7 +75,6 @@ public class StateManagerImplTest extends EasyMockTest {
private static final String HOST_A = "host_a";
private static final Identity JIM = new Identity("jim", "jim-user");
private static final String MY_JOB = "myJob";
- private static final IJobKey JOB_KEY = JobKeys.from(JIM.getRole(), "devel", MY_JOB);
private Driver driver;
private TaskIdGenerator taskIdGenerator;
@@ -96,7 +93,6 @@ public class StateManagerImplTest extends EasyMockTest {
// TODO(William Farner): Use a mocked storage.
storage = MemStorage.newEmptyStorage();
stateManager = new StateManagerImpl(
- storage,
clock,
driver,
taskIdGenerator,
@@ -352,12 +348,12 @@ public class StateManagerImplTest extends EasyMockTest {
insertTask(task, 0);
assignTask(taskId, HOST_A);
- assertFalse(stateManager.changeState(
+ assertFalse(changeState(
taskId,
Optional.of(PENDING),
RUNNING,
Optional.<String>absent()));
- assertTrue(stateManager.changeState(
+ assertTrue(changeState(
taskId,
Optional.of(ASSIGNED),
FAILED,
@@ -368,7 +364,7 @@ public class StateManagerImplTest extends EasyMockTest {
public void testCasTaskNotFound() {
control.replay();
- assertFalse(stateManager.changeState(
+ assertFalse(changeState(
"a",
Optional.of(PENDING),
ASSIGNED,
@@ -378,7 +374,7 @@ public class StateManagerImplTest extends EasyMockTest {
@Test
public void testDeleteTasks() {
ITaskConfig task = makeTask(JIM, MY_JOB);
- String taskId = "a";
+ final String taskId = "a";
expect(taskIdGenerator.generate(task, 0)).andReturn(taskId);
expectStateTransitions(taskId, INIT, PENDING, ASSIGNED, RUNNING, FINISHED);
eventSink.post(matchTasksDeleted(taskId));
@@ -389,7 +385,13 @@ public class StateManagerImplTest extends EasyMockTest {
assignTask(taskId, HOST_A);
changeState(taskId, RUNNING);
changeState(taskId, FINISHED);
- stateManager.deleteTasks(ImmutableSet.of(taskId));
+ storage.write(new Storage.MutateWork.NoResult.Quiet() {
+ @Override
+ protected void execute(Storage.MutableStoreProvider storeProvider) {
+ stateManager.deleteTasks(storeProvider, ImmutableSet.of(taskId));
+ }
+ });
+
}
@Test
@@ -448,7 +450,15 @@ public class StateManagerImplTest extends EasyMockTest {
@Test(expected = IllegalArgumentException.class)
public void insertEmptyPendingInstancesFails() {
control.replay();
- stateManager.insertPendingTasks(makeTask(JIM, MY_JOB), ImmutableSet.<Integer>of());
+ storage.write(new Storage.MutateWork.NoResult.Quiet() {
+ @Override
+ protected void execute(Storage.MutableStoreProvider storeProvider) {
+ stateManager.insertPendingTasks(
+ storeProvider,
+ makeTask(JIM, MY_JOB),
+ ImmutableSet.<Integer>of());
+ }
+ });
}
@Test(expected = IllegalArgumentException.class)
@@ -488,12 +498,36 @@ public class StateManagerImplTest extends EasyMockTest {
}
}
- private void insertTask(ITaskConfig task, int instanceId) {
- stateManager.insertPendingTasks(task, ImmutableSet.of(instanceId));
+ private void insertTask(final ITaskConfig task, final int instanceId) {
+ storage.write(new Storage.MutateWork.NoResult.Quiet() {
+ @Override
+ protected void execute(Storage.MutableStoreProvider storeProvider) {
+ stateManager.insertPendingTasks(storeProvider, task, ImmutableSet.of(instanceId));
+ }
+ });
+ }
+
+ private boolean changeState(
+ final String taskId,
+ final Optional<ScheduleStatus> casState,
+ final ScheduleStatus newState,
+ final Optional<String> auditMessage) {
+
+ return storage.write(new Storage.MutateWork.Quiet<Boolean>() {
+ @Override
+ public Boolean apply(Storage.MutableStoreProvider storeProvider) {
+ return stateManager.changeState(
+ storeProvider,
+ taskId,
+ casState,
+ newState,
+ auditMessage);
+ }
+ });
}
- private boolean changeState(String taskId, ScheduleStatus status) {
- return stateManager.changeState(
+ private boolean changeState(final String taskId, final ScheduleStatus status) {
+ return changeState(
taskId,
Optional.<ScheduleStatus>absent(),
status,
@@ -513,7 +547,17 @@ public class StateManagerImplTest extends EasyMockTest {
assignTask(taskId, host, ImmutableSet.<Integer>of());
}
- private void assignTask(String taskId, String host, Set<Integer> ports) {
- stateManager.assignTask(taskId, host, SlaveID.newBuilder().setValue(host).build(), ports);
+ private void assignTask(final String taskId, final String host, final Set<Integer> ports) {
+ storage.write(new Storage.MutateWork.NoResult.Quiet() {
+ @Override
+ protected void execute(Storage.MutableStoreProvider storeProvider) {
+ stateManager.assignTask(
+ storeProvider,
+ taskId,
+ host,
+ SlaveID.newBuilder().setValue(host).build(),
+ ports);
+ }
+ });
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9f135dc/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
index 8d69ae9..a8a70b6 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
@@ -46,6 +46,7 @@ import org.apache.mesos.Protos.Value.Type;
import org.junit.Before;
import org.junit.Test;
+import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
import static org.apache.mesos.Protos.Offer;
import static org.easymock.EasyMock.expect;
import static org.junit.Assert.assertEquals;
@@ -79,6 +80,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
.setSlaveId(MESOS_OFFER.getSlaveId())
.build();
+ private MutableStoreProvider storeProvider;
private StateManager stateManager;
private SchedulingFilter filter;
private MesosTaskFactory taskFactory;
@@ -87,6 +89,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
@Before
public void setUp() throws Exception {
+ storeProvider = createMock(MutableStoreProvider.class);
stateManager = createMock(StateManager.class);
filter = createMock(SchedulingFilter.class);
taskFactory = createMock(MesosTaskFactory.class);
@@ -106,6 +109,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
emptyJob))
.andReturn(ImmutableSet.<Veto>of());
expect(stateManager.assignTask(
+ storeProvider,
Tasks.id(TASK),
MESOS_OFFER.getHostname(),
MESOS_OFFER.getSlaveId(),
@@ -116,7 +120,9 @@ public class TaskAssignerImplTest extends EasyMockTest {
control.replay();
- assertEquals(Optional.of(TASK_INFO), assigner.maybeAssign(OFFER, TASK, emptyJob));
+ assertEquals(
+ Optional.of(TASK_INFO),
+ assigner.maybeAssign(storeProvider, OFFER, TASK, emptyJob));
}
@Test
@@ -131,6 +137,8 @@ public class TaskAssignerImplTest extends EasyMockTest {
control.replay();
- assertEquals(Optional.<TaskInfo>absent(), assigner.maybeAssign(OFFER, TASK, emptyJob));
+ assertEquals(
+ Optional.<TaskInfo>absent(),
+ assigner.maybeAssign(storeProvider, OFFER, TASK, emptyJob));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9f135dc/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
index 871ddbc..6089032 100644
--- a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
@@ -349,6 +349,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
.andReturn(ENOUGH_QUOTA);
stateManager.insertPendingTasks(
+ storageUtil.mutableStoreProvider,
sanitized.getJobConfig().getTaskConfig(),
sanitized.getInstanceIds());
@@ -371,6 +372,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
.andReturn(ENOUGH_QUOTA);
stateManager.insertPendingTasks(
+ storageUtil.mutableStoreProvider,
sanitized.getJobConfig().getTaskConfig(),
sanitized.getInstanceIds());
@@ -414,6 +416,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
.andReturn(ENOUGH_QUOTA);
stateManager.insertPendingTasks(
+ storageUtil.mutableStoreProvider,
sanitized.getJobConfig().getTaskConfig(),
sanitized.getInstanceIds());
@@ -668,6 +671,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
expect(quotaManager.checkInstanceAddition(ITaskConfig.build(sanitized.getTaskConfig()), 1))
.andReturn(ENOUGH_QUOTA);
stateManager.insertPendingTasks(
+ storageUtil.mutableStoreProvider,
ITaskConfig.build(sanitized.getTaskConfig()),
ImmutableSet.of(0));
@@ -733,6 +737,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
private void expectTransitionsToKilling() {
expect(stateManager.changeState(
+ storageUtil.mutableStoreProvider,
TASK_ID,
Optional.<ScheduleStatus>absent(),
ScheduleStatus.KILLING,
@@ -986,6 +991,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
ScheduleStatus status = ScheduleStatus.FAILED;
expect(stateManager.changeState(
+ storageUtil.mutableStoreProvider,
TASK_ID,
Optional.<ScheduleStatus>absent(),
ScheduleStatus.FAILED,
@@ -1021,6 +1027,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
buildScheduledTask());
expect(stateManager.changeState(
+ storageUtil.mutableStoreProvider,
TASK_ID,
Optional.<ScheduleStatus>absent(),
ScheduleStatus.RESTARTING,
@@ -1972,7 +1979,10 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
expect(taskIdGenerator.generate(populatedTask, 1))
.andReturn(TASK_ID);
expect(quotaManager.checkInstanceAddition(populatedTask, 1)).andReturn(ENOUGH_QUOTA);
- stateManager.insertPendingTasks(populatedTask, ImmutableSet.of(0));
+ stateManager.insertPendingTasks(
+ storageUtil.mutableStoreProvider,
+ populatedTask,
+ ImmutableSet.of(0));
control.replay();
@@ -1993,7 +2003,10 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
expect(taskIdGenerator.generate(populatedTask, 1))
.andReturn(TASK_ID);
expect(quotaManager.checkInstanceAddition(populatedTask, 1)).andReturn(ENOUGH_QUOTA);
- stateManager.insertPendingTasks(populatedTask, ImmutableSet.of(0));
+ stateManager.insertPendingTasks(
+ storageUtil.mutableStoreProvider,
+ populatedTask,
+ ImmutableSet.of(0));
control.replay();
@@ -2112,7 +2125,10 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
expect(taskIdGenerator.generate(populatedTask, 1))
.andReturn(TASK_ID);
expect(quotaManager.checkInstanceAddition(populatedTask, 1)).andReturn(ENOUGH_QUOTA);
- stateManager.insertPendingTasks(populatedTask, ImmutableSet.of(0));
+ stateManager.insertPendingTasks(
+ storageUtil.mutableStoreProvider,
+ populatedTask,
+ ImmutableSet.of(0));
expectLastCall().andThrow(new IllegalArgumentException("instance collision"));
control.replay();
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9f135dc/src/test/java/org/apache/aurora/scheduler/updater/AddTaskTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/AddTaskTest.java b/src/test/java/org/apache/aurora/scheduler/updater/AddTaskTest.java
index f9ed46f..2b255c4 100644
--- a/src/test/java/org/apache/aurora/scheduler/updater/AddTaskTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/updater/AddTaskTest.java
@@ -21,12 +21,13 @@ import org.apache.aurora.gen.JobUpdateSettings;
import org.apache.aurora.gen.JobUpdateStatus;
import org.apache.aurora.scheduler.base.JobKeys;
import org.apache.aurora.scheduler.state.StateManager;
-import org.apache.aurora.scheduler.storage.TaskStore;
import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateInstructions;
import org.junit.Before;
import org.junit.Test;
+import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+
public class AddTaskTest extends EasyMockTest {
private static final IJobUpdateInstructions INSTRUCTIONS = IJobUpdateInstructions.build(
new JobUpdateInstructions()
@@ -36,14 +37,14 @@ public class AddTaskTest extends EasyMockTest {
private static final IInstanceKey INSTANCE =
IInstanceKey.build(new InstanceKey(JobKeys.from("role", "env", "job").newBuilder(), 0));
- private TaskStore taskStore;
+ private MutableStoreProvider storeProvider;
private StateManager stateManager;
private InstanceActionHandler handler;
@Before
public void setUp() {
stateManager = createMock(StateManager.class);
- taskStore = createMock(TaskStore.class);
+ storeProvider = createMock(MutableStoreProvider.class);
handler = new InstanceActionHandler.AddTask();
}
@@ -54,7 +55,7 @@ public class AddTaskTest extends EasyMockTest {
handler.getReevaluationDelay(
INSTANCE,
INSTRUCTIONS,
- taskStore,
+ storeProvider,
stateManager,
JobUpdateStatus.ROLLING_BACK);
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9f135dc/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
index f36e34c..2533d82 100644
--- a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
@@ -14,6 +14,7 @@
package org.apache.aurora.scheduler.updater;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import com.google.common.base.Function;
@@ -222,19 +223,25 @@ public class JobUpdaterIT extends EasyMockTest {
}
private void changeState(
- IJobKey job,
- int instanceId,
+ final IJobKey job,
+ final int instanceId,
ScheduleStatus status,
ScheduleStatus... statuses) {
- for (ScheduleStatus s
+ for (final ScheduleStatus s
: ImmutableList.<ScheduleStatus>builder().add(status).add(statuses).build()) {
- assertTrue(stateManager.changeState(
- getTaskId(job, instanceId),
- Optional.<ScheduleStatus>absent(),
- s,
- Optional.<String>absent()));
+ storage.write(new NoResult.Quiet() {
+ @Override
+ protected void execute(Storage.MutableStoreProvider storeProvider) {
+ assertTrue(stateManager.changeState(
+ storeProvider,
+ getTaskId(job, instanceId),
+ Optional.<ScheduleStatus>absent(),
+ s,
+ Optional.<String>absent()));
+ }
+ });
}
}
@@ -285,10 +292,24 @@ public class JobUpdaterIT extends EasyMockTest {
return expectLastCall();
}
- private void insertInitialTasks(IJobUpdate update) {
- for (IInstanceTaskConfig config : update.getInstructions().getInitialState()) {
- stateManager.insertPendingTasks(config.getTask(), expandInstanceIds(ImmutableSet.of(config)));
- }
+ private void insertPendingTasks(final ITaskConfig task, final Set<Integer> instanceIds) {
+ storage.write(new NoResult.Quiet() {
+ @Override
+ protected void execute(Storage.MutableStoreProvider storeProvider) {
+ stateManager.insertPendingTasks(storeProvider, task, instanceIds);
+ }
+ });
+ }
+
+ private void insertInitialTasks(final IJobUpdate update) {
+ storage.write(new NoResult.Quiet() {
+ @Override
+ protected void execute(Storage.MutableStoreProvider storeProvider) {
+ for (IInstanceTaskConfig config : update.getInstructions().getInitialState()) {
+ insertPendingTasks(config.getTask(), expandInstanceIds(ImmutableSet.of(config)));
+ }
+ }
+ });
}
private void assertJobState(IJobKey job, Map<Integer, ITaskConfig> expected) {
@@ -343,7 +364,7 @@ public class JobUpdaterIT extends EasyMockTest {
assertState(ROLLING_FORWARD, actions.build());
// A task outside the scope of the update should be ignored by the updater.
- stateManager.insertPendingTasks(NEW_CONFIG, ImmutableSet.of(100));
+ insertPendingTasks(NEW_CONFIG, ImmutableSet.of(100));
// Instance 2 is updated
changeState(JOB, 2, KILLED, ASSIGNED, STARTING, RUNNING);
@@ -873,7 +894,7 @@ public class JobUpdaterIT extends EasyMockTest {
control.replay();
IJobUpdate update = makeJobUpdate();
- stateManager.insertPendingTasks(NEW_CONFIG, ImmutableSet.of(0, 1));
+ insertPendingTasks(NEW_CONFIG, ImmutableSet.of(0, 1));
changeState(JOB, 0, ASSIGNED, STARTING, RUNNING);
changeState(JOB, 1, ASSIGNED, STARTING, RUNNING);
[2/2] incubator-aurora git commit: Require StateManager callers to
open their own transactions.
Posted by wf...@apache.org.
Require StateManager callers to open their own transactions.
Reviewed at https://reviews.apache.org/r/26899/
Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/e9f135dc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/e9f135dc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/e9f135dc
Branch: refs/heads/master
Commit: e9f135dc0d9b78d5480c4b37373ce75b29a3d889
Parents: 316f291
Author: Bill Farner <wf...@apache.org>
Authored: Thu Nov 13 18:32:11 2014 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Thu Nov 13 18:32:11 2014 -0800
----------------------------------------------------------------------
.../aurora/scheduler/UserTaskLauncher.java | 26 +-
.../aurora/scheduler/async/Preemptor.java | 28 +-
.../scheduler/async/TaskHistoryPruner.java | 9 +-
.../aurora/scheduler/async/TaskScheduler.java | 17 +-
.../aurora/scheduler/async/TaskThrottler.java | 20 +-
.../aurora/scheduler/async/TaskTimeout.java | 85 ++--
.../scheduler/cron/quartz/AuroraCronJob.java | 36 +-
.../scheduler/state/MaintenanceController.java | 1 +
.../aurora/scheduler/state/StateManager.java | 14 +-
.../scheduler/state/StateManagerImpl.java | 402 +++++++++----------
.../aurora/scheduler/state/TaskAssigner.java | 9 +-
.../thrift/SchedulerThriftInterface.java | 103 +++--
.../scheduler/updater/InstanceAction.java | 3 +
.../updater/InstanceActionHandler.java | 18 +-
.../updater/JobUpdateControllerImpl.java | 68 ++--
.../aurora/scheduler/UserTaskLauncherTest.java | 9 +-
.../scheduler/async/PreemptorImplTest.java | 1 +
.../scheduler/async/TaskHistoryPrunerTest.java | 6 +-
.../scheduler/async/TaskSchedulerImplTest.java | 12 +-
.../scheduler/async/TaskSchedulerTest.java | 46 ++-
.../scheduler/async/TaskThrottlerTest.java | 12 +-
.../aurora/scheduler/async/TaskTimeoutTest.java | 16 +-
.../cron/quartz/AuroraCronJobTest.java | 24 +-
.../state/MaintenanceControllerImplTest.java | 1 +
.../scheduler/state/StateManagerImplTest.java | 76 +++-
.../scheduler/state/TaskAssignerImplTest.java | 12 +-
.../thrift/SchedulerThriftInterfaceTest.java | 22 +-
.../aurora/scheduler/updater/AddTaskTest.java | 9 +-
.../aurora/scheduler/updater/JobUpdaterIT.java | 49 ++-
29 files changed, 695 insertions(+), 439 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9f135dc/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java b/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java
index e1b7d05..80f1d83 100644
--- a/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java
+++ b/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java
@@ -27,6 +27,7 @@ import org.apache.aurora.scheduler.async.OfferQueue;
import org.apache.aurora.scheduler.base.Conversions;
import org.apache.aurora.scheduler.base.SchedulerException;
import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.storage.Storage;
import org.apache.mesos.Protos.OfferID;
import org.apache.mesos.Protos.TaskStatus;
@@ -45,11 +46,13 @@ class UserTaskLauncher implements TaskLauncher {
@VisibleForTesting
static final String MEMORY_LIMIT_DISPLAY = "Task used more memory than requested.";
+ private final Storage storage;
private final OfferQueue offerQueue;
private final StateManager stateManager;
@Inject
- UserTaskLauncher(OfferQueue offerQueue, StateManager stateManager) {
+ UserTaskLauncher(Storage storage, OfferQueue offerQueue, StateManager stateManager) {
+ this.storage = requireNonNull(storage);
this.offerQueue = requireNonNull(offerQueue);
this.stateManager = requireNonNull(stateManager);
}
@@ -63,14 +66,14 @@ class UserTaskLauncher implements TaskLauncher {
}
@Override
- public synchronized boolean statusUpdate(TaskStatus status) {
+ public synchronized boolean statusUpdate(final TaskStatus status) {
@Nullable String message = null;
if (status.hasMessage()) {
message = status.getMessage();
}
try {
- ScheduleStatus translatedState = Conversions.convertProtoState(status.getState());
+ final ScheduleStatus translatedState = Conversions.convertProtoState(status.getState());
// TODO(William Farner): Remove this hack once Mesos API change is done.
// Tracked by: https://issues.apache.org/jira/browse/MESOS-343
if (translatedState == ScheduleStatus.FAILED
@@ -79,11 +82,18 @@ class UserTaskLauncher implements TaskLauncher {
message = MEMORY_LIMIT_DISPLAY;
}
- stateManager.changeState(
- status.getTaskId().getValue(),
- Optional.<ScheduleStatus>absent(),
- translatedState,
- Optional.fromNullable(message));
+ final String auditMessage = message;
+ storage.write(new Storage.MutateWork.NoResult.Quiet() {
+ @Override
+ protected void execute(Storage.MutableStoreProvider storeProvider) {
+ stateManager.changeState(
+ storeProvider,
+ status.getTaskId().getValue(),
+ Optional.<ScheduleStatus>absent(),
+ translatedState,
+ Optional.fromNullable(auditMessage));
+ }
+ });
} catch (SchedulerException e) {
LOG.log(Level.WARNING, "Failed to update status for: " + status, e);
throw e;
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9f135dc/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java b/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
index 1d337f6..ff26c49 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
@@ -65,6 +65,8 @@ import static java.util.Objects.requireNonNull;
import static org.apache.aurora.gen.ScheduleStatus.PENDING;
import static org.apache.aurora.gen.ScheduleStatus.PREEMPTING;
import static org.apache.aurora.scheduler.base.Tasks.SCHEDULED_TO_ASSIGNED;
+import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import static org.apache.aurora.scheduler.storage.Storage.MutateWork;
import static org.apache.aurora.scheduler.storage.Storage.StoreProvider;
import static org.apache.aurora.scheduler.storage.Storage.Work;
@@ -351,7 +353,7 @@ public interface Preemptor {
return Optional.absent();
}
- IAssignedTask pendingTask = Iterables.getOnlyElement(pendingTasks);
+ final IAssignedTask pendingTask = Iterables.getOnlyElement(pendingTasks);
Multimap<String, IAssignedTask> slavesToActiveTasks = getSlavesToActiveTasks();
@@ -371,21 +373,27 @@ public interface Preemptor {
.build();
for (String slaveID : allSlaves) {
- Optional<Set<IAssignedTask>> toPreemptTasks = getTasksToPreempt(
+ final Optional<Set<IAssignedTask>> toPreemptTasks = getTasksToPreempt(
slavesToActiveTasks.get(slaveID),
slavesToOffers.get(slaveID),
pendingTask,
attributeAggregate);
if (toPreemptTasks.isPresent()) {
- for (IAssignedTask toPreempt : toPreemptTasks.get()) {
- stateManager.changeState(
- toPreempt.getTaskId(),
- Optional.<ScheduleStatus>absent(),
- PREEMPTING,
- Optional.of("Preempting in favor of " + pendingTask.getTaskId()));
- tasksPreempted.incrementAndGet();
- }
+ storage.write(new MutateWork.NoResult.Quiet() {
+ @Override
+ protected void execute(MutableStoreProvider storeProvider) {
+ for (IAssignedTask toPreempt : toPreemptTasks.get()) {
+ stateManager.changeState(
+ storeProvider,
+ toPreempt.getTaskId(),
+ Optional.<ScheduleStatus>absent(),
+ PREEMPTING,
+ Optional.of("Preempting in favor of " + pendingTask.getTaskId()));
+ tasksPreempted.incrementAndGet();
+ }
+ }
+ });
return Optional.of(slaveID);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9f135dc/src/main/java/org/apache/aurora/scheduler/async/TaskHistoryPruner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskHistoryPruner.java b/src/main/java/org/apache/aurora/scheduler/async/TaskHistoryPruner.java
index 345cd89..58d074b 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskHistoryPruner.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskHistoryPruner.java
@@ -121,9 +121,14 @@ public class TaskHistoryPruner implements EventSubscriber {
}
}
- private void deleteTasks(Set<String> taskIds) {
+ private void deleteTasks(final Set<String> taskIds) {
LOG.info("Pruning inactive tasks " + taskIds);
- stateManager.deleteTasks(taskIds);
+ storage.write(new Storage.MutateWork.NoResult.Quiet() {
+ @Override
+ protected void execute(Storage.MutableStoreProvider storeProvider) {
+ stateManager.deleteTasks(storeProvider, taskIds);
+ }
+ });
}
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9f135dc/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
index e2ba8b8..6bfa3ac 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
@@ -128,10 +128,14 @@ public interface TaskScheduler extends EventSubscriber {
}
private Function<HostOffer, Optional<TaskInfo>> getAssignerFunction(
+ final MutableStoreProvider storeProvider,
final AttributeAggregate attributeAggregate,
final String taskId,
final IScheduledTask task) {
+ // TODO(wfarner): Turn this into Predicate<Offer>, and in the caller, find the first match
+ // and perform the assignment at the very end. This will allow us to use optimistic locking
+ // at the top of the stack and avoid holding the write lock for too long.
return new Function<HostOffer, Optional<TaskInfo>>() {
@Override
public Optional<TaskInfo> apply(HostOffer offer) {
@@ -140,14 +144,14 @@ public interface TaskScheduler extends EventSubscriber {
if (reservedTaskId.isPresent()) {
if (taskId.equals(reservedTaskId.get())) {
// Slave is reserved to satisfy this task.
- return assigner.maybeAssign(offer, task, attributeAggregate);
+ return assigner.maybeAssign(storeProvider, offer, task, attributeAggregate);
} else {
// Slave is reserved for another task.
return Optional.absent();
}
} else {
// Slave is not reserved.
- return assigner.maybeAssign(offer, task, attributeAggregate);
+ return assigner.maybeAssign(storeProvider, offer, task, attributeAggregate);
}
}
};
@@ -194,7 +198,7 @@ public interface TaskScheduler extends EventSubscriber {
AttributeAggregate aggregate =
getJobState(store, Tasks.SCHEDULED_TO_JOB_KEY.apply(task));
try {
- if (!offerQueue.launchFirst(getAssignerFunction(aggregate, taskId, task))) {
+ if (!offerQueue.launchFirst(getAssignerFunction(store, aggregate, taskId, task))) {
// Task could not be scheduled.
maybePreemptFor(taskId, aggregate);
attemptsNoMatch.incrementAndGet();
@@ -209,7 +213,12 @@ public interface TaskScheduler extends EventSubscriber {
// It is in the LOST state and a new task will move to PENDING to replace it.
// Should the state change fail due to storage issues, that's okay. The task will
// time out in the ASSIGNED state and be moved to LOST.
- stateManager.changeState(taskId, Optional.of(PENDING), LOST, LAUNCH_FAILED_MSG);
+ stateManager.changeState(
+ store,
+ taskId,
+ Optional.of(PENDING),
+ LOST,
+ LAUNCH_FAILED_MSG);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9f135dc/src/main/java/org/apache/aurora/scheduler/async/TaskThrottler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskThrottler.java b/src/main/java/org/apache/aurora/scheduler/async/TaskThrottler.java
index ca6129c..f0dea48 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskThrottler.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskThrottler.java
@@ -27,6 +27,7 @@ import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.storage.Storage;
import static java.util.Objects.requireNonNull;
@@ -44,6 +45,7 @@ class TaskThrottler implements EventSubscriber {
private final RescheduleCalculator rescheduleCalculator;
private final Clock clock;
private final ScheduledExecutorService executor;
+ private final Storage storage;
private final StateManager stateManager;
private final SlidingStats throttleStats = new SlidingStats("task_throttle", "ms");
@@ -53,11 +55,13 @@ class TaskThrottler implements EventSubscriber {
RescheduleCalculator rescheduleCalculator,
Clock clock,
ScheduledExecutorService executor,
+ Storage storage,
StateManager stateManager) {
this.rescheduleCalculator = requireNonNull(rescheduleCalculator);
this.clock = requireNonNull(clock);
this.executor = requireNonNull(executor);
+ this.storage = requireNonNull(storage);
this.stateManager = requireNonNull(stateManager);
}
@@ -72,11 +76,17 @@ class TaskThrottler implements EventSubscriber {
new Runnable() {
@Override
public void run() {
- stateManager.changeState(
- stateChange.getTaskId(),
- Optional.of(THROTTLED),
- PENDING,
- Optional.<String>absent());
+ storage.write(new Storage.MutateWork.NoResult.Quiet() {
+ @Override
+ protected void execute(Storage.MutableStoreProvider storeProvider) {
+ stateManager.changeState(
+ storeProvider,
+ stateChange.getTaskId(),
+ Optional.of(THROTTLED),
+ PENDING,
+ Optional.<String>absent());
+ }
+ });
}
},
delayMs,
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9f135dc/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java b/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java
index 8217c51..90e6149 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java
@@ -33,9 +33,12 @@ import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.storage.Storage;
import static java.util.Objects.requireNonNull;
+import static org.apache.aurora.scheduler.storage.Storage.MutateWork;
+
/**
* Observes task transitions and identifies tasks that are 'stuck' in a transient state. Stuck
* tasks will be transitioned to the LOST state.
@@ -61,6 +64,7 @@ class TaskTimeout extends AbstractIdleService implements EventSubscriber {
ScheduleStatus.DRAINING);
private final ScheduledExecutorService executor;
+ private final Storage storage;
private final StateManager stateManager;
private final Amount<Long, Time> timeout;
private final AtomicLong timedOutTasks;
@@ -68,11 +72,13 @@ class TaskTimeout extends AbstractIdleService implements EventSubscriber {
@Inject
TaskTimeout(
ScheduledExecutorService executor,
+ Storage storage,
StateManager stateManager,
Amount<Long, Time> timeout,
StatsProvider statsProvider) {
this.executor = requireNonNull(executor);
+ this.storage = requireNonNull(storage);
this.stateManager = requireNonNull(stateManager);
this.timeout = requireNonNull(timeout);
this.timedOutTasks = statsProvider.makeCounter(TIMED_OUT_TASKS_COUNTER);
@@ -93,41 +99,56 @@ class TaskTimeout extends AbstractIdleService implements EventSubscriber {
// Nothing to do for shutting down.
}
+ private class TimedOutTaskHandler implements Runnable {
+ private final String taskId;
+ private final ScheduleStatus newState;
+
+ TimedOutTaskHandler(String taskId, ScheduleStatus newState) {
+ this.taskId = taskId;
+ this.newState = newState;
+ }
+
+ @Override
+ public void run() {
+ if (isRunning()) {
+ // This query acts as a CAS by including the state that we expect the task to be in
+ // if the timeout is still valid. Ideally, the future would have already been
+ // canceled, but in the event of a state transition race, including transientState
+ // prevents an unintended task timeout.
+ // Note: This requires LOST transitions trigger Driver.killTask.
+ boolean movedToLost = storage.write(new MutateWork.Quiet<Boolean>() {
+ @Override
+ public Boolean apply(Storage.MutableStoreProvider storeProvider) {
+ return stateManager.changeState(
+ storeProvider,
+ taskId,
+ Optional.of(newState),
+ ScheduleStatus.LOST,
+ TIMEOUT_MESSAGE);
+ }
+ });
+
+ if (movedToLost) {
+ LOG.info("Timeout reached for task " + taskId + ":" + taskId);
+ timedOutTasks.incrementAndGet();
+ }
+ } else {
+ // Our service is not yet started. We don't want to lose track of the task, so
+ // we will try again later.
+ LOG.fine("Retrying timeout of task " + taskId + " in " + NOT_STARTED_RETRY);
+ executor.schedule(
+ this,
+ NOT_STARTED_RETRY.getValue(),
+ NOT_STARTED_RETRY.getUnit().getTimeUnit());
+ }
+ }
+ }
+
@Subscribe
public void recordStateChange(TaskStateChange change) {
- final String taskId = change.getTaskId();
- final ScheduleStatus newState = change.getNewState();
- if (isTransient(newState)) {
+ if (isTransient(change.getNewState())) {
executor.schedule(
- new Runnable() {
- @Override
- public void run() {
- if (isRunning()) {
- // This query acts as a CAS by including the state that we expect the task to be in
- // if the timeout is still valid. Ideally, the future would have already been
- // canceled, but in the event of a state transition race, including transientState
- // prevents an unintended task timeout.
- // Note: This requires LOST transitions trigger Driver.killTask.
- if (stateManager.changeState(
- taskId,
- Optional.of(newState),
- ScheduleStatus.LOST,
- TIMEOUT_MESSAGE)) {
-
- LOG.info("Timeout reached for task " + taskId + ":" + taskId);
- timedOutTasks.incrementAndGet();
- }
- } else {
- // Our service is not yet started. We don't want to lose track of the task, so
- // we will try again later.
- LOG.fine("Retrying timeout of task " + taskId + " in " + NOT_STARTED_RETRY);
- executor.schedule(
- this,
- NOT_STARTED_RETRY.getValue(),
- NOT_STARTED_RETRY.getUnit().getTimeUnit());
- }
- }
- },
+ new TimedOutTaskHandler(change.getTaskId(), change.getNewState()),
timeout.getValue(),
timeout.getUnit().getTimeUnit());
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9f135dc/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java b/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java
index 9388657..84e37e4 100644
--- a/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java
+++ b/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java
@@ -152,7 +152,7 @@ class AuroraCronJob implements Job {
ITaskConfig task = cronJob.getSanitizedConfig().getJobConfig().getTaskConfig();
Set<Integer> instanceIds = cronJob.getSanitizedConfig().getInstanceIds();
if (activeTasks.isEmpty()) {
- stateManager.insertPendingTasks(task, instanceIds);
+ stateManager.insertPendingTasks(storeProvider, task, instanceIds);
return Optional.absent();
}
@@ -182,13 +182,20 @@ class AuroraCronJob implements Job {
return;
}
- for (String taskId : deferredLaunch.get().activeTaskIds) {
- stateManager.changeState(
- taskId,
- Optional.<ScheduleStatus>absent(),
- KILLING,
- KILL_AUDIT_MESSAGE);
- }
+ storage.write(new Storage.MutateWork.NoResult.Quiet() {
+ @Override
+ protected void execute(Storage.MutableStoreProvider storeProvider) {
+ for (String taskId : deferredLaunch.get().activeTaskIds) {
+ stateManager.changeState(
+ storeProvider,
+ taskId,
+ Optional.<ScheduleStatus>absent(),
+ KILLING,
+ KILL_AUDIT_MESSAGE);
+ }
+ }
+ });
+
LOG.info(String.format("Waiting for job to terminate before launching cron job %s.", path));
final Query.Builder query = Query.taskScoped(deferredLaunch.get().activeTaskIds).active();
@@ -200,9 +207,16 @@ class AuroraCronJob implements Job {
public Boolean get() {
if (Storage.Util.consistentFetchTasks(storage, query).isEmpty()) {
LOG.info("Initiating delayed launch of cron " + path);
- stateManager.insertPendingTasks(
- deferredLaunch.get().task,
- deferredLaunch.get().instanceIds);
+ storage.write(new Storage.MutateWork.NoResult.Quiet() {
+ @Override
+ protected void execute(Storage.MutableStoreProvider storeProvider) {
+ stateManager.insertPendingTasks(
+ storeProvider,
+ deferredLaunch.get().task,
+ deferredLaunch.get().instanceIds);
+ }
+ });
+
return true;
} else {
LOG.info("Not yet safe to run cron " + path);
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9f135dc/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java b/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
index 86440eb..a835eaa 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
@@ -127,6 +127,7 @@ public interface MaintenanceController {
LOG.info(String.format("Draining tasks: %s on host: %s", activeTasks, host));
for (String taskId : activeTasks) {
stateManager.changeState(
+ store,
taskId,
Optional.<ScheduleStatus>absent(),
ScheduleStatus.DRAINING,
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9f135dc/src/main/java/org/apache/aurora/scheduler/state/StateManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateManager.java b/src/main/java/org/apache/aurora/scheduler/state/StateManager.java
index 3a2fd27..50ff4ec 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/StateManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateManager.java
@@ -22,6 +22,8 @@ import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
import org.apache.mesos.Protos.SlaveID;
+import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+
/**
* A manager for the state of tasks. Most modifications to tasks should be made here, especially
* those that alter the {@link ScheduleStatus} of tasks.
@@ -47,6 +49,7 @@ public interface StateManager {
*
*/
boolean changeState(
+ MutableStoreProvider storeProvider,
String taskId,
Optional<ScheduleStatus> casState,
ScheduleStatus newState,
@@ -56,6 +59,7 @@ public interface StateManager {
* Assigns a task to a specific slave.
* This will modify the task record to reflect the host assignment and return the updated record.
*
+ * @param storeProvider Storage provider.
* @param taskId ID of the task to mutate.
* @param slaveHost Host name that the task is being assigned to.
* @param slaveId ID of the slave that the task is being assigned to.
@@ -63,6 +67,7 @@ public interface StateManager {
* @return The updated task record, or {@code null} if the task was not found.
*/
IAssignedTask assignTask(
+ MutableStoreProvider storeProvider,
String taskId,
String slaveHost,
SlaveID slaveId,
@@ -72,17 +77,22 @@ public interface StateManager {
* Inserts pending instances using {@code task} as their configuration. Tasks will immediately
* move into PENDING and will be eligible for scheduling.
*
+ * @param storeProvider Storage provider.
* @param task Task template.
* @param instanceIds Instance IDs to assign to new PENDING tasks.
*/
- void insertPendingTasks(ITaskConfig task, Set<Integer> instanceIds);
+ void insertPendingTasks(
+ MutableStoreProvider storeProvider,
+ ITaskConfig task,
+ Set<Integer> instanceIds);
/**
* Attempts to delete tasks from the task store.
* If the task is not currently in a state that is considered safe for deletion,
* side-effect actions will be performed to reconcile the state conflict.
*
+ * @param storeProvider Storage provider.
* @param taskIds IDs of tasks to delete.
*/
- void deleteTasks(final Set<String> taskIds);
+ void deleteTasks(MutableStoreProvider storeProvider, Set<String> taskIds);
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9f135dc/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
index 6663555..bd6a05f 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
@@ -53,9 +53,7 @@ import org.apache.aurora.scheduler.events.EventSink;
import org.apache.aurora.scheduler.events.PubsubEvent;
import org.apache.aurora.scheduler.mesos.Driver;
import org.apache.aurora.scheduler.state.SideEffect.Action;
-import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork;
import org.apache.aurora.scheduler.storage.TaskStore;
import org.apache.aurora.scheduler.storage.TaskStore.Mutable.TaskMutation;
import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
@@ -79,7 +77,6 @@ import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
public class StateManagerImpl implements StateManager {
private static final Logger LOG = Logger.getLogger(StateManagerImpl.class.getName());
- private final Storage storage;
private final Clock clock;
private final Driver driver;
private final TaskIdGenerator taskIdGenerator;
@@ -88,14 +85,12 @@ public class StateManagerImpl implements StateManager {
@Inject
StateManagerImpl(
- final Storage storage,
final Clock clock,
Driver driver,
TaskIdGenerator taskIdGenerator,
EventSink eventSink,
RescheduleCalculator rescheduleCalculator) {
- this.storage = requireNonNull(storage);
this.clock = requireNonNull(clock);
this.driver = requireNonNull(driver);
this.taskIdGenerator = requireNonNull(taskIdGenerator);
@@ -114,12 +109,17 @@ public class StateManagerImpl implements StateManager {
}
@Override
- public void insertPendingTasks(final ITaskConfig task, final Set<Integer> instanceIds) {
+ public void insertPendingTasks(
+ MutableStoreProvider storeProvider,
+ final ITaskConfig task,
+ Set<Integer> instanceIds) {
+
+ requireNonNull(storeProvider);
requireNonNull(task);
checkNotBlank(instanceIds);
// Done outside the write transaction to minimize the work done inside a transaction.
- final Set<IScheduledTask> scheduledTasks = FluentIterable.from(instanceIds)
+ Set<IScheduledTask> scheduledTasks = FluentIterable.from(instanceIds)
.transform(new Function<Integer, IScheduledTask>() {
@Override
public IScheduledTask apply(Integer instanceId) {
@@ -127,45 +127,48 @@ public class StateManagerImpl implements StateManager {
}
}).toSet();
- storage.write(new MutateWork.NoResult.Quiet() {
- @Override
- protected void execute(MutableStoreProvider storeProvider) {
- ImmutableSet<IScheduledTask> existingTasks = storeProvider.getTaskStore().fetchTasks(
- Query.jobScoped(task.getJob()).active());
+ ImmutableSet<IScheduledTask> existingTasks = storeProvider.getTaskStore().fetchTasks(
+ Query.jobScoped(task.getJob()).active());
- Set<Integer> existingInstanceIds =
- FluentIterable.from(existingTasks).transform(Tasks.SCHEDULED_TO_INSTANCE_ID).toSet();
+ Set<Integer> existingInstanceIds =
+ FluentIterable.from(existingTasks).transform(Tasks.SCHEDULED_TO_INSTANCE_ID).toSet();
- if (!Sets.intersection(existingInstanceIds, instanceIds).isEmpty()) {
- throw new IllegalArgumentException("Instance ID collision detected.");
- }
+ if (!Sets.intersection(existingInstanceIds, instanceIds).isEmpty()) {
+ throw new IllegalArgumentException("Instance ID collision detected.");
+ }
- storeProvider.getUnsafeTaskStore().saveTasks(scheduledTasks);
+ storeProvider.getUnsafeTaskStore().saveTasks(scheduledTasks);
- for (IScheduledTask task : scheduledTasks) {
- updateTaskAndExternalState(
- Tasks.id(task),
- Optional.of(task),
- Optional.of(PENDING),
- Optional.<String>absent());
- }
- }
- });
+ for (IScheduledTask scheduledTask : scheduledTasks) {
+ updateTaskAndExternalState(
+ storeProvider.getUnsafeTaskStore(),
+ Tasks.id(scheduledTask),
+ Optional.of(scheduledTask),
+ Optional.of(PENDING),
+ Optional.<String>absent());
+ }
}
@Override
public boolean changeState(
+ MutableStoreProvider storeProvider,
String taskId,
Optional<ScheduleStatus> casState,
final ScheduleStatus newState,
final Optional<String> auditMessage) {
- return updateTaskAndExternalState(casState, taskId, newState, auditMessage);
+ return updateTaskAndExternalState(
+ storeProvider.getUnsafeTaskStore(),
+ casState,
+ taskId,
+ newState,
+ auditMessage);
}
@Override
public IAssignedTask assignTask(
- final String taskId,
+ MutableStoreProvider storeProvider,
+ String taskId,
final String slaveHost,
final SlaveID slaveId,
final Set<Integer> assignedPorts) {
@@ -175,39 +178,35 @@ public class StateManagerImpl implements StateManager {
requireNonNull(slaveId);
requireNonNull(assignedPorts);
- return storage.write(new MutateWork.Quiet<IAssignedTask>() {
- @Override
- public IAssignedTask apply(MutableStoreProvider storeProvider) {
- boolean success = updateTaskAndExternalState(
- Optional.<ScheduleStatus>absent(),
- taskId,
- ASSIGNED,
- Optional.<String>absent());
-
- Preconditions.checkState(
- success,
- "Attempt to assign task " + taskId + " to " + slaveHost + " failed");
- Query.Builder query = Query.taskScoped(taskId);
- storeProvider.getUnsafeTaskStore().mutateTasks(query,
- new Function<IScheduledTask, IScheduledTask>() {
- @Override
- public IScheduledTask apply(IScheduledTask task) {
- ScheduledTask builder = task.newBuilder();
- AssignedTask assigned = builder.getAssignedTask();
- assigned.setAssignedPorts(
- getNameMappedPorts(assigned.getTask().getRequestedPorts(), assignedPorts));
- assigned.setSlaveHost(slaveHost)
- .setSlaveId(slaveId.getValue());
- return IScheduledTask.build(builder);
- }
- });
-
- return Iterables.getOnlyElement(
- Iterables.transform(
- storeProvider.getTaskStore().fetchTasks(query),
- Tasks.SCHEDULED_TO_ASSIGNED));
- }
- });
+ boolean success = updateTaskAndExternalState(
+ storeProvider.getUnsafeTaskStore(),
+ Optional.<ScheduleStatus>absent(),
+ taskId,
+ ASSIGNED,
+ Optional.<String>absent());
+
+ Preconditions.checkState(
+ success,
+ "Attempt to assign task " + taskId + " to " + slaveHost + " failed");
+ Query.Builder query = Query.taskScoped(taskId);
+ storeProvider.getUnsafeTaskStore().mutateTasks(query,
+ new Function<IScheduledTask, IScheduledTask>() {
+ @Override
+ public IScheduledTask apply(IScheduledTask task) {
+ ScheduledTask builder = task.newBuilder();
+ AssignedTask assigned = builder.getAssignedTask();
+ assigned.setAssignedPorts(
+ getNameMappedPorts(assigned.getTask().getRequestedPorts(), assignedPorts));
+ assigned.setSlaveHost(slaveHost)
+ .setSlaveId(slaveId.getValue());
+ return IScheduledTask.build(builder);
+ }
+ });
+
+ return Iterables.getOnlyElement(
+ Iterables.transform(
+ storeProvider.getTaskStore().fetchTasks(query),
+ Tasks.SCHEDULED_TO_ASSIGNED));
}
private static Map<String, Integer> getNameMappedPorts(
@@ -250,32 +249,29 @@ public class StateManagerImpl implements StateManager {
});
private boolean updateTaskAndExternalState(
- final Optional<ScheduleStatus> casState,
- final String taskId,
- final ScheduleStatus targetState,
- final Optional<String> transitionMessage) {
+ TaskStore.Mutable taskStore,
+ Optional<ScheduleStatus> casState,
+ String taskId,
+ ScheduleStatus targetState,
+ Optional<String> transitionMessage) {
- return storage.write(new MutateWork.Quiet<Boolean>() {
- @Override
- public Boolean apply(MutableStoreProvider storeProvider) {
- Optional<IScheduledTask> task = Optional.fromNullable(Iterables.getOnlyElement(
- storeProvider.getTaskStore().fetchTasks(Query.taskScoped(taskId)),
- null));
+ Optional<IScheduledTask> task = Optional.fromNullable(Iterables.getOnlyElement(
+ taskStore.fetchTasks(Query.taskScoped(taskId)),
+ null));
- // CAS operation fails if the task does not exist, or the states don't match.
- if (casState.isPresent()
- && (!task.isPresent() || casState.get() != task.get().getStatus())) {
+ // CAS operation fails if the task does not exist, or the states don't match.
+ if (casState.isPresent()
+ && (!task.isPresent() || casState.get() != task.get().getStatus())) {
- return false;
- }
+ return false;
+ }
- return updateTaskAndExternalState(
- taskId,
- task,
- Optional.of(targetState),
- transitionMessage);
- }
- });
+ return updateTaskAndExternalState(
+ taskStore,
+ taskId,
+ task,
+ Optional.of(targetState),
+ transitionMessage);
}
private static final Function<SideEffect, Action> GET_ACTION =
@@ -308,7 +304,8 @@ public class StateManagerImpl implements StateManager {
Ordering.explicit(ACTIONS_IN_ORDER).onResultOf(GET_ACTION);
private boolean updateTaskAndExternalState(
- final String taskId,
+ TaskStore.Mutable taskStore,
+ String taskId,
// Note: This argument is deliberately non-final, and should not be made final.
// This is because using the captured value within the storage operation below is
// highly-risky, since it doesn't necessarily represent the value in storage.
@@ -327,111 +324,106 @@ public class StateManagerImpl implements StateManager {
? new TaskStateMachine(task.get())
: new TaskStateMachine(taskId);
- boolean success = storage.write(new MutateWork.Quiet<Boolean>() {
- @Override
- public Boolean apply(MutableStoreProvider storeProvider) {
- TransitionResult result = stateMachine.updateState(targetState);
- Query.Builder query = Query.taskScoped(taskId);
-
- for (SideEffect sideEffect : ACTION_ORDER.sortedCopy(result.getSideEffects())) {
- Optional<IScheduledTask> upToDateTask = Optional.fromNullable(
- Iterables.getOnlyElement(storeProvider.getTaskStore().fetchTasks(query), null));
-
- switch (sideEffect.getAction()) {
- case INCREMENT_FAILURES:
- storeProvider.getUnsafeTaskStore().mutateTasks(query, new TaskMutation() {
- @Override
- public IScheduledTask apply(IScheduledTask task) {
- return IScheduledTask.build(
- task.newBuilder().setFailureCount(task.getFailureCount() + 1));
- }
- });
- break;
-
- case SAVE_STATE:
- Preconditions.checkState(
- upToDateTask.isPresent(),
- "Operation expected task " + taskId + " to be present.");
-
- storeProvider.getUnsafeTaskStore().mutateTasks(query, new TaskMutation() {
- @Override
- public IScheduledTask apply(IScheduledTask task) {
- ScheduledTask mutableTask = task.newBuilder();
- mutableTask.setStatus(targetState.get());
- mutableTask.addToTaskEvents(new TaskEvent()
- .setTimestamp(clock.nowMillis())
- .setStatus(targetState.get())
- .setMessage(transitionMessage.orNull())
- .setScheduler(LOCAL_HOST_SUPPLIER.get()));
- return IScheduledTask.build(mutableTask);
- }
- });
- events.add(
- PubsubEvent.TaskStateChange.transition(
- Iterables.getOnlyElement(storeProvider.getTaskStore().fetchTasks(query)),
- stateMachine.getPreviousState()));
- break;
-
- case STATE_CHANGE:
- updateTaskAndExternalState(
- Optional.<ScheduleStatus>absent(),
- taskId,
- sideEffect.getNextState().get(),
- Optional.<String>absent());
- break;
-
- case RESCHEDULE:
- Preconditions.checkState(
- upToDateTask.isPresent(),
- "Operation expected task " + taskId + " to be present.");
- LOG.info("Task being rescheduled: " + taskId);
-
- ScheduleStatus newState;
- String auditMessage;
- long flapPenaltyMs = rescheduleCalculator.getFlappingPenaltyMs(upToDateTask.get());
- if (flapPenaltyMs > 0) {
- newState = THROTTLED;
- auditMessage =
- String.format("Rescheduled, penalized for %s ms for flapping", flapPenaltyMs);
- } else {
- newState = PENDING;
- auditMessage = "Rescheduled";
- }
-
- IScheduledTask newTask = IScheduledTask.build(createTask(
- upToDateTask.get().getAssignedTask().getInstanceId(),
- upToDateTask.get().getAssignedTask().getTask())
- .newBuilder()
- .setFailureCount(upToDateTask.get().getFailureCount())
- .setAncestorId(taskId));
- storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(newTask));
- updateTaskAndExternalState(
- Tasks.id(newTask),
- Optional.of(newTask),
- Optional.of(newState),
- Optional.of(auditMessage));
- break;
-
- case KILL:
- driver.killTask(taskId);
- break;
-
- case DELETE:
- Preconditions.checkState(
- upToDateTask.isPresent(),
- "Operation expected task " + taskId + " to be present.");
-
- events.add(deleteTasks(storeProvider, ImmutableSet.of(taskId)));
- break;
-
- default:
- throw new IllegalStateException("Unrecognized side-effect " + sideEffect.getAction());
+ TransitionResult result = stateMachine.updateState(targetState);
+ Query.Builder query = Query.taskScoped(taskId);
+
+ for (SideEffect sideEffect : ACTION_ORDER.sortedCopy(result.getSideEffects())) {
+ Optional<IScheduledTask> upToDateTask = Optional.fromNullable(
+ Iterables.getOnlyElement(taskStore.fetchTasks(query), null));
+
+ switch (sideEffect.getAction()) {
+ case INCREMENT_FAILURES:
+ taskStore.mutateTasks(query, new TaskMutation() {
+ @Override
+ public IScheduledTask apply(IScheduledTask task) {
+ return IScheduledTask.build(
+ task.newBuilder().setFailureCount(task.getFailureCount() + 1));
+ }
+ });
+ break;
+
+ case SAVE_STATE:
+ Preconditions.checkState(
+ upToDateTask.isPresent(),
+ "Operation expected task " + taskId + " to be present.");
+
+ taskStore.mutateTasks(query, new TaskMutation() {
+ @Override
+ public IScheduledTask apply(IScheduledTask task) {
+ ScheduledTask mutableTask = task.newBuilder();
+ mutableTask.setStatus(targetState.get());
+ mutableTask.addToTaskEvents(new TaskEvent()
+ .setTimestamp(clock.nowMillis())
+ .setStatus(targetState.get())
+ .setMessage(transitionMessage.orNull())
+ .setScheduler(LOCAL_HOST_SUPPLIER.get()));
+ return IScheduledTask.build(mutableTask);
+ }
+ });
+ events.add(
+ PubsubEvent.TaskStateChange.transition(
+ Iterables.getOnlyElement(taskStore.fetchTasks(query)),
+ stateMachine.getPreviousState()));
+ break;
+
+ case STATE_CHANGE:
+ updateTaskAndExternalState(
+ taskStore,
+ Optional.<ScheduleStatus>absent(),
+ taskId,
+ sideEffect.getNextState().get(),
+ Optional.<String>absent());
+ break;
+
+ case RESCHEDULE:
+ Preconditions.checkState(
+ upToDateTask.isPresent(),
+ "Operation expected task " + taskId + " to be present.");
+ LOG.info("Task being rescheduled: " + taskId);
+
+ ScheduleStatus newState;
+ String auditMessage;
+ long flapPenaltyMs = rescheduleCalculator.getFlappingPenaltyMs(upToDateTask.get());
+ if (flapPenaltyMs > 0) {
+ newState = THROTTLED;
+ auditMessage =
+ String.format("Rescheduled, penalized for %s ms for flapping", flapPenaltyMs);
+ } else {
+ newState = PENDING;
+ auditMessage = "Rescheduled";
}
- }
- return result.isSuccess();
+ IScheduledTask newTask = IScheduledTask.build(createTask(
+ upToDateTask.get().getAssignedTask().getInstanceId(),
+ upToDateTask.get().getAssignedTask().getTask())
+ .newBuilder()
+ .setFailureCount(upToDateTask.get().getFailureCount())
+ .setAncestorId(taskId));
+ taskStore.saveTasks(ImmutableSet.of(newTask));
+ updateTaskAndExternalState(
+ taskStore,
+ Tasks.id(newTask),
+ Optional.of(newTask),
+ Optional.of(newState),
+ Optional.of(auditMessage));
+ break;
+
+ case KILL:
+ driver.killTask(taskId);
+ break;
+
+ case DELETE:
+ Preconditions.checkState(
+ upToDateTask.isPresent(),
+ "Operation expected task " + taskId + " to be present.");
+
+ events.add(deleteTasks(taskStore, ImmutableSet.of(taskId)));
+ break;
+
+ default:
+ throw new IllegalStateException("Unrecognized side-effect " + sideEffect.getAction());
}
- });
+ }
// Note (AURORA-138): Delaying events until after the write operation is somewhat futile, since
// the state may actually not be written to durable store
@@ -442,32 +434,26 @@ public class StateManagerImpl implements StateManager {
eventSink.post(event);
}
- return success;
+ return result.isSuccess();
}
@Override
- public void deleteTasks(final Set<String> taskIds) {
- storage.write(new MutateWork.NoResult.Quiet() {
- @Override
- protected void execute(final MutableStoreProvider storeProvider) {
-
- Map<String, IScheduledTask> tasks = Maps.uniqueIndex(
- storeProvider.getTaskStore().fetchTasks(Query.taskScoped(taskIds)),
- Tasks.SCHEDULED_TO_ID);
-
- for (Map.Entry<String, IScheduledTask> entry : tasks.entrySet()) {
- updateTaskAndExternalState(
- entry.getKey(),
- Optional.of(entry.getValue()),
- Optional.<ScheduleStatus>absent(),
- Optional.<String>absent());
- }
- }
- });
+ public void deleteTasks(MutableStoreProvider storeProvider, final Set<String> taskIds) {
+ Map<String, IScheduledTask> tasks = Maps.uniqueIndex(
+ storeProvider.getTaskStore().fetchTasks(Query.taskScoped(taskIds)),
+ Tasks.SCHEDULED_TO_ID);
+
+ for (Map.Entry<String, IScheduledTask> entry : tasks.entrySet()) {
+ updateTaskAndExternalState(
+ storeProvider.getUnsafeTaskStore(),
+ entry.getKey(),
+ Optional.of(entry.getValue()),
+ Optional.<ScheduleStatus>absent(),
+ Optional.<String>absent());
+ }
}
- private static PubsubEvent deleteTasks(MutableStoreProvider storeProvider, Set<String> taskIds) {
- TaskStore.Mutable taskStore = storeProvider.getUnsafeTaskStore();
+ private static PubsubEvent deleteTasks(TaskStore.Mutable taskStore, Set<String> taskIds) {
Iterable<IScheduledTask> tasks = taskStore.fetchTasks(Query.taskScoped(taskIds));
taskStore.deleteTasks(taskIds);
return new PubsubEvent.TasksDeleted(ImmutableSet.copyOf(tasks));
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9f135dc/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
index 77db411..4abc7ba 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
@@ -34,6 +34,7 @@ import org.apache.mesos.Protos.TaskInfo;
import static java.util.Objects.requireNonNull;
+import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
import static org.apache.mesos.Protos.Offer;
/**
@@ -45,12 +46,14 @@ public interface TaskAssigner {
* Tries to match a task against an offer. If a match is found, the assigner should
* make the appropriate changes to the task and provide a non-empty result.
*
+ * @param storeProvider Storage provider.
* @param offer The resource offer.
* @param task The task to match against and optionally assign.
* @param attributeAggregate Attribute information for tasks in the job containing {@code task}.
* @return Instructions for launching the task if matching and assignment were successful.
*/
Optional<TaskInfo> maybeAssign(
+ MutableStoreProvider storeProvider,
HostOffer offer,
IScheduledTask task,
AttributeAggregate attributeAggregate);
@@ -73,11 +76,12 @@ public interface TaskAssigner {
this.taskFactory = requireNonNull(taskFactory);
}
- private TaskInfo assign(Offer offer, IScheduledTask task) {
+ private TaskInfo assign(MutableStoreProvider storeProvider, Offer offer, IScheduledTask task) {
String host = offer.getHostname();
Set<Integer> selectedPorts =
Resources.getPorts(offer, task.getAssignedTask().getTask().getRequestedPorts().size());
IAssignedTask assigned = stateManager.assignTask(
+ storeProvider,
Tasks.id(task),
host,
offer.getSlaveId(),
@@ -89,6 +93,7 @@ public interface TaskAssigner {
@Override
public Optional<TaskInfo> maybeAssign(
+ MutableStoreProvider storeProvider,
HostOffer offer,
IScheduledTask task,
AttributeAggregate attributeAggregate) {
@@ -100,7 +105,7 @@ public interface TaskAssigner {
Tasks.id(task),
attributeAggregate);
if (vetoes.isEmpty()) {
- return Optional.of(assign(offer.getOffer(), task));
+ return Optional.of(assign(storeProvider, offer.getOffer(), task));
} else {
LOG.fine("Slave " + offer.getOffer().getHostname() + " vetoed task " + Tasks.id(task)
+ ": " + vetoes);
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9f135dc/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
index f081bf3..f0b4975 100644
--- a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
@@ -320,7 +320,10 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
cronJobManager.createJob(SanitizedCronJob.from(sanitized));
} else {
LOG.info("Launching " + count + " tasks.");
- stateManager.insertPendingTasks(template, sanitized.getInstanceIds());
+ stateManager.insertPendingTasks(
+ storeProvider,
+ template,
+ sanitized.getInstanceIds());
}
return okEmptyResponse();
} catch (LockException e) {
@@ -763,12 +766,14 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
final Set<IScheduledTask> tasks = storeProvider.getTaskStore().fetchTasks(query);
- Optional<SessionContext> context = isAdmin(session);
- if (context.isPresent()) {
+ Optional<SessionContext> maybeAdminContext = isAdmin(session);
+ final SessionContext context;
+ if (maybeAdminContext.isPresent()) {
LOG.info("Granting kill query to admin user: " + query);
+ context = maybeAdminContext.get();
} else {
try {
- context = Optional.of(validateSessionKeyForTasks(session, query, tasks));
+ context = validateSessionKeyForTasks(session, query, tasks);
} catch (AuthFailedException e) {
return errorResponse(AUTH_FAILED, e);
}
@@ -784,7 +789,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
LOG.info("Killing tasks matching " + query);
- boolean tasksKilled = false;
+ final boolean cronJobKilled;
if (isSingleJobScoped) {
// If this looks like a query for all tasks in a job, instruct the cron
// scheduler to delete it.
@@ -792,19 +797,30 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
IJobKey jobKey = Iterables.getOnlyElement(JobKeys.from(query).get());
LOG.warning("Deprecated behavior: descheduling job " + jobKey
+ " with cron via killTasks. (See AURORA-454)");
- tasksKilled = cronJobManager.deleteJob(jobKey);
+ cronJobKilled = cronJobManager.deleteJob(jobKey);
+ } else {
+ cronJobKilled = false;
}
- for (String taskId : Tasks.ids(tasks)) {
- tasksKilled |= stateManager.changeState(
- taskId,
- Optional.<ScheduleStatus>absent(),
- ScheduleStatus.KILLING,
- killedByMessage(context.get().getIdentity()));
- }
+ final boolean tasksKilled = storage.write(new MutateWork.Quiet<Boolean>() {
+ @Override
+ public Boolean apply(MutableStoreProvider storeProvider) {
+ boolean match = false;
+ for (String taskId : Tasks.ids(tasks)) {
+ match |= stateManager.changeState(
+ storeProvider,
+ taskId,
+ Optional.<ScheduleStatus>absent(),
+ ScheduleStatus.KILLING,
+ killedByMessage(context.getIdentity()));
+ }
+ return match;
+ }
+ });
- return tasksKilled
- ? okEmptyResponse() : addMessage(emptyResponse(), OK, NO_TASKS_TO_KILL_MESSAGE);
+ return cronJobKilled || tasksKilled
+ ? okEmptyResponse()
+ : addMessage(emptyResponse(), OK, NO_TASKS_TO_KILL_MESSAGE);
}
});
}
@@ -839,19 +855,25 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
}
Query.Builder query = Query.instanceScoped(jobKey, shardIds).active();
- Set<IScheduledTask> matchingTasks = storeProvider.getTaskStore().fetchTasks(query);
+ final Set<IScheduledTask> matchingTasks = storeProvider.getTaskStore().fetchTasks(query);
if (matchingTasks.size() != shardIds.size()) {
return invalidResponse("Not all requested shards are active.");
}
LOG.info("Restarting shards matching " + query);
- for (String taskId : Tasks.ids(matchingTasks)) {
- stateManager.changeState(
- taskId,
- Optional.<ScheduleStatus>absent(),
- ScheduleStatus.RESTARTING,
- restartedByMessage(context.getIdentity()));
- }
+ storage.write(new MutateWork.NoResult.Quiet() {
+ @Override
+ protected void execute(MutableStoreProvider storeProvider) {
+ for (String taskId : Tasks.ids(matchingTasks)) {
+ stateManager.changeState(
+ storeProvider,
+ taskId,
+ Optional.<ScheduleStatus>absent(),
+ ScheduleStatus.RESTARTING,
+ restartedByMessage(context.getIdentity()));
+ }
+ }
+ });
return okEmptyResponse();
}
});
@@ -919,12 +941,16 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
}
@Override
- public Response forceTaskState(String taskId, ScheduleStatus status, SessionKey session) {
+ public Response forceTaskState(
+ final String taskId,
+ final ScheduleStatus status,
+ SessionKey session) {
+
checkNotBlank(taskId);
requireNonNull(status);
requireNonNull(session);
- SessionContext context;
+ final SessionContext context;
try {
// TODO(Sathya): Remove this after AOP-style session validation passes in a SessionContext.
context = sessionValidator.checkAuthorized(session, Capability.ROOT, AuditCheck.REQUIRED);
@@ -932,11 +958,17 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
return errorResponse(AUTH_FAILED, e);
}
- stateManager.changeState(
- taskId,
- Optional.<ScheduleStatus>absent(),
- status,
- transitionMessage(context.getIdentity()));
+ storage.write(new MutateWork.NoResult.Quiet() {
+ @Override
+ protected void execute(MutableStoreProvider storeProvider) {
+ stateManager.changeState(
+ storeProvider,
+ taskId,
+ Optional.<ScheduleStatus>absent(),
+ status,
+ transitionMessage(context.getIdentity()));
+ }
+ });
return okEmptyResponse();
}
@@ -1204,7 +1236,16 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
currentTasks.size() + config.getInstanceIdsSize(),
quotaManager.checkInstanceAddition(task, config.getInstanceIdsSize()));
- stateManager.insertPendingTasks(task, ImmutableSet.copyOf(config.getInstanceIds()));
+ storage.write(new NoResult.Quiet() {
+ @Override
+ protected void execute(MutableStoreProvider storeProvider) {
+ stateManager.insertPendingTasks(
+ storeProvider,
+ task,
+ ImmutableSet.copyOf(config.getInstanceIds()));
+ }
+ });
+
return okEmptyResponse();
} catch (LockException e) {
return errorResponse(LOCK_ERROR, e);
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9f135dc/src/main/java/org/apache/aurora/scheduler/updater/InstanceAction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/InstanceAction.java b/src/main/java/org/apache/aurora/scheduler/updater/InstanceAction.java
index 3774c85..b553f97 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/InstanceAction.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/InstanceAction.java
@@ -21,6 +21,9 @@ import static org.apache.aurora.scheduler.updater.InstanceActionHandler.WatchRun
enum InstanceAction {
KILL_TASK(Optional.<InstanceActionHandler>of(new KillTask())),
+ // TODO(wfarner): Build this action into the scheduler state machine instead. Rather than
+ // killing a task and re-recreating it, choose the updated or rolled-back task when we are
+ // deciding to reschedule the task.
ADD_TASK(Optional.<InstanceActionHandler>of(new AddTask())),
WATCH_TASK(Optional.<InstanceActionHandler>of(new WatchRunningTask())),
AWAIT_STATE_CHANGE(Optional.<InstanceActionHandler>absent());
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9f135dc/src/main/java/org/apache/aurora/scheduler/updater/InstanceActionHandler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/InstanceActionHandler.java b/src/main/java/org/apache/aurora/scheduler/updater/InstanceActionHandler.java
index f4363aa..3b9919d 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/InstanceActionHandler.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/InstanceActionHandler.java
@@ -27,7 +27,6 @@ import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.state.StateManager;
-import org.apache.aurora.scheduler.storage.TaskStore;
import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
import org.apache.aurora.scheduler.storage.entities.IInstanceTaskConfig;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateInstructions;
@@ -35,13 +34,14 @@ import org.apache.aurora.scheduler.storage.entities.IRange;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
import static org.apache.aurora.gen.JobUpdateStatus.ROLLING_FORWARD;
+import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
interface InstanceActionHandler {
Amount<Long, Time> getReevaluationDelay(
IInstanceKey instance,
IJobUpdateInstructions instructions,
- TaskStore taskStore,
+ MutableStoreProvider storeProvider,
StateManager stateManager,
JobUpdateStatus status);
@@ -73,7 +73,7 @@ interface InstanceActionHandler {
public Amount<Long, Time> getReevaluationDelay(
IInstanceKey instance,
IJobUpdateInstructions instructions,
- TaskStore taskStore,
+ MutableStoreProvider storeProvider,
StateManager stateManager,
JobUpdateStatus status) {
@@ -82,7 +82,10 @@ interface InstanceActionHandler {
instructions,
status == ROLLING_FORWARD,
instance.getInstanceId());
- stateManager.insertPendingTasks(replacement, ImmutableSet.of(instance.getInstanceId()));
+ stateManager.insertPendingTasks(
+ storeProvider,
+ replacement,
+ ImmutableSet.of(instance.getInstanceId()));
return Amount.of(
(long) instructions.getSettings().getMaxWaitToInstanceRunningMs(),
Time.MILLISECONDS);
@@ -94,14 +97,15 @@ interface InstanceActionHandler {
public Amount<Long, Time> getReevaluationDelay(
IInstanceKey instance,
IJobUpdateInstructions instructions,
- TaskStore taskStore,
+ MutableStoreProvider storeProvider,
StateManager stateManager,
JobUpdateStatus status) {
String taskId = Tasks.id(Iterables.getOnlyElement(
- taskStore.fetchTasks(Query.instanceScoped(instance).active())));
+ storeProvider.getTaskStore().fetchTasks(Query.instanceScoped(instance).active())));
LOG.info("Killing " + instance + " while " + status);
stateManager.changeState(
+ storeProvider,
taskId,
Optional.<ScheduleStatus>absent(),
ScheduleStatus.KILLING,
@@ -117,7 +121,7 @@ interface InstanceActionHandler {
public Amount<Long, Time> getReevaluationDelay(
IInstanceKey instance,
IJobUpdateInstructions instructions,
- TaskStore taskStore,
+ MutableStoreProvider storeProvider,
StateManager stateManager,
JobUpdateStatus status) {
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9f135dc/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
index f918d15..a992938 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
@@ -72,6 +72,7 @@ import static org.apache.aurora.gen.JobUpdateStatus.ROLLED_BACK;
import static org.apache.aurora.gen.JobUpdateStatus.ROLLED_FORWARD;
import static org.apache.aurora.gen.JobUpdateStatus.ROLLING_BACK;
import static org.apache.aurora.gen.JobUpdateStatus.ROLLING_FORWARD;
+import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
import static org.apache.aurora.scheduler.storage.Storage.MutateWork;
import static org.apache.aurora.scheduler.updater.JobUpdateStateMachine.ACTIVE_QUERY;
import static org.apache.aurora.scheduler.updater.JobUpdateStateMachine.MonitorAction;
@@ -131,7 +132,7 @@ class JobUpdateControllerImpl implements JobUpdateController {
storage.write(new MutateWork.NoResult<UpdateStateException>() {
@Override
- protected void execute(Storage.MutableStoreProvider storeProvider)
+ protected void execute(MutableStoreProvider storeProvider)
throws UpdateStateException {
IJobUpdateSummary summary = update.getSummary();
@@ -159,8 +160,7 @@ class JobUpdateControllerImpl implements JobUpdateController {
Optional.of(requireNonNull(lock.getToken())));
recordAndChangeJobUpdateStatus(
- storeProvider.getJobUpdateStore(),
- storeProvider.getTaskStore(),
+ storeProvider,
summary.getUpdateId(),
job,
ROLLING_FORWARD,
@@ -197,14 +197,13 @@ class JobUpdateControllerImpl implements JobUpdateController {
public void systemResume() {
storage.write(new MutateWork.NoResult.Quiet() {
@Override
- protected void execute(Storage.MutableStoreProvider storeProvider) {
+ protected void execute(MutableStoreProvider storeProvider) {
for (IJobUpdateSummary summary
: storeProvider.getJobUpdateStore().fetchJobUpdateSummaries(ACTIVE_QUERY)) {
LOG.info("Automatically resuming update " + JobKeys.canonicalString(summary.getJobKey()));
changeJobUpdateStatus(
- storeProvider.getJobUpdateStore(),
- storeProvider.getTaskStore(),
+ storeProvider,
summary.getUpdateId(),
summary.getJobKey(),
summary.getState().getStatus(),
@@ -234,15 +233,14 @@ class JobUpdateControllerImpl implements JobUpdateController {
private void instanceChanged(final IInstanceKey instance, final Optional<IScheduledTask> state) {
storage.write(new MutateWork.NoResult.Quiet() {
@Override
- protected void execute(Storage.MutableStoreProvider storeProvider) {
+ protected void execute(MutableStoreProvider storeProvider) {
IJobKey job = instance.getJobKey();
UpdateFactory.Update update = updates.get(job);
if (update != null) {
if (update.getUpdater().containsInstance(instance.getInstanceId())) {
LOG.info("Forwarding task change for " + InstanceKeys.toString(instance));
evaluateUpdater(
- storeProvider.getJobUpdateStore(),
- storeProvider.getTaskStore(),
+ storeProvider,
update,
getOnlyMatch(storeProvider.getJobUpdateStore(), queryByJob(job)),
ImmutableMap.of(instance.getInstanceId(), state));
@@ -291,7 +289,7 @@ class JobUpdateControllerImpl implements JobUpdateController {
storage.write(new MutateWork.NoResult<UpdateStateException>() {
@Override
- protected void execute(Storage.MutableStoreProvider storeProvider)
+ protected void execute(MutableStoreProvider storeProvider)
throws UpdateStateException {
IJobUpdateSummary update = Iterables.getOnlyElement(
@@ -302,19 +300,13 @@ class JobUpdateControllerImpl implements JobUpdateController {
JobUpdateStatus status = update.getState().getStatus();
JobUpdateStatus newStatus = requireNonNull(stateChange.apply(status));
- changeUpdateStatus(
- storeProvider.getJobUpdateStore(),
- storeProvider.getTaskStore(),
- update,
- newStatus,
- user);
+ changeUpdateStatus(storeProvider, update, newStatus, user);
}
});
}
private void changeUpdateStatus(
- JobUpdateStore.Mutable updateStore,
- TaskStore taskStore,
+ MutableStoreProvider storeProvider,
IJobUpdateSummary updateSummary,
JobUpdateStatus newStatus,
Optional<String> user) {
@@ -325,8 +317,7 @@ class JobUpdateControllerImpl implements JobUpdateController {
assertTransitionAllowed(updateSummary.getState().getStatus(), newStatus);
recordAndChangeJobUpdateStatus(
- updateStore,
- taskStore,
+ storeProvider,
updateSummary.getUpdateId(),
updateSummary.getJobKey(),
newStatus,
@@ -334,14 +325,13 @@ class JobUpdateControllerImpl implements JobUpdateController {
}
private void recordAndChangeJobUpdateStatus(
- JobUpdateStore.Mutable updateStore,
- TaskStore taskStore,
+ MutableStoreProvider storeProvider,
String updateId,
IJobKey job,
JobUpdateStatus status,
Optional<String> user) {
- changeJobUpdateStatus(updateStore, taskStore, updateId, job, status, user, true);
+ changeJobUpdateStatus(storeProvider, updateId, job, status, user, true);
}
private static final Set<JobUpdateStatus> UNLOCK_STATES = ImmutableSet.of(
@@ -353,8 +343,7 @@ class JobUpdateControllerImpl implements JobUpdateController {
);
private void changeJobUpdateStatus(
- JobUpdateStore.Mutable updateStore,
- TaskStore taskStore,
+ MutableStoreProvider storeProvider,
String updateId,
IJobKey job,
JobUpdateStatus newStatus,
@@ -364,6 +353,7 @@ class JobUpdateControllerImpl implements JobUpdateController {
JobUpdateStatus status;
boolean record;
+ JobUpdateStore.Mutable updateStore = storeProvider.getJobUpdateStore();
Optional<String> updateLock = updateStore.getLockToken(updateId);
if (updateLock.isPresent()) {
status = newStatus;
@@ -407,13 +397,12 @@ class JobUpdateControllerImpl implements JobUpdateController {
update = updateFactory.newUpdate(jobUpdate.getInstructions(), action == ROLL_FORWARD);
} catch (RuntimeException e) {
LOG.log(Level.WARNING, "Uncaught exception: " + e, e);
- changeJobUpdateStatus(updateStore, taskStore, updateId, job, ERROR, user, true);
+ changeJobUpdateStatus(storeProvider, updateId, job, ERROR, user, true);
return;
}
updates.put(job, update);
evaluateUpdater(
- updateStore,
- taskStore,
+ storeProvider,
update,
jobUpdate.getSummary(),
ImmutableMap.<Integer, Optional<IScheduledTask>>of());
@@ -433,8 +422,7 @@ class JobUpdateControllerImpl implements JobUpdateController {
ImmutableSet.of(InstanceUpdateStatus.WORKING, InstanceUpdateStatus.SUCCEEDED);
private void evaluateUpdater(
- JobUpdateStore.Mutable updateStore,
- final TaskStore taskStore,
+ final MutableStoreProvider storeProvider,
final UpdateFactory.Update update,
IJobUpdateSummary summary,
Map<Integer, Optional<IScheduledTask>> changedInstance) {
@@ -442,10 +430,10 @@ class JobUpdateControllerImpl implements JobUpdateController {
JobUpdateStatus updaterStatus = summary.getState().getStatus();
final IJobKey job = summary.getJobKey();
+ final JobUpdateStore.Mutable updateStore = storeProvider.getJobUpdateStore();
if (!updateStore.getLockToken(summary.getUpdateId()).isPresent()) {
recordAndChangeJobUpdateStatus(
- updateStore,
- taskStore,
+ storeProvider,
summary.getUpdateId(),
job,
ERROR,
@@ -457,7 +445,7 @@ class JobUpdateControllerImpl implements JobUpdateController {
new InstanceStateProvider<Integer, Optional<IScheduledTask>>() {
@Override
public Optional<IScheduledTask> getState(Integer instanceId) {
- return getActiveInstance(taskStore, job, instanceId);
+ return getActiveInstance(storeProvider.getTaskStore(), job, instanceId);
}
};
@@ -505,16 +493,13 @@ class JobUpdateControllerImpl implements JobUpdateController {
}
if (status == SUCCEEDED) {
- changeUpdateStatus(
- updateStore,
- taskStore,
+ changeUpdateStatus(storeProvider,
summary,
update.getSuccessStatus(),
Optional.<String>absent());
} else {
changeUpdateStatus(
- updateStore,
- taskStore,
+ storeProvider,
summary,
update.getFailureStatus(),
Optional.<String>absent());
@@ -531,7 +516,7 @@ class JobUpdateControllerImpl implements JobUpdateController {
Amount<Long, Time> reevaluateDelay = handler.get().getReevaluationDelay(
instance,
updateStore.fetchJobUpdateInstructions(summary.getUpdateId()).get(),
- taskStore,
+ storeProvider,
stateManager,
updaterStatus);
executor.schedule(
@@ -575,7 +560,7 @@ class JobUpdateControllerImpl implements JobUpdateController {
public void run() {
storage.write(new MutateWork.NoResult.Quiet() {
@Override
- protected void execute(Storage.MutableStoreProvider storeProvider) {
+ protected void execute(MutableStoreProvider storeProvider) {
IJobUpdateSummary summary =
getOnlyMatch(storeProvider.getJobUpdateStore(), queryByUpdateId(updateId));
JobUpdateStatus status = summary.getState().getStatus();
@@ -583,8 +568,7 @@ class JobUpdateControllerImpl implements JobUpdateController {
if (JobUpdateStateMachine.isActive(status)) {
UpdateFactory.Update update = updates.get(instance.getJobKey());
evaluateUpdater(
- storeProvider.getJobUpdateStore(),
- storeProvider.getTaskStore(),
+ storeProvider,
update,
summary,
ImmutableMap.of(
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9f135dc/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java b/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java
index 4673e80..7ba9464 100644
--- a/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java
@@ -29,6 +29,7 @@ import org.apache.aurora.scheduler.configuration.Resources;
import org.apache.aurora.scheduler.state.StateManager;
import org.apache.aurora.scheduler.storage.Storage.StorageException;
import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
import org.apache.mesos.Protos.Attribute;
import org.apache.mesos.Protos.FrameworkID;
import org.apache.mesos.Protos.OfferID;
@@ -66,6 +67,7 @@ public class UserTaskLauncherTest extends EasyMockTest {
private OfferQueue offerQueue;
private StateManager stateManager;
+ private StorageTestUtil storageUtil;
private TaskLauncher launcher;
@@ -73,7 +75,9 @@ public class UserTaskLauncherTest extends EasyMockTest {
public void setUp() {
offerQueue = createMock(OfferQueue.class);
stateManager = createMock(StateManager.class);
- launcher = new UserTaskLauncher(offerQueue, stateManager);
+ storageUtil = new StorageTestUtil(this);
+ storageUtil.expectOperations();
+ launcher = new UserTaskLauncher(storageUtil.storage, offerQueue, stateManager);
}
@Test
@@ -88,6 +92,7 @@ public class UserTaskLauncherTest extends EasyMockTest {
@Test
public void testForwardsStatusUpdates() throws Exception {
expect(stateManager.changeState(
+ storageUtil.mutableStoreProvider,
TASK_ID_A,
Optional.<ScheduleStatus>absent(),
RUNNING,
@@ -116,6 +121,7 @@ public class UserTaskLauncherTest extends EasyMockTest {
@Test(expected = StorageException.class)
public void testFailedStatusUpdate() throws Exception {
expect(stateManager.changeState(
+ storageUtil.mutableStoreProvider,
TASK_ID_A,
Optional.<ScheduleStatus>absent(),
RUNNING,
@@ -135,6 +141,7 @@ public class UserTaskLauncherTest extends EasyMockTest {
@Test
public void testMemoryLimitTranslationHack() throws Exception {
expect(stateManager.changeState(
+ storageUtil.mutableStoreProvider,
TASK_ID_A,
Optional.<ScheduleStatus>absent(),
FAILED,
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9f135dc/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
index 59bfbcb..8b0367e 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
@@ -587,6 +587,7 @@ public class PreemptorImplTest extends EasyMockTest {
private void expectPreempted(ScheduledTask preempted) throws Exception {
expect(stateManager.changeState(
+ eq(storageUtil.mutableStoreProvider),
eq(Tasks.id(preempted)),
eq(Optional.<ScheduleStatus>absent()),
eq(ScheduleStatus.PREEMPTING),
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9f135dc/src/test/java/org/apache/aurora/scheduler/async/TaskHistoryPrunerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskHistoryPrunerTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskHistoryPrunerTest.java
index 9682c89..6eaf3ce 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskHistoryPrunerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskHistoryPrunerTest.java
@@ -287,7 +287,7 @@ public class TaskHistoryPrunerTest extends EasyMockTest {
eventDispatch.setName(getClass().getName() + "-EventDispatch");
eventDispatch.start();
- stateManager.deleteTasks(ImmutableSet.of(taskId));
+ stateManager.deleteTasks(storageUtil.mutableStoreProvider, ImmutableSet.of(taskId));
expectLastCall().andAnswer(new IAnswer<Void>() {
@Override
public Void answer() {
@@ -306,7 +306,7 @@ public class TaskHistoryPrunerTest extends EasyMockTest {
}
private void expectDeleteTasks(String... tasks) {
- stateManager.deleteTasks(ImmutableSet.copyOf(tasks));
+ stateManager.deleteTasks(storageUtil.mutableStoreProvider, ImmutableSet.copyOf(tasks));
}
private Capture<Runnable> expectDefaultDelayedPrune() {
@@ -342,7 +342,7 @@ public class TaskHistoryPrunerTest extends EasyMockTest {
FluentIterable.from(tasksInJob).transform(Tasks.SCHEDULED_TO_JOB_KEY).toSet());
storageUtil.expectTaskFetch(TaskHistoryPruner.jobHistoryQuery(jobKey), tasksInJob);
if (pruned.length > 0) {
- stateManager.deleteTasks(Tasks.ids(pruned));
+ stateManager.deleteTasks(storageUtil.mutableStoreProvider, Tasks.ids(pruned));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9f135dc/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
index 0e8a98c..9bc6a75 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
@@ -56,12 +56,14 @@ import org.apache.aurora.scheduler.storage.mem.MemStorage;
import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
import org.apache.mesos.Protos.TaskInfo;
import org.easymock.Capture;
+import org.easymock.EasyMock;
import org.junit.Before;
import org.junit.Test;
import static org.apache.aurora.gen.ScheduleStatus.PENDING;
import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -132,7 +134,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
}
private void expectAssigned(IScheduledTask task) {
- expect(assigner.maybeAssign(OFFER, task, emptyJob))
+ expect(assigner.maybeAssign(storageUtil.mutableStoreProvider, OFFER, task, emptyJob))
.andReturn(Optional.of(TaskInfo.getDefaultInstance()));
}
@@ -192,7 +194,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
Capture<Function<HostOffer, Optional<TaskInfo>>> secondAssignment = expectLaunchAttempt(true);
- expect(assigner.maybeAssign(OFFER, TASK_B, emptyJob))
+ expect(assigner.maybeAssign(storageUtil.mutableStoreProvider, OFFER, TASK_B, emptyJob))
.andReturn(Optional.of(TaskInfo.getDefaultInstance()));
control.replay();
@@ -302,7 +304,11 @@ public class TaskSchedulerImplTest extends EasyMockTest {
});
Capture<Function<HostOffer, Optional<TaskInfo>>> assignment = expectLaunchAttempt(true);
- expect(assigner.maybeAssign(OFFER, taskA, emptyJob))
+ expect(assigner.maybeAssign(
+ EasyMock.<MutableStoreProvider>anyObject(),
+ eq(OFFER),
+ eq(taskA),
+ eq(emptyJob)))
.andReturn(Optional.of(TaskInfo.getDefaultInstance()));
control.replay();