You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by jc...@apache.org on 2017/01/04 21:51:14 UTC

aurora git commit: Reduce storage write lock contention by adopting Double-Checked Locking pattern in TimedOutTaskHandler.

Repository: aurora
Updated Branches:
  refs/heads/master 21ad18ec7 -> d4ebb56ba


Reduce storage write lock contention by adopting Double-Checked Locking pattern in
TimedOutTaskHandler.

`TimedOutTaskHandler` acquires storage write lock for every task every time they transition to a
transient state. It then verifies after a default time-out period of 5 minutes if the task has
transitioned out of the transient state.

The verification step takes place while holding the storage write lock. In over 99% of cases the
logic short-circuits and returns from `StateManagerImpl.updateTaskAndExternalState()` once it learns
task has transitioned out of the transient state.

This patch reduces storage write lock contention by adopting Double-Checked Locking pattern in
`TimedOutTaskHandler.run()`.

Bugs closed: AURORA-1820

Reviewed at https://reviews.apache.org/r/55179/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/d4ebb56b
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/d4ebb56b
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/d4ebb56b

Branch: refs/heads/master
Commit: d4ebb56ba7f02b4f921d37518185af20f253a44f
Parents: 21ad18e
Author: Mehrdad Nurolahzade <me...@nurolahzade.com>
Authored: Wed Jan 4 15:50:46 2017 -0600
Committer: Joshua Cohen <jc...@apache.org>
Committed: Wed Jan 4 15:50:46 2017 -0600

----------------------------------------------------------------------
 .../scheduler/reconciliation/TaskTimeout.java   | 36 ++++++++++++--------
 .../reconciliation/TaskTimeoutTest.java         | 36 ++++++++++----------
 2 files changed, 39 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/d4ebb56b/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskTimeout.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskTimeout.java b/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskTimeout.java
