You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by jo...@apache.org on 2018/07/19 21:29:44 UTC

[1/2] aurora git commit: Enable SLA-aware updates

Repository: aurora
Updated Branches:
  refs/heads/master f054e9b10 -> 4e28e73bb


http://git-wip-us.apache.org/repos/asf/aurora/blob/4e28e73b/src/test/java/org/apache/aurora/scheduler/updater/KillTaskTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/KillTaskTest.java b/src/test/java/org/apache/aurora/scheduler/updater/KillTaskTest.java
index 2c27ec7..efe0c06 100644
--- a/src/test/java/org/apache/aurora/scheduler/updater/KillTaskTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/updater/KillTaskTest.java
@@ -14,47 +14,101 @@
 package org.apache.aurora.scheduler.updater;
 
 import java.util.Optional;
+import java.util.function.Consumer;
+
+import com.google.common.collect.ImmutableSet;
 
 import org.apache.aurora.common.testing.easymock.EasyMockTest;
 import org.apache.aurora.gen.InstanceKey;
+import org.apache.aurora.gen.InstanceTaskConfig;
 import org.apache.aurora.gen.JobUpdateInstructions;
 import org.apache.aurora.gen.JobUpdateKey;
 import org.apache.aurora.gen.JobUpdateSettings;
 import org.apache.aurora.gen.JobUpdateStatus;
+import org.apache.aurora.gen.PercentageSlaPolicy;
+import org.apache.aurora.gen.Range;
 import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.gen.SlaPolicy;
 import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.TaskTestUtil;
 import org.apache.aurora.scheduler.state.StateChangeResult;
 import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateInstructions;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.apache.aurora.scheduler.updater.InstanceActionHandler.KillTask;
+import org.easymock.Capture;
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.expectLastCall;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class KillTaskTest extends EasyMockTest {
+  private static final String TASK_ID = "task_id";
+  private static final IJobKey JOB = JobKeys.from("role", "env", "job");
+  private static final IInstanceKey INSTANCE =
+      IInstanceKey.build(new InstanceKey(JOB.newBuilder(), 0));
+  private static final IJobUpdateKey UPDATE_ID =
+      IJobUpdateKey.build(new JobUpdateKey(JOB.newBuilder(), "update_id"));
+  private static final SlaPolicy TEST_SLA_POLICY_OLD = SlaPolicy.percentageSlaPolicy(
+      new PercentageSlaPolicy()
+          .setPercentage(95)
+          .setDurationSecs(3600));
+  private static final SlaPolicy TEST_SLA_POLICY_NEW = SlaPolicy.percentageSlaPolicy(
+      new PercentageSlaPolicy()
+          .setPercentage(0)
+          .setDurationSecs(0));
+  private static final ITaskConfig CONFIG_NO_SLA = ITaskConfig.build(
+      TaskTestUtil.makeConfig(JOB, true, Optional.empty()).newBuilder());
+  private static final ITaskConfig OLD_CONFIG_WITH_SLA =
+      TaskTestUtil.makeConfig(JOB, true, Optional.of(TEST_SLA_POLICY_OLD));
+  private static final ITaskConfig NEW_CONFIG_WITH_SLA =
+      TaskTestUtil.makeConfig(JOB, true, Optional.of(TEST_SLA_POLICY_NEW));
   private static final IJobUpdateInstructions INSTRUCTIONS = IJobUpdateInstructions.build(
       new JobUpdateInstructions()
           .setSettings(
               new JobUpdateSettings()
                   .setMinWaitInInstanceRunningMs(1000)));
-  private static final IJobKey JOB = JobKeys.from("role", "env", "job");
-  private static final IInstanceKey INSTANCE =
-      IInstanceKey.build(new InstanceKey(JOB.newBuilder(), 0));
-  private static final IJobUpdateKey UPDATE_ID =
-          IJobUpdateKey.build(new JobUpdateKey(JOB.newBuilder(), "update_id"));
+  private static final IJobUpdateInstructions INSTRUCTIONS_SLA_AWARE = IJobUpdateInstructions.build(
+      new JobUpdateInstructions()
+          .setSettings(
+              new JobUpdateSettings()
+                  .setMinWaitInInstanceRunningMs(1000)
+                  .setSlaAware(true))
+          .setInitialState(ImmutableSet.of(new InstanceTaskConfig(
+              OLD_CONFIG_WITH_SLA.newBuilder(),
+              ImmutableSet.of(new Range(0, 0)))))
+          .setDesiredState(new InstanceTaskConfig(
+              NEW_CONFIG_WITH_SLA.newBuilder(),
+              ImmutableSet.of(new Range(0, 0)))));
+  private static final IJobUpdateInstructions INSTRUCTIONS_SLA_AWARE_NO_POLICY
+      = IJobUpdateInstructions.build(
+          new JobUpdateInstructions()
+              .setSettings(
+                  new JobUpdateSettings()
+                      .setMinWaitInInstanceRunningMs(1000)
+                      .setSlaAware(true))
+              .setDesiredState(new InstanceTaskConfig(
+                  CONFIG_NO_SLA.newBuilder(),
+                  ImmutableSet.of(new Range(0, 0)))));
 
   private StorageTestUtil storageUtil;
   private StateManager stateManager;
   private InstanceActionHandler handler;
   private UpdateAgentReserver updateAgentReserver;
+  private SlaKillController slaKillController;
 
   @Before
   public void setUp() {
@@ -62,19 +116,19 @@ public class KillTaskTest extends EasyMockTest {
     storageUtil.expectOperations();
     stateManager = createMock(StateManager.class);
     updateAgentReserver = createMock(UpdateAgentReserver.class);
-    handler = new InstanceActionHandler.KillTask(false);
+    handler = new KillTask(false);
+    slaKillController = createMock(SlaKillController.class);
   }
 
   @Test
   public void testInstanceKill() throws Exception {
-    String id = "task_id";
     storageUtil.expectTaskFetch(
         Query.instanceScoped(INSTANCE).active(),
-        TaskTestUtil.makeTask(id, INSTANCE.getJobKey()));
+        TaskTestUtil.makeTask(TASK_ID, INSTANCE.getJobKey()));
 
     expect(stateManager.changeState(
         storageUtil.mutableStoreProvider,
-        id,
+        TASK_ID,
         Optional.empty(),
         ScheduleStatus.KILLING,
         Optional.of("Killed for job update " + UPDATE_ID.getId())))
@@ -89,36 +143,36 @@ public class KillTaskTest extends EasyMockTest {
         stateManager,
         updateAgentReserver,
         JobUpdateStatus.ROLLING_BACK,
-        UPDATE_ID);
+        UPDATE_ID,
+        slaKillController);
   }
 
   @Test
   public void testKillForUpdateReservesAgentForInstance() throws Exception {
-    String id = "task_id";
-    IScheduledTask task = TaskTestUtil.makeTask(id, INSTANCE.getJobKey(), 1, "agent01");
+    IScheduledTask task = TaskTestUtil.makeTask(TASK_ID, INSTANCE.getJobKey(), 1, "agent01");
     storageUtil.expectTaskFetch(Query.instanceScoped(INSTANCE).active(), task);
 
     expect(stateManager.changeState(
         storageUtil.mutableStoreProvider,
-        id,
+        TASK_ID,
         Optional.empty(),
         ScheduleStatus.KILLING,
         Optional.of("Killed for job update " + UPDATE_ID.getId())))
         .andReturn(StateChangeResult.SUCCESS);
-
     updateAgentReserver.reserve(task.getAssignedTask().getSlaveId(), INSTANCE);
     expectLastCall();
 
     control.replay();
 
-    new InstanceActionHandler.KillTask(true).getReevaluationDelay(
+    new KillTask(true).getReevaluationDelay(
         INSTANCE,
         INSTRUCTIONS,
         storageUtil.mutableStoreProvider,
         stateManager,
         updateAgentReserver,
         JobUpdateStatus.ROLLING_BACK,
-        UPDATE_ID);
+        UPDATE_ID,
+        slaKillController);
   }
 
   @Test
