You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2015/04/21 23:31:52 UTC

aurora git commit: Resuming blocked updates on restart.

Repository: aurora
Updated Branches:
  refs/heads/master f55f46743 -> d10d2d171


Resuming blocked updates on restart.

Bugs closed: AURORA-1285

Reviewed at https://reviews.apache.org/r/33374/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/d10d2d17
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/d10d2d17
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/d10d2d17

Branch: refs/heads/master
Commit: d10d2d17142dad405c8b154321931cef59512866
Parents: f55f467
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Tue Apr 21 14:28:27 2015 -0700
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Tue Apr 21 14:28:27 2015 -0700

----------------------------------------------------------------------
 .../updater/JobUpdateControllerImpl.java        | 16 +++--
 .../updater/JobUpdateStateMachine.java          |  9 ++-
 .../aurora/scheduler/updater/JobUpdaterIT.java  | 72 ++++++++++++++++++++
 3 files changed, 88 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/d10d2d17/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
index ac15217..1ebfa64 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
@@ -83,6 +83,7 @@ import static org.apache.aurora.gen.JobUpdateStatus.ROLL_FORWARD_AWAITING_PULSE;
 import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import static org.apache.aurora.scheduler.storage.Storage.MutateWork;
 import static org.apache.aurora.scheduler.updater.JobUpdateStateMachine.ACTIVE_QUERY;
+import static org.apache.aurora.scheduler.updater.JobUpdateStateMachine.AUTO_RESUME_STATES;
 import static org.apache.aurora.scheduler.updater.JobUpdateStateMachine.GET_ACTIVE_RESUME_STATE;
 import static org.apache.aurora.scheduler.updater.JobUpdateStateMachine.GET_BLOCKED_RESUME_STATE;
 import static org.apache.aurora.scheduler.updater.JobUpdateStateMachine.GET_PAUSE_STATE;
