You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2015/04/21 23:31:52 UTC
aurora git commit: Resuming blocked updates on restart.
Repository: aurora
Updated Branches:
refs/heads/master f55f46743 -> d10d2d171
Resuming blocked updates on restart.
Bugs closed: AURORA-1285
Reviewed at https://reviews.apache.org/r/33374/
Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/d10d2d17
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/d10d2d17
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/d10d2d17
Branch: refs/heads/master
Commit: d10d2d17142dad405c8b154321931cef59512866
Parents: f55f467
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Tue Apr 21 14:28:27 2015 -0700
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Tue Apr 21 14:28:27 2015 -0700
----------------------------------------------------------------------
.../updater/JobUpdateControllerImpl.java | 16 +++--
.../updater/JobUpdateStateMachine.java | 9 ++-
.../aurora/scheduler/updater/JobUpdaterIT.java | 72 ++++++++++++++++++++
3 files changed, 88 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aurora/blob/d10d2d17/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
index ac15217..1ebfa64 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
@@ -83,6 +83,7 @@ import static org.apache.aurora.gen.JobUpdateStatus.ROLL_FORWARD_AWAITING_PULSE;
import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
import static org.apache.aurora.scheduler.storage.Storage.MutateWork;
import static org.apache.aurora.scheduler.updater.JobUpdateStateMachine.ACTIVE_QUERY;
+import static org.apache.aurora.scheduler.updater.JobUpdateStateMachine.AUTO_RESUME_STATES;
import static org.apache.aurora.scheduler.updater.JobUpdateStateMachine.GET_ACTIVE_RESUME_STATE;
import static org.apache.aurora.scheduler.updater.JobUpdateStateMachine.GET_BLOCKED_RESUME_STATE;
import static org.apache.aurora.scheduler.updater.JobUpdateStateMachine.GET_PAUSE_STATE;
@@ -270,16 +271,19 @@ class JobUpdateControllerImpl implements JobUpdateController {
IJobUpdateKey key = summary.getKey();
JobUpdateStatus status = summary.getState().getStatus();
- LOG.info("Automatically resuming update " + key);
-
if (isCoordinatedUpdate(instructions)) {
+ LOG.info("Automatically restoring pulse state for " + key);
pulseHandler.initializePulseState(details.getUpdate(), status);
}
- try {
- changeJobUpdateStatus(storeProvider, key, newEvent(status), false);
- } catch (UpdateStateException e) {
- throw Throwables.propagate(e);
+ if (AUTO_RESUME_STATES.contains(status)) {
+ LOG.info("Automatically resuming update " + key);
+
+ try {
+ changeJobUpdateStatus(storeProvider, key, newEvent(status), false);
+ } catch (UpdateStateException e) {
+ throw Throwables.propagate(e);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/d10d2d17/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateStateMachine.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateStateMachine.java b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateStateMachine.java
index 74e915c..1dbab1e 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateStateMachine.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateStateMachine.java
@@ -14,6 +14,7 @@
package org.apache.aurora.scheduler.updater;
import java.util.Map;
+import java.util.Set;
import com.google.common.base.Function;
import com.google.common.base.Optional;
@@ -21,8 +22,8 @@ import com.google.common.collect.BiMap;
import com.google.common.collect.ImmutableBiMap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultimap;
-import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
import org.apache.aurora.gen.JobUpdateQuery;
import org.apache.aurora.gen.JobUpdateStatus;
@@ -97,8 +98,10 @@ final class JobUpdateStateMachine {
ROLL_BACK_AWAITING_PULSE, ROLL_BACK_PAUSED);
static final IJobUpdateQuery ACTIVE_QUERY = IJobUpdateQuery.build(
- new JobUpdateQuery()
- .setUpdateStatuses(ImmutableSet.copyOf(ACTIVE_TO_PAUSED_STATES.keySet())));
+ new JobUpdateQuery().setUpdateStatuses(Updates.ACTIVE_JOB_UPDATE_STATES));
+
+ static final Set<JobUpdateStatus> AUTO_RESUME_STATES =
+ Sets.immutableEnumSet(ACTIVE_TO_PAUSED_STATES.keySet());
private static final Map<JobUpdateStatus, JobUpdateStatus> PAUSE_BEHAVIOR =
ImmutableMap.<JobUpdateStatus, JobUpdateStatus>builder()
http://git-wip-us.apache.org/repos/asf/aurora/blob/d10d2d17/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
index 6be0efa..802c090 100644
--- a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
@@ -496,6 +496,78 @@ public class JobUpdaterIT extends EasyMockTest {
}
@Test
+ public void testRecoverAwaitingPulseFromStorage() throws Exception {
+ expectTaskKilled();
+
+ control.replay();
+
+ JobUpdate builder =
+ setInstanceCount(makeJobUpdate(makeInstanceConfig(0, 0, OLD_CONFIG)), 1).newBuilder();
+ builder.getInstructions().getSettings().setBlockIfNoPulsesAfterMs((int) PULSE_TIMEOUT_MS);
+ final IJobUpdate update = IJobUpdate.build(builder);
+ insertInitialTasks(update);
+ changeState(JOB, 0, ASSIGNED, STARTING, RUNNING);
+ clock.advance(ONE_DAY);
+
+ storage.write(new NoResult.Quiet() {
+ @Override
+ protected void execute(Storage.MutableStoreProvider storeProvider) {
+ saveJobUpdate(storeProvider.getJobUpdateStore(), update, ROLL_FORWARD_AWAITING_PULSE);
+ }
+ });
+
+ subscriber.startAsync().awaitRunning();
+ ImmutableMultimap.Builder<Integer, JobUpdateAction> actions = ImmutableMultimap.builder();
+
+ assertState(ROLL_FORWARD_AWAITING_PULSE, actions.build());
+ assertEquals(JobUpdatePulseStatus.OK, updater.pulse(UPDATE_ID));
+
+ changeState(JOB, 0, KILLED, ASSIGNED, STARTING, RUNNING);
+ clock.advance(WATCH_TIMEOUT);
+ actions.putAll(0, INSTANCE_UPDATING, INSTANCE_UPDATED);
+
+ assertState(ROLLED_FORWARD, actions.build());
+ assertEquals(JobUpdatePulseStatus.FINISHED, updater.pulse(UPDATE_ID));
+ }
+
+ @Test
+ public void testRecoverCoordinatedPausedFromStorage() throws Exception {
+ expectTaskKilled();
+
+ control.replay();
+
+ JobUpdate builder =
+ setInstanceCount(makeJobUpdate(makeInstanceConfig(0, 0, OLD_CONFIG)), 1).newBuilder();
+ builder.getInstructions().getSettings().setBlockIfNoPulsesAfterMs((int) PULSE_TIMEOUT_MS);
+ final IJobUpdate update = IJobUpdate.build(builder);
+ insertInitialTasks(update);
+ changeState(JOB, 0, ASSIGNED, STARTING, RUNNING);
+ clock.advance(ONE_DAY);
+
+ storage.write(new NoResult.Quiet() {
+ @Override
+ protected void execute(Storage.MutableStoreProvider storeProvider) {
+ saveJobUpdate(storeProvider.getJobUpdateStore(), update, ROLL_FORWARD_PAUSED);
+ }
+ });
+
+ subscriber.startAsync().awaitRunning();
+ ImmutableMultimap.Builder<Integer, JobUpdateAction> actions = ImmutableMultimap.builder();
+
+ assertState(ROLL_FORWARD_PAUSED, actions.build());
+ assertEquals(JobUpdatePulseStatus.OK, updater.pulse(UPDATE_ID));
+
+ updater.resume(UPDATE_ID, AUDIT);
+
+ changeState(JOB, 0, KILLED, ASSIGNED, STARTING, RUNNING);
+ clock.advance(WATCH_TIMEOUT);
+ actions.putAll(0, INSTANCE_UPDATING, INSTANCE_UPDATED);
+
+ assertState(ROLLED_FORWARD, actions.build());
+ assertEquals(JobUpdatePulseStatus.FINISHED, updater.pulse(UPDATE_ID));
+ }
+
+ @Test
public void testResumeToAwaitingPulse() throws Exception {
expectTaskKilled().times(2);