@@ -134,6 +188,177 @@ public class KillTaskTest extends EasyMockTest {
         stateManager,
         updateAgentReserver,
         JobUpdateStatus.ROLLING_BACK,
-        UPDATE_ID);
+        UPDATE_ID,
+        slaKillController);
+  }
+
+  /**
+   * Ensures that if an instance is killed with code {@code slaAware} option in the instructions,
+   * then the kill is sent to be handled by the {@link SlaKillController}.
+   */
+  @Test
+  public void testInstanceKillSlaAware() throws Exception {
+    IScheduledTask task = TaskTestUtil.makeTask(TASK_ID, INSTANCE.getJobKey());
+    storageUtil.expectTaskFetch(Query.instanceScoped(INSTANCE).active(), task);
+
+    slaKillController.slaKill(
+        eq(storageUtil.mutableStoreProvider),
+        eq(INSTANCE),
+        eq(task),
+        eq(UPDATE_ID),
+        eq(INSTRUCTIONS_SLA_AWARE.getDesiredState().getTask().getSlaPolicy()),
+        eq(JobUpdateStatus.ROLLING_FORWARD),
+        anyObject());
+    expectLastCall();
+
+    control.replay();
+
+    handler.getReevaluationDelay(
+        INSTANCE,
+        INSTRUCTIONS_SLA_AWARE,
+        storageUtil.mutableStoreProvider,
+        stateManager,
+        updateAgentReserver,
+        JobUpdateStatus.ROLLING_FORWARD,
+        UPDATE_ID,
+        slaKillController);
+  }
+
+  /**
+   * Ensures that if an instance killed while {@link JobUpdateStatus#ROLLING_BACK}, it uses the old
+   * configuration.
+   */
+  @Test
+  public void testInstanceKillSlaAwareRollback() throws Exception {
+    IScheduledTask task = TaskTestUtil.makeTask(TASK_ID, INSTANCE.getJobKey(), 1, "agent01");
+    storageUtil.expectTaskFetch(Query.instanceScoped(INSTANCE).active(), task);
+
+    Capture<Consumer<Storage.MutableStoreProvider>> killCommandCapture = createCapture();
+    slaKillController.slaKill(
+        eq(storageUtil.mutableStoreProvider),
+        eq(INSTANCE),
+        eq(task),
+        eq(UPDATE_ID),
+        eq(INSTRUCTIONS_SLA_AWARE.getInitialState().asList().get(0).getTask().getSlaPolicy()),
+        eq(JobUpdateStatus.ROLLING_BACK),
+        capture(killCommandCapture));
+    expectLastCall();
+
+    control.replay();
+
+    handler.getReevaluationDelay(
+        INSTANCE,
+        INSTRUCTIONS_SLA_AWARE,
+        storageUtil.mutableStoreProvider,
+        stateManager,
+        updateAgentReserver,
+        JobUpdateStatus.ROLLING_BACK,
+        UPDATE_ID,
+        slaKillController);
+  }
+
+  /**
+   * Ensure the correct behavior of the consumer passed into
+   * {@link SlaKillController#slaKill}. It should behave as
+   * {@link KillTask#killAndMaybeReserve}.
+   */
+  @Test
+  public void testInstanceKillSlaAwareKillCommand() throws Exception {
+    IScheduledTask task = TaskTestUtil.makeTask(TASK_ID, INSTANCE.getJobKey(), 1, "agent01");
+    storageUtil.expectTaskFetch(Query.instanceScoped(INSTANCE).active(), task);
+
+    Capture<Consumer<Storage.MutableStoreProvider>> killCommandCapture = createCapture();
+    slaKillController.slaKill(
+        eq(storageUtil.mutableStoreProvider),
+        eq(INSTANCE),
+        eq(task),
+        eq(UPDATE_ID),
+        eq(INSTRUCTIONS_SLA_AWARE.getDesiredState().getTask().getSlaPolicy()),
+        eq(JobUpdateStatus.ROLLING_FORWARD),
+        capture(killCommandCapture));
+    expectLastCall();
+
+    // Ensure the correct kill command consumer has been passed in and expected behavior occurs
+    // when executed.
+    expect(stateManager.changeState(
+        storageUtil.mutableStoreProvider,
+        TASK_ID,
+        Optional.empty(),
+        ScheduleStatus.KILLING,
+        Optional.of("Killed for job update " + UPDATE_ID.getId())))
+        .andReturn(StateChangeResult.SUCCESS);
+    updateAgentReserver.reserve(task.getAssignedTask().getSlaveId(), INSTANCE);
+    expectLastCall();
+
+    control.replay();
+
+    new KillTask(true).getReevaluationDelay(
+        INSTANCE,
+        INSTRUCTIONS_SLA_AWARE,
+        storageUtil.mutableStoreProvider,
+        stateManager,
+        updateAgentReserver,
+        JobUpdateStatus.ROLLING_FORWARD,
+        UPDATE_ID,
+        slaKillController);
+
+    assertTrue(killCommandCapture.hasCaptured());
+    killCommandCapture.getValue().accept(storageUtil.mutableStoreProvider);
+  }
+
+  /**
+   * Ensures that if an instance is killed with code {@code slaAware} option in the instructions
+   * but no {@link org.apache.aurora.gen.SlaPolicy} with the task, then the kill does not fail but
+   * instead continues as a non-sla-aware kill.
+   */
+  @Test
+  public void testInstanceKillSlaAwareMissingSlaPolicy() throws Exception {
+    IScheduledTask task = TaskTestUtil.makeTask(TASK_ID, INSTANCE.getJobKey(), 1, "agent01");
+    storageUtil.expectTaskFetch(Query.instanceScoped(INSTANCE).active(), task);
+    expect(stateManager.changeState(
+        storageUtil.mutableStoreProvider,
+        TASK_ID,
+        Optional.empty(),
+        ScheduleStatus.KILLING,
+        Optional.of("Killed for job update " + UPDATE_ID.getId())))
+        .andReturn(StateChangeResult.SUCCESS);
+
+    control.replay();
+
+    handler.getReevaluationDelay(
+        INSTANCE,
+        INSTRUCTIONS_SLA_AWARE_NO_POLICY,
+        storageUtil.mutableStoreProvider,
+        stateManager,
+        updateAgentReserver,
+        JobUpdateStatus.ROLLING_FORWARD,
+        UPDATE_ID,
+        slaKillController);
+  }
+
+  /**
+   * Ensures that if SLA-aware kill is called while not in an active state we throw an exception.
+   */
+  @Test
+  public void testInstanceKillSlaAwareBadStatus() {
+    IScheduledTask task = TaskTestUtil.makeTask(TASK_ID, INSTANCE.getJobKey(), 1, "agent01");
+    storageUtil.expectTaskFetch(Query.instanceScoped(INSTANCE).active(), task);
+
+    control.replay();
+
+    try {
+      handler.getReevaluationDelay(
+          INSTANCE,
+          INSTRUCTIONS_SLA_AWARE_NO_POLICY,
+          storageUtil.mutableStoreProvider,
+          stateManager,
+          updateAgentReserver,
+          JobUpdateStatus.ROLL_FORWARD_PAUSED,
+          UPDATE_ID,
+          slaKillController);
+      fail();
+    } catch (UpdateStateException e) {
+      // Expected
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e28e73b/src/test/java/org/apache/aurora/scheduler/updater/SlaKillControllerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/SlaKillControllerTest.java b/src/test/java/org/apache/aurora/scheduler/updater/SlaKillControllerTest.java
new file mode 100644
index 0000000..373fb83
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/updater/SlaKillControllerTest.java
@@ -0,0 +1,449 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.updater;
+
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Consumer;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.common.util.BackoffStrategy;
+import org.apache.aurora.gen.InstanceTaskConfig;
+import org.apache.aurora.gen.JobInstanceUpdateEvent;
+import org.apache.aurora.gen.JobUpdate;
+import org.apache.aurora.gen.JobUpdateAction;
+import org.apache.aurora.gen.JobUpdateDetails;
+import org.apache.aurora.gen.JobUpdateEvent;
+import org.apache.aurora.gen.JobUpdateInstructions;
+import org.apache.aurora.gen.JobUpdateKey;
+import org.apache.aurora.gen.JobUpdateSettings;
+import org.apache.aurora.gen.JobUpdateStatus;
+import org.apache.aurora.gen.PercentageSlaPolicy;
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.gen.SlaPolicy;
+import org.apache.aurora.scheduler.base.InstanceKeys;
+import org.apache.aurora.scheduler.base.JobKeys;
+import org.apache.aurora.scheduler.base.TaskTestUtil;
+import org.apache.aurora.scheduler.config.types.TimeAmount;
+import org.apache.aurora.scheduler.sla.SlaManager;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
+import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ISlaPolicy;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.apache.aurora.scheduler.testing.FakeScheduledExecutor;
+import org.apache.aurora.scheduler.testing.FakeStatsProvider;
+import org.apache.aurora.scheduler.updater.UpdaterModule.UpdateActionBatchWorker;
+import org.easymock.Capture;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.scheduler.base.TaskTestUtil.JOB;
+import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask;
+import static org.apache.aurora.scheduler.testing.BatchWorkerUtil.expectBatchExecute;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.eq;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.newCapture;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class SlaKillControllerTest extends EasyMockTest {
+
+  private static final ITaskConfig OLD_CONFIG = TaskTestUtil.makeConfig(JOB);
+  private static final SlaPolicy TEST_SLA_POLICY = SlaPolicy.percentageSlaPolicy(
+      new PercentageSlaPolicy()
+          .setPercentage(0)
+          .setDurationSecs(0));
+  private static final ITaskConfig NEW_CONFIG = ITaskConfig.build(
+      TaskTestUtil.makeConfig(JOB).newBuilder().setSlaPolicy(TEST_SLA_POLICY));
+  private static final IScheduledTask TASK = IScheduledTask.build(
+      makeTask("id", OLD_CONFIG).newBuilder().setStatus(ScheduleStatus.RUNNING));
+  private static final IAssignedTask ASSIGNED_TASK = TASK.getAssignedTask();
+  private static final IInstanceKey INSTANCE_KEY = InstanceKeys.from(
+      JOB,
+      ASSIGNED_TASK.getInstanceId());
+  private static final IJobUpdate UPDATE = IJobUpdate.build(
+      new JobUpdate()
+          .setInstructions(new JobUpdateInstructions()
+              .setDesiredState(new InstanceTaskConfig()
+                  .setTask(NEW_CONFIG.newBuilder()))
+              .setSettings(new JobUpdateSettings()
+                  .setSlaAware(true))));
+  private static final IJobUpdateKey UPDATE_ID =
+      IJobUpdateKey.build(new JobUpdateKey(JOB.newBuilder(), "update_id"));
+  private static final String KILL_ATTEMPTS_STAT_NAME = SlaKillController.SLA_KILL_ATTEMPT
+      + JobKeys.canonicalString(JOB);
+  private static final String KILL_SUCCESSES_STAT_NAME = SlaKillController.SLA_KILL_SUCCESS
+      + JobKeys.canonicalString(JOB);
+
+  private StorageTestUtil storageUtil;
+  private UpdateActionBatchWorker batchWorker;
+  private SlaManager slaManager;
+  private FakeScheduledExecutor clock;
+  private BackoffStrategy backoffStrategy;
+  private FakeStatsProvider statsProvider;
+  private SlaKillController slaKillController;
+  private CountDownLatch killCommandHasExecuted;
+  private Consumer<Storage.MutableStoreProvider> fakeKillCommand;
+
+  @Before
+  public void setUp() {
+    storageUtil = new StorageTestUtil(this);
+    storageUtil.expectOperations();
+    batchWorker = createMock(UpdateActionBatchWorker.class);
+    slaManager = createMock(SlaManager.class);
+    ScheduledExecutorService executor = createMock(ScheduledExecutorService.class);
+    clock = FakeScheduledExecutor.scheduleExecutor(executor);
+    backoffStrategy = createMock(BackoffStrategy.class);
+    statsProvider = new FakeStatsProvider();
+    slaKillController = new SlaKillController(
+        executor,
+        batchWorker,
+        slaManager,
+        clock,
+        backoffStrategy,
+        statsProvider);
+    killCommandHasExecuted = new CountDownLatch(2);
+    fakeKillCommand = mutableStoreProvider -> killCommandHasExecuted.countDown();
+  }
+
+  @Test
+  public <T, E extends Exception> void testSlaKill() throws Exception {
+    IJobUpdateDetails updateDetails = IJobUpdateDetails.build(
+        new JobUpdateDetails(
+            UPDATE.newBuilder(),
+            ImmutableList.of(new JobUpdateEvent(JobUpdateStatus.ROLLING_FORWARD, 123L)),
+            ImmutableList.of()));
+    expect(storageUtil.jobUpdateStore.fetchJobUpdate(UPDATE_ID))
+        .andReturn(Optional.of(updateDetails))
+        .anyTimes();
+    Capture<IJobInstanceUpdateEvent> instanceUpdateEventCapture = newCapture();
+    storageUtil.jobUpdateStore.saveJobInstanceUpdateEvent(
+        eq(UPDATE_ID),
+        capture(instanceUpdateEventCapture));
+    expectLastCall().times(2);
+    storageUtil.expectTaskFetch(TASK.getAssignedTask().getTaskId(), TASK);
+    Capture<Storage.MutateWork<T, E>> workCapture = createCapture();
+    slaManager.checkSlaThenAct(
+        eq(TASK),
+        eq(ISlaPolicy.build(TEST_SLA_POLICY)),
+        capture(workCapture),
+        eq(ImmutableMap.of()),
+        eq(false));
+    expectBatchExecute(batchWorker, storageUtil.storage, control);
+    expect(backoffStrategy.calculateBackoffMs(0)).andReturn(42L);
+
+    control.replay();
+
+    // Kill command has not been executed yet
+    assertEquals(2, killCommandHasExecuted.getCount());
+
+    // Start an SLA-aware kill
+    slaKillController.slaKill(
+        storageUtil.mutableStoreProvider,
+        INSTANCE_KEY,
+        TASK,
+        UPDATE_ID,
+        ISlaPolicy.build(TEST_SLA_POLICY),
+        JobUpdateStatus.ROLLING_FORWARD,
+        fakeKillCommand);
+
+    // Ensure the SLA_CHECKING_MESSAGE message is added
+    assertTrue(
+        checkInstanceEventMatches(
+            instanceUpdateEventCapture.getValue(),
+            INSTANCE_KEY,
+            JobUpdateAction.INSTANCE_UPDATING,
+            SlaKillController.SLA_CHECKING_MESSAGE));
+    instanceUpdateEventCapture.reset();
+    assertFalse(instanceUpdateEventCapture.hasCaptured());
+
+    // Pretend SLA passes, executes work
+    workCapture.getValue().apply(storageUtil.mutableStoreProvider);
+    assertEquals(1, killCommandHasExecuted.getCount());
+
+    // Ensure the SLA_PASSED_MESSAGE message is added
+    assertTrue(
+        checkInstanceEventMatches(
+            instanceUpdateEventCapture.getValue(),
+            INSTANCE_KEY,
+            JobUpdateAction.INSTANCE_UPDATING,
+            SlaKillController.SLA_PASSED_MESSAGE));
+  }
+
+  /**
+   * Test that SLA kills are retried in case the SLA check does not pass.
+   */
+  @Test
+  public <T, E extends Exception> void testSlaKillRetry() throws Exception {
+    IJobUpdateDetails updateDetails = IJobUpdateDetails.build(
+        new JobUpdateDetails(
+            UPDATE.newBuilder(),
+            ImmutableList.of(new JobUpdateEvent(JobUpdateStatus.ROLLING_FORWARD, 123L)),
+            ImmutableList.of()));
+    expect(storageUtil.jobUpdateStore.fetchJobUpdate(UPDATE_ID))
+        .andReturn(Optional.of(updateDetails))
+        .anyTimes();
+    storageUtil.jobUpdateStore.saveJobInstanceUpdateEvent(eq(UPDATE_ID), anyObject());
+    expectLastCall().times(2);
+    storageUtil.expectTaskFetch(TASK.getAssignedTask().getTaskId(), TASK).times(2);
+    storageUtil.expectTaskFetch(
+        TASK.getAssignedTask().getTaskId(),
+        IScheduledTask.build(TASK.newBuilder().setStatus(ScheduleStatus.KILLING)));
+    Capture<Storage.MutateWork<T, E>> workCapture = createCapture();
+    slaManager.checkSlaThenAct(
+        eq(TASK),
+        eq(ISlaPolicy.build(TEST_SLA_POLICY)),
+        capture(workCapture),
+        eq(ImmutableMap.of()),
+        eq(false));
+    expectLastCall().times(2);
+    expectBatchExecute(batchWorker, storageUtil.storage, control).times(3);
+    expect(backoffStrategy.calculateBackoffMs(0L)).andReturn(42L);
+    expect(backoffStrategy.calculateBackoffMs(42L)).andReturn(84L);
+
+    control.replay();
+
+    // Kill command has not been executed yet
+    assertFalse(statsProvider.getAllValues().keySet().contains(KILL_ATTEMPTS_STAT_NAME));
+    assertFalse(statsProvider.getAllValues().keySet().contains(KILL_SUCCESSES_STAT_NAME));
+    assertFalse(workCapture.hasCaptured());
+    assertEquals(killCommandHasExecuted.getCount(), 2);
+
+    // Start an SLA-aware kill
+    slaKillController.slaKill(
+        storageUtil.mutableStoreProvider,
+        INSTANCE_KEY,
+        TASK,
+        UPDATE_ID,
+        ISlaPolicy.build(TEST_SLA_POLICY),
+        JobUpdateStatus.ROLLING_FORWARD,
+        fakeKillCommand);
+
+    // SLA check is called and discarded, pretending it failed
+    assertEquals(1, statsProvider.getLongValue(KILL_ATTEMPTS_STAT_NAME));
+    assertFalse(statsProvider.getAllValues().keySet().contains(KILL_SUCCESSES_STAT_NAME));
+    assertTrue(workCapture.hasCaptured());
+    workCapture.reset();
+    assertEquals(2, killCommandHasExecuted.getCount());
+    assertFalse(workCapture.hasCaptured());
+
+    // Another SLA kill is scheduled assuming the previous attempt failed
+    assertEquals(1, clock.countDeferredWork());
+    clock.advance(TimeAmount.of(42L, Time.MILLISECONDS));
+
+    // The second SLA check passes and the kill function is called
+    assertEquals(2, killCommandHasExecuted.getCount());
+    workCapture.getValue().apply(storageUtil.mutableStoreProvider);
+    assertEquals(1, killCommandHasExecuted.getCount());
+    assertEquals(2, statsProvider.getLongValue(KILL_ATTEMPTS_STAT_NAME));
+    assertEquals(1, statsProvider.getLongValue(KILL_SUCCESSES_STAT_NAME));
+
+    // One more SLA kill is scheduled assuming the previous attempt failed. Since the previous
+    // attempt did not fail, we do a NOOP since the task is already KILLING
+    clock.advance(TimeAmount.of(84L, Time.MILLISECONDS));
+    assertEquals(2, statsProvider.getLongValue(KILL_ATTEMPTS_STAT_NAME));
+    assertEquals(1, statsProvider.getLongValue(KILL_SUCCESSES_STAT_NAME));
+    assertEquals(1, killCommandHasExecuted.getCount());
+    assertEquals(0, clock.countDeferredWork());
+  }
+
+  @Test
+  public <T, E extends Exception> void testSlaKillRollingBack() throws Exception {
+    IJobUpdateDetails updateDetails = IJobUpdateDetails.build(
+        new JobUpdateDetails(
+            UPDATE.newBuilder(),
+            ImmutableList.of(new JobUpdateEvent(JobUpdateStatus.ROLLING_BACK, 123L)),
+            ImmutableList.of()));
+    expect(storageUtil.jobUpdateStore.fetchJobUpdate(UPDATE_ID))
+        .andReturn(Optional.of(updateDetails))
+        .anyTimes();
+    Capture<IJobInstanceUpdateEvent> instanceUpdateEventCapture = newCapture();
+    storageUtil.jobUpdateStore.saveJobInstanceUpdateEvent(
+        eq(UPDATE_ID),
+        capture(instanceUpdateEventCapture));
+    expectLastCall().times(2);
+    storageUtil.expectTaskFetch(TASK.getAssignedTask().getTaskId(), TASK);
+    Capture<Storage.MutateWork<T, E>> workCapture = createCapture();
+    slaManager.checkSlaThenAct(
+        eq(TASK),
+        eq(ISlaPolicy.build(TEST_SLA_POLICY)),
+        capture(workCapture),
+        eq(ImmutableMap.of()),
+        eq(false));
+    expectBatchExecute(batchWorker, storageUtil.storage, control);
+    expect(backoffStrategy.calculateBackoffMs(0)).andReturn(42L);
+
+    control.replay();
+
+    // Kill command has not been executed yet
+    assertEquals(2, killCommandHasExecuted.getCount());
+
+    // Start an SLA-aware kill
+    slaKillController.slaKill(
+        storageUtil.mutableStoreProvider,
+        INSTANCE_KEY,
+        TASK,
+        UPDATE_ID,
+        ISlaPolicy.build(TEST_SLA_POLICY),
+        JobUpdateStatus.ROLLING_BACK,
+        fakeKillCommand);
+
+    // Ensure the SLA_CHECKING_MESSAGE message is added with ROLLING_BACK action
+    assertTrue(
+        checkInstanceEventMatches(
+            instanceUpdateEventCapture.getValue(),
+            INSTANCE_KEY,
+            JobUpdateAction.INSTANCE_ROLLING_BACK,
+            SlaKillController.SLA_CHECKING_MESSAGE));
+    instanceUpdateEventCapture.reset();
+    assertFalse(instanceUpdateEventCapture.hasCaptured());
+
+    // Pretend SLA passes, executes work
+    workCapture.getValue().apply(storageUtil.mutableStoreProvider);
+    assertEquals(1, killCommandHasExecuted.getCount());
+
+    // Ensure the SLA_PASSED_MESSAGE message is added with ROLLING_BACK action
+    assertTrue(
+        checkInstanceEventMatches(
+            instanceUpdateEventCapture.getValue(),
+            INSTANCE_KEY,
+            JobUpdateAction.INSTANCE_ROLLING_BACK,
+            SlaKillController.SLA_PASSED_MESSAGE));
+  }
+
+  @Test
+  public void testSlaKillFailOnPause() throws Exception {
+    IJobUpdateDetails updateDetails = IJobUpdateDetails.build(
+        new JobUpdateDetails(
+            UPDATE.newBuilder(),
+            ImmutableList.of(new JobUpdateEvent(JobUpdateStatus.ROLLING_FORWARD, 123L)),
+            ImmutableList.of()));
+    IJobUpdateDetails pausedUpdateDetails = IJobUpdateDetails.build(
+        new JobUpdateDetails(
+            UPDATE.newBuilder(),
+            ImmutableList.of(new JobUpdateEvent(JobUpdateStatus.ROLL_FORWARD_PAUSED, 123L)),
+            ImmutableList.of()));
+    expect(storageUtil.jobUpdateStore.fetchJobUpdate(UPDATE_ID))
+        .andReturn(Optional.of(updateDetails));
+    storageUtil.jobUpdateStore.saveJobInstanceUpdateEvent(eq(UPDATE_ID), anyObject());
+    expectLastCall();
+    expectBatchExecute(batchWorker, storageUtil.storage, control);
+    expect(storageUtil.jobUpdateStore.fetchJobUpdate(UPDATE_ID))
+        .andReturn(Optional.of(pausedUpdateDetails));
+
+    control.replay();
+
+    // Kill command has not been executed yet
+    assertEquals(2, killCommandHasExecuted.getCount());
+
+    // Start an SLA-aware kill
+    slaKillController.slaKill(
+        storageUtil.mutableStoreProvider,
+        INSTANCE_KEY,
+        TASK,
+        UPDATE_ID,
+        ISlaPolicy.build(TEST_SLA_POLICY),
+        JobUpdateStatus.ROLLING_FORWARD,
+        fakeKillCommand);
+
+    // Nothing should happen since status has changed
+    assertEquals(2, killCommandHasExecuted.getCount());
+    assertEquals(0, clock.countDeferredWork());
+    assertFalse(statsProvider.getAllValues().keySet().contains(KILL_ATTEMPTS_STAT_NAME));
+  }
+
+  @Test
+  public void testSlaKillNoDuplicateEvents() throws Exception {
+    IJobUpdateDetails updateDetails = IJobUpdateDetails.build(
+        new JobUpdateDetails(
+            UPDATE.newBuilder(),
+            ImmutableList.of(new JobUpdateEvent(JobUpdateStatus.ROLLING_FORWARD, 123L)),
+            ImmutableList.of(
+                new JobInstanceUpdateEvent()
+                    .setInstanceId(INSTANCE_KEY.getInstanceId())
+                    .setAction(JobUpdateAction.INSTANCE_UPDATING)
+                    .setMessage(SlaKillController.SLA_CHECKING_MESSAGE))));
+    IJobUpdateDetails pausedUpdateDetails = IJobUpdateDetails.build(
+        new JobUpdateDetails(
+            UPDATE.newBuilder(),
+            ImmutableList.of(new JobUpdateEvent(JobUpdateStatus.ROLL_FORWARD_PAUSED, 123L)),
+            ImmutableList.of()));
+    expect(storageUtil.jobUpdateStore.fetchJobUpdate(UPDATE_ID))
+        .andReturn(Optional.of(updateDetails));
+    expectBatchExecute(batchWorker, storageUtil.storage, control);
+    expect(storageUtil.jobUpdateStore.fetchJobUpdate(UPDATE_ID))
+        .andReturn(Optional.of(pausedUpdateDetails));
+
+    control.replay();
+
+    // Start an SLA-aware kill, update already contains event so we don't expect a save
+    slaKillController.slaKill(
+        storageUtil.mutableStoreProvider,
+        INSTANCE_KEY,
+        TASK,
+        UPDATE_ID,
+        ISlaPolicy.build(TEST_SLA_POLICY),
+        JobUpdateStatus.ROLLING_FORWARD,
+        fakeKillCommand);
+  }
+
+  @Test
+  public void testSlaKillInvalidStatus() {
+    control.replay();
+
+    // Start an SLA-aware kill, throws an exception since the kill was called while the update
+    // was not active
+    try {
+      slaKillController.slaKill(
+          storageUtil.mutableStoreProvider,
+          INSTANCE_KEY,
+          TASK,
+          UPDATE_ID,
+          ISlaPolicy.build(TEST_SLA_POLICY),
+          JobUpdateStatus.ROLL_FORWARD_PAUSED,
+          fakeKillCommand);
+    } catch (RuntimeException e) {
+      return;
+    }
+
+    fail();
+  }
+
+  private boolean checkInstanceEventMatches(IJobInstanceUpdateEvent event,
+                                            IInstanceKey instance,
+                                            JobUpdateAction action,
+                                            String message) {
+
+    return event.getInstanceId() == instance.getInstanceId()
+        && event.getAction() == action
+        && event.getMessage().equals(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e28e73b/src/test/python/apache/aurora/client/cli/test_inspect.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/cli/test_inspect.py b/src/test/python/apache/aurora/client/cli/test_inspect.py
index 2baba2a..58bcac1 100644
--- a/src/test/python/apache/aurora/client/cli/test_inspect.py
+++ b/src/test/python/apache/aurora/client/cli/test_inspect.py
@@ -112,7 +112,8 @@ Process 'process':
             "watch_secs": 45,
             "rollback_on_failure": True,
             "max_per_shard_failures": 0,
-            "max_total_failures": 0},
+            "max_total_failures": 0,
+            "sla_aware": False},
         "name": "the_job",
         "max_task_failures": 1,
         "cron_collision_policy": "KILL_EXISTING",

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e28e73b/src/test/resources/org/apache/aurora/scheduler/storage/durability/goldens/current/saveJobInstanceUpdateEvent
----------------------------------------------------------------------
diff --git a/src/test/resources/org/apache/aurora/scheduler/storage/durability/goldens/current/saveJobInstanceUpdateEvent b/src/test/resources/org/apache/aurora/scheduler/storage/durability/goldens/current/saveJobInstanceUpdateEvent
index 48902d3..8360998 100644
--- a/src/test/resources/org/apache/aurora/scheduler/storage/durability/goldens/current/saveJobInstanceUpdateEvent
+++ b/src/test/resources/org/apache/aurora/scheduler/storage/durability/goldens/current/saveJobInstanceUpdateEvent
@@ -11,6 +11,9 @@
           },
           "3": {
             "i32": 1
+          },
+          "4": {
+            "str": "string-value"
           }
         }
       },

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e28e73b/src/test/resources/org/apache/aurora/scheduler/storage/durability/goldens/current/saveJobUpdate
----------------------------------------------------------------------
diff --git a/src/test/resources/org/apache/aurora/scheduler/storage/durability/goldens/current/saveJobUpdate b/src/test/resources/org/apache/aurora/scheduler/storage/durability/goldens/current/saveJobUpdate
index 08dfa5b..3876767 100644
--- a/src/test/resources/org/apache/aurora/scheduler/storage/durability/goldens/current/saveJobUpdate
+++ b/src/test/resources/org/apache/aurora/scheduler/storage/durability/goldens/current/saveJobUpdate
@@ -485,6 +485,9 @@
                   },
                   "9": {
                     "i32": 2
+                  },
+                  "10": {
+                    "tf": 1
                   }
                 }
               }

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e28e73b/ui/src/main/js/components/UpdateInstanceEvents.js
----------------------------------------------------------------------
diff --git a/ui/src/main/js/components/UpdateInstanceEvents.js b/ui/src/main/js/components/UpdateInstanceEvents.js
index 8351f2c..ab6d3df 100644
--- a/ui/src/main/js/components/UpdateInstanceEvents.js
+++ b/ui/src/main/js/components/UpdateInstanceEvents.js
@@ -30,7 +30,8 @@ export class InstanceEvent extends React.Component {
           getClassForUpdateAction(e.action),
           (i === events.length - 1) ? ' active' : ''),
         state: UPDATE_ACTION[e.action],