@@ -270,16 +271,19 @@ class JobUpdateControllerImpl implements JobUpdateController {
           IJobUpdateKey key = summary.getKey();
           JobUpdateStatus status = summary.getState().getStatus();
 
-          LOG.info("Automatically resuming update " + key);
-
           if (isCoordinatedUpdate(instructions)) {
+            LOG.info("Automatically restoring pulse state for " + key);
             pulseHandler.initializePulseState(details.getUpdate(), status);
           }
 
-          try {
-            changeJobUpdateStatus(storeProvider, key, newEvent(status), false);
-          } catch (UpdateStateException e) {
-            throw Throwables.propagate(e);
+          if (AUTO_RESUME_STATES.contains(status)) {
+            LOG.info("Automatically resuming update " + key);
+
+            try {
+              changeJobUpdateStatus(storeProvider, key, newEvent(status), false);
+            } catch (UpdateStateException e) {
+              throw Throwables.propagate(e);
+            }
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/aurora/blob/d10d2d17/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateStateMachine.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateStateMachine.java b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateStateMachine.java
index 74e915c..1dbab1e 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateStateMachine.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateStateMachine.java
@@ -14,6 +14,7 @@
 package org.apache.aurora.scheduler.updater;
 
 import java.util.Map;
+import java.util.Set;
 
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
@@ -21,8 +22,8 @@ import com.google.common.collect.BiMap;
 import com.google.common.collect.ImmutableBiMap;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableMultimap;
-import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
 
 import org.apache.aurora.gen.JobUpdateQuery;
 import org.apache.aurora.gen.JobUpdateStatus;
@@ -97,8 +98,10 @@ final class JobUpdateStateMachine {
           ROLL_BACK_AWAITING_PULSE, ROLL_BACK_PAUSED);
 
   static final IJobUpdateQuery ACTIVE_QUERY = IJobUpdateQuery.build(
-      new JobUpdateQuery()
-          .setUpdateStatuses(ImmutableSet.copyOf(ACTIVE_TO_PAUSED_STATES.keySet())));
+      new JobUpdateQuery().setUpdateStatuses(Updates.ACTIVE_JOB_UPDATE_STATES));
+
+  static final Set<JobUpdateStatus> AUTO_RESUME_STATES =
+      Sets.immutableEnumSet(ACTIVE_TO_PAUSED_STATES.keySet());
 
   private static final Map<JobUpdateStatus, JobUpdateStatus> PAUSE_BEHAVIOR =
       ImmutableMap.<JobUpdateStatus, JobUpdateStatus>builder()

http://git-wip-us.apache.org/repos/asf/aurora/blob/d10d2d17/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
index 6be0efa..802c090 100644
--- a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
@@ -496,6 +496,78 @@ public class JobUpdaterIT extends EasyMockTest {
   }
 
   @Test
+  public void testRecoverAwaitingPulseFromStorage() throws Exception {
+    expectTaskKilled();
+
+    control.replay();
+
+    JobUpdate builder =
+        setInstanceCount(makeJobUpdate(makeInstanceConfig(0, 0, OLD_CONFIG)), 1).newBuilder();
+    builder.getInstructions().getSettings().setBlockIfNoPulsesAfterMs((int) PULSE_TIMEOUT_MS);
+    final IJobUpdate update = IJobUpdate.build(builder);
+    insertInitialTasks(update);
+    changeState(JOB, 0, ASSIGNED, STARTING, RUNNING);
+    clock.advance(ONE_DAY);
+
+    storage.write(new NoResult.Quiet() {
+      @Override
+      protected void execute(Storage.MutableStoreProvider storeProvider) {
+        saveJobUpdate(storeProvider.getJobUpdateStore(), update, ROLL_FORWARD_AWAITING_PULSE);
+      }
+    });
+
+    subscriber.startAsync().awaitRunning();
+    ImmutableMultimap.Builder<Integer, JobUpdateAction> actions = ImmutableMultimap.builder();
+
+    assertState(ROLL_FORWARD_AWAITING_PULSE, actions.build());
+    assertEquals(JobUpdatePulseStatus.OK, updater.pulse(UPDATE_ID));
+
+    changeState(JOB, 0, KILLED, ASSIGNED, STARTING, RUNNING);
+    clock.advance(WATCH_TIMEOUT);
+    actions.putAll(0, INSTANCE_UPDATING, INSTANCE_UPDATED);
+
+    assertState(ROLLED_FORWARD, actions.build());
+    assertEquals(JobUpdatePulseStatus.FINISHED, updater.pulse(UPDATE_ID));
+  }
+
+  @Test
+  public void testRecoverCoordinatedPausedFromStorage() throws Exception {
+    expectTaskKilled();
+
+    control.replay();
+
+    JobUpdate builder =
+        setInstanceCount(makeJobUpdate(makeInstanceConfig(0, 0, OLD_CONFIG)), 1).newBuilder();
+    builder.getInstructions().getSettings().setBlockIfNoPulsesAfterMs((int) PULSE_TIMEOUT_MS);
+    final IJobUpdate update = IJobUpdate.build(builder);
+    insertInitialTasks(update);
+    changeState(JOB, 0, ASSIGNED, STARTING, RUNNING);
+    clock.advance(ONE_DAY);
+
+    storage.write(new NoResult.Quiet() {
+      @Override
+      protected void execute(Storage.MutableStoreProvider storeProvider) {
+        saveJobUpdate(storeProvider.getJobUpdateStore(), update, ROLL_FORWARD_PAUSED);
+      }
+    });
+
+    subscriber.startAsync().awaitRunning();
+    ImmutableMultimap.Builder<Integer, JobUpdateAction> actions = ImmutableMultimap.builder();
+
+    assertState(ROLL_FORWARD_PAUSED, actions.build());
+    assertEquals(JobUpdatePulseStatus.OK, updater.pulse(UPDATE_ID));
+
+    updater.resume(UPDATE_ID, AUDIT);
+
+    changeState(JOB, 0, KILLED, ASSIGNED, STARTING, RUNNING);
+    clock.advance(WATCH_TIMEOUT);
+    actions.putAll(0, INSTANCE_UPDATING, INSTANCE_UPDATED);
+
+    assertState(ROLLED_FORWARD, actions.build());
+    assertEquals(JobUpdatePulseStatus.FINISHED, updater.pulse(UPDATE_ID));
+  }
+
+  @Test
   public void testResumeToAwaitingPulse() throws Exception {
     expectTaskKilled().times(2);