You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2014/08/15 06:16:49 UTC

git commit: Store a lock association with job updates.

Repository: incubator-aurora
Updated Branches:
  refs/heads/master 8d5a62cfd -> 1c4f52782


Store a lock association with job updates.

Bugs closed: AURORA-613

Reviewed at https://reviews.apache.org/r/24727/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/1c4f5278
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/1c4f5278
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/1c4f5278

Branch: refs/heads/master
Commit: 1c4f52782870087aee0187ea9797c22cc624a026
Parents: 8d5a62c
Author: Bill Farner <wf...@apache.org>
Authored: Thu Aug 14 20:19:54 2014 -0700
Committer: Bill Farner <wf...@apache.org>
Committed: Thu Aug 14 20:19:54 2014 -0700

----------------------------------------------------------------------
 .../aurora/scheduler/state/JobUpdater.java      |   4 +-
 .../aurora/scheduler/state/JobUpdaterImpl.java  |   8 +-
 .../scheduler/storage/ForwardingStore.java      |   8 +-
 .../scheduler/storage/JobUpdateStore.java       |  17 +-
 .../scheduler/storage/db/DBJobUpdateStore.java  |  27 ++-
 .../storage/db/JobUpdateDetailsMapper.java      |  38 +++-
 .../scheduler/storage/log/LogStorage.java       |   3 +-
 .../storage/log/SnapshotStoreImpl.java          |  49 +++--
 .../storage/log/WriteAheadStorage.java          |   6 +-
 .../thrift/SchedulerThriftInterface.java        |  10 +-
 .../storage/db/JobUpdateDetailsMapper.xml       |  54 +++--
 .../aurora/scheduler/storage/db/schema.sql      |  12 +-
 .../thrift/org/apache/aurora/gen/storage.thrift |  10 +-
 .../org/apache/aurora/gen/storage_local.thrift  |  24 --
 .../scheduler/state/JobUpdaterImplTest.java     |  11 +-
 .../scheduler/storage/backup/RecoveryTest.java  |   4 +-
 .../storage/db/DBJobUpdateStoreTest.java        | 219 ++++++++++++++-----
 .../scheduler/storage/db/DbLockStoreTest.java   |  34 ++-
 .../scheduler/storage/log/LogStorageTest.java   |   8 +-
 .../storage/log/SnapshotStoreImplTest.java      |  10 +-
 .../thrift/SchedulerThriftInterfaceTest.java    |  15 +-
 21 files changed, 390 insertions(+), 181 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1c4f5278/src/main/java/org/apache/aurora/scheduler/state/JobUpdater.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/JobUpdater.java b/src/main/java/org/apache/aurora/scheduler/state/JobUpdater.java
