You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by jc...@apache.org on 2017/01/04 21:51:14 UTC
aurora git commit: Reduce storage write lock contention by adopting
Double-Checked Locking pattern in TimedOutTaskHandler.
Repository: aurora
Updated Branches:
refs/heads/master 21ad18ec7 -> d4ebb56ba
Reduce storage write lock contention by adopting Double-Checked Locking pattern in
TimedOutTaskHandler.
`TimedOutTaskHandler` acquires storage write lock for every task every time they transition to a
transient state. It then verifies after a default time-out period of 5 minutes if the task has
transitioned out of the transient state.
The verification step takes place while holding the storage write lock. In over 99% of cases the
logic short-circuits and returns from `StateManagerImpl.updateTaskAndExternalState()` once it learns
task has transitioned out of the transient state.
This patch reduces storage write lock contention by adopting Double-Checked Locking pattern in
`TimedOutTaskHandler.run()`.
Bugs closed: AURORA-1820
Reviewed at https://reviews.apache.org/r/55179/
Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/d4ebb56b
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/d4ebb56b
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/d4ebb56b
Branch: refs/heads/master
Commit: d4ebb56ba7f02b4f921d37518185af20f253a44f
Parents: 21ad18e
Author: Mehrdad Nurolahzade <me...@nurolahzade.com>
Authored: Wed Jan 4 15:50:46 2017 -0600
Committer: Joshua Cohen <jc...@apache.org>
Committed: Wed Jan 4 15:50:46 2017 -0600
----------------------------------------------------------------------
.../scheduler/reconciliation/TaskTimeout.java | 36 ++++++++++++--------
.../reconciliation/TaskTimeoutTest.java | 36 ++++++++++----------
2 files changed, 39 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aurora/blob/d4ebb56b/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskTimeout.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskTimeout.java b/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskTimeout.java
index 2dc9bc2..8e9a0d3 100644
--- a/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskTimeout.java
+++ b/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskTimeout.java
@@ -35,6 +35,7 @@ import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
import org.apache.aurora.scheduler.state.StateChangeResult;
import org.apache.aurora.scheduler.state.StateManager;
import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -112,21 +113,26 @@ class TaskTimeout extends AbstractIdleService implements EventSubscriber {
@Override
public void run() {
if (isRunning()) {
- // This query acts as a CAS by including the state that we expect the task to be in
- // if the timeout is still valid. Ideally, the future would have already been
- // canceled, but in the event of a state transition race, including transientState
- // prevents an unintended task timeout.
- // Note: This requires LOST transitions trigger Driver.killTask.
- StateChangeResult result = storage.write(storeProvider -> stateManager.changeState(
- storeProvider,
- taskId,
- Optional.of(newState),
- ScheduleStatus.LOST,
- TIMEOUT_MESSAGE));
-
- if (result == StateChangeResult.SUCCESS) {
- LOG.info("Timeout reached for task " + taskId + ":" + taskId);
- timedOutTasks.incrementAndGet();
+ Optional<IScheduledTask> task = storage.read(
+ storeProvider -> storeProvider.getTaskStore().fetchTask(taskId));
+ // Double-Checked Locking: acquire storage write lock only if necessary
+ if (task.isPresent() && task.get().getStatus() == newState) {
+ // This query acts as a CAS by including the state that we expect the task to be in
+ // if the timeout is still valid. Ideally, the future would have already been
+ // canceled, but in the event of a state transition race, including transientState
+ // prevents an unintended task timeout.
+ // Note: This requires LOST transitions trigger Driver.killTask.
+ StateChangeResult result = storage.write(storeProvider -> stateManager.changeState(
+ storeProvider,
+ taskId,
+ Optional.of(newState),
+ ScheduleStatus.LOST,
+ TIMEOUT_MESSAGE));
+
+ if (result == StateChangeResult.SUCCESS) {
+ LOG.info("Timeout reached for task " + taskId + ":" + taskId);
+ timedOutTasks.incrementAndGet();
+ }
}
} else {
// Our service is not yet started. We don't want to lose track of the task, so
http://git-wip-us.apache.org/repos/asf/aurora/blob/d4ebb56b/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskTimeoutTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskTimeoutTest.java b/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskTimeoutTest.java
index 1006ddb..9da99c6 100644
--- a/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskTimeoutTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskTimeoutTest.java
@@ -70,7 +70,6 @@ public class TaskTimeoutTest extends EasyMockTest {
public void setUp() {
executor = createMock(DelayExecutor.class);
storageUtil = new StorageTestUtil(this);
- storageUtil.expectOperations();
stateManager = createMock(StateManager.class);
clock = new FakeClock();
statsProvider = createMock(StatsProvider.class);
@@ -130,24 +129,25 @@ public class TaskTimeoutTest extends EasyMockTest {
public void testTransientToTransient() {
expectTaskWatch();
Capture<Runnable> killingTimeout = expectTaskWatch();
- expect(stateManager.changeState(
- storageUtil.mutableStoreProvider,
- TASK_ID,
- Optional.of(KILLING),
- LOST,
- TaskTimeout.TIMEOUT_MESSAGE))
- .andReturn(StateChangeResult.SUCCESS);
+ expect(storageUtil.storeProvider.getTaskStore()).andReturn(storageUtil.taskStore);
+ storageUtil.expectRead();
+ storageUtil.expectTaskFetch(TASK_ID, makeTask(TASK_ID, ASSIGNED));
replayAndCreate();
changeState(PENDING, ASSIGNED);
changeState(ASSIGNED, KILLING);
killingTimeout.getValue().run();
+ assertEquals(0, timedOutTaskCounter.intValue());
}
@Test
public void testTimeout() throws Exception {
Capture<Runnable> assignedTimeout = expectTaskWatch();
+ expect(storageUtil.storeProvider.getTaskStore()).andReturn(storageUtil.taskStore);
+ storageUtil.expectRead();
+ storageUtil.expectTaskFetch(TASK_ID, makeTask(TASK_ID, ASSIGNED));
+ storageUtil.expectWrite();
expect(stateManager.changeState(
storageUtil.mutableStoreProvider,
TASK_ID,
@@ -161,26 +161,26 @@ public class TaskTimeoutTest extends EasyMockTest {
changeState(INIT, PENDING);
changeState(PENDING, ASSIGNED);
assignedTimeout.getValue().run();
- assertEquals(timedOutTaskCounter.intValue(), 1);
+ assertEquals(1, timedOutTaskCounter.intValue());
}
@Test
public void testTaskDeleted() throws Exception {
Capture<Runnable> assignedTimeout = expectTaskWatch();
- expect(stateManager.changeState(
- storageUtil.mutableStoreProvider,
- TASK_ID,
- Optional.of(KILLING),
- LOST,
- TaskTimeout.TIMEOUT_MESSAGE))
- .andReturn(StateChangeResult.ILLEGAL);
+ expect(storageUtil.storeProvider.getTaskStore()).andReturn(storageUtil.taskStore);
+ storageUtil.expectRead();
+ storageUtil.expectTaskFetch(TASK_ID);
replayAndCreate();
changeState(INIT, PENDING);
changeState(PENDING, KILLING);
assignedTimeout.getValue().run();
- assertEquals(timedOutTaskCounter.intValue(), 0);
+ assertEquals(0, timedOutTaskCounter.intValue());
+ }
+
+ private static IScheduledTask makeTask(String taskId, ScheduleStatus status) {
+ return makeTask(taskId, status, 0L);
}
private static IScheduledTask makeTask(
@@ -231,6 +231,6 @@ public class TaskTimeoutTest extends EasyMockTest {
changeState(INIT, PENDING);
changeState(PENDING, ASSIGNED);
assignedTimeout.getValue().run();
- assertEquals(timedOutTaskCounter.intValue(), 0);
+ assertEquals(0, timedOutTaskCounter.intValue());
}
}