-        timestamp: e.timestampMs
+        timestamp: e.timestampMs,
+        message: e.message
       };
     });
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e28e73b/ui/src/main/js/components/UpdateSettings.js
----------------------------------------------------------------------
diff --git a/ui/src/main/js/components/UpdateSettings.js b/ui/src/main/js/components/UpdateSettings.js
index d756f59..d7fbe00 100644
--- a/ui/src/main/js/components/UpdateSettings.js
+++ b/ui/src/main/js/components/UpdateSettings.js
@@ -25,6 +25,10 @@ export default function UpdateSettings({ update }) {
         <td>Rollback On Failure?</td>
         <td>{settings.rollbackOnFailure ? 'yes' : 'no'}</td>
       </tr>
+      <tr>
+        <td>SLA-Aware?</td>
+        <td>{settings.slaAware ? 'yes' : 'no'}</td>
+      </tr>
     </table>
   </div>);
 }


[2/2] aurora git commit: Enable SLA-aware updates

Posted by jo...@apache.org.
Enable SLA-aware updates

This patch enables SLA-aware updates.

Following https://reviews.apache.org/r/66716/, tasks may now specify custom SLA
policies that will be respected by the scheduler during maintenance. This patch
integrates into the same system to allow users to specify if they want their
updates to also respect SLA. Please see
https://docs.google.com/document/d/1lCoDyoX26qrGrptrgO7vJHqYR_L2CBRGFIywsAd8uQo/edit?usp=sharing
for a more detailed description.

This patch adds two optional Thrift fields, `slaAware` to `JobUpdateSettings`
and `message` to `JobInstanceUpdateEvent`. These should be forward and
backwards compatible.

Reviewed at https://reviews.apache.org/r/67696/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/4e28e73b
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/4e28e73b
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/4e28e73b

Branch: refs/heads/master
Commit: 4e28e73bb83b75f1369a705d15c96e40fb7098e1
Parents: f054e9b
Author: Jordan Ly <jo...@gmail.com>
Authored: Thu Jul 19 14:28:40 2018 -0700
Committer: Jordan Ly <jl...@twitter.com>
Committed: Thu Jul 19 14:28:40 2018 -0700

----------------------------------------------------------------------
 RELEASE-NOTES.md                                |  14 +
 .../thrift/org/apache/aurora/gen/api.thrift     |  21 +-
 docs/features/job-updates.md                    |  13 +
 docs/reference/configuration.md                 |  29 ++
 docs/reference/scheduler-configuration.md       |   6 +
 .../org/apache/aurora/scheduler/base/Tasks.java |   6 +
 .../scheduler/pruning/TaskHistoryPruner.java    |   2 +-
 .../thrift/SchedulerThriftInterface.java        |   8 +
 .../updater/InstanceActionHandler.java          |  97 +++-
 .../scheduler/updater/InstanceUpdater.java      |   5 +-
 .../updater/JobUpdateControllerImpl.java        |  59 ++-
 .../scheduler/updater/OneWayJobUpdater.java     |   3 +-
 .../scheduler/updater/SlaKillController.java    | 341 ++++++++++++++
 .../aurora/scheduler/updater/UpdateFactory.java |  18 +-
 .../aurora/scheduler/updater/UpdaterModule.java |  58 ++-
 .../aurora/scheduler/updater/Updates.java       |  31 ++
 .../apache/aurora/client/api/updater_util.py    |   8 +-
 .../python/apache/aurora/config/schema/base.py  |   1 +
 .../aurora/scheduler/base/TaskTestUtil.java     |   8 +-
 .../scheduler/config/CommandLineTest.java       |   6 +
 .../storage/AbstractJobUpdateStoreTest.java     |   4 +-
 .../testing/FakeScheduledExecutor.java          |   4 +
 .../thrift/SchedulerThriftInterfaceTest.java    |  46 +-
 .../aurora/scheduler/updater/AddTaskTest.java   |  11 +-
 .../aurora/scheduler/updater/JobUpdaterIT.java  | 312 ++++++++++++-
 .../aurora/scheduler/updater/KillTaskTest.java  | 259 ++++++++++-
 .../updater/SlaKillControllerTest.java          | 449 +++++++++++++++++++
 .../apache/aurora/client/cli/test_inspect.py    |   3 +-
 .../goldens/current/saveJobInstanceUpdateEvent  |   3 +
 .../durability/goldens/current/saveJobUpdate    |   3 +
 .../main/js/components/UpdateInstanceEvents.js  |   3 +-
 ui/src/main/js/components/UpdateSettings.js     |   4 +
 32 files changed, 1737 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/4e28e73b/RELEASE-NOTES.md
----------------------------------------------------------------------
diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md
index edc081f..e6edbde 100644
--- a/RELEASE-NOTES.md
+++ b/RELEASE-NOTES.md
@@ -21,6 +21,20 @@
 
   Note: The `Coordinator` interface required for the `CoordinatorSlaPolicy` is experimental at
   this juncture and is bound to change in the future.
+- Introduced ability for updates to be 'SLA-aware', or only update instances if it is within SLA,
+  using the new `sla_aware` field in `UpdateConfig`. See the bullet point above for an explanation
+  of custom SLA requirements.
+
+  **NOTE**: SLA-aware updates will use the desired config's SLA, not the existing config.
+
+  Three additional scheduler options have been added to support this feature:
+
+    1. `max_update_action_batch_size (default: 300)`: the number of update actions to process in a
+    batch.
+    2. `sla_aware_kill_retry_min_delay (default: 1mins)`: the minimum amount of time to wait before
+    retrying an SLA-aware kill (using a truncated binary backoff).
+    3. `sla_aware_kill_retry_max_delay (default: 5mins)`: the maximum amount of time to wait before
+    retrying an SLA-aware kill (using a truncated binary backoff).
 
 ### Deprecations and removals:
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e28e73b/api/src/main/thrift/org/apache/aurora/gen/api.thrift
----------------------------------------------------------------------
diff --git a/api/src/main/thrift/org/apache/aurora/gen/api.thrift b/api/src/main/thrift/org/apache/aurora/gen/api.thrift
index 653da94..dac2267 100644
--- a/api/src/main/thrift/org/apache/aurora/gen/api.thrift
+++ b/api/src/main/thrift/org/apache/aurora/gen/api.thrift
@@ -742,13 +742,19 @@ struct JobUpdateSettings {
    */
   8: bool waitForBatchCompletion
 
- /**
-  * If set, requires external calls to pulseJobUpdate RPC within the specified rate for the
-  * update to make progress. If no pulses received within specified interval the update will
-  * block. A blocked update is unable to continue but retains its current status. It may only get
-  * unblocked by a fresh pulseJobUpdate call.
-  */
+  /**
+   * If set, requires external calls to pulseJobUpdate RPC within the specified rate for the
+   * update to make progress. If no pulses received within specified interval the update will
+   * block. A blocked update is unable to continue but retains its current status. It may only get
+   * unblocked by a fresh pulseJobUpdate call.
+   */
   9: optional i32 blockIfNoPulsesAfterMs
+
+  /**
+   * If true, updates will obey the SLA requirements of the tasks being updated. If the SLA policy
+   * differs between the old and new task configurations, updates will use the newest configuration.
+   */
+  10: optional bool slaAware
 }
 
 /** Event marking a state transition in job update lifecycle. */