index f153444..eb472e4 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/JobUpdater.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/JobUpdater.java
@@ -25,10 +25,12 @@ public interface JobUpdater {
    *
    * @param request Job update request.
    * @param user User who initiated the update.
+   * @param lockToken Token for the lock held for this update.
    * @return Saved job update ID.
    * @throws UpdaterException Throws if update fails to start for any reason.
    */
-  String startJobUpdate(IJobUpdateRequest request, String user) throws UpdaterException;
+  String startJobUpdate(IJobUpdateRequest request, String user, String lockToken)
+      throws UpdaterException;
 
   /**
    * Thrown when job update related operation failed.

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1c4f5278/src/main/java/org/apache/aurora/scheduler/state/JobUpdaterImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/JobUpdaterImpl.java b/src/main/java/org/apache/aurora/scheduler/state/JobUpdaterImpl.java
index 6bcdf62..f21f27c 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/JobUpdaterImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/JobUpdaterImpl.java
@@ -101,8 +101,10 @@ class JobUpdaterImpl implements JobUpdater {
   }
 
   @Override
-  public String startJobUpdate(final IJobUpdateRequest request, final String user)
-      throws UpdaterException {
+  public String startJobUpdate(
+      final IJobUpdateRequest request,
+      final String user,
+      final String lockToken) throws UpdaterException {
 
     return storage.write(new MutateWork<String, UpdaterException>() {
       @Override
@@ -125,7 +127,7 @@ class JobUpdaterImpl implements JobUpdater {
             .setTimestampMs(clock.nowMillis()));
 
         try {
-          storeProvider.getJobUpdateStore().saveJobUpdate(update);
+          storeProvider.getJobUpdateStore().saveJobUpdate(update, lockToken);
           storeProvider.getJobUpdateStore().saveJobUpdateEvent(event, updateId);
         } catch (StorageException e) {
           throw new UpdaterException("Failed to start update.", e);

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1c4f5278/src/main/java/org/apache/aurora/scheduler/storage/ForwardingStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/ForwardingStore.java b/src/main/java/org/apache/aurora/scheduler/storage/ForwardingStore.java
index 3f083d6..b894a71 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/ForwardingStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/ForwardingStore.java
@@ -20,6 +20,7 @@ import java.util.Set;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableSet;
 
+import org.apache.aurora.gen.storage.StoredJobUpdateDetails;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
@@ -150,7 +151,12 @@ public class ForwardingStore implements
   }
 
   @Override
-  public List<IJobUpdateDetails> fetchAllJobUpdateDetails() {
+  public Set<StoredJobUpdateDetails> fetchAllJobUpdateDetails() {
     return jobUpdateStore.fetchAllJobUpdateDetails();
   }
+
+  @Override
+  public boolean isActive(String updateId) {
+    return jobUpdateStore.isActive(updateId);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1c4f5278/src/main/java/org/apache/aurora/scheduler/storage/JobUpdateStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/JobUpdateStore.java b/src/main/java/org/apache/aurora/scheduler/storage/JobUpdateStore.java
index c05833f..599dbd8 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/JobUpdateStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/JobUpdateStore.java
@@ -14,9 +14,11 @@
 package org.apache.aurora.scheduler.storage;
 
 import java.util.List;
+import java.util.Set;
 
 import com.google.common.base.Optional;
 
+import org.apache.aurora.gen.storage.StoredJobUpdateDetails;
 import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
@@ -47,10 +49,20 @@ public interface JobUpdateStore {
 
   /**
    * Fetches a read-only view of all job update details available in the store.
+   * TODO(wfarner): Generate immutable wrappers for storage.thrift structs, use an immutable object
+   *                here.
    *
    * @return A read-only view of all job update details.
    */
-  List<IJobUpdateDetails> fetchAllJobUpdateDetails();
+  Set<StoredJobUpdateDetails> fetchAllJobUpdateDetails();
+
+  /**
+   * Determines whether an update ID represents a currently-active job update.
+   *
+   * @param updateId Job update ID.
+   * @return {@code true} if this update has exclusive access to the job, otherwise {@code false}.
+   */
+  boolean isActive(String updateId);
 
   interface Mutable extends JobUpdateStore {
 
@@ -69,8 +81,9 @@ public interface JobUpdateStore {
      * without having at least one {@link IJobUpdateEvent} present in the store will return empty.
      *
      * @param update Update to save.
+     * @param lockToken UUID identifying the lock associated with this update.
      */
-    void saveJobUpdate(IJobUpdate update);
+    void saveJobUpdate(IJobUpdate update, String lockToken);
 
     /**
      * Saves a new job update event.

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1c4f5278/src/main/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStore.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStore.java
index d659aa1..ec9b37c 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStore.java
@@ -20,8 +20,9 @@ import javax.inject.Inject;
 
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
 
-import org.apache.aurora.gen.JobUpdateDetails;
+import org.apache.aurora.gen.storage.StoredJobUpdateDetails;
 import org.apache.aurora.scheduler.storage.JobUpdateStore;
 import org.apache.aurora.scheduler.storage.entities.IInstanceTaskConfig;
 import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
@@ -58,11 +59,15 @@ public class DBJobUpdateStore implements JobUpdateStore.Mutable {
   }
 
   @Override
-  public void saveJobUpdate(IJobUpdate update) {
+  public void saveJobUpdate(IJobUpdate update, String lockToken) {
+    requireNonNull(update);
+    requireNonNull(lockToken);
+
     jobKeyMapper.merge(update.getSummary().getJobKey().newBuilder());
     detailsMapper.insert(update.newBuilder());
 
     String updateId = update.getSummary().getUpdateId();
+    detailsMapper.insertLockToken(updateId, lockToken);
 
     // Insert optional instance update overrides.
     Set<IRange> instanceOverrides =
@@ -113,16 +118,24 @@ public class DBJobUpdateStore implements JobUpdateStore.Mutable {
   @Override
   public Optional<IJobUpdateDetails> fetchJobUpdateDetails(final String updateId) {
     return Optional.fromNullable(detailsMapper.selectDetails(updateId))
-        .transform(new Function<JobUpdateDetails, IJobUpdateDetails>() {
+        .transform(new Function<StoredJobUpdateDetails, IJobUpdateDetails>() {
           @Override
-          public IJobUpdateDetails apply(JobUpdateDetails input) {
-            return IJobUpdateDetails.build(input);
+          public IJobUpdateDetails apply(StoredJobUpdateDetails input) {
+            return IJobUpdateDetails.build(input.getDetails());
           }
         });
   }
 
   @Override
-  public List<IJobUpdateDetails> fetchAllJobUpdateDetails() {
-    return IJobUpdateDetails.listFromBuilders(detailsMapper.selectAllDetails());
+  public Set<StoredJobUpdateDetails> fetchAllJobUpdateDetails() {
+    return ImmutableSet.copyOf(detailsMapper.selectAllDetails());
+  }
+
+  @Override
+  public boolean isActive(String updateId) {
+    // We assume here that cascading deletes will cause a lock-update associative row to disappear
+    // when the lock is invalidated.  This further assumes that a lock row is deleted when a lock
+    // is no longer valid.
+    return detailsMapper.selectLockToken(updateId) != null;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1c4f5278/src/main/java/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.java b/src/main/java/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.java
index d590219..53f7a9b 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.java
@@ -19,11 +19,11 @@ import java.util.Set;
 import javax.annotation.Nullable;
 
 import org.apache.aurora.gen.JobUpdate;
-import org.apache.aurora.gen.JobUpdateDetails;
 import org.apache.aurora.gen.JobUpdateQuery;
 import org.apache.aurora.gen.JobUpdateSummary;
 import org.apache.aurora.gen.Range;
 import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.gen.storage.StoredJobUpdateDetails;
 import org.apache.ibatis.annotations.Param;
 
 /**
@@ -34,17 +34,26 @@ import org.apache.ibatis.annotations.Param;
 interface JobUpdateDetailsMapper {
 
   /**
-   * Inserts new {@link JobUpdate}.
+   * Inserts new job update.
    *
    * @param jobUpdate Job update to insert.
    */
   void insert(JobUpdate jobUpdate);
 
   /**
-   * Inserts {@link TaskConfig} entries associated with the current update.
+   * Inserts an association between an update and a lock.
+   *
+   * @param updateId Unique update identifier.
+   * @param lockToken Unique lock identifier, resulting from
+   *        {@link org.apache.aurora.scheduler.storage.entities.ILock#getToken()}.
+   */
+  void insertLockToken(@Param("updateId") String updateId, @Param("lockToken") String lockToken);
+
+  /**
+   * Inserts a task configuration entry for an update.
    *
    * @param updateId Update ID to insert task configs for.
-   * @param taskConfig {@link TaskConfig} to insert.
+   * @param taskConfig task configuration to insert.
    * @param isNew Flag to identify if the task config is existing {@code false} or
    *              desired {@code true}.
    * @param result Container for auto-generated ID of the inserted job update row.
@@ -82,7 +91,7 @@ interface JobUpdateDetailsMapper {
   void truncate();
 
   /**
-   * Gets all {@link JobUpdateSummary} matching the provided {@code query}.
+   * Gets all job update summaries matching the provided {@code query}.
    * All {@code query} fields are ANDed together.
    *
    * @param query Query to filter results by.
@@ -91,18 +100,27 @@ interface JobUpdateDetailsMapper {
   List<JobUpdateSummary> selectSummaries(JobUpdateQuery query);
 
   /**
-   * Gets {@link JobUpdateDetails} for the provided {@code updateId}.
+   * Gets details for the provided {@code updateId}.
    *
    * @param updateId Update ID to get.
-   * @return {@link JobUpdateDetails} instance, if it exists.
+   * @return job update details for the provided update ID, if it exists.
    */
   @Nullable
-  JobUpdateDetails selectDetails(String updateId);
+  StoredJobUpdateDetails selectDetails(String updateId);
 
   /**
-   * Gets all stored {@link JobUpdateDetails}.
+   * Gets all stored job update details.
    *
    * @return All stored job update details.
    */
-  List<JobUpdateDetails> selectAllDetails();
+  Set<StoredJobUpdateDetails> selectAllDetails();
+
+  /**
+   * Gets the token associated with an update.
+   *
+   * @param updateId Update identifier.
+   * @return The associated lock token, or {@code null} if no association exists.
+   */
+  @Nullable
+  String selectLockToken(String updateId);
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1c4f5278/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
index 342bab0..e3c20cb 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
@@ -461,7 +461,8 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
 
       case SAVE_JOB_UPDATE:
         writeBehindJobUpdateStore.saveJobUpdate(
-            IJobUpdate.build(op.getSaveJobUpdate().getJobUpdate()));
+            IJobUpdate.build(op.getSaveJobUpdate().getJobUpdate()),
+            op.getSaveJobUpdate().getLockToken());
         break;
 
       case SAVE_JOB_UPDATE_EVENT:

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1c4f5278/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
index 3d291dd..8331c29 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
@@ -21,7 +21,6 @@ import java.util.logging.Logger;
 import javax.inject.Inject;
 
 import com.google.common.collect.ImmutableSet;
-
 import com.twitter.common.inject.TimedInterceptor.Timed;
 import com.twitter.common.util.BuildInfo;
 import com.twitter.common.util.Clock;
@@ -35,6 +34,7 @@ import org.apache.aurora.gen.storage.QuotaConfiguration;
 import org.apache.aurora.gen.storage.SchedulerMetadata;
 import org.apache.aurora.gen.storage.Snapshot;
 import org.apache.aurora.gen.storage.StoredJob;
+import org.apache.aurora.gen.storage.StoredJobUpdateDetails;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.storage.JobUpdateStore;
 import org.apache.aurora.scheduler.storage.SnapshotStore;
@@ -48,7 +48,6 @@ import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
 import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
 import org.apache.aurora.scheduler.storage.entities.ILock;
 import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
@@ -68,6 +67,25 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> {
 
   private static final Iterable<SnapshotField> SNAPSHOT_FIELDS = Arrays.asList(
       new SnapshotField() {
+        // It's important for locks to be replayed first, since there are relations that expect
+        // references to be valid on insertion.
+        @Override
+        public void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
+          snapshot.setLocks(ILock.toBuildersSet(store.getLockStore().fetchLocks()));
+        }
+
+        @Override
+        public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) {
+          store.getLockStore().deleteLocks();
+
+          if (snapshot.isSetLocks()) {
+            for (Lock lock : snapshot.getLocks()) {
+              store.getLockStore().saveLock(ILock.build(lock));
+            }
+          }
+        }
+      },
+      new SnapshotField() {
         @Override
         public void saveToSnapshot(StoreProvider storeProvider, Snapshot snapshot) {
           snapshot.setHostAttributes(
@@ -189,25 +207,7 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> {
       new SnapshotField() {
         @Override
         public void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
-          snapshot.setLocks(ILock.toBuildersSet(store.getLockStore().fetchLocks()));
-        }
-
-        @Override
-        public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) {
-          store.getLockStore().deleteLocks();
-
-          if (snapshot.isSetLocks()) {
-            for (Lock lock : snapshot.getLocks()) {
-              store.getLockStore().saveLock(ILock.build(lock));
-            }
-          }
-        }
-      },
-      new SnapshotField() {
-        @Override
-        public void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
-          snapshot.setJobUpdateDetails(IJobUpdateDetails.toBuildersSet(
-              store.getJobUpdateStore().fetchAllJobUpdateDetails()));
+          snapshot.setJobUpdateDetails(store.getJobUpdateStore().fetchAllJobUpdateDetails());
         }
 
         @Override
@@ -216,8 +216,11 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> {
           updateStore.deleteAllUpdatesAndEvents();
 
           if (snapshot.isSetJobUpdateDetails()) {
-            for (JobUpdateDetails details : snapshot.getJobUpdateDetails()) {
-              updateStore.saveJobUpdate(IJobUpdate.build(details.getUpdate()));
+            for (StoredJobUpdateDetails storedDetails : snapshot.getJobUpdateDetails()) {
+              JobUpdateDetails details = storedDetails.getDetails();
+              updateStore.saveJobUpdate(
+                  IJobUpdate.build(details.getUpdate()),
+                  storedDetails.getLockToken());
 
               for (JobUpdateEvent updateEvent : details.getUpdateEvents()) {
                 updateStore.saveJobUpdateEvent(

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1c4f5278/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
index 2915ff0..2b0a6a1 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
@@ -281,11 +281,11 @@ class WriteAheadStorage extends ForwardingStore implements
   }
 
   @Override
-  public void saveJobUpdate(IJobUpdate update) {
+  public void saveJobUpdate(IJobUpdate update, String lockToken) {
     requireNonNull(update);
 
-    write(Op.saveJobUpdate(new SaveJobUpdate(update.newBuilder())));
-    jobUpdateStore.saveJobUpdate(update);
+    write(Op.saveJobUpdate(new SaveJobUpdate(update.newBuilder(), lockToken)));
+    jobUpdateStore.saveJobUpdate(update, lockToken);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1c4f5278/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
index 0802ee0..7ef2885 100644
--- a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
@@ -1265,7 +1265,6 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
     requireNonNull(mutableRequest);
     requireNonNull(session);
 
-    final ILock lock = ILock.build(requireNonNull(mutableLock));
     final Response response = emptyResponse();
 
     final SessionContext context;
@@ -1288,10 +1287,12 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
     return storage.write(new MutateWork.Quiet<Response>() {
       @Override
       public Response apply(MutableStoreProvider storeProvider) {
+        // TODO(wfarner): Move lock acquisition down into the update controller once introduced.
+        ILock lock;
         try {
-          lockManager.validateIfLocked(
+          lock = lockManager.acquireLock(
               ILockKey.build(LockKey.job(request.getJobKey().newBuilder())),
-              Optional.of(lock));
+              context.getIdentity());
         } catch (LockException e) {
           return addMessage(response, LOCK_ERROR, e);
         }
@@ -1299,7 +1300,8 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
         // TODO(maxim): Wire in task limits and quota checks from SchedulerCore.
 
         try {
-          String updateId = jobUpdater.startJobUpdate(request, context.getIdentity());
+          String updateId =
+              jobUpdater.startJobUpdate(request, context.getIdentity(), lock.getToken());
           return okResponse(Result.startJobUpdateResult(new StartJobUpdateResult(updateId)));
         } catch (UpdaterException e) {
           return addMessage(response, INVALID_REQUEST, e);

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1c4f5278/src/main/resources/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.xml
----------------------------------------------------------------------
diff --git a/src/main/resources/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.xml b/src/main/resources/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.xml
index 17c58b1..e59e6f7 100644
--- a/src/main/resources/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.xml
+++ b/src/main/resources/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.xml
@@ -17,6 +17,14 @@
         PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
         "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
 <mapper namespace="org.apache.aurora.scheduler.storage.db.JobUpdateDetailsMapper">
+  <sql id="selectUpdateIdentity">
+      (
+        SELECT id
+        FROM job_updates
+        WHERE update_id = #{updateId}
+      )
+  </sql>
+
   <insert id="insert">
     INSERT INTO job_updates (
       job_key_id,
@@ -49,17 +57,23 @@
     )
   </insert>
 
+  <insert id="insertLockToken">
+    INSERT INTO job_update_locks (
+      update_id,
+      lock_token
+    ) VALUES (
+      <include refid="selectUpdateIdentity"/>,
+      #{lockToken}
+    )
+  </insert>
+
   <insert id="insertTaskConfig" useGeneratedKeys="true" keyColumn="id" keyProperty="result.id">
     INSERT INTO job_update_configs (
       update_id,
       task_config,
       is_new
     ) VALUES (
-      (
-        SELECT id
-        FROM job_updates
-        WHERE update_id = #{updateId}
-      ),
+      <include refid="selectUpdateIdentity"/>,
       #{config, typeHandler=org.apache.aurora.scheduler.storage.db.typehandlers.TaskConfigTypeHandler},
       #{isNew}
     )
@@ -85,11 +99,7 @@
       last
     ) VALUES
     <foreach item="element" collection="ranges" open="(" separator="),(" close=")">
-      (
-        SELECT id
-        FROM job_updates
-        WHERE update_id = #{updateId}
-      ),
+      <include refid="selectUpdateIdentity"/>,
       #{element.first},
       #{element.last}
     </foreach>
@@ -150,12 +160,12 @@
     <association property="configuration" resultMap="jobUpdateConfigurationMap" columnPrefix="juc_"/>
   </resultMap>
 
-  <resultMap id="jobUpdateDetailsMap" type="org.apache.aurora.gen.JobUpdateDetails">
+  <resultMap id="jobUpdateDetailsMap" type="org.apache.aurora.gen.storage.StoredJobUpdateDetails">
     <id column="u_id" />
-    <association property="update" resultMap="jobUpdateMap" />
+    <association property="details.update" resultMap="jobUpdateMap" />
     <!--Using notNullColumn attribute is required below as LEFT JOIN with empty right side
     will produce an empty row.-->
-    <collection property="updateEvents"
+    <collection property="details.updateEvents"
                 ofType="org.apache.aurora.gen.JobUpdateEvent"
                 columnPrefix="e_"
                 notNullColumn="id">
@@ -164,7 +174,7 @@
               column="status"
               typeHandler="org.apache.aurora.scheduler.storage.db.typehandlers.JobUpdateStatusTypeHandler" />
     </collection>
-    <collection property="instanceEvents"
+    <collection property="details.instanceEvents"
                 ofType="org.apache.aurora.gen.JobInstanceUpdateEvent"
                 columnPrefix="i_"
                 notNullColumn="id">
@@ -262,7 +272,7 @@
       attributes used in associations.
       For example: jusm_just_status maps to JobUpdateSummary/JobUpdateState/status field.-->
   <sql id="unscoped_details_select">
-     SELECT
+    SELECT
       u.id AS u_id,
       u.id AS juc_juse_id,
       u.update_id AS jusm_update_id,
@@ -297,7 +307,8 @@
       ci.last AS juc_itc_r_last,
       io.id AS juc_juse_r_id,
       io.first AS juc_juse_r_first,
-      io.last AS juc_juse_r_last
+      io.last AS juc_juse_r_last,
+      l.lock_token AS lock_token
     FROM job_updates AS u
     INNER JOIN job_keys AS j ON j.id = u.job_key_id
     INNER JOIN job_update_configs AS cn ON cn.update_id = u.id AND cn.is_new = TRUE
@@ -307,6 +318,7 @@
     LEFT OUTER JOIN job_updates_to_instance_overrides AS io ON io.update_id = u.id
     LEFT OUTER JOIN job_update_events AS e ON e.update_id = u.id
     LEFT OUTER JOIN job_instance_update_events AS i ON i.update_id = u.id
+    LEFT OUTER JOIN job_update_locks AS l on l.update_id = u.id
   </sql>
 
   <select id="selectDetails" resultMap="jobUpdateDetailsMap">
@@ -315,10 +327,16 @@
     ORDER BY e_timestamp_ms, i_timestamp_ms
   </select>
 
-  <!--Order by ID to facilitate proper re-insertion during recovery.-->
   <select id="selectAllDetails" resultMap="jobUpdateDetailsMap">
     <include refid="unscoped_details_select"/>
-    ORDER BY u_id
+  </select>
+
+  <select id="selectLockToken" resultType="String">
+    SELECT
+      lock_token
+    FROM job_update_locks AS l
+    INNER JOIN job_updates u ON l.update_id = u.id
+    WHERE u.update_id = #{id}
   </select>
 
   <delete id="truncate">

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1c4f5278/src/main/resources/org/apache/aurora/scheduler/storage/db/schema.sql
----------------------------------------------------------------------
diff --git a/src/main/resources/org/apache/aurora/scheduler/storage/db/schema.sql b/src/main/resources/org/apache/aurora/scheduler/storage/db/schema.sql
index 1cf803f..a450a09 100644
--- a/src/main/resources/org/apache/aurora/scheduler/storage/db/schema.sql
+++ b/src/main/resources/org/apache/aurora/scheduler/storage/db/schema.sql
@@ -39,7 +39,8 @@ CREATE TABLE locks(
   timestampMs BIGINT NOT NULL,
   message VARCHAR,
 
-  UNIQUE(job_key_id)
+  UNIQUE(job_key_id),
+  UNIQUE(token)
 );
 
 CREATE TABLE quotas(
@@ -109,6 +110,15 @@ CREATE TABLE job_updates(
   UNIQUE(update_id)
 );
 
+CREATE TABLE job_update_locks(
+  id IDENTITY,
+  update_id BIGINT NOT NULL REFERENCES job_updates(id) ON DELETE CASCADE,
+  lock_token VARCHAR NOT NULL REFERENCES locks(token) ON DELETE CASCADE,
+
+  UNIQUE(update_id),
+  UNIQUE(lock_token)
+);
+
 CREATE TABLE job_update_configs(
   id IDENTITY,
   update_id BIGINT NOT NULL REFERENCES job_updates(id) ON DELETE CASCADE,

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1c4f5278/src/main/thrift/org/apache/aurora/gen/storage.thrift
----------------------------------------------------------------------
diff --git a/src/main/thrift/org/apache/aurora/gen/storage.thrift b/src/main/thrift/org/apache/aurora/gen/storage.thrift
index 9f8378e..7e50245 100644
--- a/src/main/thrift/org/apache/aurora/gen/storage.thrift
+++ b/src/main/thrift/org/apache/aurora/gen/storage.thrift
@@ -69,10 +69,18 @@ struct SaveHostAttributes {
 
 struct SaveJobUpdate {
   1: api.JobUpdate jobUpdate
+  2: string lockToken
+}
+
+struct StoredJobUpdateDetails {
+  1: api.JobUpdateDetails details
+  /** ID of the lock associated with this update. */
+  2: string lockToken
 }
 
 struct SaveJobUpdateEvent {
   1: api.JobUpdateEvent event
+  /** ID of the lock associated with this update. */
   2: string updateId
 }
 
@@ -147,7 +155,7 @@ struct Snapshot {
   6: SchedulerMetadata schedulerMetadata
   8: set<QuotaConfiguration> quotaConfigurations
   9: set<api.Lock> locks
-  10: set<api.JobUpdateDetails> jobUpdateDetails
+  10: set<StoredJobUpdateDetails> jobUpdateDetails
 }
 
 // A message header that calls out the number of expected FrameChunks to follow to form a complete

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1c4f5278/src/main/thrift/org/apache/aurora/gen/storage_local.thrift
----------------------------------------------------------------------
diff --git a/src/main/thrift/org/apache/aurora/gen/storage_local.thrift b/src/main/thrift/org/apache/aurora/gen/storage_local.thrift
deleted file mode 100644
index becfd75..0000000
--- a/src/main/thrift/org/apache/aurora/gen/storage_local.thrift
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// Thrift structures for a local log storage system, for use in simulated environments.
-namespace java org.apache.aurora.gen.test
-
-struct LogRecord {
-  1: binary contents
-}
-
-struct FileLogContents {
-  1: map<i64, LogRecord> records
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1c4f5278/src/test/java/org/apache/aurora/scheduler/state/JobUpdaterImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/JobUpdaterImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/JobUpdaterImplTest.java
index 1f985fb..90b4e8a 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/JobUpdaterImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/JobUpdaterImplTest.java
@@ -59,6 +59,7 @@ public class JobUpdaterImplTest extends EasyMockTest {
   private static final IJobKey JOB = JobKeys.from("role", "env", "name");
   private static final Identity IDENTITY = new Identity("role", "user");
   private static final Long TIMESTAMP = 1234L;
+  private static final String LOCK = "lock token";
 
   private JobUpdater updater;
   private StorageTestUtil storageUtil;
@@ -106,12 +107,14 @@ public class JobUpdaterImplTest extends EasyMockTest {
         oldTask6,
         oldTask7);
 
-    storageUtil.updateStore.saveJobUpdate(update);
+    storageUtil.updateStore.saveJobUpdate(update, LOCK);
     storageUtil.updateStore.saveJobUpdateEvent(buildUpdateEvent(), UPDATE_ID);
 
     control.replay();
 
-    assertEquals(UPDATE_ID, updater.startJobUpdate(buildJobRequest(update), IDENTITY.getUser()));
+    assertEquals(
+        UPDATE_ID,
+        updater.startJobUpdate(buildJobRequest(update), IDENTITY.getUser(), LOCK));
   }
 
   @Test(expected = UpdaterException.class)
@@ -128,12 +131,12 @@ public class JobUpdaterImplTest extends EasyMockTest {
 
     storageUtil.expectTaskFetch(Query.unscoped().byJob(JOB).active());
 
-    storageUtil.updateStore.saveJobUpdate(update);
+    storageUtil.updateStore.saveJobUpdate(update, LOCK);
     expectLastCall().andThrow(new Storage.StorageException("fail"));
 
     control.replay();
 
-    updater.startJobUpdate(buildJobRequest(update), IDENTITY.getUser());
+    updater.startJobUpdate(buildJobRequest(update), IDENTITY.getUser(), LOCK);
   }
 
   private static IJobUpdateRequest buildJobRequest(IJobUpdate update) {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1c4f5278/src/test/java/org/apache/aurora/scheduler/storage/backup/RecoveryTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/backup/RecoveryTest.java b/src/test/java/org/apache/aurora/scheduler/storage/backup/RecoveryTest.java
index 2f2c3e1..5ac15ec 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/backup/RecoveryTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/backup/RecoveryTest.java
@@ -27,7 +27,6 @@ import com.twitter.common.util.testing.FakeClock;
 import org.apache.aurora.gen.AssignedTask;
 import org.apache.aurora.gen.HostAttributes;
 import org.apache.aurora.gen.Identity;
-import org.apache.aurora.gen.JobUpdateDetails;
 import org.apache.aurora.gen.Lock;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
@@ -35,6 +34,7 @@ import org.apache.aurora.gen.storage.QuotaConfiguration;
 import org.apache.aurora.gen.storage.SchedulerMetadata;
 import org.apache.aurora.gen.storage.Snapshot;
 import org.apache.aurora.gen.storage.StoredJob;
+import org.apache.aurora.gen.storage.StoredJobUpdateDetails;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.storage.DistributedSnapshotStore;
@@ -165,7 +165,7 @@ public class RecoveryTest extends EasyMockTest {
         .setQuotaConfigurations(ImmutableSet.<QuotaConfiguration>of())
         .setTasks(ImmutableSet.<ScheduledTask>builder().add(tasks).build())
         .setLocks(ImmutableSet.<Lock>of())
-        .setJobUpdateDetails(ImmutableSet.<JobUpdateDetails>of());
+        .setJobUpdateDetails(ImmutableSet.<StoredJobUpdateDetails>of());
   }
 
   private static ScheduledTask makeTask(String taskId) {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1c4f5278/src/test/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStoreTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStoreTest.java b/src/test/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStoreTest.java
index f695b85..f669dbe 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStoreTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStoreTest.java
@@ -15,6 +15,7 @@
 package org.apache.aurora.scheduler.storage.db;
 
 import java.util.List;
+import java.util.Set;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
@@ -34,8 +35,11 @@ import org.apache.aurora.gen.JobUpdateSettings;
 import org.apache.aurora.gen.JobUpdateState;
 import org.apache.aurora.gen.JobUpdateStatus;
 import org.apache.aurora.gen.JobUpdateSummary;
+import org.apache.aurora.gen.Lock;
+import org.apache.aurora.gen.LockKey;
 import org.apache.aurora.gen.Range;
 import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.gen.storage.StoredJobUpdateDetails;
 import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
@@ -49,11 +53,14 @@ import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary;
+import org.apache.aurora.scheduler.storage.entities.ILock;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 public class DBJobUpdateStoreTest {
 
@@ -79,13 +86,13 @@ public class DBJobUpdateStoreTest {
     String updateId1 = "u1";
     String updateId2 = "u2";
 
-    IJobUpdate update1 = makeJobUpdate(JOB, updateId1);
-    IJobUpdate update2 = makeJobUpdate(JOB, updateId2);
+    IJobUpdate update1 = makeJobUpdate(JobKeys.from("role", "env", "name1"), updateId1);
+    IJobUpdate update2 = makeJobUpdate(JobKeys.from("role", "env", "name2"), updateId2);
 
-    saveUpdate(update1);
+    saveUpdate(update1, "lock1");
     assertEquals(populateExpected(update1), getUpdateDetails(updateId1).get().getUpdate());
 
-    saveUpdate(update2);
+    saveUpdate(update2, "lock2");
     assertEquals(populateExpected(update1), getUpdateDetails(updateId1).get().getUpdate());
   }
 
@@ -100,7 +107,7 @@ public class DBJobUpdateStoreTest {
     IJobUpdate expected = IJobUpdate.build(builder);
 
     // Save with empty overrides.
-    saveUpdate(expected);
+    saveUpdate(expected, "lock");
     assertEquals(populateExpected(expected), getUpdateDetails(updateId).get().getUpdate());
   }
 
@@ -116,7 +123,7 @@ public class DBJobUpdateStoreTest {
 
     // Save with null overrides.
     builder.getConfiguration().getSettings().setUpdateOnlyTheseInstances(null);
-    saveUpdate(IJobUpdate.build(builder));
+    saveUpdate(IJobUpdate.build(builder), "lock");
     assertEquals(populateExpected(expected), getUpdateDetails(updateId).get().getUpdate());
   }
 
@@ -125,8 +132,8 @@ public class DBJobUpdateStoreTest {
     String updateId = "u1";
     IJobUpdate update = makeJobUpdate(JOB, updateId);
 
-    saveUpdate(update);
-    saveUpdate(update);
+    saveUpdate(update, "lock1");
+    saveUpdate(update, "lock2");
   }
 
   @Test
@@ -136,7 +143,7 @@ public class DBJobUpdateStoreTest {
     IJobUpdateEvent event1 = makeJobUpdateEvent(JobUpdateStatus.ROLLING_FORWARD, 124L);
     IJobUpdateEvent event2 = makeJobUpdateEvent(JobUpdateStatus.ROLL_FORWARD_PAUSED, 125L);
 
-    saveUpdate(update);
+    saveUpdate(update, "lock1");
     assertEquals(populateExpected(update), getUpdateDetails(updateId).get().getUpdate());
     assertEquals(ImmutableList.of(FIRST_EVENT), getUpdateDetails(updateId).get().getUpdateEvents());
 
@@ -161,7 +168,7 @@ public class DBJobUpdateStoreTest {
     IJobInstanceUpdateEvent event1 = makeJobInstanceEvent(0, 125L, JobUpdateAction.ADD_INSTANCE);
     IJobInstanceUpdateEvent event2 = makeJobInstanceEvent(1, 126L, JobUpdateAction.ADD_INSTANCE);
 
-    saveUpdate(update);
+    saveUpdate(update, "lock");
     assertEquals(populateExpected(update), getUpdateDetails(updateId).get().getUpdate());
     assertEquals(0, getUpdateDetails(updateId).get().getInstanceEvents().size());
 
@@ -201,7 +208,7 @@ public class DBJobUpdateStoreTest {
         567L,
         567L);
 
-    saveUpdate(update);
+    saveUpdate(update, "lock1");
 
     // Assert state fields were ignored.
     assertEquals(populateExpected(update), getUpdateDetails(updateId).get().getUpdate());
@@ -210,11 +217,12 @@ public class DBJobUpdateStoreTest {
   @Test
   public void testSaveJobUpdateWithoutEventFailsSelect() {
     final String updateId = "u3";
-    storage.write(new MutateWork.Quiet<Void>() {
+    storage.write(new MutateWork.NoResult.Quiet() {
       @Override
-      public Void apply(MutableStoreProvider storeProvider) {
-        storeProvider.getJobUpdateStore().saveJobUpdate(makeJobUpdate(JOB, updateId));
-        return null;
+      public void execute(MutableStoreProvider storeProvider) {
+        IJobUpdate update = makeJobUpdate(JOB, updateId);
+        storeProvider.getLockStore().saveLock(makeLock(update.getSummary().getJobKey(), "lock1"));
+        storeProvider.getJobUpdateStore().saveJobUpdate(update, "lock1");
       }
     });
     assertEquals(Optional.<IJobUpdateDetails>absent(), getUpdateDetails(updateId));
@@ -224,16 +232,18 @@ public class DBJobUpdateStoreTest {
   public void testMultipleJobDetails() {
     String updateId1 = "u1";
     String updateId2 = "u2";
-    IJobUpdateDetails details1 = makeJobDetails(makeJobUpdate(JOB, updateId1));
-    IJobUpdateDetails details2 = makeJobDetails(makeJobUpdate(JOB, updateId2));
+    IJobUpdateDetails details1 =
+        makeJobDetails(makeJobUpdate(JobKeys.from("role", "env", "name1"), updateId1));
+    IJobUpdateDetails details2 =
+        makeJobDetails(makeJobUpdate(JobKeys.from("role", "env", "name2"), updateId2));
 
-    saveUpdate(details1.getUpdate());
-    saveUpdate(details2.getUpdate());
+    saveUpdate(details1.getUpdate(), "lock1");
+    saveUpdate(details2.getUpdate(), "lock2");
 
     details1 = updateJobDetails(populateExpected(details1.getUpdate()), FIRST_EVENT);
     details2 = updateJobDetails(populateExpected(details2.getUpdate()), FIRST_EVENT);
-    assertEquals(details1, getUpdateDetails(updateId1).get());
-    assertEquals(details2, getUpdateDetails(updateId2).get());
+    assertEquals(Optional.of(details1), getUpdateDetails(updateId1));
+    assertEquals(Optional.of(details2), getUpdateDetails(updateId2));
 
     IJobUpdateEvent jEvent11 = makeJobUpdateEvent(JobUpdateStatus.ROLLING_FORWARD, 456L);
     IJobUpdateEvent jEvent12 = makeJobUpdateEvent(JobUpdateStatus.ERROR, 457L);
@@ -263,10 +273,14 @@ public class DBJobUpdateStoreTest {
         populateExpected(details2.getUpdate(), JobUpdateStatus.ABORTED, CREATED_MS, 568L),
         ImmutableList.of(FIRST_EVENT, jEvent21, jEvent22), ImmutableList.of(iEvent21, iEvent22));
 
-    assertEquals(details1, getUpdateDetails(updateId1).get());
-    assertEquals(details2, getUpdateDetails(updateId2).get());
+    assertEquals(Optional.of(details1), getUpdateDetails(updateId1));
+    assertEquals(Optional.of(details2), getUpdateDetails(updateId2));
 
-    assertEquals(ImmutableList.of(details1, details2), getAllUpdateDetails());
+    assertEquals(
+        ImmutableSet.of(
+            new StoredJobUpdateDetails(details1.newBuilder(), "lock1"),
+            new StoredJobUpdateDetails(details2.newBuilder(), "lock2")),
+        getAllUpdateDetails());
   }
 
   @Test
@@ -278,7 +292,7 @@ public class DBJobUpdateStoreTest {
     IJobInstanceUpdateEvent instanceEvent = IJobInstanceUpdateEvent.build(
         new JobInstanceUpdateEvent(0, 125L, JobUpdateAction.ADD_INSTANCE));
 
-    saveUpdate(update);
+    saveUpdate(update, "lock");
     saveJobEvent(updateEvent, updateId);
     saveJobInstanceEvent(instanceEvent, updateId);
     assertEquals(
@@ -291,14 +305,107 @@ public class DBJobUpdateStoreTest {
     assertEquals(Optional.<IJobUpdateDetails>absent(), getUpdateDetails(updateId));
   }
 
+  @Test(expected = StorageException.class)
+  public void testSaveUpdateWithoutLock() {
+    final IJobUpdate update = makeJobUpdate(JOB, "updateId");
+    storage.write(new MutateWork.NoResult.Quiet() {
+      @Override
+      public void execute(MutableStoreProvider storeProvider) {
+        storeProvider.getJobUpdateStore().saveJobUpdate(update, "lock");
+      }
+    });
+  }
+
+  @Test(expected = StorageException.class)
+  public void testSaveTwoUpdatesForOneJob() {
+    final IJobUpdate update = makeJobUpdate(JOB, "updateId");
+    saveUpdate(update, "lock1");
+    saveUpdate(update, "lock2");
+  }
+
+  @Test(expected = StorageException.class)
+  public void testSaveTwoUpdatesSameJobKey() {
+    final IJobUpdate update1 = makeJobUpdate(JOB, "update1");
+    final IJobUpdate update2 = makeJobUpdate(JOB, "update2");
+    saveUpdate(update1, "lock1");
+    saveUpdate(update2, "lock1");
+  }
+
+  @Test
+  public void testLockCleared() {
+    final IJobUpdate update = makeJobUpdate(JOB, "update1");
+    saveUpdate(update, "lock1");
+
+    storage.write(new MutateWork.NoResult.Quiet() {
+      @Override
+      public void execute(MutableStoreProvider storeProvider) {
+        storeProvider.getLockStore().removeLock(
+            makeLock(update.getSummary().getJobKey(), "lock1").getKey());
+      }
+    });
+
+    assertEquals(
+        Optional.of(updateJobDetails(populateExpected(update), FIRST_EVENT)),
+        getUpdateDetails("update1"));
+    assertEquals(
+        ImmutableSet.of(
+            new StoredJobUpdateDetails(
+                updateJobDetails(populateExpected(update), FIRST_EVENT).newBuilder(),
+                null)),
+        getAllUpdateDetails());
+
+    assertEquals(
+        ImmutableList.of(populateExpected(update).getSummary()),
+        getSummaries(new JobUpdateQuery().setUpdateId("update1")));
+
+    // If the lock has been released for this job, we can start another update.
+    saveUpdate(makeJobUpdate(JOB, "update2"), "lock2");
+  }
+
+  @Test
+  public void testIsActive() {
+    storage.write(new MutateWork.NoResult.Quiet() {
+      @Override
+      public void execute(MutableStoreProvider storeProvider) {
+        final IJobUpdate update1 = makeJobUpdate(JobKeys.from("role", "env", "name1"), "update1");
+        final IJobUpdate update2 = makeJobUpdate(JobKeys.from("role", "env", "name2"), "update2");
+        saveUpdate(update1, "lock1");
+        assertTrue(storeProvider.getJobUpdateStore().isActive("update1"));
+        assertFalse(storeProvider.getJobUpdateStore().isActive("update2"));
+
+        saveUpdate(update2, "lock2");
+        assertTrue(storeProvider.getJobUpdateStore().isActive("update1"));
+        assertTrue(storeProvider.getJobUpdateStore().isActive("update2"));
+
+        storeProvider.getLockStore().removeLock(
+            makeLock(update1.getSummary().getJobKey(), "lock1").getKey());
+        assertFalse(storeProvider.getJobUpdateStore().isActive("update1"));
+        assertTrue(storeProvider.getJobUpdateStore().isActive("update2"));
+
+        storeProvider.getLockStore().removeLock(
+            makeLock(update2.getSummary().getJobKey(), "lock2").getKey());
+        assertFalse(storeProvider.getJobUpdateStore().isActive("update1"));
+        assertFalse(storeProvider.getJobUpdateStore().isActive("update2"));
+      }
+    });
+  }
+
   @Test
   public void testGetSummaries() {
-    IJobKey job2 = JobKeys.from("role", "env", "name");
-    IJobUpdateSummary s1 = saveSummary(JOB, "u1", 1230L, JobUpdateStatus.ROLLED_BACK, "user");
-    IJobUpdateSummary s2 = saveSummary(JOB, "u2", 1231L, JobUpdateStatus.ABORTED, "user");
-    IJobUpdateSummary s3 = saveSummary(JOB, "u3", 1239L, JobUpdateStatus.ERROR, "user2");
-    IJobUpdateSummary s4 = saveSummary(JOB, "u4", 1234L, JobUpdateStatus.ROLL_BACK_PAUSED, "user3");
-    IJobUpdateSummary s5 = saveSummary(job2, "u5", 1235L, JobUpdateStatus.ROLLING_FORWARD, "user4");
+    String role1 = "role1";
+    IJobKey job1 = JobKeys.from(role1, "env", "name1");
+    IJobKey job2 = JobKeys.from(role1, "env", "name2");
+    IJobKey job3 = JobKeys.from(role1, "env", "name3");
+    IJobKey job4 = JobKeys.from(role1, "env", "name4");
+    IJobKey job5 = JobKeys.from("role", "env", "name5");
+    IJobUpdateSummary s1 =
+        saveSummary(job1, "u1", 1230L, JobUpdateStatus.ROLLED_BACK, "user", "lock1");
+    IJobUpdateSummary s2 = saveSummary(job2, "u2", 1231L, JobUpdateStatus.ABORTED, "user", "lock2");
+    IJobUpdateSummary s3 = saveSummary(job3, "u3", 1239L, JobUpdateStatus.ERROR, "user2", "lock3");
+    IJobUpdateSummary s4 =
+        saveSummary(job4, "u4", 1234L, JobUpdateStatus.ROLL_BACK_PAUSED, "user3", "lock4");
+    IJobUpdateSummary s5 =
+        saveSummary(job5, "u5", 1235L, JobUpdateStatus.ROLLING_FORWARD, "user4", "lock5");
 
     // Test empty query returns all.
     assertEquals(ImmutableList.of(s1, s2, s4, s5, s3), getSummaries(new JobUpdateQuery()));
@@ -309,12 +416,12 @@ public class DBJobUpdateStoreTest {
     // Test query by role.
     assertEquals(
         ImmutableList.of(s1, s2, s4, s3),
-        getSummaries(new JobUpdateQuery().setRole(JOB.getRole())));
+        getSummaries(new JobUpdateQuery().setRole(role1)));
 
     // Test query by job key.
     assertEquals(
         ImmutableList.of(s5),
-        getSummaries(new JobUpdateQuery().setJobKey(job2.newBuilder())));
+        getSummaries(new JobUpdateQuery().setJobKey(job5.newBuilder())));
 
     // Test query by user.
     assertEquals(ImmutableList.of(s1, s2), getSummaries(new JobUpdateQuery().setUser("user")));
@@ -361,10 +468,10 @@ public class DBJobUpdateStoreTest {
     });
   }
 
-  private List<IJobUpdateDetails> getAllUpdateDetails() {
-    return storage.consistentRead(new Quiet<List<IJobUpdateDetails>>() {
+  private Set<StoredJobUpdateDetails> getAllUpdateDetails() {
+    return storage.consistentRead(new Quiet<Set<StoredJobUpdateDetails>>() {
       @Override
-      public List<IJobUpdateDetails> apply(Storage.StoreProvider storeProvider) {
+      public Set<StoredJobUpdateDetails> apply(Storage.StoreProvider storeProvider) {
         return storeProvider.getJobUpdateStore().fetchAllJobUpdateDetails();
       }
     });
@@ -380,45 +487,50 @@ public class DBJobUpdateStoreTest {
     });
   }
 
-  private void saveUpdate(final IJobUpdate update) {
-    storage.write(new MutateWork.Quiet<Void>() {
+  private static ILock makeLock(IJobKey jobKey, String lockToken) {
+    return ILock.build(new Lock()
+        .setKey(LockKey.job(jobKey.newBuilder()))
+        .setToken(lockToken)
+        .setTimestampMs(100)
+        .setUser("fake user"));
+  }
+
+  private void saveUpdate(final IJobUpdate update, final String lockToken) {
+    storage.write(new MutateWork.NoResult.Quiet() {
       @Override
-      public Void apply(MutableStoreProvider storeProvider) {
-        storeProvider.getJobUpdateStore().saveJobUpdate(update);
+      public void execute(MutableStoreProvider storeProvider) {
+        storeProvider.getLockStore().saveLock(makeLock(update.getSummary().getJobKey(), lockToken));
+        storeProvider.getJobUpdateStore().saveJobUpdate(update, lockToken);
         storeProvider.getJobUpdateStore().saveJobUpdateEvent(
             FIRST_EVENT,
             update.getSummary().getUpdateId());
-        return null;
       }
     });
   }
 
   private void saveJobEvent(final IJobUpdateEvent event, final String updateId) {
-    storage.write(new MutateWork.Quiet<Void>() {
+    storage.write(new MutateWork.NoResult.Quiet() {
       @Override
-      public Void apply(MutableStoreProvider storeProvider) {
+      public void execute(MutableStoreProvider storeProvider) {
         storeProvider.getJobUpdateStore().saveJobUpdateEvent(event, updateId);
-        return null;
       }
     });
   }
 
   private void saveJobInstanceEvent(final IJobInstanceUpdateEvent event, final String updateId) {
-    storage.write(new MutateWork.Quiet<Void>() {
+    storage.write(new MutateWork.NoResult.Quiet() {
       @Override
-      public Void apply(MutableStoreProvider storeProvider) {
+      public void execute(MutableStoreProvider storeProvider) {
         storeProvider.getJobUpdateStore().saveJobInstanceUpdateEvent(event, updateId);
-        return null;
       }
     });
   }
 
   private void truncateUpdates() {
-    storage.write(new MutateWork.Quiet<Void>() {
+    storage.write(new MutateWork.NoResult.Quiet() {
       @Override
-      public Void apply(MutableStoreProvider storeProvider) {
+      public void execute(MutableStoreProvider storeProvider) {
         storeProvider.getJobUpdateStore().deleteAllUpdatesAndEvents();
-        return null;
       }
     });
   }
@@ -458,7 +570,7 @@ public class DBJobUpdateStoreTest {
   private IJobUpdateDetails makeJobDetails(IJobUpdate update) {
     return updateJobDetails(
         update,
-        ImmutableList.<IJobUpdateEvent>of(FIRST_EVENT),
+        ImmutableList.of(FIRST_EVENT),
         ImmutableList.<IJobInstanceUpdateEvent>of());
   }
 
@@ -492,7 +604,8 @@ public class DBJobUpdateStoreTest {
       String updateId,
       Long modifiedTimestampMs,
       JobUpdateStatus status,
-      String user) {
+      String user,
+      String lockToken) {
 
     IJobUpdateSummary summary = IJobUpdateSummary.build(new JobUpdateSummary()
         .setUpdateId(updateId)
@@ -500,7 +613,7 @@ public class DBJobUpdateStoreTest {
         .setUser(user));
 
     IJobUpdate update = makeJobUpdate(summary);
-    saveUpdate(update);
+    saveUpdate(update, lockToken);
     saveJobEvent(makeJobUpdateEvent(status, modifiedTimestampMs), updateId);
     return populateExpected(update, status, CREATED_MS, modifiedTimestampMs).getSummary();
   }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1c4f5278/src/test/java/org/apache/aurora/scheduler/storage/db/DbLockStoreTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/db/DbLockStoreTest.java b/src/test/java/org/apache/aurora/scheduler/storage/db/DbLockStoreTest.java
index ae4cef4..e9b210f 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/db/DbLockStoreTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/db/DbLockStoreTest.java
@@ -85,10 +85,10 @@ public class DbLockStoreTest {
     });
   }
 
-  private static ILock makeLock(JobKey key) {
+  private static ILock makeLock(JobKey key, String token) {
     return ILock.build(new Lock()
       .setKey(LockKey.job(key))
-      .setToken("lock1")
+      .setToken(token)
       .setUser("testUser")
       .setMessage("Test message")
       .setTimestampMs(12345L));
@@ -108,8 +108,8 @@ public class DbLockStoreTest {
     String job1 = "testJob1";
     String job2 = "testJob2";
 
-    ILock lock1 = makeLock(JobKeys.from(role, env, job1).newBuilder());
-    ILock lock2 = makeLock(JobKeys.from(role, env, job2).newBuilder());
+    ILock lock1 = makeLock(JobKeys.from(role, env, job1).newBuilder(), "token1");
+    ILock lock2 = makeLock(JobKeys.from(role, env, job2).newBuilder(), "token2");
 
     saveLocks(lock1, lock2);
     assertLocks(lock1, lock2);
@@ -126,7 +126,7 @@ public class DbLockStoreTest {
     String env = "testEnv";
     String job = "testJob";
 
-    ILock lock = makeLock(JobKeys.from(role, env, job).newBuilder());
+    ILock lock = makeLock(JobKeys.from(role, env, job).newBuilder(), "token1");
 
     saveLocks(lock);
     try {
@@ -145,7 +145,7 @@ public class DbLockStoreTest {
     String env = "testEnv";
     String job = "testJob";
 
-    ILock lock = makeLock(JobKeys.from(role, env, job).newBuilder());
+    ILock lock = makeLock(JobKeys.from(role, env, job).newBuilder(), "token1");
 
     saveLocks(lock);
     removeLocks(lock);
@@ -163,8 +163,8 @@ public class DbLockStoreTest {
     String env = "testEnv";
     String job = "testJob";
 
-    ILock lock1 = makeLock(JobKeys.from(role1, env, job).newBuilder());
-    ILock lock2 = makeLock(JobKeys.from(role2, env, job).newBuilder());
+    ILock lock1 = makeLock(JobKeys.from(role1, env, job).newBuilder(), "token1");
+    ILock lock2 = makeLock(JobKeys.from(role2, env, job).newBuilder(), "token2");
 
     assertEquals(Optional.<ILock>absent(), getLock(lock1.getKey()));
     assertEquals(Optional.<ILock>absent(), getLock(lock2.getKey()));
@@ -189,8 +189,8 @@ public class DbLockStoreTest {
     String job1 = "testJob1";
     String job2 = "testJob2";
 
-    ILock lock1 = makeLock(JobKeys.from(role, env, job1).newBuilder());
-    ILock lock2 = makeLock(JobKeys.from(role, env, job2).newBuilder());
+    ILock lock1 = makeLock(JobKeys.from(role, env, job1).newBuilder(), "token1");
+    ILock lock2 = makeLock(JobKeys.from(role, env, job2).newBuilder(), "token2");
 
     saveLocks(lock1, lock2);
     assertLocks(lock1, lock2);
@@ -205,4 +205,18 @@ public class DbLockStoreTest {
 
     assertLocks();
   }
+
+  @Test
+  public void testDuplicateToken() throws Exception {
+    ILock lock = makeLock(JobKeys.from("role", "env", "job1").newBuilder(), "token1");
+    saveLocks(lock);
+    try {
+      saveLocks(makeLock(JobKeys.from("role", "env", "job2").newBuilder(), "token1"));
+      fail();
+    } catch (StorageException e) {
+      // Expected.
+    }
+
+    assertLocks(lock);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1c4f5278/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
index ebcb910..78798f2 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
@@ -763,19 +763,21 @@ public class LogStorageTest extends EasyMockTest {
                 .setTask(new TaskConfig())
                 .setInstances(ImmutableSet.of(new Range(0, 3)))))
             .setSettings(new JobUpdateSettings())));
+    final String lockToken = "token";
 
     new MutationFixture() {
       @Override
       protected void setupExpectations() throws Exception {
         storageUtil.expectWriteOperation();
-        storageUtil.updateStore.saveJobUpdate(update);
-        streamMatcher.expectTransaction(Op.saveJobUpdate(new SaveJobUpdate(update.newBuilder())))
+        storageUtil.updateStore.saveJobUpdate(update, lockToken);
+        streamMatcher.expectTransaction(
+            Op.saveJobUpdate(new SaveJobUpdate(update.newBuilder(), lockToken)))
             .andReturn(position);
       }
 
       @Override
       protected void performMutations(MutableStoreProvider storeProvider) {
-        storeProvider.getJobUpdateStore().saveJobUpdate(update);
+        storeProvider.getJobUpdateStore().saveJobUpdate(update, lockToken);
       }
     }.run();
   }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1c4f5278/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplTest.java
index bee9c9c..c752e66 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplTest.java
@@ -42,6 +42,7 @@ import org.apache.aurora.gen.storage.QuotaConfiguration;
 import org.apache.aurora.gen.storage.SchedulerMetadata;
 import org.apache.aurora.gen.storage.Snapshot;
 import org.apache.aurora.gen.storage.StoredJob;
+import org.apache.aurora.gen.storage.StoredJobUpdateDetails;
 import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.ResourceAggregates;
@@ -117,8 +118,10 @@ public class SnapshotStoreImplTest extends EasyMockTest {
         .andReturn(ImmutableSet.of(IJobConfiguration.build(job.getJobConfiguration())));
     expect(storageUtil.schedulerStore.fetchFrameworkId()).andReturn(Optional.of(frameworkId));
     expect(storageUtil.lockStore.fetchLocks()).andReturn(ImmutableSet.of(lock));
+    String lockToken = "token";
     expect(storageUtil.updateStore.fetchAllJobUpdateDetails())
-        .andReturn(ImmutableList.of(updateDetails));
+        .andReturn(ImmutableSet.of(
+            new StoredJobUpdateDetails(updateDetails.newBuilder(), lockToken)));
 
     expectDataWipe();
     storageUtil.taskStore.saveTasks(tasks);
@@ -129,7 +132,7 @@ public class SnapshotStoreImplTest extends EasyMockTest {
         IJobConfiguration.build(job.getJobConfiguration()));
     storageUtil.schedulerStore.saveFrameworkId(frameworkId);
     storageUtil.lockStore.saveLock(lock);
-    storageUtil.updateStore.saveJobUpdate(updateDetails.getUpdate());
+    storageUtil.updateStore.saveJobUpdate(updateDetails.getUpdate(), lockToken);
     storageUtil.updateStore.saveJobUpdateEvent(
         Iterables.getOnlyElement(updateDetails.getUpdateEvents()),
         updateId);
@@ -147,7 +150,8 @@ public class SnapshotStoreImplTest extends EasyMockTest {
         .setJobs(ImmutableSet.of(job))
         .setSchedulerMetadata(metadata)
         .setLocks(ImmutableSet.of(lock.newBuilder()))
-        .setJobUpdateDetails(ImmutableSet.of(updateDetails.newBuilder()));
+        .setJobUpdateDetails(ImmutableSet.of(
+            new StoredJobUpdateDetails(updateDetails.newBuilder(), lockToken)));
 
     assertEquals(expected, snapshotStore.createSnapshot());
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1c4f5278/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
index 7dbf97a..649afa2 100644
--- a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
@@ -164,7 +164,8 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
   private static final SessionKey SESSION = new SessionKey();
   private static final IJobKey JOB_KEY = JobKeys.from(ROLE, DEFAULT_ENVIRONMENT, JOB_NAME);
   private static final ILockKey LOCK_KEY = ILockKey.build(LockKey.job(JOB_KEY.newBuilder()));
-  private static final ILock LOCK = ILock.build(new Lock().setKey(LOCK_KEY.newBuilder()));
+  private static final ILock LOCK =
+      ILock.build(new Lock().setKey(LOCK_KEY.newBuilder()).setToken("token"));
   private static final JobConfiguration CRON_JOB = makeJob().setCronSchedule("* * * * *");
   private static final Lock DEFAULT_LOCK = null;
   private static final String TASK_ID = "task_id";
@@ -1811,8 +1812,9 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
   public void testStartUpdate() throws Exception {
     JobUpdateRequest request = createJobRequest(populatedTask());
     expectAuth(ROLE, true);
-    lockManager.validateIfLocked(LOCK_KEY, Optional.of(LOCK));
-    expect(jobUpdater.startJobUpdate(IJobUpdateRequest.build(request), USER)).andReturn(UPDATE_ID);
+    expect(lockManager.acquireLock(LOCK_KEY, USER)).andReturn(LOCK);
+    expect(jobUpdater.startJobUpdate(IJobUpdateRequest.build(request), USER, LOCK.getToken()))
+        .andReturn(UPDATE_ID);
 
     control.replay();
 
@@ -1843,8 +1845,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
   public void testStartUpdateFailsLockValidation() throws Exception {
     JobUpdateRequest request = createJobRequest(populatedTask());
     expectAuth(ROLE, true);
-    lockManager.validateIfLocked(LOCK_KEY, Optional.of(LOCK));
-    expectLastCall().andThrow(new LockException("lock failed"));
+    expect(lockManager.acquireLock(LOCK_KEY, USER)).andThrow(new LockException("lock failed"));
 
     control.replay();
 
@@ -1855,8 +1856,8 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
   public void testStartUpdateFailsInUpdater() throws Exception {
     JobUpdateRequest request = createJobRequest(populatedTask());
     expectAuth(ROLE, true);
-    lockManager.validateIfLocked(LOCK_KEY, Optional.of(LOCK));
-    expect(jobUpdater.startJobUpdate(IJobUpdateRequest.build(request), USER))
+    expect(lockManager.acquireLock(LOCK_KEY, USER)).andReturn(LOCK);
+    expect(jobUpdater.startJobUpdate(IJobUpdateRequest.build(request), USER, LOCK.getToken()))
         .andThrow(new UpdaterException("failed update"));
 
     control.replay();