You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by zm...@apache.org on 2016/08/11 20:57:33 UTC
aurora git commit: Add rollback functionality to the scheduler
Repository: aurora
Updated Branches:
refs/heads/master 90846640b -> a4fdf284d
Add rollback functionality to the scheduler
For active job updates in ROLLING_FORWARD, ROLL_BACK_PAUSED,
ROLL_BACK_AWAITING_PULSE, ROLL_FORWARD_PAUSED or ROLL_FORWARD_AWAITING_PULSE
state it is possible now to initiate a rollback by calling a corresponding API
function. Rollback is also supported in aurora CLI tool via new command: aurora
update rollback CLUSTER/ROLE/ENV/NAME
Bugs closed: AURORA-1721
Reviewed at https://reviews.apache.org/r/50168/
Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/a4fdf284
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/a4fdf284
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/a4fdf284
Branch: refs/heads/master
Commit: a4fdf284d878c9430f267e2051a1ae97355d0ddb
Parents: 9084664
Author: Igor Morozov <ig...@gmail.com>
Authored: Thu Aug 11 13:56:48 2016 -0700
Committer: Zameer Manji <zm...@apache.org>
Committed: Thu Aug 11 13:56:48 2016 -0700
----------------------------------------------------------------------
CHANGELOG | 3 +-
RELEASE-NOTES.md | 2 +
.../thrift/org/apache/aurora/gen/api.thrift | 9 +
.../thrift/SchedulerThriftInterface.java | 8 +
.../thrift/aop/AnnotatedAuroraAdmin.java | 5 +
.../scheduler/updater/JobUpdateController.java | 20 ++
.../updater/JobUpdateControllerImpl.java | 7 +
.../updater/JobUpdateStateMachine.java | 14 +-
.../python/apache/aurora/client/api/__init__.py | 11 +
.../python/apache/aurora/client/cli/update.py | 97 ++++++--
.../updater/JobUpdateStateMachineTest.java | 2 +
.../aurora/scheduler/updater/JobUpdaterIT.java | 220 ++++++++++++++++++-
.../aurora/client/api/test_scheduler_client.py | 5 +
.../apache/aurora/client/cli/test_supdate.py | 86 ++++++++
14 files changed, 459 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aurora/blob/a4fdf284/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index fc6a46d..b1dce58 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,11 +1,10 @@
Aurora 0.15.0
--------------------------------------------------------------------------------
## Task
- * [AURORA-1725] - Expose tier configurations as a debug page
+ * [AURORA-1725] - Expose tier configurations as a debug page
* [AURORA-1458] - Add tier into the UI "show config" summary
* [AURORA-1720] - Broken link in http://aurora.apache.org/
-
Aurora 0.14.0
--------------------------------------------------------------------------------
## Bug
http://git-wip-us.apache.org/repos/asf/aurora/blob/a4fdf284/RELEASE-NOTES.md
----------------------------------------------------------------------
diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md
index 50f9b83..1819eaa 100644
--- a/RELEASE-NOTES.md
+++ b/RELEASE-NOTES.md
@@ -29,6 +29,8 @@
documentation.
- The `ExecutorInfo.source` field is deprecated and has been replaced with a label named `source`.
It will be removed from Mesos in a future release.
+- Add rollback API to the scheduler and new client command to support rolling back
+ active update jobs to their initial state.
### Deprecations and removals:
http://git-wip-us.apache.org/repos/asf/aurora/blob/a4fdf284/api/src/main/thrift/org/apache/aurora/gen/api.thrift
----------------------------------------------------------------------
diff --git a/api/src/main/thrift/org/apache/aurora/gen/api.thrift b/api/src/main/thrift/org/apache/aurora/gen/api.thrift
index b799cce..c5765b7 100644
--- a/api/src/main/thrift/org/apache/aurora/gen/api.thrift
+++ b/api/src/main/thrift/org/apache/aurora/gen/api.thrift
@@ -1106,6 +1106,15 @@ service AuroraSchedulerManager extends ReadOnlyScheduler {
3: string message)
/**
+ * Rollbacks the specified active job update to the initial state.
+ */
+ Response rollbackJobUpdate(
+ /** The update to rollback. */
+ 1: JobUpdateKey key,
+ /** A user-specified message to include with the induced job update state change. */
+ 2: string message)
+
+ /**
* Allows progress of the job update in case blockIfNoPulsesAfterMs is specified in
* JobUpdateSettings. Unblocks progress if the update was previously blocked.
* Responds with ResponseCode.INVALID_REQUEST in case an unknown update key is specified.
http://git-wip-us.apache.org/repos/asf/aurora/blob/a4fdf284/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
index b534abf..929d021 100644
--- a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
@@ -979,6 +979,14 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin {
}
@Override
+ public Response rollbackJobUpdate(JobUpdateKey mutableKey, @Nullable String message) {
+ return changeJobUpdateState(
+ mutableKey,
+ JobUpdateController::rollback,
+ Optional.fromNullable(message));
+ }
+
+ @Override
public Response pulseJobUpdate(JobUpdateKey mutableUpdateKey) {
IJobUpdateKey updateKey = validateJobUpdateKey(mutableUpdateKey);
try {
http://git-wip-us.apache.org/repos/asf/aurora/blob/a4fdf284/src/main/java/org/apache/aurora/scheduler/thrift/aop/AnnotatedAuroraAdmin.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/aop/AnnotatedAuroraAdmin.java b/src/main/java/org/apache/aurora/scheduler/thrift/aop/AnnotatedAuroraAdmin.java
index 9243c92..bfc3dc8 100644
--- a/src/main/java/org/apache/aurora/scheduler/thrift/aop/AnnotatedAuroraAdmin.java
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/aop/AnnotatedAuroraAdmin.java
@@ -95,4 +95,9 @@ public interface AnnotatedAuroraAdmin extends AuroraAdmin.Iface {
@Override
Response pulseJobUpdate(
@AuthorizingParam @Nullable JobUpdateKey key) throws TException;
+
+ @Override
+ Response rollbackJobUpdate(
+ @AuthorizingParam @Nullable JobUpdateKey key,
+ @Nullable String message) throws TException;
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/a4fdf284/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateController.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateController.java b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateController.java
index f8357c4..c2ec1b3 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateController.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateController.java
@@ -123,6 +123,26 @@ public interface JobUpdateController {
void abort(IJobUpdateKey key, AuditData auditData) throws UpdateStateException;
/**
+ * Rollbacks an active job update.
+ * <p>
+ * This will rollback the update to its initial state effectively 'undoing' it.
+ * The rollback is possible if update is in following states:
+ * <ul>
+ * <li>ROLLING_FORWARD</li>
+ * <li>ROLL_BACK_PAUSED</li>
+ * <li>ROLL_BACK_AWAITING_PULSE</li>
+ * <li>ROLL_FORWARD_PAUSED</li>
+ * <li>ROLL_FORWARD_AWAITING_PULSE</li>
+ * </ul>
+ * has not reached its terminal state yet.
+ *
+ * @param key Update to rollback.
+ * @param auditData Details about the origin of this state change.
+ * @throws UpdateStateException If pre-condition is not met.
+ */
+ void rollback(IJobUpdateKey key, AuditData auditData) throws UpdateStateException;
+
+ /**
* Notifies the updater that the state of an instance has changed. A state change could also mean
* deletion.
*
http://git-wip-us.apache.org/repos/asf/aurora/blob/a4fdf284/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
index 364c5c7..594bb62 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
@@ -246,6 +246,13 @@ class JobUpdateControllerImpl implements JobUpdateController {
Functions.compose(createAuditedEvent(auditData), Functions.constant(ABORTED)));
}
+ @Override
+ public void rollback(IJobUpdateKey key, AuditData auditData) throws UpdateStateException {
+ unscopedChangeUpdateStatus(
+ key,
+ Functions.compose(createAuditedEvent(auditData), Functions.constant(ROLLING_BACK)));
+ }
+
private static Function<JobUpdateStatus, JobUpdateEvent> createAuditedEvent(
final AuditData auditData) {
http://git-wip-us.apache.org/repos/asf/aurora/blob/a4fdf284/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateStateMachine.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateStateMachine.java b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateStateMachine.java
index 7ab739a..959a5a5 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateStateMachine.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateStateMachine.java
@@ -70,9 +70,19 @@ final class JobUpdateStateMachine {
ABORTED,
ERROR,
FAILED)
- .putAll(ROLL_FORWARD_PAUSED, ROLLING_FORWARD, ROLL_FORWARD_AWAITING_PULSE, ABORTED, ERROR)
+ .putAll(ROLL_FORWARD_PAUSED,
+ ROLLING_BACK,
+ ROLLING_FORWARD,
+ ROLL_FORWARD_AWAITING_PULSE,
+ ABORTED,
+ ERROR)
.putAll(ROLL_BACK_PAUSED, ROLLING_BACK, ROLL_BACK_AWAITING_PULSE, ABORTED, ERROR)
- .putAll(ROLL_FORWARD_AWAITING_PULSE, ROLLING_FORWARD, ROLL_FORWARD_PAUSED, ABORTED, ERROR)
+ .putAll(ROLL_FORWARD_AWAITING_PULSE,
+ ROLLING_BACK,
+ ROLLING_FORWARD,
+ ROLL_FORWARD_PAUSED,
+ ABORTED,
+ ERROR)
.putAll(ROLL_BACK_AWAITING_PULSE, ROLLING_BACK, ROLL_BACK_PAUSED, ABORTED, ERROR)
.build();
http://git-wip-us.apache.org/repos/asf/aurora/blob/a4fdf284/src/main/python/apache/aurora/client/api/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/__init__.py b/src/main/python/apache/aurora/client/api/__init__.py
index ec2c786..9149c30 100644
--- a/src/main/python/apache/aurora/client/api/__init__.py
+++ b/src/main/python/apache/aurora/client/api/__init__.py
@@ -198,6 +198,17 @@ class AuroraClientAPI(object):
"""
return self._scheduler_proxy.abortJobUpdate(update_key, message)
+ def rollback_job_update(self, update_key, message):
+ """Requests Scheduler to rollback active job update.
+
+ Arguments:
+ update_key -- Update identifier.
+ message -- Audit message to include with the change.
+
+ Returns response object.
+ """
+ return self._scheduler_proxy.rollbackJobUpdate(update_key, message)
+
def get_job_update_diff(self, config, instances=None):
"""Requests scheduler to calculate difference between scheduler and client job views.
http://git-wip-us.apache.org/repos/asf/aurora/blob/a4fdf284/src/main/python/apache/aurora/client/cli/update.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/cli/update.py b/src/main/python/apache/aurora/client/cli/update.py
index bb526f7..23aaa2c 100644
--- a/src/main/python/apache/aurora/client/cli/update.py
+++ b/src/main/python/apache/aurora/client/cli/update.py
@@ -76,11 +76,18 @@ class UpdateController(object):
update_key = self.get_update_key(job_key)
if update_key is None:
self.context.print_err("No active update found for this job.")
- return EXIT_INVALID_PARAMETER
+ return EXIT_INVALID_PARAMETER, update_key
resp = mutate_fn(update_key)
self.context.log_response_and_raise(resp, err_code=EXIT_API_ERROR, err_msg=error_msg)
self.context.print_out(success_msg)
- return EXIT_OK
+ return EXIT_OK, update_key
+
+ def rollback(self, job_key, message):
+ return self._modify_update(
+ job_key,
+ lambda key: self.api.rollback_job_update(key, message),
+ "Failed to rollback update due to error:",
+ "Update rollback has started.")
def pause(self, job_key, message):
return self._modify_update(
@@ -103,7 +110,6 @@ class UpdateController(object):
"Failed to abort update due to error:",
"Update has been aborted.")
-
def format_timestamp(stamp_millis):
return datetime.datetime.utcfromtimestamp(stamp_millis / 1000).isoformat()
@@ -116,16 +122,16 @@ MESSAGE_OPTION = CommandOption(
help='Message to include with the update state transition')
+WAIT_OPTION = lambda help_msg: CommandOption(
+ '--wait',
+ default=False,
+ action='store_true',
+ help=help_msg)
+
class StartUpdate(Verb):
UPDATE_MSG_TEMPLATE = "Job update has started. View your update progress at %s"
- WAIT_OPTION = CommandOption(
- '--wait',
- default=False,
- action='store_true',
- help='Wait until the update completes')
-
def __init__(self, clock=time):
self._clock = clock
@@ -143,7 +149,7 @@ class StartUpdate(Verb):
STRICT_OPTION,
INSTANCES_SPEC_ARGUMENT,
CONFIG_ARGUMENT,
- self.WAIT_OPTION
+ WAIT_OPTION('Wait until the update completes')
]
@property
@@ -189,14 +195,26 @@ class StartUpdate(Verb):
context.print_out(self.UPDATE_MSG_TEMPLATE % url)
if context.options.wait:
- return wait_for_update(context, self._clock, api, update_key)
+ return wait_for_update(context, self._clock, api, update_key, update_state_to_err_code)
else:
context.print_out(combine_messages(resp))
return EXIT_OK
-def wait_for_update(context, clock, api, update_key):
+def rollback_state_to_err_code(state):
+ return (EXIT_OK if state == JobUpdateStatus.ROLLED_BACK else
+ EXIT_COMMAND_FAILURE if state == JobUpdateStatus.ROLLED_FORWARD else
+ EXIT_UNKNOWN_ERROR)
+
+
+def update_state_to_err_code(state):
+ return (EXIT_OK if state == JobUpdateStatus.ROLLED_FORWARD else
+ EXIT_COMMAND_FAILURE if state == JobUpdateStatus.ROLLED_BACK else
+ EXIT_UNKNOWN_ERROR)
+
+
+def wait_for_update(context, clock, api, update_key, state_to_err_code_func):
cur_state = None
while True:
@@ -209,12 +227,7 @@ def wait_for_update(context, clock, api, update_key):
cur_state = new_state
context.print_out('Current state %s' % JobUpdateStatus._VALUES_TO_NAMES[cur_state])
if cur_state not in ACTIVE_JOB_UPDATE_STATES:
- if cur_state == JobUpdateStatus.ROLLED_FORWARD:
- return EXIT_OK
- elif cur_state == JobUpdateStatus.ROLLED_BACK:
- return EXIT_COMMAND_FAILURE
- else:
- return EXIT_UNKNOWN_ERROR
+ return state_to_err_code_func(cur_state)
clock.sleep(5)
elif len(summaries) == 0:
raise context.CommandError(EXIT_INVALID_PARAMETER, 'Job update not found.')
@@ -252,7 +265,9 @@ class UpdateWait(Verb):
context,
self._clock,
context.get_api(context.options.jobspec.cluster),
- JobUpdateKey(job=context.options.jobspec.to_thrift(), id=context.options.id))
+ JobUpdateKey(job=context.options.jobspec.to_thrift(), id=context.options.id),
+ update_state_to_err_code
+ )
class PauseUpdate(Verb):
@@ -269,9 +284,10 @@ class PauseUpdate(Verb):
def execute(self, context):
job_key = context.options.jobspec
- return UpdateController(context.get_api(job_key.cluster), context).pause(
+ err_code, _ = UpdateController(context.get_api(job_key.cluster), context).pause(
job_key,
context.options.message)
+ return err_code
class ResumeUpdate(Verb):
@@ -288,9 +304,10 @@ class ResumeUpdate(Verb):
def execute(self, context):
job_key = context.options.jobspec
- return UpdateController(context.get_api(job_key.cluster), context).resume(
+ err_code, _ = UpdateController(context.get_api(job_key.cluster), context).resume(
job_key,
context.options.message)
+ return err_code
class AbortUpdate(Verb):
@@ -307,9 +324,44 @@ class AbortUpdate(Verb):
def execute(self, context):
job_key = context.options.jobspec
- return UpdateController(context.get_api(job_key.cluster), context).abort(
+ err_code, _ = UpdateController(context.get_api(job_key.cluster), context).abort(
job_key,
context.options.message)
+ return err_code
+
+
+class RollbackUpdate(Verb):
+ def __init__(self, clock=time):
+ self._clock = clock
+
+ @property
+ def name(self):
+ return 'rollback'
+
+ def get_options(self):
+ return [JOBSPEC_ARGUMENT, MESSAGE_OPTION, WAIT_OPTION('Wait until the update rolls back')]
+
+ @property
+ def help(self):
+ return 'Rollback an in-progress update.'
+
+ def execute(self, context):
+ job_key = context.options.jobspec
+ update_controller = UpdateController(
+ context.get_api(job_key.cluster), context)
+
+ err_code, update_key = update_controller.rollback(
+ job_key, context.options.message)
+ if err_code == EXIT_OK and context.options.wait:
+ return wait_for_update(
+ context,
+ self._clock,
+ context.get_api(job_key.cluster),
+ update_key,
+ rollback_state_to_err_code
+ )
+
+ return err_code
UpdateFilter = namedtuple('UpdateFilter', ['cluster', 'role', 'env', 'job'])
@@ -554,6 +606,7 @@ class Update(Noun):
self.register_verb(PauseUpdate())
self.register_verb(ResumeUpdate())
self.register_verb(AbortUpdate())
+ self.register_verb(RollbackUpdate())
self.register_verb(ListUpdates())
self.register_verb(UpdateInfo())
self.register_verb(UpdateWait())
http://git-wip-us.apache.org/repos/asf/aurora/blob/a4fdf284/src/test/java/org/apache/aurora/scheduler/updater/JobUpdateStateMachineTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdateStateMachineTest.java b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdateStateMachineTest.java
index 8d78bae..3522dcc 100644
--- a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdateStateMachineTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdateStateMachineTest.java
@@ -60,11 +60,13 @@ public class JobUpdateStateMachineTest {
.put(Pair.of(ROLL_FORWARD_PAUSED, ROLL_FORWARD_AWAITING_PULSE), STOP_WATCHING)
.put(Pair.of(ROLL_FORWARD_PAUSED, ABORTED), STOP_WATCHING)
.put(Pair.of(ROLL_FORWARD_PAUSED, ERROR), STOP_WATCHING)
+ .put(Pair.of(ROLL_FORWARD_PAUSED, ROLLING_BACK), ROLL_BACK)
.put(Pair.of(ROLL_BACK_PAUSED, ROLLING_BACK), ROLL_BACK)
.put(Pair.of(ROLL_BACK_PAUSED, ROLL_BACK_AWAITING_PULSE), STOP_WATCHING)
.put(Pair.of(ROLL_BACK_PAUSED, ABORTED), STOP_WATCHING)
.put(Pair.of(ROLL_BACK_PAUSED, ERROR), STOP_WATCHING)
.put(Pair.of(ROLL_FORWARD_AWAITING_PULSE, ROLLING_FORWARD), ROLL_FORWARD)
+ .put(Pair.of(ROLL_FORWARD_AWAITING_PULSE, ROLLING_BACK), ROLL_BACK)
.put(Pair.of(ROLL_FORWARD_AWAITING_PULSE, ROLL_FORWARD_PAUSED), STOP_WATCHING)
.put(Pair.of(ROLL_FORWARD_AWAITING_PULSE, ABORTED), STOP_WATCHING)
.put(Pair.of(ROLL_FORWARD_AWAITING_PULSE, ERROR), STOP_WATCHING)
http://git-wip-us.apache.org/repos/asf/aurora/blob/a4fdf284/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
index e157c0d..04551f1 100644
--- a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
@@ -263,6 +263,11 @@ public class JobUpdaterIT extends EasyMockTest {
storeProvider -> storeProvider.getJobUpdateStore().fetchJobUpdateDetails(UPDATE_ID).get());
}
+ private IJobUpdateDetails getDetails(IJobUpdateKey key) {
+ return storage.read(
+ storeProvider -> storeProvider.getJobUpdateStore().fetchJobUpdateDetails(key).get());
+ }
+
private void assertLatestUpdateMessage(String expected) {
IJobUpdateDetails details = getDetails();
assertEquals(expected, Iterables.getLast(details.getUpdateEvents()).getMessage());
@@ -272,7 +277,15 @@ public class JobUpdaterIT extends EasyMockTest {
JobUpdateStatus expected,
Multimap<Integer, JobUpdateAction> expectedActions) {
- IJobUpdateDetails details = getDetails();
+ assertStateUpdate(UPDATE_ID, expected, expectedActions);
+ }
+
+ private void assertStateUpdate(
+ IJobUpdateKey key,
+ JobUpdateStatus expected,
+ Multimap<Integer, JobUpdateAction> expectedActions) {
+
+ IJobUpdateDetails details = getDetails(key);
Iterable<IJobInstanceUpdateEvent> orderedEvents =
EVENT_ORDER.sortedCopy(details.getInstanceEvents());
Multimap<Integer, IJobInstanceUpdateEvent> eventsByInstance =
@@ -1365,15 +1378,214 @@ public class JobUpdaterIT extends EasyMockTest {
updater.resume(UPDATE_ID, AUDIT);
}
- private static IJobUpdateSummary makeUpdateSummary() {
+ @Test
+ public void testFailToRollbackCompletedUpdate() throws Exception {
+ expectTaskKilled().times(3);
+
+ control.replay();
+
+ JobUpdate builder = makeJobUpdate(makeInstanceConfig(0, 2, OLD_CONFIG)).newBuilder();
+ builder.getInstructions().getSettings()
+ .setWaitForBatchCompletion(true)
+ .setUpdateGroupSize(2);
+ IJobUpdate update = IJobUpdate.build(builder);
+ insertInitialTasks(update);
+
+ changeState(JOB, 0, ASSIGNED, STARTING, RUNNING);
+ changeState(JOB, 1, ASSIGNED, STARTING, RUNNING);
+ changeState(JOB, 2, ASSIGNED, STARTING, RUNNING);
+ clock.advance(WATCH_TIMEOUT);
+
+ ImmutableMultimap.Builder<Integer, JobUpdateAction> actions = ImmutableMultimap.builder();
+
+ // Instances 0 and 1 are updated.
+ updater.start(update, AUDIT);
+ actions.putAll(0, INSTANCE_UPDATING)
+ .putAll(1, INSTANCE_UPDATING);
+ assertState(ROLLING_FORWARD, actions.build());
+ changeState(JOB, 1, FINISHED, ASSIGNED, STARTING, RUNNING);
+ clock.advance(Amount.of(WATCH_TIMEOUT.getValue() / 2, Time.MILLISECONDS));
+ changeState(JOB, 0, FINISHED, ASSIGNED, STARTING, RUNNING);
+ clock.advance(Amount.of(WATCH_TIMEOUT.getValue() / 2, Time.MILLISECONDS));
+
+ // Instance 1 finished first, but update does not yet proceed until 0 finishes.
+ actions.putAll(1, INSTANCE_UPDATED);
+ assertState(ROLLING_FORWARD, actions.build());
+ clock.advance(WATCH_TIMEOUT);
+ actions.putAll(0, INSTANCE_UPDATED);
+
+ // Instance 2 is updated.
+ changeState(JOB, 2, FINISHED, ASSIGNED, STARTING, RUNNING);
+ clock.advance(WATCH_TIMEOUT);
+ actions.putAll(2, INSTANCE_UPDATING, INSTANCE_UPDATED);
+ assertState(ROLLED_FORWARD, actions.build());
+
+ assertJobState(
+ JOB,
+ ImmutableMap.of(0, NEW_CONFIG, 1, NEW_CONFIG, 2, NEW_CONFIG));
+
+ try {
+ updater.rollback(UPDATE_ID, AUDIT);
+ fail();
+ } catch (UpdateStateException e) {
+ // Expected.
+ }
+ }
+
+ @Test
+ public void testRollbackDuringUpgrade() throws Exception {
+ expectTaskKilled().times(5);
+
+ control.replay();
+
+ JobUpdate builder = makeJobUpdate(makeInstanceConfig(0, 2, OLD_CONFIG)).newBuilder();
+ builder.getInstructions().getSettings()
+ .setWaitForBatchCompletion(true)
+ .setUpdateGroupSize(2);
+ IJobUpdate update = IJobUpdate.build(builder);
+ insertInitialTasks(update);
+
+ changeState(JOB, 0, ASSIGNED, STARTING, RUNNING);
+ changeState(JOB, 1, ASSIGNED, STARTING, RUNNING);
+ changeState(JOB, 2, ASSIGNED, STARTING, RUNNING);
+ clock.advance(WATCH_TIMEOUT);
+
+ ImmutableMultimap.Builder<Integer, JobUpdateAction> actions = ImmutableMultimap.builder();
+
+ // Instances 0 and 1 are updated.
+ updater.start(update, AUDIT);
+ actions.putAll(0, INSTANCE_UPDATING)
+ .putAll(1, INSTANCE_UPDATING);
+ assertState(ROLLING_FORWARD, actions.build());
+ changeState(JOB, 1, FINISHED, ASSIGNED, STARTING, RUNNING);
+ changeState(JOB, 0, FINISHED, ASSIGNED, STARTING, RUNNING);
+ clock.advance(WATCH_TIMEOUT);
+
+ actions.putAll(0, INSTANCE_UPDATED)
+ .putAll(1, INSTANCE_UPDATED)
+ .putAll(2, INSTANCE_UPDATING);
+ assertState(ROLLING_FORWARD, actions.build());
+ clock.advance(WATCH_TIMEOUT);
+
+ updater.rollback(UPDATE_ID, AUDIT);
+
+ actions.putAll(1, INSTANCE_ROLLING_BACK);
+ actions.putAll(2, INSTANCE_ROLLING_BACK);
+ changeState(JOB, 1, KILLED);
+ changeState(JOB, 2, KILLED);
+ clock.advance(WATCH_TIMEOUT);
+
+ assertState(ROLLING_BACK, actions.build());
+ clock.advance(WATCH_TIMEOUT);
+
+ changeState(JOB, 2, ASSIGNED, STARTING, RUNNING);
+ changeState(JOB, 1, ASSIGNED, STARTING, RUNNING);
+ clock.advance(WATCH_TIMEOUT);
+
+ actions.putAll(2, INSTANCE_ROLLED_BACK)
+ .putAll(1, INSTANCE_ROLLED_BACK);
+ changeState(JOB, 0, KILLED);
+ actions.putAll(0, INSTANCE_ROLLING_BACK);
+ clock.advance(WATCH_TIMEOUT);
+
+ assertState(ROLLING_BACK, actions.build());
+
+ changeState(JOB, 0, ASSIGNED, STARTING, RUNNING);
+ actions.putAll(0, INSTANCE_ROLLED_BACK);
+ clock.advance(WATCH_TIMEOUT);
+
+ assertState(ROLLED_BACK, actions.build());
+
+ assertJobState(
+ JOB,
+ ImmutableMap.of(0, OLD_CONFIG, 1, OLD_CONFIG, 2, OLD_CONFIG));
+ }
+
+ @Test
+ public void testRollbackCoordinatedUpdate() throws Exception {
+ control.replay();
+
+ JobUpdate builder = makeJobUpdate(
+ // No-op - task is already matching the new config.
+ makeInstanceConfig(0, 0, NEW_CONFIG),
+ // Tasks needing update.
+ makeInstanceConfig(1, 2, OLD_CONFIG)).newBuilder();
+
+ builder.getInstructions().getSettings().setBlockIfNoPulsesAfterMs((int) PULSE_TIMEOUT_MS);
+ insertInitialTasks(IJobUpdate.build(builder));
+
+ changeState(JOB, 0, ASSIGNED, STARTING, RUNNING);
+ changeState(JOB, 1, ASSIGNED, STARTING, RUNNING);
+ changeState(JOB, 2, ASSIGNED, STARTING, RUNNING);
+ clock.advance(WATCH_TIMEOUT);
+
+ ImmutableMultimap.Builder<Integer, JobUpdateAction> actions = ImmutableMultimap.builder();
+ updater.start(IJobUpdate.build(builder), AUDIT);
+
+ // The update is blocked initially waiting for a pulse.
+ assertState(ROLL_FORWARD_AWAITING_PULSE, actions.build());
+
+ updater.rollback(UPDATE_ID, AUDIT);
+
+ clock.advance(WATCH_TIMEOUT);
+ assertState(ROLLED_BACK, actions.build());
+ }
+
+ @Test
+ public void testRollbackPausedForwardUpdate() throws Exception {
+ expectTaskKilled().times(2);
+
+ control.replay();
+
+ JobUpdate builder = makeJobUpdate(
+ // No-op - task is already matching the new config.
+ makeInstanceConfig(0, 0, NEW_CONFIG),
+ // Tasks needing update.
+ makeInstanceConfig(1, 2, OLD_CONFIG)).newBuilder();
+
+ insertInitialTasks(IJobUpdate.build(builder));
+
+ changeState(JOB, 0, ASSIGNED, STARTING, RUNNING);
+ changeState(JOB, 1, ASSIGNED, STARTING, RUNNING);
+ changeState(JOB, 2, ASSIGNED, STARTING, RUNNING);
+ clock.advance(WATCH_TIMEOUT);
+
+ ImmutableMultimap.Builder<Integer, JobUpdateAction> actions = ImmutableMultimap.builder();
+ updater.start(IJobUpdate.build(builder), AUDIT);
+
+ actions.putAll(1, INSTANCE_UPDATING);
+ assertState(ROLLING_FORWARD, actions.build());
+ clock.advance(WATCH_TIMEOUT);
+ changeState(JOB, 1, KILLED, ASSIGNED, STARTING, RUNNING);
+
+ updater.pause(UPDATE_ID, AUDIT);
+ assertState(ROLL_FORWARD_PAUSED, actions.build());
+
+ updater.rollback(UPDATE_ID, AUDIT);
+
+ actions.putAll(1, INSTANCE_ROLLING_BACK);
+ clock.advance(WATCH_TIMEOUT);
+ assertState(ROLLING_BACK, actions.build());
+
+ actions.putAll(1, INSTANCE_ROLLED_BACK);
+ changeState(JOB, 1, KILLED, ASSIGNED, STARTING, RUNNING);
+ clock.advance(WATCH_TIMEOUT);
+ assertState(ROLLED_BACK, actions.build());
+
+ assertJobState(
+ JOB,
+ ImmutableMap.of(0, NEW_CONFIG, 1, OLD_CONFIG, 2, OLD_CONFIG));
+ }
+
+ private static IJobUpdateSummary makeUpdateSummary(IJobUpdateKey key) {
return IJobUpdateSummary.build(new JobUpdateSummary()
.setUser("user")
- .setKey(UPDATE_ID.newBuilder()));
+ .setKey(key.newBuilder()));
}
private static IJobUpdate makeJobUpdate(IInstanceTaskConfig... configs) {
JobUpdate builder = new JobUpdate()
- .setSummary(makeUpdateSummary().newBuilder())
+ .setSummary(makeUpdateSummary(UPDATE_ID).newBuilder())
.setInstructions(new JobUpdateInstructions()
.setDesiredState(new InstanceTaskConfig()
.setTask(NEW_CONFIG.newBuilder())
http://git-wip-us.apache.org/repos/asf/aurora/blob/a4fdf284/src/test/python/apache/aurora/client/api/test_scheduler_client.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/api/test_scheduler_client.py b/src/test/python/apache/aurora/client/api/test_scheduler_client.py
index afbd385..afac250 100644
--- a/src/test/python/apache/aurora/client/api/test_scheduler_client.py
+++ b/src/test/python/apache/aurora/client/api/test_scheduler_client.py
@@ -183,6 +183,11 @@ class TestSchedulerProxyInjection(unittest.TestCase):
self.mox.ReplayAll()
self.make_scheduler_proxy().abortJobUpdate('update_id')
+ def test_rollbackJobUpdate(self):
+ self.mock_thrift_client.rollbackJobUpdate('update_id').AndReturn(DEFAULT_RESPONSE)
+ self.mox.ReplayAll()
+ self.make_scheduler_proxy().rollbackJobUpdate('update_id')
+
def test_pulseJobUpdate(self):
self.mock_thrift_client.pulseJobUpdate('update_id').AndReturn(DEFAULT_RESPONSE)
self.mox.ReplayAll()
http://git-wip-us.apache.org/repos/asf/aurora/blob/a4fdf284/src/test/python/apache/aurora/client/cli/test_supdate.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/cli/test_supdate.py b/src/test/python/apache/aurora/client/cli/test_supdate.py
index 317b175..92fb039 100644
--- a/src/test/python/apache/aurora/client/cli/test_supdate.py
+++ b/src/test/python/apache/aurora/client/cli/test_supdate.py
@@ -34,6 +34,7 @@ from apache.aurora.client.cli.update import (
ListUpdates,
PauseUpdate,
ResumeUpdate,
+ RollbackUpdate,
StartUpdate,
UpdateFilter,
UpdateInfo,
@@ -705,3 +706,88 @@ class TestUpdateWait(AuroraClientCommandTest):
assert self._fake_context.get_out() == []
assert self._mock_api.query_job_updates.mock_calls == [self._fetch_call]
+
+
+class TestRollbackUpdate(AuroraClientCommandTest):
+ def setUp(self):
+ self._command = RollbackUpdate()
+ self._mock_options = mock_verb_options(self._command)
+ self._mock_options.jobspec = self.TEST_JOBKEY
+ self._mock_options.wait = False
+ self._fake_context = FakeAuroraCommandContext()
+ self._fake_context.set_options(self._mock_options)
+ self._mock_api = self._fake_context.get_api('UNUSED')
+
+ def test_rollback_update_command_line_succeeds(self):
+ self._mock_api.query_job_updates.return_value = get_status_query_response()
+ self._mock_api.rollback_job_update.return_value = self.create_simple_success_response()
+ self._mock_options.message = 'hello'
+ assert self._command.execute(self._fake_context) == EXIT_OK
+
+ assert self._mock_api.query_job_updates.mock_calls == [
+ call(update_statuses=ACTIVE_JOB_UPDATE_STATES, job_key=self.TEST_JOBKEY)]
+ assert self._mock_api.rollback_job_update.mock_calls == [call(UPDATE_KEY, 'hello')]
+ assert self._fake_context.get_out() == ["Update rollback has started."]
+ assert self._fake_context.get_err() == []
+
+ def test_rollback_update_command_line_error(self):
+ self._mock_api.query_job_updates.return_value = get_status_query_response()
+ self._mock_api.rollback_job_update.return_value = self.create_error_response()
+
+ with pytest.raises(Context.CommandError):
+ self._command.execute(self._fake_context)
+ assert self._mock_api.query_job_updates.mock_calls == [
+ call(update_statuses=ACTIVE_JOB_UPDATE_STATES, job_key=self.TEST_JOBKEY)]
+ assert self._mock_api.rollback_job_update.mock_calls == [call(UPDATE_KEY, None)]
+
+ assert self._fake_context.get_out() == []
+ assert self._fake_context.get_err() == ["Failed to rollback update due to error:", "\tWhoops"]
+
+ def test_rollback_invalid_api_response(self):
+ # Mimic the API returning two active updates for one job, which should be impossible.
+ self._mock_api.query_job_updates.return_value = get_status_query_response(count=2)
+ self._mock_api.rollback_job_update.return_value = self.create_error_response()
+ with pytest.raises(Context.CommandError) as error:
+ self._command.execute(self._fake_context)
+ assert error.message == (
+ 'scheduler returned multiple active updates for this job.')
+
+ assert self._mock_api.query_job_updates.mock_calls == [
+ call(update_statuses=ACTIVE_JOB_UPDATE_STATES, job_key=self.TEST_JOBKEY)]
+ assert self._mock_api.rollback_job_update.mock_calls == []
+ assert self._fake_context.get_out() == []
+ assert self._fake_context.get_err() == []
+
+ def test_rollback_and_wait_success(self):
+ self._mock_options.wait = True
+
+ updating_response = get_status_query_response(status=JobUpdateStatus.ROLLING_FORWARD)
+ updated_response = get_status_query_response(status=JobUpdateStatus.ROLLED_BACK)
+
+ self._mock_api.query_job_updates.side_effect = [updating_response, updated_response]
+ self._mock_api.rollback_job_update.return_value = self.create_simple_success_response()
+
+ assert self._command.execute(self._fake_context) == EXIT_OK
+ assert self._mock_api.rollback_job_update.mock_calls == [call(UPDATE_KEY, None)]
+ assert self._fake_context.get_err() == []
+
+ def test_rollback_and_wait_rolled_forward(self):
+ self._mock_options.wait = True
+
+ updating_response = get_status_query_response(status=JobUpdateStatus.ROLLING_FORWARD)
+ updated_response = get_status_query_response(status=JobUpdateStatus.ROLLED_FORWARD)
+
+ self._mock_api.query_job_updates.side_effect = [updating_response, updated_response]
+
+ self._mock_api.rollback_job_update.return_value = self.create_simple_success_response()
+ assert self._command.execute(self._fake_context) == EXIT_COMMAND_FAILURE
+
+ def test_rollback_and_wait_error(self):
+ self._mock_options.wait = True
+ updating_response = get_status_query_response(status=JobUpdateStatus.ROLLING_FORWARD)
+ failed_response = get_status_query_response(status=JobUpdateStatus.ERROR)
+
+ self._mock_api.query_job_updates.side_effect = [updating_response, failed_response]
+
+ self._mock_api.rollback_job_update.return_value = self.create_simple_success_response()
+ assert self._command.execute(self._fake_context) == EXIT_UNKNOWN_ERROR