@@ -779,6 +785,9 @@ struct JobInstanceUpdateEvent {
 
   /** Job update action taken on the instance. */
   3: JobUpdateAction action
+
+  /** Optional message explaining the instance update event. */
+  4: optional string message
 }
 
 /** Maps instance IDs to TaskConfigs it. */

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e28e73b/docs/features/job-updates.md
----------------------------------------------------------------------
diff --git a/docs/features/job-updates.md b/docs/features/job-updates.md
index b52eb35..bcf5052 100644
--- a/docs/features/job-updates.md
+++ b/docs/features/job-updates.md
@@ -84,6 +84,19 @@ progress until the first pulse arrives. However, a paused update (`ROLL_FORWARD_
 provided the pulse interval has not expired.
 
 
+SLA-Aware Updates
+-----------------
+
+Updates can take advantage of [Custom SLA Requirements](../features/sla-requirements.md) and
+specify the `sla_aware=True` option within
+[UpdateConfig](../reference/configuration.md#updateconfig-objects) to only update instances if
+the action will maintain the task's SLA requirements. This feature allows updates to avoid killing
+too many instances in the face of unexpected failures outside of the update range.
+
+See the [Using the `sla_aware` option](../reference/configuration.md#using-the-sla-aware-option)
+for more information on how to use this feature.
+
+
 Canary Deployments
 ------------------
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e28e73b/docs/reference/configuration.md
----------------------------------------------------------------------
diff --git a/docs/reference/configuration.md b/docs/reference/configuration.md
index acab4c5..0632559 100644
--- a/docs/reference/configuration.md
+++ b/docs/reference/configuration.md
@@ -376,6 +376,35 @@ Parameters for controlling the rate and policy of rolling updates.
 | ```rollback_on_failure```    | boolean  | When False, prevents auto rollback of a failed update (Default: True)
 | ```wait_for_batch_completion```| boolean | When True, all threads from a given batch will be blocked from picking up new instances until the entire batch is updated. This essentially simulates the legacy sequential updater algorithm. (Default: False)
 | ```pulse_interval_secs```    | Integer  |  Indicates a [coordinated update](../features/job-updates.md#coordinated-job-updates). If no pulses are received within the provided interval the update will be blocked. Beta-updater only. Will fail on submission when used with client updater. (Default: None)
+| ```sla_aware```              | boolean  | When True, updates will only update an instance if it does not break the task's specified [SLA Requirements](../features/sla-requirements.md). (Default: None)
+
+#### Using the `sla_aware` option
+
+There are some nuances around the `sla_aware` option that users should be aware of:
+
+- SLA-aware updates work in tandem with maintenance. Draining a host that has an instance of the
+job being updated affects the SLA and thus will be taken into account when the update determines
+whether or not it is safe to update another instance.
+- SLA-aware updates will use the [SLAPolicy](../features/sla-requirements.md#custom-sla) of the
+*newest* configuration when determining whether or not it is safe to update an instance. For
+example, if the current configuration specifies a
+[PercentageSlaPolicy](../features/sla-requirements.md#percentageslapolicy-objects) that allows for
+5% of instances to be down and the updated configuration increaes this value to 10%, the SLA
+calculation will be done using the 10% policy. Be mindful of this when doing an update that
+modifies the `SLAPolicy` since it may be possible to put the old configuration in a bad state
+that the new configuration would not be affected by. Additionally, if the update is rolled back,
+then the rollback will use the old `SLAPolicy` (or none if there was not one previously).
+- If using the [CoordinatorSlaPolicy](../features/sla-requirements.md#coordinatorslapolicy-objects),
+it is important to pay attention to the `batch_size` of the update. If you have a complex SLA
+requirement, then you may be limiting the throughput of your updates with an insufficient
+`batch_size`. For example, imagine you have a job with 9 instance that represents three
+replicated caches, and you can only update one instance per replica set: `[0 1 2]
+[3 4 5] [6 7 8]` (the number indicates the instance ID and the brackets represent replica
+sets). If your `batch_size` is 3, then you will slowly update one replica set at a time. If your
+`batch_size` is 9, then you can update all replica sets in parallel and thus speeding up the update.
+- If an instance fails an SLA check for an update, then it will be rechecked starting at a delay
+from `sla_aware_kill_retry_min_delay` and exponentially increasing up to
+`sla_aware_kill_retry_max_delay`. These are cluster-operator set values.
 
 ### HealthCheckConfig Objects
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e28e73b/docs/reference/scheduler-configuration.md
----------------------------------------------------------------------
diff --git a/docs/reference/scheduler-configuration.md b/docs/reference/scheduler-configuration.md
index 805e516..3f8cba8 100644
--- a/docs/reference/scheduler-configuration.md
+++ b/docs/reference/scheduler-configuration.md
@@ -222,6 +222,12 @@ Optional flags:
 	Path to shiro.ini for authentication and authorization configuration.
 -shiro_realm_modules (default [class org.apache.aurora.scheduler.http.api.security.IniShiroRealmModule])
 	Guice modules for configuring Shiro Realms.
+-sla_aware_action_max_batch_size (default 300) [must be > 0]
+	The maximum number of sla aware update actions that can be processed in a batch.
+-sla_aware_kill_retry_min_delay (default (1, min)) [must be > 0]
+	The minimum amount of time to wait before retrying an SLA-aware kill (using a truncated binary backoff).
+-sla_aware_kill_retry_max_delay (default (5, min)) [must be > 0]
+	The maximum amount of time to wait before retrying an SLA-aware kill (using a truncated binary backoff).
 -sla_coordinator_timeout (default (1, min)) [must be > 0]
 	Timeout interval for communicating with Coordinator.
 -sla_non_prod_metrics (default [])

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e28e73b/src/main/java/org/apache/aurora/scheduler/base/Tasks.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/base/Tasks.java b/src/main/java/org/apache/aurora/scheduler/base/Tasks.java
index 2e13aac..47cd855 100644
--- a/src/main/java/org/apache/aurora/scheduler/base/Tasks.java
+++ b/src/main/java/org/apache/aurora/scheduler/base/Tasks.java
@@ -35,6 +35,8 @@ import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.apache.aurora.scheduler.storage.entities.ITaskEvent;
 
+import static org.apache.aurora.gen.ScheduleStatus.KILLING;
+
 /**
  * Utility class providing convenience functions relating to tasks.
  */
@@ -102,6 +104,10 @@ public final class Tasks {
     return ACTIVE_STATES.contains(status);
   }
 
+  public static boolean isKillable(ScheduleStatus status) {
+    return Tasks.isActive(status) && status != KILLING;
+  }
+
   public static boolean isTerminated(ScheduleStatus status) {
     return TERMINAL_STATES.contains(status);
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e28e73b/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java b/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
index 9aa51c3..06618af 100644
--- a/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
+++ b/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
@@ -142,7 +142,7 @@ public class TaskHistoryPruner implements EventSubscriber {
   }
 
   private void deleteTasks(final Set<String> taskIds) {
-    LOG.info("Pruning inactive tasks " + taskIds);
+    LOG.debug("Pruning inactive tasks {}", taskIds);
     batchWorker.execute(storeProvider -> {
       stateManager.deleteTasks(storeProvider, taskIds);
       return BatchWorker.NO_RESULT;

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e28e73b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
index 6a28bc2..74820c9 100644
--- a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
@@ -818,6 +818,10 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin {
       return invalidRequest(INVALID_PULSE_TIMEOUT);
     }
 
+    if (settings.isSlaAware() && !mutableRequest.getTaskConfig().isSetSlaPolicy()) {
+      return invalidRequest(INVALID_SLA_AWARE_UPDATE);
+    }
+
     IJobUpdateRequest request;
     try {
       request = IJobUpdateRequest.build(
@@ -1071,4 +1075,8 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin {
 
   @VisibleForTesting
   static final String INVALID_INSTANCE_COUNT = "Instance count must be positive.";
+
+  @VisibleForTesting
+  static final String INVALID_SLA_AWARE_UPDATE = "slaAware is true, but no task slaPolicy is "
+      + "specified.";
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e28e73b/src/main/java/org/apache/aurora/scheduler/updater/InstanceActionHandler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/InstanceActionHandler.java b/src/main/java/org/apache/aurora/scheduler/updater/InstanceActionHandler.java
index 9fa68b2..d9c7e0a 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/InstanceActionHandler.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/InstanceActionHandler.java
@@ -32,12 +32,15 @@ import org.apache.aurora.scheduler.storage.entities.IJobUpdateInstructions;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
 import org.apache.aurora.scheduler.storage.entities.IRange;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ISlaPolicy;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.aurora.gen.JobUpdateStatus.ROLLING_BACK;
 import static org.apache.aurora.gen.JobUpdateStatus.ROLLING_FORWARD;
 import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import static org.apache.aurora.scheduler.updater.Updates.getConfig;
 
 interface InstanceActionHandler {
 
@@ -48,7 +51,8 @@ interface InstanceActionHandler {
       StateManager stateManager,
       UpdateAgentReserver reserver,
       JobUpdateStatus status,
-      IJobUpdateKey key);
+      IJobUpdateKey key,
+      SlaKillController slaKillController) throws UpdateStateException;
 
   Logger LOG = LoggerFactory.getLogger(InstanceActionHandler.class);
 
@@ -90,7 +94,8 @@ interface InstanceActionHandler {
         StateManager stateManager,
         UpdateAgentReserver reserver,
         JobUpdateStatus status,
-        IJobUpdateKey key) {
+        IJobUpdateKey key,
+        SlaKillController slaKillController) {
 
       Optional<IScheduledTask> task = getExistingTask(storeProvider, instance);
       if (task.isPresent()) {
@@ -128,19 +133,38 @@ interface InstanceActionHandler {
         StateManager stateManager,
         UpdateAgentReserver reserver,
         JobUpdateStatus status,
-        IJobUpdateKey key) {
+        IJobUpdateKey key,
+        SlaKillController slaKillController) throws UpdateStateException {
 
       Optional<IScheduledTask> task = getExistingTask(storeProvider, instance);
       if (task.isPresent()) {
-        LOG.info("Killing " + instance + " while " + status);
-        stateManager.changeState(
-            storeProvider,
-            Tasks.id(task.get()),
-            Optional.empty(),
-            ScheduleStatus.KILLING,
-            Optional.of("Killed for job update " + key.getId()));
-        if (reserveForReplacement && task.get().getAssignedTask().isSetSlaveId()) {
-          reserver.reserve(task.get().getAssignedTask().getSlaveId(), instance);
+        Optional<ISlaPolicy> slaPolicy = getSlaPolicy(instance, status, instructions);
+        if (instructions.getSettings().isSlaAware() && slaPolicy.isPresent()) {
+          slaKillController.slaKill(
+              storeProvider,
+              instance,
+              task.get(),
+              key,
+              slaPolicy.get(),
+              status,
+              (MutableStoreProvider slaStoreProvider) -> killAndMaybeReserve(
+                  instance,
+                  slaStoreProvider,
+                  stateManager,
+                  reserver,
+                  status,
+                  key,
+                  task.get()));
+        } else {
+          killAndMaybeReserve(
+              instance,
+              storeProvider,
+              stateManager,
+              reserver,
+              status,
+              key,
+              task.get()
+          );
         }
       } else {
         // Due to async event processing it's possible to have a race between task event
@@ -150,6 +174,52 @@ interface InstanceActionHandler {
       // A task state transition will trigger re-evaluation in this case, rather than a timer.
       return Optional.empty();
     }
+
+    private void killAndMaybeReserve(
+        IInstanceKey instance,
+        MutableStoreProvider storeProvider,
+        StateManager stateManager,
+        UpdateAgentReserver reserver,
+        JobUpdateStatus status,
+        IJobUpdateKey key,
+        IScheduledTask task) {
+
+      LOG.info("Killing " + instance + " while " + status);
+      stateManager.changeState(
+          storeProvider,
+          Tasks.id(task),
+          Optional.empty(),
+          ScheduleStatus.KILLING,
+          Optional.of("Killed for job update " + key.getId()));
+      if (reserveForReplacement && task.getAssignedTask().isSetSlaveId()) {
+        reserver.reserve(task.getAssignedTask().getSlaveId(), instance);
+      }
+    }
+
+    /**
+     * Get the SLA policy that should be used to kill a task for an update. If the update is
+     * {@link JobUpdateStatus#ROLLING_FORWARD}, then we use the config we are updating to. If the
+     * update is {@link JobUpdateStatus#ROLLING_BACK}, then we use the config of the initial state.
+     */
+    private Optional<ISlaPolicy> getSlaPolicy(
+        IInstanceKey instance,
+        JobUpdateStatus status,
+        IJobUpdateInstructions instructions) throws UpdateStateException {
+
+      if (status == ROLLING_FORWARD) {
+        return Optional.ofNullable(instructions.getDesiredState().getTask().getSlaPolicy());
+      } else if (status == ROLLING_BACK) {
+        return getConfig(instance.getInstanceId(), instructions.getInitialState())
+            .map(ITaskConfig::getSlaPolicy);
+      } else {
+        // This should not happen as there checks before this method is called, but we throw an
+        // exception just in case.
+        LOG.error("Attempted to perform an SLA-aware kill on instance {} while update is not "
+            + "in an active state (it is in state {})", instance, status);
+        throw new UpdateStateException("Attempted to perform an instance update action while not "
+            + "in an active state.");
+      }
+    }
   }
 
   class WatchRunningTask implements InstanceActionHandler {
@@ -161,7 +231,8 @@ interface InstanceActionHandler {
         StateManager stateManager,
         UpdateAgentReserver reserver,
         JobUpdateStatus status,
-        IJobUpdateKey key) {
+        IJobUpdateKey key,
+        SlaKillController slaKillController) {
 
       return Optional.of(Amount.of(
           (long) instructions.getSettings().getMinWaitInInstanceRunningMs(),

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e28e73b/src/main/java/org/apache/aurora/scheduler/updater/InstanceUpdater.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/InstanceUpdater.java b/src/main/java/org/apache/aurora/scheduler/updater/InstanceUpdater.java
index a002d95..af77fa4 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/InstanceUpdater.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/InstanceUpdater.java
@@ -31,6 +31,7 @@ import static java.util.Objects.requireNonNull;
 
 import static org.apache.aurora.gen.ScheduleStatus.KILLING;
 import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
+import static org.apache.aurora.scheduler.base.Tasks.isKillable;
 import static org.apache.aurora.scheduler.resources.ResourceManager.bagFromResources;
 import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.EVALUATE_AFTER_MIN_RUNNING_MS;
 import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.EVALUATE_ON_STATE_CHANGE;
@@ -78,10 +79,6 @@ class InstanceUpdater implements StateEvaluator<Optional<IScheduledTask>> {
     return task.getStatus() != KILLING && wasKilling;
   }
 
-  private static boolean isKillable(ScheduleStatus status) {
-    return Tasks.isActive(status) && status != KILLING;
-  }
-
   private static boolean isTaskPresent(Optional<IScheduledTask> task) {
     return task.isPresent() && !isPermanentlyKilled(task.get());
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e28e73b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
index ec577cc..45b4dc6 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
@@ -124,8 +124,9 @@ class JobUpdateControllerImpl implements JobUpdateController {
   private final Clock clock;
   private final PulseHandler pulseHandler;
   private final Lifecycle lifecycle;
-  private final TaskEventBatchWorker batchWorker;
+  private final TaskEventBatchWorker taskEventBatchWorker;
   private final UpdateAgentReserver updateAgentReserver;
+  private final SlaKillController slaKillController;
 
   // Currently-active updaters. An active updater is one that is rolling forward or back. Paused
   // and completed updates are represented only in storage, not here.
@@ -144,8 +145,9 @@ class JobUpdateControllerImpl implements JobUpdateController {
       UpdateAgentReserver updateAgentReserver,
       Clock clock,
       Lifecycle lifecycle,
-      TaskEventBatchWorker batchWorker,
-      StatsProvider statsProvider) {
+      TaskEventBatchWorker taskEventBatchWorker,
+      StatsProvider statsProvider,
+      SlaKillController slaKillController) {
 
     this.updateFactory = requireNonNull(updateFactory);
     this.storage = requireNonNull(storage);
@@ -153,9 +155,10 @@ class JobUpdateControllerImpl implements JobUpdateController {
     this.stateManager = requireNonNull(stateManager);
     this.clock = requireNonNull(clock);
     this.lifecycle = requireNonNull(lifecycle);
-    this.batchWorker = requireNonNull(batchWorker);
+    this.taskEventBatchWorker = requireNonNull(taskEventBatchWorker);
     this.pulseHandler = new PulseHandler(clock);
     this.updateAgentReserver = requireNonNull(updateAgentReserver);
+    this.slaKillController = requireNonNull(slaKillController);
 
     this.jobUpdateEventStats = CacheBuilder.newBuilder()
         .build(new CacheLoader<JobUpdateStatus, AtomicLong>() {
@@ -265,9 +268,8 @@ class JobUpdateControllerImpl implements JobUpdateController {
       }
 
       IJobUpdate update = details.get().getUpdate();
-      IJobUpdateKey key1 = update.getSummary().getKey();
       Function<JobUpdateStatus, JobUpdateStatus> stateChange =
-          isCoordinatedAndPulseExpired(key1, update.getInstructions())
+          isCoordinatedAndPulseExpired(key, update.getInstructions())
               ? GET_BLOCKED_RESUME_STATE
               : GET_ACTIVE_RESUME_STATE;
 
@@ -403,20 +405,28 @@ class JobUpdateControllerImpl implements JobUpdateController {
   }
 
   private void instanceChanged(final IInstanceKey instance, final Optional<IScheduledTask> state) {
-    batchWorker.execute(storeProvider -> {
+    taskEventBatchWorker.execute(storeProvider -> {
       IJobKey job = instance.getJobKey();
       UpdateFactory.Update update = updates.get(job);
       if (update != null) {
         if (update.getUpdater().containsInstance(instance.getInstanceId())) {
-          LOG.info("Forwarding task change for " + InstanceKeys.toString(instance));
-          try {
-            evaluateUpdater(
-                storeProvider,
-                update,
-                getOnlyMatch(storeProvider.getJobUpdateStore(), queryActiveByJob(job)),
-                ImmutableMap.of(instance.getInstanceId(), state));
-          } catch (UpdateStateException e) {
-            throw new RuntimeException(e);
+          // We check to see if the state change is specified, and if it is, ensure that the new
+          // state matches the current state. We do this because events are processed asynchronously
+          // and it is possible for an old event trigger an action that should not be triggered
+          // for the actual updated state.
+          if (!state.isPresent() || isLatestState(storeProvider, state.get())) {
+            LOG.info("Forwarding task change for " + InstanceKeys.toString(instance));
+            try {
+              evaluateUpdater(
+                  storeProvider,
+                  update,
+                  getOnlyMatch(storeProvider.getJobUpdateStore(), queryActiveByJob(job)),
+                  ImmutableMap.of(instance.getInstanceId(), state));
+            } catch (UpdateStateException e) {
+              throw new RuntimeException(e);
+            }
+          } else {
+            LOG.info("Ignoring out of date task change for " + instance);
           }
         } else {
           LOG.info("Instance " + instance + " is not part of active update for "
@@ -427,6 +437,20 @@ class JobUpdateControllerImpl implements JobUpdateController {
     });
   }
 
+  /**
+   * Check to see that a given {@link IScheduledTask} still exists in storage and has the same
+   * status.
+   */
+  private boolean isLatestState(MutableStoreProvider storeProvider, IScheduledTask reportedState) {
+    Optional<IScheduledTask> currentState = storeProvider
+        .getTaskStore()
+        .fetchTask(reportedState.getAssignedTask().getTaskId());
+
+    return currentState
+        .map(iScheduledTask -> iScheduledTask.getStatus() == reportedState.getStatus())
+        .orElse(false);
+  }
+
   private IJobUpdateSummary getOnlyMatch(JobUpdateStore store, IJobUpdateQuery query) {
     return Iterables.getOnlyElement(store.fetchJobUpdates(query)).getUpdate().getSummary();
   }
@@ -690,7 +714,8 @@ class JobUpdateControllerImpl implements JobUpdateController {
                 stateManager,
                 updateAgentReserver,
                 updaterStatus,
-                key);
+                key,
+                slaKillController);
             if (reevaluateDelay.isPresent()) {
               executor.schedule(
                   getDeferredEvaluator(instance, key),

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e28e73b/src/main/java/org/apache/aurora/scheduler/updater/OneWayJobUpdater.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/OneWayJobUpdater.java b/src/main/java/org/apache/aurora/scheduler/updater/OneWayJobUpdater.java
index f2d33fb..64720dd 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/OneWayJobUpdater.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/OneWayJobUpdater.java
@@ -179,8 +179,7 @@ class OneWayJobUpdater<K, T> {
         for (K instance : nextGroup) {
           builder.put(instance, instances.get(instance).evaluate(stateProvider.getState(instance)));
         }
-        LOG.info("Changed working set for update to "
-            + filterByStatus(instances, WORKING));
+        LOG.debug("Changed working set for update to " + filterByStatus(instances, WORKING));
       }
 
       Map<K, SideEffect> sideEffects = builder.build();

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e28e73b/src/main/java/org/apache/aurora/scheduler/updater/SlaKillController.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/SlaKillController.java b/src/main/java/org/apache/aurora/scheduler/updater/SlaKillController.java
new file mode 100644
index 0000000..854db1a
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/updater/SlaKillController.java
@@ -0,0 +1,341 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.updater;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+
+import org.apache.aurora.common.stats.StatsProvider;
+import org.apache.aurora.common.util.BackoffStrategy;
+import org.apache.aurora.common.util.Clock;
+import org.apache.aurora.gen.JobInstanceUpdateEvent;
+import org.apache.aurora.gen.JobUpdateAction;
+import org.apache.aurora.gen.JobUpdateStatus;
+import org.apache.aurora.scheduler.BatchWorker;
+import org.apache.aurora.scheduler.base.JobKeys;
+import org.apache.aurora.scheduler.sla.SlaManager;
+import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
+import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ISlaPolicy;
+import org.apache.aurora.scheduler.updater.UpdaterModule.UpdateActionBatchWorker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.Objects.requireNonNull;
+
+import static org.apache.aurora.scheduler.base.Tasks.SLAVE_ASSIGNED_STATES;
+import static org.apache.aurora.scheduler.base.Tasks.isKillable;
+
+/**
+ * This class helps push SLA-aware kills to completion.
+ *
+ * SLA-aware kills will be added to a queue and retried until SLA passes or the update changes
+ * state. If the update becomes PAUSED or AWAITING_PULSE, then the retries will stop until the
+ * update becomes active again.
+ */
+class SlaKillController {
+  private static final Logger LOG = LoggerFactory.getLogger(SlaKillController.class);
+
+  @VisibleForTesting
+  static final String SLA_CHECKING_MESSAGE = "Checking SLA before continuing.";
+  @VisibleForTesting
+  static final String SLA_PASSED_MESSAGE = "SLA check passed, continuing.";
+  @VisibleForTesting
+  static final String SLA_KILL_ATTEMPT = "updates_sla_kill_attempt_";
+  @VisibleForTesting
+  static final String SLA_KILL_SUCCESS = "updates_sla_kill_success_";
+
+  private final ScheduledExecutorService executor;
+  private final UpdateActionBatchWorker batchWorker;
+  private final SlaManager slaManager;
+  private final BackoffStrategy backoffStrategy;
+  private final Clock clock;
+
+  private final LoadingCache<String, AtomicLong> killAttemptsByJob;
+  private final LoadingCache<String, AtomicLong> killSuccessesByJob;
+
+  @Inject
+  SlaKillController(
+      ScheduledExecutorService executor,
+      UpdateActionBatchWorker batchWorker,
+      SlaManager slaManager,
+      Clock clock,
+      BackoffStrategy backoffStrategy,
+      StatsProvider statsProvider) {
+
+    this.executor = requireNonNull(executor);
+    this.batchWorker = requireNonNull(batchWorker);
+    this.slaManager = requireNonNull(slaManager);
+    this.backoffStrategy = requireNonNull(backoffStrategy);
+    this.clock = requireNonNull(clock);
+
+    killAttemptsByJob = CacheBuilder.newBuilder().build(
+        new CacheLoader<String, AtomicLong>() {
+          @Override
+          public AtomicLong load(String key) {
+            return statsProvider.makeCounter(key);
+          }
+        }
+    );
+    killSuccessesByJob = CacheBuilder.newBuilder().build(
+        new CacheLoader<String, AtomicLong>() {
+          @Override
+          public AtomicLong load(String key) {
+            return statsProvider.makeCounter(key);
+          }
+        }
+    );
+  }
+
+  /**
+   * Perform an SLA-aware kill on a given {@link IInstanceKey instance} and
+   * {@link IScheduledTask task} combination. Adds {@link JobInstanceUpdateEvent}s to show progress
+   * of the kill (e.g. whether it is waiting for the SLA check to pass or whether it has already
+   * succeeded and is in the process of killing). SLA-aware kills are performed asynchronously
+   * to calling this method.
+   *
+   * Kills will retry until:
+   *
+   *  - The kill is successful.
+   *  - The update becomes inactive.
+   *  - The task supplied is killed for some other reason.
+   *
+   * @param storeProvider A storage provider to access/modify update information.
+   * @param instance The {@link IInstanceKey} of the task being killed.
+   * @param task The task being killed.
+   * @param key The update key.
+   * @param slaPolicy The {@link ISlaPolicy} to use.
+   * @param status The status of the update as of issuing the kill.
+   * @param killCommand The consumer provided to execute on a successful SLA check. This consumer
+   *    should perform the "kill" of the task (i.e. do the transition to KILLING).
+   */
+  void slaKill(
+      MutableStoreProvider storeProvider,
+      IInstanceKey instance,
+      IScheduledTask task,
+      IJobUpdateKey key,
+      ISlaPolicy slaPolicy,
+      JobUpdateStatus status,
+      Consumer<MutableStoreProvider> killCommand) {
+
+    if (!updateEventExists(
+            storeProvider,
+            instance,
+            key,
+            determineActionFromStatus(status),
+            SLA_CHECKING_MESSAGE)) {
+      // The check above is to ensure we do not add duplicate instance update events since slaKill
+      // can be called multiple times for one instance (e.g. slaKill is called, the update is
+      // paused, and then slaKill is called again after an unpause).
+      addUpdatingInstanceEvent(
+          storeProvider,
+          instance,
+          key,
+          clock.nowMillis(),
+          determineActionFromStatus(status),
+          SLA_CHECKING_MESSAGE);
+    }
+
+    retryingSlaKill(
+        instance,
+        task.getAssignedTask().getTaskId(),
+        key,
+        slaPolicy,
+        status,
+        killCommand,
+        0L);
+  }
+
+  private void retryingSlaKill(
+      IInstanceKey instance,
+      String taskId,
+      IJobUpdateKey key,
+      ISlaPolicy slaPolicy,
+      JobUpdateStatus status,
+      Consumer<MutableStoreProvider> killCommand,
+      long retryInMs) {
+
+    batchWorker.execute(storeProvider -> {
+      if (updateInStatus(storeProvider, key, status)) {
+        // Ensure that the update is still ongoing.
+        storeProvider
+            .getTaskStore()
+            .fetchTask(taskId)
+            .filter(task -> isKillable(task.getStatus()))
+            .ifPresent(task -> {
+              incrementJobStatCounter(killAttemptsByJob, SLA_KILL_ATTEMPT, instance.getJobKey());
+              slaManager.checkSlaThenAct(
+                  task,
+                  slaPolicy,
+                  slaStoreProvider -> performKill(
+                      slaStoreProvider,
+                      instance,
+                      key,
+                      status,
+                      killCommand),
+                  ImmutableMap.of(),
+                  // If the task is not assigned, force the update since it does not affect the
+                  // SLA. For example, if a task is THROTTLED or PENDING, we probably don't care
+                  // if the update replaces it with a new instance.
+                  !SLAVE_ASSIGNED_STATES.contains(task.getStatus()));
+
+              // We retry all SLA kills since it is possible that the SLA check fails and thus we
+              // will want to reevaluate later. If the kill succeeds or the update is cancelled by
+              // the next time the kill is retried, it will be a NOOP.
+              long backoff = backoffStrategy.calculateBackoffMs(retryInMs);
+              executor.schedule(
+                  () -> retryingSlaKill(
+                      instance,
+                      taskId,
+                      key,
+                      slaPolicy,
+                      status,
+                      killCommand,
+                      backoff),
+                  backoff,
+                  TimeUnit.MILLISECONDS);
+            });
+      }
+
+      return BatchWorker.NO_RESULT;
+    });
+  }
+
+  /**
+   * Passed into {@link SlaManager#checkSlaThenAct}. Actually performs the kill on the instance.
+   *
+   * Ensures that the update is still valid (the update's status has not changed since the kill
+   * was initiated) before persisting new state.
+   */
+  private BatchWorker.NoResult performKill(
+      MutableStoreProvider slaStoreProvider,
+      IInstanceKey instance,
+      IJobUpdateKey key,
+      JobUpdateStatus status,
+      Consumer<MutableStoreProvider> killCommand) {
+
+    LOG.info("Performing SLA-aware kill of " + instance);
+    if (updateInStatus(slaStoreProvider, key, status)) {
+      // Check again that the status is the same as when the command was issued. We do this because
+      // the SLA kill is executed asynchronously and the status may have changed (due to a pause or
+      // waiting for a pulse).
+      addUpdatingInstanceEvent(
+          slaStoreProvider,
+          instance,
+          key,
+          clock.nowMillis(),
+          determineActionFromStatus(status),
+          SLA_PASSED_MESSAGE);
+      killCommand.accept(slaStoreProvider);
+      incrementJobStatCounter(killSuccessesByJob, SLA_KILL_SUCCESS, instance.getJobKey());
+    } else {
+      // Between issuing the SLA kill and the kill being executed, the update has
+      // changed state and is no longer ROLLING_FORWARD. Skip the kill until the
+      // update becomes active again.
+      LOG.info("Update " + key + " is not active, skipping SLA kill of " + instance);
+    }
+
+    return BatchWorker.NO_RESULT;
+  }
+
+  private static void incrementJobStatCounter(
+      LoadingCache<String, AtomicLong> counter,
+      String prefix,
+      IJobKey jobKey) {
+
+    counter.getUnchecked(prefix + JobKeys.canonicalString(jobKey)).incrementAndGet();
+  }
+
+  /**
+   * Checks that an update contains an instance update event with a given instance ID,
+   * {@link JobUpdateAction}, and message.
+   */
+  private static boolean updateEventExists(
+      MutableStoreProvider storeProvider,
+      IInstanceKey instance,
+      IJobUpdateKey key,
+      JobUpdateAction action,
+      String message) {
+
+    return storeProvider
+        .getJobUpdateStore()
+        .fetchJobUpdate(key)
+        .get()
+        .getInstanceEvents()
+        .stream()
+        .anyMatch(e -> e.getInstanceId() == instance.getInstanceId()
+            && e.getAction() == action
+            && e.isSetMessage()
+            && e.getMessage().equals(message));
+  }
+
+  private static void addUpdatingInstanceEvent(
+      MutableStoreProvider storeProvider,
+      IInstanceKey instance,
+      IJobUpdateKey key,
+      long timestampMs,
+      JobUpdateAction action,
+      String message) {
+
+    IJobInstanceUpdateEvent event = IJobInstanceUpdateEvent.build(
+        new JobInstanceUpdateEvent()
+            .setInstanceId(instance.getInstanceId())
+            .setTimestampMs(timestampMs)
+            .setAction(action)
+            .setMessage(message));
+    storeProvider.getJobUpdateStore().saveJobInstanceUpdateEvent(key, event);
+  }
+
+  private static boolean updateInStatus(
+      MutableStoreProvider storeProvider,
+      IJobUpdateKey key,
+      JobUpdateStatus status) {
+
+    return storeProvider
+        .getJobUpdateStore()
+        .fetchJobUpdate(key)
+        .filter(jobUpdateDetails -> Updates.getJobUpdateStatus(jobUpdateDetails) == status)
+        .isPresent();
+  }
+
+  /**
+   * Determine the {@link JobUpdateAction} that corresponds with a give {@link JobUpdateStatus}.
+   * For example, ROLLING_FORWARD corresponds with INSTANCE_UPDATING actions while ROLLING_BACK
+   * corresponds with INSTANCE_ROLLING_BACK actions.
+   */
+  private static JobUpdateAction determineActionFromStatus(JobUpdateStatus status) {
+    switch (status) {
+      case ROLLING_BACK:
+        return JobUpdateAction.INSTANCE_ROLLING_BACK;
+      case ROLLING_FORWARD:
+        return JobUpdateAction.INSTANCE_UPDATING;
+      default:
+        throw new RuntimeException("Unexpected status " + status + " encountered when"
+            + " performing an SLA-aware update.");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e28e73b/src/main/java/org/apache/aurora/scheduler/updater/UpdateFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/UpdateFactory.java b/src/main/java/org/apache/aurora/scheduler/updater/UpdateFactory.java
index 3992aa7..bc8008e 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/UpdateFactory.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/UpdateFactory.java
@@ -32,7 +32,6 @@ import org.apache.aurora.gen.JobUpdateStatus;
 import org.apache.aurora.scheduler.storage.entities.IInstanceTaskConfig;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateInstructions;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateSettings;
-import org.apache.aurora.scheduler.storage.entities.IRange;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.apache.aurora.scheduler.updater.strategy.BatchStrategy;
@@ -43,7 +42,7 @@ import static java.util.Objects.requireNonNull;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
-import static org.apache.aurora.scheduler.base.Numbers.toRange;
+import static org.apache.aurora.scheduler.updater.Updates.getConfig;
 
 /**
  * A factory that produces job updaters based on a job update configuration.
@@ -139,21 +138,6 @@ interface UpdateFactory {
     static Set<Integer> expandInstanceIds(Set<IInstanceTaskConfig> instanceGroups) {
       return Updates.getInstanceIds(instanceGroups).asSet(DiscreteDomain.integers());
     }
-
-    private static Optional<ITaskConfig> getConfig(
-        int id,
-        Set<IInstanceTaskConfig> instanceGroups) {
-
-      for (IInstanceTaskConfig group : instanceGroups) {
-        for (IRange range : group.getInstances()) {
-          if (toRange(range).contains(id)) {
-            return Optional.of(group.getTask());
-          }
-        }
-      }
-
-      return Optional.empty();
-    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e28e73b/src/main/java/org/apache/aurora/scheduler/updater/UpdaterModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/UpdaterModule.java b/src/main/java/org/apache/aurora/scheduler/updater/UpdaterModule.java
index 74ee174..bdc17b7 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/UpdaterModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/UpdaterModule.java
@@ -14,8 +14,10 @@
 package org.apache.aurora.scheduler.updater;
 
 import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.ScheduledExecutorService;
 
+import javax.inject.Inject;
 import javax.inject.Singleton;
 
 import com.beust.jcommander.Parameter;
@@ -26,13 +28,20 @@ import com.google.inject.PrivateModule;
 import com.google.inject.TypeLiteral;
 
 import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.stats.StatsProvider;
+import org.apache.aurora.common.util.BackoffStrategy;
+import org.apache.aurora.common.util.TruncatedBinaryBackoff;
+import org.apache.aurora.scheduler.BatchWorker;
+import org.apache.aurora.scheduler.BatchWorker.NoResult;
 import org.apache.aurora.scheduler.SchedulerServicesModule;
 import org.apache.aurora.scheduler.base.AsyncUtil;
 import org.apache.aurora.scheduler.base.TaskGroupKey;
+import org.apache.aurora.scheduler.config.CliOptions;
 import org.apache.aurora.scheduler.config.types.TimeAmount;
 import org.apache.aurora.scheduler.events.PubsubEventModule;
 import org.apache.aurora.scheduler.preemptor.BiCache;
 import org.apache.aurora.scheduler.preemptor.BiCache.BiCacheSettings;
+import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,20 +62,41 @@ public class UpdaterModule extends AbstractModule {
     @Parameter(names = "-update_affinity_reservation_hold_time",
         description = "How long to wait for a reserved agent to reoffer freed up resources.")
     public TimeAmount affinityExpiration = new TimeAmount(3L, Time.MINUTES);
+
+    @Parameter(names = "-sla_aware_action_max_batch_size",
+        description = "The maximum number of sla aware update actions that can be processed"
+            + " in a batch.")
+    public int slaAwareActionMaxBatchSize = 300;
+
+    @Parameter(names = "-sla_aware_kill_retry_min_delay",
+        description = "Minimum amount of time to wait between attempting to perform an SLA-Aware"
+            + " kill on a task.")
+    public TimeAmount slaAwareKillRetryMinDelay = new TimeAmount(1L, Time.MINUTES);
+
+    @Parameter(names = "-sla_aware_kill_retry_max_delay",
+        description = "Maximum amount of time to wait between attempting to perform an SLA-Aware"
+            + " kill on a task.")
+    public TimeAmount slaAwareKillRetryMaxDelay = new TimeAmount(5L, Time.MINUTES);
   }
 
   private final ScheduledExecutorService executor;
+  private final Optional<UpdateActionBatchWorker> batchWorker;
   private final Options options;
 
   public UpdaterModule(Options options) {
     this(
         AsyncUtil.singleThreadLoggingScheduledExecutor("updater-%d", LOG),
+        Optional.empty(),
         options);
   }
 
   @VisibleForTesting
-  UpdaterModule(ScheduledExecutorService executor, Options options) {
+  UpdaterModule(ScheduledExecutorService executor,
+                Optional<UpdateActionBatchWorker> batchWorker,
+                Options options) {
+
     this.executor = Objects.requireNonNull(executor);
+    this.batchWorker = batchWorker;
     this.options = options;
   }
 
@@ -87,6 +117,11 @@ public class UpdaterModule extends AbstractModule {
         }
         expose(UpdateAgentReserver.class);
 
+        bind(BackoffStrategy.class).toInstance(new TruncatedBinaryBackoff(
+            options.slaAwareKillRetryMinDelay,
+            options.slaAwareKillRetryMaxDelay));
+        bind(SlaKillController.class).in(Singleton.class);
+
         bind(ScheduledExecutorService.class).toInstance(executor);
         bind(UpdateFactory.class).to(UpdateFactory.UpdateFactoryImpl.class);
         bind(UpdateFactory.UpdateFactoryImpl.class).in(Singleton.class);
@@ -98,8 +133,29 @@ public class UpdaterModule extends AbstractModule {
       }
     });
 
+    if (batchWorker.isPresent()) {
+      bind(UpdateActionBatchWorker.class).toInstance(batchWorker.get());
+    } else {
+      bind(UpdateActionBatchWorker.class).in(Singleton.class);
+    }
+    SchedulerServicesModule.addSchedulerActiveServiceBinding(binder())
+        .to(UpdateActionBatchWorker.class);
+
     PubsubEventModule.bindSubscriber(binder(), JobUpdateEventSubscriber.class);
     SchedulerServicesModule.addSchedulerActiveServiceBinding(binder())
         .to(JobUpdateEventSubscriber.class);
   }
+
+  public static class UpdateActionBatchWorker extends BatchWorker<NoResult> {
+
+    @Inject
+    UpdateActionBatchWorker(CliOptions options, Storage storage, StatsProvider statsProvider) {
+      super(storage, statsProvider, options.updater.slaAwareActionMaxBatchSize);
+    }
+
+    @Override
+    protected String serviceName() {
+      return "UpdateActionBatchWorker";
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e28e73b/src/main/java/org/apache/aurora/scheduler/updater/Updates.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/Updates.java b/src/main/java/org/apache/aurora/scheduler/updater/Updates.java
index f949fd5..d15217b 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/Updates.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/Updates.java
@@ -13,16 +13,22 @@
  */
 package org.apache.aurora.scheduler.updater;
 
+import java.util.Optional;
 import java.util.Set;
 
 import com.google.common.collect.ImmutableRangeSet;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Range;
 import com.google.common.collect.Sets;
 
 import org.apache.aurora.gen.JobUpdateStatus;
 import org.apache.aurora.gen.apiConstants;
 import org.apache.aurora.scheduler.storage.entities.IInstanceTaskConfig;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
 import org.apache.aurora.scheduler.storage.entities.IRange;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+
+import static org.apache.aurora.scheduler.base.Numbers.toRange;
 
 /**
  * Utility functions for job updates.
@@ -55,4 +61,29 @@ public final class Updates {
 
     return builder.build();
   }
+
+  /**
+   * Get the config from a set of {@link IInstanceTaskConfig} for a given instance ID if it exists.
+   */
+  public static Optional<ITaskConfig> getConfig(
+      int id,
+      Set<IInstanceTaskConfig> instanceGroups) {
+
+    for (IInstanceTaskConfig group : instanceGroups) {
+      for (IRange range : group.getInstances()) {
+        if (toRange(range).contains(id)) {
+          return Optional.of(group.getTask());
+        }
+      }
+    }
+
+    return Optional.empty();
+  }
+
+  /**
+   * Get the lastest {@link JobUpdateStatus} for an update.
+   */
+  static JobUpdateStatus getJobUpdateStatus(IJobUpdateDetails jobUpdateDetails) {
+    return Iterables.getLast(jobUpdateDetails.getUpdateEvents()).getStatus();
+  }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e28e73b/src/main/python/apache/aurora/client/api/updater_util.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/updater_util.py b/src/main/python/apache/aurora/client/api/updater_util.py
index 4e39862..5c2d953 100644
--- a/src/main/python/apache/aurora/client/api/updater_util.py
+++ b/src/main/python/apache/aurora/client/api/updater_util.py
@@ -31,7 +31,8 @@ class UpdaterConfig(object):
                max_total_failures,
                rollback_on_failure=True,
                wait_for_batch_completion=False,
-               pulse_interval_secs=None):
+               pulse_interval_secs=None,
+               sla_aware=None):
 
     if batch_size <= 0:
       raise ValueError('Batch size should be greater than 0')
@@ -48,6 +49,7 @@ class UpdaterConfig(object):
     self.rollback_on_failure = rollback_on_failure
     self.wait_for_batch_completion = wait_for_batch_completion
     self.pulse_interval_secs = pulse_interval_secs
+    self.sla_aware = sla_aware
 
   @classmethod
   def instances_to_ranges(cls, instances):
@@ -86,7 +88,9 @@ class UpdaterConfig(object):
         rollbackOnFailure=self.rollback_on_failure,
         waitForBatchCompletion=self.wait_for_batch_completion,
         updateOnlyTheseInstances=self.instances_to_ranges(instances) if instances else None,
-        blockIfNoPulsesAfterMs=self.pulse_interval_secs * 1000 if self.pulse_interval_secs else None
+        blockIfNoPulsesAfterMs=(self.pulse_interval_secs * 1000 if self.pulse_interval_secs
+            else None),
+        slaAware=self.sla_aware
     )
 
   def __eq__(self, other):

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e28e73b/src/main/python/apache/aurora/config/schema/base.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/config/schema/base.py b/src/main/python/apache/aurora/config/schema/base.py
index 7baded7..bf75660 100644
--- a/src/main/python/apache/aurora/config/schema/base.py
+++ b/src/main/python/apache/aurora/config/schema/base.py
@@ -37,6 +37,7 @@ class UpdateConfig(Struct):
   rollback_on_failure         = Default(Boolean, True)
   wait_for_batch_completion   = Default(Boolean, False)
   pulse_interval_secs         = Integer
+  sla_aware                   = Default(Boolean, False)
 
 
 class HttpHealthChecker(Struct):

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e28e73b/src/test/java/org/apache/aurora/scheduler/base/TaskTestUtil.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/base/TaskTestUtil.java b/src/test/java/org/apache/aurora/scheduler/base/TaskTestUtil.java
index abee095..8795b15 100644
--- a/src/test/java/org/apache/aurora/scheduler/base/TaskTestUtil.java
+++ b/src/test/java/org/apache/aurora/scheduler/base/TaskTestUtil.java
@@ -121,6 +121,11 @@ public final class TaskTestUtil {
   }
 
   public static ITaskConfig makeConfig(IJobKey job, boolean prod) {
+    return makeConfig(job, prod, Optional.of(SlaPolicy.percentageSlaPolicy(
+        new PercentageSlaPolicy().setPercentage(95.0).setDurationSecs(1800))));
+  }
+
+  public static ITaskConfig makeConfig(IJobKey job, boolean prod, Optional<SlaPolicy> slaPolicy) {
     return ITaskConfig.build(new TaskConfig()
         .setJob(job.newBuilder())
         .setOwner(new Identity().setUser(job.getRole() + "-user"))
@@ -130,8 +135,7 @@ public final class TaskTestUtil {
         .setProduction(prod)
         .setTier(prod ? PROD_TIER_NAME : DEV_TIER_NAME)
         .setPartitionPolicy(new PartitionPolicy().setDelaySecs(5).setReschedule(true))
-        .setSlaPolicy(SlaPolicy.percentageSlaPolicy(
-            new PercentageSlaPolicy().setPercentage(95.0).setDurationSecs(1800)))
+        .setSlaPolicy(slaPolicy.orElse(null))
         .setConstraints(ImmutableSet.of(
             new Constraint(
                 "valueConstraint",

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e28e73b/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java b/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
index dcf5889..5e34680 100644
--- a/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
@@ -169,6 +169,9 @@ public class CommandLineTest {
     expected.zk.digestCredentials = "testing";
     expected.updater.enableAffinity = true;
     expected.updater.affinityExpiration = TEST_TIME;
+    expected.updater.slaAwareActionMaxBatchSize = 42;
+    expected.updater.slaAwareKillRetryMinDelay = new TimeAmount(42, Time.DAYS);
+    expected.updater.slaAwareKillRetryMaxDelay = new TimeAmount(42, Time.DAYS);
     expected.state.taskAssignerModules = ImmutableList.of(NoopModule.class);
     expected.snapshot.snapshotInterval = TEST_TIME;
     expected.logPersistence.maxLogEntrySize = TEST_DATA;
@@ -320,6 +323,9 @@ public class CommandLineTest {
         "-zk_digest_credentials=testing",
         "-enable_update_affinity=true",
         "-update_affinity_reservation_hold_time=42days",
+        "-sla_aware_action_max_batch_size=42",
+        "-sla_aware_kill_retry_min_delay=42days",
+        "-sla_aware_kill_retry_max_delay=42days",
         "-task_assigner_modules=org.apache.aurora.scheduler.config.CommandLineTest$NoopModule",
         "-dlog_snapshot_interval=42days",
         "-dlog_max_entry_size=42GB",

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e28e73b/src/test/java/org/apache/aurora/scheduler/storage/AbstractJobUpdateStoreTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/AbstractJobUpdateStoreTest.java b/src/test/java/org/apache/aurora/scheduler/storage/AbstractJobUpdateStoreTest.java
index 3a06a45..6af66aa 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/AbstractJobUpdateStoreTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/AbstractJobUpdateStoreTest.java
@@ -105,7 +105,8 @@ public abstract class AbstractJobUpdateStoreTest {
         .setInstanceEvents(ImmutableList.of(new JobInstanceUpdateEvent()
             .setTimestampMs(2)
             .setInstanceId(1)
-            .setAction(INSTANCE_UPDATING)));
+            .setAction(INSTANCE_UPDATING)
+            .setMessage("message2")));
     JobUpdateInstructions instructions = builder.getUpdate().getInstructions();
     Stream.of(
         instructions.getInitialState().stream()
@@ -630,6 +631,7 @@ public abstract class AbstractJobUpdateStoreTest {
             .setMaxFailedInstances(1)
             .setMinWaitInInstanceRunningMs(200)
             .setRollbackOnFailure(true)
+            .setSlaAware(true)
             .setWaitForBatchCompletion(true)
             .setUpdateOnlyTheseInstances(ImmutableSet.of(new Range(0, 0), new Range(3, 5)))));
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e28e73b/src/test/java/org/apache/aurora/scheduler/testing/FakeScheduledExecutor.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/testing/FakeScheduledExecutor.java b/src/test/java/org/apache/aurora/scheduler/testing/FakeScheduledExecutor.java
index 0aea369..3bfef46 100644
--- a/src/test/java/org/apache/aurora/scheduler/testing/FakeScheduledExecutor.java
+++ b/src/test/java/org/apache/aurora/scheduler/testing/FakeScheduledExecutor.java
@@ -188,6 +188,10 @@ public final class FakeScheduledExecutor extends FakeClock {
     }
   }
 
+  public int countDeferredWork() {
+    return deferredWork.size();
+  }
+
   public void assertEmpty() {
     assertEquals(ImmutableList.<Pair<Long, Runnable>>of(), deferredWork);
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e28e73b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
index aa1cb2b..334fd5d 100644
--- a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
@@ -32,6 +32,7 @@ import org.apache.aurora.gen.AssignedTask;
 import org.apache.aurora.gen.AuroraAdmin;
 import org.apache.aurora.gen.Constraint;
 import org.apache.aurora.gen.Container;
+import org.apache.aurora.gen.CoordinatorSlaPolicy;
 import org.apache.aurora.gen.ExecutorConfig;
 import org.apache.aurora.gen.ExplicitReconciliationSettings;
 import org.apache.aurora.gen.HostStatus;
@@ -149,6 +150,7 @@ import static org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.CREATE
 import static org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.CREATE_OR_UPDATE_CRON;
 import static org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.DRAIN_HOSTS;
 import static org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.END_MAINTENANCE;
+import static org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.INVALID_SLA_AWARE_UPDATE;
 import static org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.KILL_TASKS;
 import static org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.MAINTENANCE_STATUS;
 import static org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.NOOP_JOB_UPDATE_MESSAGE;
@@ -561,11 +563,23 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
   }
 
   private IScheduledTask buildTaskForJobUpdate(int instanceId, String executorData) {
+    return buildTaskForJobUpdate(
+        instanceId,
+        executorData,
+        Optional.of(SlaPolicy.coordinatorSlaPolicy(
+            new CoordinatorSlaPolicy("test-url", "test-key"))));
+  }
+
+  private IScheduledTask buildTaskForJobUpdate(int instanceId,
+                                               String executorData,
+                                               Optional<SlaPolicy> slaPolicy) {
+
     return IScheduledTask.build(new ScheduledTask()
         .setAssignedTask(new AssignedTask()
             .setInstanceId(instanceId)
             .setTask(populatedTask()
                 .setIsService(true)
+                .setSlaPolicy(slaPolicy.orElse(null))
                 .setExecutorConfig(new ExecutorConfig().setName(EXECUTOR_NAME)
                     .setData(executorData)))));
   }
@@ -1745,6 +1759,27 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
   }
 
   @Test
+  public void testStartUpdateFailsWhenSlaAwareWithNoPolicy() throws Exception {
+    IScheduledTask oldTask = buildTaskForJobUpdate(0, "old");
+    ITaskConfig newTask = buildTaskForJobUpdate(0, "new", Optional.empty())
+        .getAssignedTask()
+        .getTask();
+
+    IJobUpdate update = buildJobUpdate(
+        1,
+        newTask,
+        ImmutableMap.of(oldTask.getAssignedTask().getTask(), ImmutableSet.of(new Range(0, 0))),
+        buildJobUpdateSettings().setSlaAware(true));
+
+    control.replay();
+
+    JobUpdateRequest request = buildJobUpdateRequest(update);
+    Response response = thrift.startJobUpdate(request, AUDIT_MESSAGE);
+    assertEquals(invalidResponse(INVALID_SLA_AWARE_UPDATE), response);
+    assertEquals(0L, statsProvider.getLongValue(START_JOB_UPDATE));
+  }
+
+  @Test
   public void testPauseJobUpdateByCoordinator() throws Exception {
     expectGetRemoteUser();
     jobUpdateController.pause(UPDATE_KEY, AUDIT);
@@ -1956,6 +1991,15 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
       ITaskConfig newConfig,
       ImmutableMap<ITaskConfig, ImmutableSet<Range>> oldConfigMap) {
 
+    return buildJobUpdate(instanceCount, newConfig, oldConfigMap, buildJobUpdateSettings());
+  }
+
+  private static IJobUpdate buildJobUpdate(
+      int instanceCount,
+      ITaskConfig newConfig,
+      ImmutableMap<ITaskConfig, ImmutableSet<Range>> oldConfigMap,
+      JobUpdateSettings jobUpdateSettings) {
+
     ImmutableSet.Builder<InstanceTaskConfig> builder = ImmutableSet.builder();
     for (Map.Entry<ITaskConfig, ImmutableSet<Range>> entry : oldConfigMap.entrySet()) {
       builder.add(new InstanceTaskConfig(entry.getKey().newBuilder(), entry.getValue()));
@@ -1967,7 +2011,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
             .setUser(IDENTITY.getUser())
             .setMetadata(METADATA))
         .setInstructions(new JobUpdateInstructions()
-            .setSettings(buildJobUpdateSettings())
+            .setSettings(jobUpdateSettings)
             .setDesiredState(new InstanceTaskConfig()
                 .setTask(newConfig.newBuilder())
                 .setInstances(ImmutableSet.of(new Range(0, instanceCount - 1))))

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e28e73b/src/test/java/org/apache/aurora/scheduler/updater/AddTaskTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/AddTaskTest.java b/src/test/java/org/apache/aurora/scheduler/updater/AddTaskTest.java
index 43f857d..259aaa0 100644
--- a/src/test/java/org/apache/aurora/scheduler/updater/AddTaskTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/updater/AddTaskTest.java
@@ -55,6 +55,7 @@ public class AddTaskTest extends EasyMockTest {
   private StateManager stateManager;
   private InstanceActionHandler handler;
   private UpdateAgentReserver updateAgentReserver;
+  private SlaKillController slaKillController;
 
   @Before
   public void setUp() {
@@ -63,6 +64,7 @@ public class AddTaskTest extends EasyMockTest {
     stateManager = createMock(StateManager.class);
     updateAgentReserver = createMock(UpdateAgentReserver.class);
     handler = new InstanceActionHandler.AddTask();
+    slaKillController = createMock(SlaKillController.class);
   }
 
   @Test
@@ -83,7 +85,8 @@ public class AddTaskTest extends EasyMockTest {
         stateManager,
         updateAgentReserver,
         JobUpdateStatus.ROLLING_FORWARD,
-        UPDATE_ID);
+        UPDATE_ID,
+        slaKillController);
   }
 
   @Test
@@ -101,7 +104,8 @@ public class AddTaskTest extends EasyMockTest {
         stateManager,
         updateAgentReserver,
         JobUpdateStatus.ROLLING_FORWARD,
-        UPDATE_ID);
+        UPDATE_ID,
+        slaKillController);
   }
 
   @Test(expected = IllegalStateException.class)
@@ -117,6 +121,7 @@ public class AddTaskTest extends EasyMockTest {
         stateManager,
         updateAgentReserver,
         JobUpdateStatus.ROLLING_BACK,
-        UPDATE_ID);
+        UPDATE_ID,
+        slaKillController);
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e28e73b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
index 5667a1b..c96def1 100644
--- a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
@@ -13,10 +13,12 @@
  */
 package org.apache.aurora.scheduler.updater;
 
+import java.util.Collection;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.stream.Collectors;
 
 import com.google.common.base.Function;
 import com.google.common.collect.ImmutableList;
@@ -43,6 +45,7 @@ import org.apache.aurora.common.stats.StatsProvider;
 import org.apache.aurora.common.testing.easymock.EasyMockTest;
 import org.apache.aurora.common.util.Clock;
 import org.apache.aurora.common.util.TruncatedBinaryBackoff;
+import org.apache.aurora.gen.CountSlaPolicy;
 import org.apache.aurora.gen.InstanceTaskConfig;
 import org.apache.aurora.gen.JobUpdate;
 import org.apache.aurora.gen.JobUpdateAction;
@@ -58,10 +61,13 @@ import org.apache.aurora.gen.Metadata;
 import org.apache.aurora.gen.Range;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.ServerInfo;
+import org.apache.aurora.gen.SlaPolicy;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.scheduler.SchedulerModule.TaskEventBatchWorker;
 import org.apache.aurora.scheduler.TaskIdGenerator;
 import org.apache.aurora.scheduler.TaskIdGenerator.TaskIdGeneratorImpl;
+import org.apache.aurora.scheduler.TierModule;
 import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.TaskTestUtil;
@@ -72,6 +78,7 @@ import org.apache.aurora.scheduler.events.PubsubEvent;
 import org.apache.aurora.scheduler.mesos.Driver;
 import org.apache.aurora.scheduler.scheduling.RescheduleCalculator;
 import org.apache.aurora.scheduler.scheduling.RescheduleCalculator.RescheduleCalculatorImpl;
+import org.apache.aurora.scheduler.sla.SlaModule;
 import org.apache.aurora.scheduler.state.StateChangeResult;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.state.StateManagerImpl;
@@ -88,12 +95,14 @@ import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.IServerInfo;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
 import org.apache.aurora.scheduler.testing.FakeScheduledExecutor;
 import org.apache.aurora.scheduler.testing.FakeStatsProvider;
 import org.apache.aurora.scheduler.updater.JobUpdateController.AuditData;
 import org.apache.aurora.scheduler.updater.StateEvaluator.Failure;
+import org.apache.aurora.scheduler.updater.UpdaterModule.UpdateActionBatchWorker;
 import org.easymock.EasyMock;
 import org.easymock.IExpectationSetters;
 import org.junit.After;
@@ -119,6 +128,7 @@ import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
 import static org.apache.aurora.gen.ScheduleStatus.FAILED;
 import static org.apache.aurora.gen.ScheduleStatus.FINISHED;
 import static org.apache.aurora.gen.ScheduleStatus.KILLED;
+import static org.apache.aurora.gen.ScheduleStatus.KILLING;
 import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
 import static org.apache.aurora.gen.ScheduleStatus.STARTING;
 import static org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
@@ -126,6 +136,7 @@ import static org.apache.aurora.scheduler.testing.BatchWorkerUtil.expectBatchExe
 import static org.apache.aurora.scheduler.updater.UpdateFactory.UpdateFactoryImpl.expandInstanceIds;
 import static org.easymock.EasyMock.expectLastCall;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class JobUpdaterIT extends EasyMockTest {
@@ -143,6 +154,8 @@ public class JobUpdaterIT extends EasyMockTest {
   private static final ITaskConfig OLD_CONFIG =
       setExecutorData(TaskTestUtil.makeConfig(JOB), "olddata");
   private static final ITaskConfig NEW_CONFIG = setExecutorData(OLD_CONFIG, "newdata");
+  private static final ITaskConfig SLA_AWARE_CONFIG =
+      setCountSlaPolicy(setExecutorData(OLD_CONFIG, "sladata"), 2, 0);
   private static final long PULSE_TIMEOUT_MS = 10000;
   private static final ImmutableSet<Metadata> METADATA = ImmutableSet.of(
       new Metadata("k1", "v1"), new Metadata("k2", "v2"));
@@ -162,6 +175,13 @@ public class JobUpdaterIT extends EasyMockTest {
     return ITaskConfig.build(builder);
   }
 
+  private static ITaskConfig setCountSlaPolicy(ITaskConfig task, int count, int durationMs) {
+    TaskConfig builder = task.newBuilder();
+    SlaPolicy policy = SlaPolicy.countSlaPolicy(new CountSlaPolicy(count, durationMs));
+    builder.setSlaPolicy(policy);
+    return ITaskConfig.build(builder);
+  }
+
   @Before
   public void setUp() throws Exception {
     // Avoid console spam due to stats registered multiple times.
@@ -171,13 +191,24 @@ public class JobUpdaterIT extends EasyMockTest {
     driver = createMock(Driver.class);
     shutdownCommand = createMock(Command.class);
     eventBus = new EventBus();
-    TaskEventBatchWorker batchWorker = createMock(TaskEventBatchWorker.class);
-
-    UpdaterModule.Options options = new UpdaterModule.Options();
-    options.enableAffinity = true;
+    TaskEventBatchWorker taskEventBatchWorker = createMock(TaskEventBatchWorker.class);
+    UpdateActionBatchWorker updateActionBatchWorker = createMock(UpdateActionBatchWorker.class);
+
+    UpdaterModule.Options updaterOptions = new UpdaterModule.Options();
+    updaterOptions.enableAffinity = true;
+    updaterOptions.slaAwareKillRetryMinDelay = new TimeAmount(
+        WATCH_TIMEOUT.getValue(),
+        WATCH_TIMEOUT.getUnit());
+    updaterOptions.slaAwareKillRetryMaxDelay = new TimeAmount(
+        WATCH_TIMEOUT.getValue(),
+        WATCH_TIMEOUT.getUnit());
+    SlaModule.Options slaOptions = new SlaModule.Options();
+    slaOptions.minRequiredInstances = 3;
 
     Injector injector = Guice.createInjector(
-        new UpdaterModule(executor, options),
+        new UpdaterModule(executor, Optional.of(updateActionBatchWorker), updaterOptions),
+        new SlaModule(slaOptions),
+        new TierModule(TaskTestUtil.TIER_CONFIG),
         new MemStorageModule(),
         new AbstractModule() {
           @Override
@@ -197,7 +228,13 @@ public class JobUpdaterIT extends EasyMockTest {
             bind(EventSink.class).toInstance(eventBus::post);
             bind(UUIDGenerator.class).to(UUIDGeneratorImpl.class);
             bind(Lifecycle.class).toInstance(new Lifecycle(shutdownCommand));
-            bind(TaskEventBatchWorker.class).toInstance(batchWorker);
+            bind(TaskEventBatchWorker.class).toInstance(taskEventBatchWorker);
+            bind(UpdateActionBatchWorker.class).toInstance(updateActionBatchWorker);
+            bind(IServerInfo.class).toInstance(
+                IServerInfo.build(
+                    new ServerInfo()
+                        .setClusterName("JobUpdaterITCluster")
+                        .setStatsUrlPrefix("test_stats_prefix")));
           }
         });
     updater = injector.getInstance(JobUpdateController.class);
@@ -206,7 +243,8 @@ public class JobUpdaterIT extends EasyMockTest {
     stateManager = injector.getInstance(StateManager.class);
     eventBus.register(injector.getInstance(JobUpdateEventSubscriber.class));
     subscriber = injector.getInstance(JobUpdateEventSubscriber.class);
-    expectBatchExecute(batchWorker, storage, control).anyTimes();
+    expectBatchExecute(taskEventBatchWorker, storage, control).anyTimes();
+    expectBatchExecute(updateActionBatchWorker, storage, control).anyTimes();
   }
 
   @After
@@ -1523,6 +1561,255 @@ public class JobUpdaterIT extends EasyMockTest {
     }
   }
 
+  @Test
+  public void testSuccessfulSlaAwareUpdate() throws Exception {
+    expectTaskKilled().times(3);
+
+    control.replay();
+
+    // Our batch size is 3 but our SLA policy only allows for 1 instance to be down at a time.
+    // We want to ensure that only one instance is killed at a time.
+    JobUpdate builder = makeJobUpdate(3, SLA_AWARE_CONFIG, makeInstanceConfig(0, 2, OLD_CONFIG))
+        .newBuilder();
+    builder.getInstructions().getSettings().setSlaAware(true);
+    IJobUpdate update = setInstanceCount(IJobUpdate.build(builder), 3);
+    insertInitialTasks(update);
+
+    changeState(JOB, 0, ASSIGNED, STARTING, RUNNING);
+    changeState(JOB, 1, ASSIGNED, STARTING, RUNNING);
+    changeState(JOB, 2, ASSIGNED, STARTING, RUNNING);
+
+    ImmutableMultimap.Builder<Integer, JobUpdateAction> actions = ImmutableMultimap.builder();
+
+    updater.start(update, AUDIT);
+    actions.put(0, INSTANCE_UPDATING);
+    actions.put(0, INSTANCE_UPDATING); // Awaiting SLA check to pass
+    actions.put(1, INSTANCE_UPDATING);
+    actions.put(1, INSTANCE_UPDATING); // Awaiting SLA check to pass
+    actions.put(2, INSTANCE_UPDATING);
+    actions.put(2, INSTANCE_UPDATING); // Awaiting SLA check to pass
+
+    // SLA aware update should only send one instance to KILLING -- find which one.
+    int firstTaskBeingKilled = Iterables
+        .getOnlyElement(getTasksInState(JOB, KILLING))
+        .getAssignedTask()
+        .getInstanceId();
+    actions.put(firstTaskBeingKilled, INSTANCE_UPDATING); // SLA check passed, killing
+    changeState(JOB, firstTaskBeingKilled, KILLED);
+
+    // Sanity check that after a kill delay but before the task is RUNNING again, we do not kill
+    // another task.
+    clock.advance(WATCH_TIMEOUT);
+    assertTrue(Iterables.isEmpty(getTasksInState(JOB, KILLING)));
+    assertState(ROLLING_FORWARD, actions.build());
+
+    // First task finishes updating
+    changeState(JOB, firstTaskBeingKilled, ASSIGNED, STARTING, RUNNING);
+    clock.advance(WATCH_TIMEOUT);
+    actions.put(firstTaskBeingKilled, INSTANCE_UPDATED);
+
+    // Update the second task
+    int secondTaskBeingKilled = Iterables
+        .getOnlyElement(getTasksInState(JOB, KILLING))
+        .getAssignedTask()
+        .getInstanceId();
+    actions.put(secondTaskBeingKilled, INSTANCE_UPDATING); // SLA check passed, killing
+    changeState(JOB, secondTaskBeingKilled, KILLED);
+    changeState(JOB, secondTaskBeingKilled, ASSIGNED, STARTING, RUNNING);
+    clock.advance(WATCH_TIMEOUT);
+    actions.put(secondTaskBeingKilled, INSTANCE_UPDATED);
+
+    // Update the final task
+    int finalTaskBeingKilled = Iterables
+        .getOnlyElement(getTasksInState(JOB, KILLING))
+        .getAssignedTask()
+        .getInstanceId();
+    actions.put(finalTaskBeingKilled, INSTANCE_UPDATING); // SLA check passed, killing
+    changeState(JOB, finalTaskBeingKilled, KILLED);
+    changeState(JOB, finalTaskBeingKilled, ASSIGNED, STARTING, RUNNING);
+    clock.advance(WATCH_TIMEOUT);
+    actions.put(finalTaskBeingKilled, INSTANCE_UPDATED);
+
+    assertState(ROLLED_FORWARD, actions.build());
+    assertJobState(
+        JOB,
+        ImmutableMap.of(0, SLA_AWARE_CONFIG, 1, SLA_AWARE_CONFIG, 2, SLA_AWARE_CONFIG));
+  }
+
+  @Test
+  public void testSuccessfulSlaAwareUpdateWithPause() throws Exception {
+    expectTaskKilled().times(3);
+
+    control.replay();
+
+    JobUpdate builder = makeJobUpdate(3, SLA_AWARE_CONFIG, makeInstanceConfig(0, 2, OLD_CONFIG))
+        .newBuilder();
+    builder.getInstructions().getSettings().setSlaAware(true);
+    IJobUpdate update = setInstanceCount(IJobUpdate.build(builder), 3);
+    insertInitialTasks(update);
+
+    changeState(JOB, 0, ASSIGNED, STARTING, RUNNING);
+    changeState(JOB, 1, ASSIGNED, STARTING, RUNNING);
+    changeState(JOB, 2, ASSIGNED, STARTING, RUNNING);
+
+    ImmutableMultimap.Builder<Integer, JobUpdateAction> actions = ImmutableMultimap.builder();
+
+    updater.start(update, AUDIT);
+    actions.put(0, INSTANCE_UPDATING);
+    actions.put(0, INSTANCE_UPDATING);
+    actions.put(1, INSTANCE_UPDATING);
+    actions.put(1, INSTANCE_UPDATING);
+    actions.put(2, INSTANCE_UPDATING);
+    actions.put(2, INSTANCE_UPDATING);
+
+    int firstTaskBeingKilled = Iterables
+        .getOnlyElement(getTasksInState(JOB, KILLING))
+        .getAssignedTask()
+        .getInstanceId();
+    actions.put(firstTaskBeingKilled, INSTANCE_UPDATING);
+    changeState(JOB, firstTaskBeingKilled, KILLED);
+    changeState(JOB, firstTaskBeingKilled, ASSIGNED, STARTING, RUNNING);
+
+    // Pause the update to stop progress
+    assertState(ROLLING_FORWARD, actions.build());
+    updater.pause(UPDATE_ID, AUDIT);
+    assertState(ROLL_FORWARD_PAUSED, actions.build());
+
+    // Ensure no tasks are killed while paused
+    clock.advance(WATCH_TIMEOUT);
+    assertTrue(Iterables.isEmpty(getTasksInState(JOB, KILLING)));
+
+    // Unpause and continue as normal
+    updater.resume(UPDATE_ID, AUDIT);
+    clock.advance(WATCH_TIMEOUT);
+    actions.put(firstTaskBeingKilled, INSTANCE_UPDATED);
+
+    int secondTaskBeingKilled = Iterables
+        .getOnlyElement(getTasksInState(JOB, KILLING))
+        .getAssignedTask()
+        .getInstanceId();
+    actions.put(secondTaskBeingKilled, INSTANCE_UPDATING);
+    changeState(JOB, secondTaskBeingKilled, KILLED);
+    changeState(JOB, secondTaskBeingKilled, ASSIGNED, STARTING, RUNNING);
+    clock.advance(WATCH_TIMEOUT);
+    actions.put(secondTaskBeingKilled, INSTANCE_UPDATED);
+
+    int finalTaskBeingKilled = Iterables
+        .getOnlyElement(getTasksInState(JOB, KILLING))
+        .getAssignedTask()
+        .getInstanceId();
+    actions.put(finalTaskBeingKilled, INSTANCE_UPDATING);
+    changeState(JOB, finalTaskBeingKilled, KILLED);
+    changeState(JOB, finalTaskBeingKilled, ASSIGNED, STARTING, RUNNING);
+    clock.advance(WATCH_TIMEOUT);
+    actions.put(finalTaskBeingKilled, INSTANCE_UPDATED);
+
+    assertState(ROLLED_FORWARD, actions.build());
+    assertJobState(
+        JOB,
+        ImmutableMap.of(0, SLA_AWARE_CONFIG, 1, SLA_AWARE_CONFIG, 2, SLA_AWARE_CONFIG));
+  }
+
+  @Test
+  public void testSuccessfulSlaAwareUpdateWithRollback() throws Exception {
+    expectTaskKilled().times(4);
+    // We need both the old and new config to be SLA aware for this rollback
+    ITaskConfig slaAwareOldConfig = setCountSlaPolicy(setExecutorData(OLD_CONFIG, "old"), 2, 0);
+
+    control.replay();
+
+    JobUpdate builder = makeJobUpdate(
+        3,
+        SLA_AWARE_CONFIG,
+        makeInstanceConfig(0, 2, slaAwareOldConfig)).newBuilder();
+    builder.getInstructions().getSettings().setSlaAware(true);
+    IJobUpdate update = setInstanceCount(IJobUpdate.build(builder), 3);
+    insertInitialTasks(update);
+
+    changeState(JOB, 0, ASSIGNED, STARTING, RUNNING);
+    changeState(JOB, 1, ASSIGNED, STARTING, RUNNING);
+    changeState(JOB, 2, ASSIGNED, STARTING, RUNNING);
+
+    ImmutableMultimap.Builder<Integer, JobUpdateAction> actions = ImmutableMultimap.builder();
+
+    updater.start(update, AUDIT);
+    actions.put(0, INSTANCE_UPDATING);
+    actions.put(0, INSTANCE_UPDATING);
+    actions.put(1, INSTANCE_UPDATING);
+    actions.put(1, INSTANCE_UPDATING);
+    actions.put(2, INSTANCE_UPDATING);
+    actions.put(2, INSTANCE_UPDATING);
+
+    int firstTaskBeingKilled = Iterables
+        .getOnlyElement(getTasksInState(JOB, KILLING))
+        .getAssignedTask()
+        .getInstanceId();
+    actions.put(firstTaskBeingKilled, INSTANCE_UPDATING);
+    changeState(JOB, firstTaskBeingKilled, KILLED);
+    changeState(JOB, firstTaskBeingKilled, ASSIGNED, STARTING, RUNNING);
+    clock.advance(WATCH_TIMEOUT);
+    actions.put(firstTaskBeingKilled, INSTANCE_UPDATED);
+
+    int secondTaskBeingKilled = Iterables
+        .getOnlyElement(getTasksInState(JOB, KILLING))
+        .getAssignedTask()
+        .getInstanceId();
+    actions.put(secondTaskBeingKilled, INSTANCE_UPDATING);
+    changeState(JOB, secondTaskBeingKilled, KILLED);
+    changeState(JOB, secondTaskBeingKilled, ASSIGNED, STARTING, RUNNING);
+
+    int finalTaskNotKilled = Iterables.getOnlyElement(getTasksInState(JOB, RUNNING)
+        .stream()
+        .filter(t -> t.getAssignedTask().getTask().equals(slaAwareOldConfig))
+        .map(t -> t.getAssignedTask().getInstanceId())
+        .collect(Collectors.toList()));
+
+    // Roll the update back with 2/3 instances on the new config
+    assertJobState(
+        JOB,
+        ImmutableMap.of(
+            firstTaskBeingKilled, SLA_AWARE_CONFIG,
+            secondTaskBeingKilled, SLA_AWARE_CONFIG,
+            finalTaskNotKilled, slaAwareOldConfig));
+    updater.rollback(UPDATE_ID, AUDIT);
+    actions.put(firstTaskBeingKilled, INSTANCE_ROLLING_BACK);
+    actions.put(firstTaskBeingKilled, INSTANCE_ROLLING_BACK); // Awaiting SLA check to pass
+    actions.put(secondTaskBeingKilled, INSTANCE_ROLLING_BACK);
+    actions.put(secondTaskBeingKilled, INSTANCE_ROLLING_BACK); // Awaiting SLA check to pass
+    actions.put(finalTaskNotKilled, INSTANCE_ROLLING_BACK);
+    actions.put(finalTaskNotKilled, INSTANCE_ROLLED_BACK); // Task never updated originally
+
+    int firstRollbackTaskBeingKilled = Iterables
+        .getOnlyElement(getTasksInState(JOB, KILLING))
+        .getAssignedTask()
+        .getInstanceId();
+    actions.put(firstRollbackTaskBeingKilled, INSTANCE_ROLLING_BACK); // SLA check passed
+    changeState(JOB, firstRollbackTaskBeingKilled, KILLED);
+    changeState(JOB, firstRollbackTaskBeingKilled, ASSIGNED, STARTING, RUNNING);
+    clock.advance(WATCH_TIMEOUT);
+    actions.put(firstRollbackTaskBeingKilled, INSTANCE_ROLLED_BACK);
+
+    int secondRollbackTaskBeingKilled = Iterables
+        .getOnlyElement(getTasksInState(JOB, KILLING))
+        .getAssignedTask()
+        .getInstanceId();
+    actions.put(secondRollbackTaskBeingKilled, INSTANCE_ROLLING_BACK); // SLA check passed
+    changeState(JOB, secondRollbackTaskBeingKilled, KILLED);
+    changeState(JOB, secondRollbackTaskBeingKilled, ASSIGNED, STARTING, RUNNING);
+    clock.advance(WATCH_TIMEOUT);
+    actions.put(secondRollbackTaskBeingKilled, INSTANCE_ROLLED_BACK);
+
+    assertState(ROLLED_BACK, actions.build());
+    assertJobState(
+        JOB,
+        ImmutableMap.of(0, slaAwareOldConfig, 1, slaAwareOldConfig, 2, slaAwareOldConfig));
+  }
+
+  private Collection<IScheduledTask> getTasksInState(IJobKey job, ScheduleStatus status) {
+    return storage.write(storeProvider ->
+        storeProvider.getTaskStore().fetchTasks(Query.jobScoped(job).byStatus(status)));
+  }
+
   private static IJobUpdateSummary makeUpdateSummary(IJobUpdateKey key) {
     return IJobUpdateSummary.build(new JobUpdateSummary()
         .setUser("user")
@@ -1530,14 +1817,21 @@ public class JobUpdaterIT extends EasyMockTest {
   }
 
   private static IJobUpdate makeJobUpdate(IInstanceTaskConfig... configs) {
+    return makeJobUpdate(1, NEW_CONFIG, configs);
+  }
+
+  private static IJobUpdate makeJobUpdate(int updateGroupSize,
+                                          ITaskConfig newConfig,
+                                          IInstanceTaskConfig... configs) {
+
     JobUpdate builder = new JobUpdate()
         .setSummary(makeUpdateSummary(UPDATE_ID).newBuilder().setMetadata(METADATA))
         .setInstructions(new JobUpdateInstructions()
             .setDesiredState(new InstanceTaskConfig()
-                .setTask(NEW_CONFIG.newBuilder())
+                .setTask(newConfig.newBuilder())
                 .setInstances(ImmutableSet.of(new Range(0, 2))))
             .setSettings(new JobUpdateSettings()
-                .setUpdateGroupSize(1)
+                .setUpdateGroupSize(updateGroupSize)
                 .setRollbackOnFailure(true)
                 .setMinWaitInInstanceRunningMs(WATCH_TIMEOUT.as(Time.MILLISECONDS).intValue())
                 .setUpdateOnlyTheseInstances(ImmutableSet.of())));