index 2dc9bc2..8e9a0d3 100644
--- a/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskTimeout.java
+++ b/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskTimeout.java
@@ -35,6 +35,7 @@ import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.state.StateChangeResult;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -112,21 +113,26 @@ class TaskTimeout extends AbstractIdleService implements EventSubscriber {
     @Override
     public void run() {
       if (isRunning()) {
-        // This query acts as a CAS by including the state that we expect the task to be in
-        // if the timeout is still valid.  Ideally, the future would have already been
-        // canceled, but in the event of a state transition race, including transientState
-        // prevents an unintended task timeout.
-        // Note: This requires LOST transitions trigger Driver.killTask.
-        StateChangeResult result = storage.write(storeProvider -> stateManager.changeState(
-            storeProvider,
-            taskId,
-            Optional.of(newState),
-            ScheduleStatus.LOST,
-            TIMEOUT_MESSAGE));
-
-        if (result == StateChangeResult.SUCCESS) {
-          LOG.info("Timeout reached for task " + taskId + ":" + taskId);
-          timedOutTasks.incrementAndGet();
+        Optional<IScheduledTask> task = storage.read(
+            storeProvider -> storeProvider.getTaskStore().fetchTask(taskId));
+        // Double-Checked Locking: acquire storage write lock only if necessary
+        if (task.isPresent() && task.get().getStatus() == newState) {
+          // This query acts as a CAS by including the state that we expect the task to be in
+          // if the timeout is still valid.  Ideally, the future would have already been
+          // canceled, but in the event of a state transition race, including transientState
+          // prevents an unintended task timeout.
+          // Note: This requires LOST transitions trigger Driver.killTask.
+          StateChangeResult result = storage.write(storeProvider -> stateManager.changeState(
+              storeProvider,
+              taskId,
+              Optional.of(newState),
+              ScheduleStatus.LOST,
+              TIMEOUT_MESSAGE));
+
+          if (result == StateChangeResult.SUCCESS) {
+            LOG.info("Timeout reached for task " + taskId + ":" + taskId);
+            timedOutTasks.incrementAndGet();
+          }
         }
       } else {
         // Our service is not yet started.  We don't want to lose track of the task, so

http://git-wip-us.apache.org/repos/asf/aurora/blob/d4ebb56b/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskTimeoutTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskTimeoutTest.java b/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskTimeoutTest.java
index 1006ddb..9da99c6 100644
--- a/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskTimeoutTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskTimeoutTest.java
@@ -70,7 +70,6 @@ public class TaskTimeoutTest extends EasyMockTest {
   public void setUp() {
     executor = createMock(DelayExecutor.class);
     storageUtil = new StorageTestUtil(this);
-    storageUtil.expectOperations();
     stateManager = createMock(StateManager.class);
     clock = new FakeClock();
     statsProvider = createMock(StatsProvider.class);
@@ -130,24 +129,25 @@ public class TaskTimeoutTest extends EasyMockTest {
   public void testTransientToTransient() {
     expectTaskWatch();
     Capture<Runnable> killingTimeout = expectTaskWatch();
-    expect(stateManager.changeState(
-        storageUtil.mutableStoreProvider,
-        TASK_ID,
-        Optional.of(KILLING),
-        LOST,
-        TaskTimeout.TIMEOUT_MESSAGE))
-        .andReturn(StateChangeResult.SUCCESS);
+    expect(storageUtil.storeProvider.getTaskStore()).andReturn(storageUtil.taskStore);
+    storageUtil.expectRead();
+    storageUtil.expectTaskFetch(TASK_ID, makeTask(TASK_ID, ASSIGNED));
 
     replayAndCreate();
 
     changeState(PENDING, ASSIGNED);
     changeState(ASSIGNED, KILLING);
     killingTimeout.getValue().run();
+    assertEquals(0, timedOutTaskCounter.intValue());
   }
 
   @Test
   public void testTimeout() throws Exception {
     Capture<Runnable> assignedTimeout = expectTaskWatch();
+    expect(storageUtil.storeProvider.getTaskStore()).andReturn(storageUtil.taskStore);
+    storageUtil.expectRead();
+    storageUtil.expectTaskFetch(TASK_ID, makeTask(TASK_ID, ASSIGNED));
+    storageUtil.expectWrite();
     expect(stateManager.changeState(
         storageUtil.mutableStoreProvider,
         TASK_ID,
@@ -161,26 +161,26 @@ public class TaskTimeoutTest extends EasyMockTest {
     changeState(INIT, PENDING);
     changeState(PENDING, ASSIGNED);
     assignedTimeout.getValue().run();
-    assertEquals(timedOutTaskCounter.intValue(), 1);
+    assertEquals(1, timedOutTaskCounter.intValue());
   }
 
   @Test
   public void testTaskDeleted() throws Exception {
     Capture<Runnable> assignedTimeout = expectTaskWatch();
-    expect(stateManager.changeState(
-        storageUtil.mutableStoreProvider,
-        TASK_ID,
-        Optional.of(KILLING),
-        LOST,
-        TaskTimeout.TIMEOUT_MESSAGE))
-        .andReturn(StateChangeResult.ILLEGAL);
+    expect(storageUtil.storeProvider.getTaskStore()).andReturn(storageUtil.taskStore);
+    storageUtil.expectRead();
+    storageUtil.expectTaskFetch(TASK_ID);
 
     replayAndCreate();
 
     changeState(INIT, PENDING);
     changeState(PENDING, KILLING);
     assignedTimeout.getValue().run();
-    assertEquals(timedOutTaskCounter.intValue(), 0);
+    assertEquals(0, timedOutTaskCounter.intValue());
+  }
+
+  private static IScheduledTask makeTask(String taskId, ScheduleStatus status) {
+    return makeTask(taskId, status, 0L);
   }
 
   private static IScheduledTask makeTask(
@@ -231,6 +231,6 @@ public class TaskTimeoutTest extends EasyMockTest {
     changeState(INIT, PENDING);
     changeState(PENDING, ASSIGNED);
     assignedTimeout.getValue().run();
-    assertEquals(timedOutTaskCounter.intValue(), 0);
+    assertEquals(0, timedOutTaskCounter.intValue());
   }
 }