You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2014/08/15 06:16:49 UTC
git commit: Store a lock association with job updates.
Repository: incubator-aurora
Updated Branches:
refs/heads/master 8d5a62cfd -> 1c4f52782
Store a lock association with job updates.
Bugs closed: AURORA-613
Reviewed at https://reviews.apache.org/r/24727/
Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/1c4f5278
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/1c4f5278
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/1c4f5278
Branch: refs/heads/master
Commit: 1c4f52782870087aee0187ea9797c22cc624a026
Parents: 8d5a62c
Author: Bill Farner <wf...@apache.org>
Authored: Thu Aug 14 20:19:54 2014 -0700
Committer: Bill Farner <wf...@apache.org>
Committed: Thu Aug 14 20:19:54 2014 -0700
----------------------------------------------------------------------
.../aurora/scheduler/state/JobUpdater.java | 4 +-
.../aurora/scheduler/state/JobUpdaterImpl.java | 8 +-
.../scheduler/storage/ForwardingStore.java | 8 +-
.../scheduler/storage/JobUpdateStore.java | 17 +-
.../scheduler/storage/db/DBJobUpdateStore.java | 27 ++-
.../storage/db/JobUpdateDetailsMapper.java | 38 +++-
.../scheduler/storage/log/LogStorage.java | 3 +-
.../storage/log/SnapshotStoreImpl.java | 49 +++--
.../storage/log/WriteAheadStorage.java | 6 +-
.../thrift/SchedulerThriftInterface.java | 10 +-
.../storage/db/JobUpdateDetailsMapper.xml | 54 +++--
.../aurora/scheduler/storage/db/schema.sql | 12 +-
.../thrift/org/apache/aurora/gen/storage.thrift | 10 +-
.../org/apache/aurora/gen/storage_local.thrift | 24 --
.../scheduler/state/JobUpdaterImplTest.java | 11 +-
.../scheduler/storage/backup/RecoveryTest.java | 4 +-
.../storage/db/DBJobUpdateStoreTest.java | 219 ++++++++++++++-----
.../scheduler/storage/db/DbLockStoreTest.java | 34 ++-
.../scheduler/storage/log/LogStorageTest.java | 8 +-
.../storage/log/SnapshotStoreImplTest.java | 10 +-
.../thrift/SchedulerThriftInterfaceTest.java | 15 +-
21 files changed, 390 insertions(+), 181 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1c4f5278/src/main/java/org/apache/aurora/scheduler/state/JobUpdater.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/JobUpdater.java b/src/main/java/org/apache/aurora/scheduler/state/JobUpdater.java
index f153444..eb472e4 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/JobUpdater.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/JobUpdater.java
@@ -25,10 +25,12 @@ public interface JobUpdater {
*
* @param request Job update request.
* @param user User who initiated the update.
+ * @param lockToken Token for the lock held for this update.
* @return Saved job update ID.
* @throws UpdaterException Throws if update fails to start for any reason.
*/
- String startJobUpdate(IJobUpdateRequest request, String user) throws UpdaterException;
+ String startJobUpdate(IJobUpdateRequest request, String user, String lockToken)
+ throws UpdaterException;
/**
* Thrown when job update related operation failed.
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1c4f5278/src/main/java/org/apache/aurora/scheduler/state/JobUpdaterImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/JobUpdaterImpl.java b/src/main/java/org/apache/aurora/scheduler/state/JobUpdaterImpl.java
index 6bcdf62..f21f27c 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/JobUpdaterImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/JobUpdaterImpl.java
@@ -101,8 +101,10 @@ class JobUpdaterImpl implements JobUpdater {
}
@Override
- public String startJobUpdate(final IJobUpdateRequest request, final String user)
- throws UpdaterException {
+ public String startJobUpdate(
+ final IJobUpdateRequest request,
+ final String user,
+ final String lockToken) throws UpdaterException {
return storage.write(new MutateWork<String, UpdaterException>() {
@Override
@@ -125,7 +127,7 @@ class JobUpdaterImpl implements JobUpdater {
.setTimestampMs(clock.nowMillis()));
try {
- storeProvider.getJobUpdateStore().saveJobUpdate(update);
+ storeProvider.getJobUpdateStore().saveJobUpdate(update, lockToken);
storeProvider.getJobUpdateStore().saveJobUpdateEvent(event, updateId);
} catch (StorageException e) {
throw new UpdaterException("Failed to start update.", e);
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1c4f5278/src/main/java/org/apache/aurora/scheduler/storage/ForwardingStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/ForwardingStore.java b/src/main/java/org/apache/aurora/scheduler/storage/ForwardingStore.java
index 3f083d6..b894a71 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/ForwardingStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/ForwardingStore.java
@@ -20,6 +20,7 @@ import java.util.Set;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableSet;
+import org.apache.aurora.gen.storage.StoredJobUpdateDetails;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
@@ -150,7 +151,12 @@ public class ForwardingStore implements
}
@Override
- public List<IJobUpdateDetails> fetchAllJobUpdateDetails() {
+ public Set<StoredJobUpdateDetails> fetchAllJobUpdateDetails() {
return jobUpdateStore.fetchAllJobUpdateDetails();
}
+
+ @Override
+ public boolean isActive(String updateId) {
+ return jobUpdateStore.isActive(updateId);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1c4f5278/src/main/java/org/apache/aurora/scheduler/storage/JobUpdateStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/JobUpdateStore.java b/src/main/java/org/apache/aurora/scheduler/storage/JobUpdateStore.java
index c05833f..599dbd8 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/JobUpdateStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/JobUpdateStore.java
@@ -14,9 +14,11 @@
package org.apache.aurora.scheduler.storage;
import java.util.List;
+import java.util.Set;
import com.google.common.base.Optional;
+import org.apache.aurora.gen.storage.StoredJobUpdateDetails;
import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
@@ -47,10 +49,20 @@ public interface JobUpdateStore {
/**
* Fetches a read-only view of all job update details available in the store.
+ * TODO(wfarner): Generate immutable wrappers for storage.thrift structs, use an immutable object
+ * here.
*
* @return A read-only view of all job update details.
*/
- List<IJobUpdateDetails> fetchAllJobUpdateDetails();
+ Set<StoredJobUpdateDetails> fetchAllJobUpdateDetails();
+
+ /**
+ * Determines whether an update ID represents a currently-active job update.
+ *
+ * @param updateId Job update ID.
+ * @return {@code true} if this update has exclusive access to the job, otherwise {@code false}.
+ */
+ boolean isActive(String updateId);
interface Mutable extends JobUpdateStore {
@@ -69,8 +81,9 @@ public interface JobUpdateStore {
* without having at least one {@link IJobUpdateEvent} present in the store will return empty.
*
* @param update Update to save.
+ * @param lockToken UUID identifying the lock associated with this update.
*/
- void saveJobUpdate(IJobUpdate update);
+ void saveJobUpdate(IJobUpdate update, String lockToken);
/**
* Saves a new job update event.
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1c4f5278/src/main/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStore.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStore.java
index d659aa1..ec9b37c 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStore.java
@@ -20,8 +20,9 @@ import javax.inject.Inject;
import com.google.common.base.Function;
import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
-import org.apache.aurora.gen.JobUpdateDetails;
+import org.apache.aurora.gen.storage.StoredJobUpdateDetails;
import org.apache.aurora.scheduler.storage.JobUpdateStore;
import org.apache.aurora.scheduler.storage.entities.IInstanceTaskConfig;
import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
@@ -58,11 +59,15 @@ public class DBJobUpdateStore implements JobUpdateStore.Mutable {
}
@Override
- public void saveJobUpdate(IJobUpdate update) {
+ public void saveJobUpdate(IJobUpdate update, String lockToken) {
+ requireNonNull(update);
+ requireNonNull(lockToken);
+
jobKeyMapper.merge(update.getSummary().getJobKey().newBuilder());
detailsMapper.insert(update.newBuilder());
String updateId = update.getSummary().getUpdateId();
+ detailsMapper.insertLockToken(updateId, lockToken);
// Insert optional instance update overrides.
Set<IRange> instanceOverrides =
@@ -113,16 +118,24 @@ public class DBJobUpdateStore implements JobUpdateStore.Mutable {
@Override
public Optional<IJobUpdateDetails> fetchJobUpdateDetails(final String updateId) {
return Optional.fromNullable(detailsMapper.selectDetails(updateId))
- .transform(new Function<JobUpdateDetails, IJobUpdateDetails>() {
+ .transform(new Function<StoredJobUpdateDetails, IJobUpdateDetails>() {
@Override
- public IJobUpdateDetails apply(JobUpdateDetails input) {
- return IJobUpdateDetails.build(input);
+ public IJobUpdateDetails apply(StoredJobUpdateDetails input) {
+ return IJobUpdateDetails.build(input.getDetails());
}
});
}
@Override
- public List<IJobUpdateDetails> fetchAllJobUpdateDetails() {
- return IJobUpdateDetails.listFromBuilders(detailsMapper.selectAllDetails());
+ public Set<StoredJobUpdateDetails> fetchAllJobUpdateDetails() {
+ return ImmutableSet.copyOf(detailsMapper.selectAllDetails());
+ }
+
+ @Override
+ public boolean isActive(String updateId) {
+ // We assume here that cascading deletes will cause a lock-update associative row to disappear
+ // when the lock is invalidated. This further assumes that a lock row is deleted when a lock
+ // is no longer valid.
+ return detailsMapper.selectLockToken(updateId) != null;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1c4f5278/src/main/java/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.java b/src/main/java/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.java
index d590219..53f7a9b 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.java
@@ -19,11 +19,11 @@ import java.util.Set;
import javax.annotation.Nullable;
import org.apache.aurora.gen.JobUpdate;
-import org.apache.aurora.gen.JobUpdateDetails;
import org.apache.aurora.gen.JobUpdateQuery;
import org.apache.aurora.gen.JobUpdateSummary;
import org.apache.aurora.gen.Range;
import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.gen.storage.StoredJobUpdateDetails;
import org.apache.ibatis.annotations.Param;
/**
@@ -34,17 +34,26 @@ import org.apache.ibatis.annotations.Param;
interface JobUpdateDetailsMapper {
/**
- * Inserts new {@link JobUpdate}.
+ * Inserts new job update.
*
* @param jobUpdate Job update to insert.
*/
void insert(JobUpdate jobUpdate);
/**
- * Inserts {@link TaskConfig} entries associated with the current update.
+ * Inserts an association between an update and a lock.
+ *
+ * @param updateId Unique update identifier.
+ * @param lockToken Unique lock identifier, resulting from
+ * {@link org.apache.aurora.scheduler.storage.entities.ILock#getToken()}.
+ */
+ void insertLockToken(@Param("updateId") String updateId, @Param("lockToken") String lockToken);
+
+ /**
+ * Inserts a task configuration entry for an update.
*
* @param updateId Update ID to insert task configs for.
- * @param taskConfig {@link TaskConfig} to insert.
+ * @param taskConfig task configuration to insert.
* @param isNew Flag to identify if the task config is existing {@code false} or
* desired {@code true}.
* @param result Container for auto-generated ID of the inserted job update row.
@@ -82,7 +91,7 @@ interface JobUpdateDetailsMapper {
void truncate();
/**
- * Gets all {@link JobUpdateSummary} matching the provided {@code query}.
+ * Gets all job update summaries matching the provided {@code query}.
* All {@code query} fields are ANDed together.
*
* @param query Query to filter results by.
@@ -91,18 +100,27 @@ interface JobUpdateDetailsMapper {
List<JobUpdateSummary> selectSummaries(JobUpdateQuery query);
/**
- * Gets {@link JobUpdateDetails} for the provided {@code updateId}.
+ * Gets details for the provided {@code updateId}.
*
* @param updateId Update ID to get.
- * @return {@link JobUpdateDetails} instance, if it exists.
+ * @return job update details for the provided update ID, if it exists.
*/
@Nullable
- JobUpdateDetails selectDetails(String updateId);
+ StoredJobUpdateDetails selectDetails(String updateId);
/**
- * Gets all stored {@link JobUpdateDetails}.
+ * Gets all stored job update details.
*
* @return All stored job update details.
*/
- List<JobUpdateDetails> selectAllDetails();
+ Set<StoredJobUpdateDetails> selectAllDetails();
+
+ /**
+ * Gets the token associated with an update.
+ *
+ * @param updateId Update identifier.
+ * @return The associated lock token, or {@code null} if no association exists.
+ */
+ @Nullable
+ String selectLockToken(String updateId);
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1c4f5278/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
index 342bab0..e3c20cb 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
@@ -461,7 +461,8 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
case SAVE_JOB_UPDATE:
writeBehindJobUpdateStore.saveJobUpdate(
- IJobUpdate.build(op.getSaveJobUpdate().getJobUpdate()));
+ IJobUpdate.build(op.getSaveJobUpdate().getJobUpdate()),
+ op.getSaveJobUpdate().getLockToken());
break;
case SAVE_JOB_UPDATE_EVENT:
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1c4f5278/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
index 3d291dd..8331c29 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
@@ -21,7 +21,6 @@ import java.util.logging.Logger;
import javax.inject.Inject;
import com.google.common.collect.ImmutableSet;
-
import com.twitter.common.inject.TimedInterceptor.Timed;
import com.twitter.common.util.BuildInfo;
import com.twitter.common.util.Clock;
@@ -35,6 +34,7 @@ import org.apache.aurora.gen.storage.QuotaConfiguration;
import org.apache.aurora.gen.storage.SchedulerMetadata;
import org.apache.aurora.gen.storage.Snapshot;
import org.apache.aurora.gen.storage.StoredJob;
+import org.apache.aurora.gen.storage.StoredJobUpdateDetails;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.storage.JobUpdateStore;
import org.apache.aurora.scheduler.storage.SnapshotStore;
@@ -48,7 +48,6 @@ import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
import org.apache.aurora.scheduler.storage.entities.ILock;
import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
@@ -68,6 +67,25 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> {
private static final Iterable<SnapshotField> SNAPSHOT_FIELDS = Arrays.asList(
new SnapshotField() {
+ // It's important for locks to be replayed first, since there are relations that expect
+ // references to be valid on insertion.
+ @Override
+ public void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
+ snapshot.setLocks(ILock.toBuildersSet(store.getLockStore().fetchLocks()));
+ }
+
+ @Override
+ public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) {
+ store.getLockStore().deleteLocks();
+
+ if (snapshot.isSetLocks()) {
+ for (Lock lock : snapshot.getLocks()) {
+ store.getLockStore().saveLock(ILock.build(lock));
+ }
+ }
+ }
+ },
+ new SnapshotField() {
@Override
public void saveToSnapshot(StoreProvider storeProvider, Snapshot snapshot) {
snapshot.setHostAttributes(
@@ -189,25 +207,7 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> {
new SnapshotField() {
@Override
public void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
- snapshot.setLocks(ILock.toBuildersSet(store.getLockStore().fetchLocks()));
- }
-
- @Override
- public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) {
- store.getLockStore().deleteLocks();
-
- if (snapshot.isSetLocks()) {
- for (Lock lock : snapshot.getLocks()) {
- store.getLockStore().saveLock(ILock.build(lock));
- }
- }
- }
- },
- new SnapshotField() {
- @Override
- public void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
- snapshot.setJobUpdateDetails(IJobUpdateDetails.toBuildersSet(
- store.getJobUpdateStore().fetchAllJobUpdateDetails()));
+ snapshot.setJobUpdateDetails(store.getJobUpdateStore().fetchAllJobUpdateDetails());
}
@Override
@@ -216,8 +216,11 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> {
updateStore.deleteAllUpdatesAndEvents();
if (snapshot.isSetJobUpdateDetails()) {
- for (JobUpdateDetails details : snapshot.getJobUpdateDetails()) {
- updateStore.saveJobUpdate(IJobUpdate.build(details.getUpdate()));
+ for (StoredJobUpdateDetails storedDetails : snapshot.getJobUpdateDetails()) {
+ JobUpdateDetails details = storedDetails.getDetails();
+ updateStore.saveJobUpdate(
+ IJobUpdate.build(details.getUpdate()),
+ storedDetails.getLockToken());
for (JobUpdateEvent updateEvent : details.getUpdateEvents()) {
updateStore.saveJobUpdateEvent(
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1c4f5278/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
index 2915ff0..2b0a6a1 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
@@ -281,11 +281,11 @@ class WriteAheadStorage extends ForwardingStore implements
}
@Override
- public void saveJobUpdate(IJobUpdate update) {
+ public void saveJobUpdate(IJobUpdate update, String lockToken) {
requireNonNull(update);
- write(Op.saveJobUpdate(new SaveJobUpdate(update.newBuilder())));
- jobUpdateStore.saveJobUpdate(update);
+ write(Op.saveJobUpdate(new SaveJobUpdate(update.newBuilder(), lockToken)));
+ jobUpdateStore.saveJobUpdate(update, lockToken);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1c4f5278/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
index 0802ee0..7ef2885 100644
--- a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
@@ -1265,7 +1265,6 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
requireNonNull(mutableRequest);
requireNonNull(session);
- final ILock lock = ILock.build(requireNonNull(mutableLock));
final Response response = emptyResponse();
final SessionContext context;
@@ -1288,10 +1287,12 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
return storage.write(new MutateWork.Quiet<Response>() {
@Override
public Response apply(MutableStoreProvider storeProvider) {
+ // TODO(wfarner): Move lock acquisition down into the update controller once introduced.
+ ILock lock;
try {
- lockManager.validateIfLocked(
+ lock = lockManager.acquireLock(
ILockKey.build(LockKey.job(request.getJobKey().newBuilder())),
- Optional.of(lock));
+ context.getIdentity());
} catch (LockException e) {
return addMessage(response, LOCK_ERROR, e);
}
@@ -1299,7 +1300,8 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
// TODO(maxim): Wire in task limits and quota checks from SchedulerCore.
try {
- String updateId = jobUpdater.startJobUpdate(request, context.getIdentity());
+ String updateId =
+ jobUpdater.startJobUpdate(request, context.getIdentity(), lock.getToken());
return okResponse(Result.startJobUpdateResult(new StartJobUpdateResult(updateId)));
} catch (UpdaterException e) {
return addMessage(response, INVALID_REQUEST, e);
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1c4f5278/src/main/resources/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.xml
----------------------------------------------------------------------
diff --git a/src/main/resources/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.xml b/src/main/resources/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.xml
index 17c58b1..e59e6f7 100644
--- a/src/main/resources/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.xml
+++ b/src/main/resources/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.xml
@@ -17,6 +17,14 @@
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="org.apache.aurora.scheduler.storage.db.JobUpdateDetailsMapper">
+ <sql id="selectUpdateIdentity">
+ (
+ SELECT id
+ FROM job_updates
+ WHERE update_id = #{updateId}
+ )
+ </sql>
+
<insert id="insert">
INSERT INTO job_updates (
job_key_id,
@@ -49,17 +57,23 @@
)
</insert>
+ <insert id="insertLockToken">
+ INSERT INTO job_update_locks (
+ update_id,
+ lock_token
+ ) VALUES (
+ <include refid="selectUpdateIdentity"/>,
+ #{lockToken}
+ )
+ </insert>
+
<insert id="insertTaskConfig" useGeneratedKeys="true" keyColumn="id" keyProperty="result.id">
INSERT INTO job_update_configs (
update_id,
task_config,
is_new
) VALUES (
- (
- SELECT id
- FROM job_updates
- WHERE update_id = #{updateId}
- ),
+ <include refid="selectUpdateIdentity"/>,
#{config, typeHandler=org.apache.aurora.scheduler.storage.db.typehandlers.TaskConfigTypeHandler},
#{isNew}
)
@@ -85,11 +99,7 @@
last
) VALUES
<foreach item="element" collection="ranges" open="(" separator="),(" close=")">
- (
- SELECT id
- FROM job_updates
- WHERE update_id = #{updateId}
- ),
+ <include refid="selectUpdateIdentity"/>,
#{element.first},
#{element.last}
</foreach>
@@ -150,12 +160,12 @@
<association property="configuration" resultMap="jobUpdateConfigurationMap" columnPrefix="juc_"/>
</resultMap>
- <resultMap id="jobUpdateDetailsMap" type="org.apache.aurora.gen.JobUpdateDetails">
+ <resultMap id="jobUpdateDetailsMap" type="org.apache.aurora.gen.storage.StoredJobUpdateDetails">
<id column="u_id" />
- <association property="update" resultMap="jobUpdateMap" />
+ <association property="details.update" resultMap="jobUpdateMap" />
<!--Using notNullColumn attribute is required below as LEFT JOIN with empty right side
will produce an empty row.-->
- <collection property="updateEvents"
+ <collection property="details.updateEvents"
ofType="org.apache.aurora.gen.JobUpdateEvent"
columnPrefix="e_"
notNullColumn="id">
@@ -164,7 +174,7 @@
column="status"
typeHandler="org.apache.aurora.scheduler.storage.db.typehandlers.JobUpdateStatusTypeHandler" />
</collection>
- <collection property="instanceEvents"
+ <collection property="details.instanceEvents"
ofType="org.apache.aurora.gen.JobInstanceUpdateEvent"
columnPrefix="i_"
notNullColumn="id">
@@ -262,7 +272,7 @@
attributes used in associations.
For example: jusm_just_status maps to JobUpdateSummary/JobUpdateState/status field.-->
<sql id="unscoped_details_select">
- SELECT
+ SELECT
u.id AS u_id,
u.id AS juc_juse_id,
u.update_id AS jusm_update_id,
@@ -297,7 +307,8 @@
ci.last AS juc_itc_r_last,
io.id AS juc_juse_r_id,
io.first AS juc_juse_r_first,
- io.last AS juc_juse_r_last
+ io.last AS juc_juse_r_last,
+ l.lock_token AS lock_token
FROM job_updates AS u
INNER JOIN job_keys AS j ON j.id = u.job_key_id
INNER JOIN job_update_configs AS cn ON cn.update_id = u.id AND cn.is_new = TRUE
@@ -307,6 +318,7 @@
LEFT OUTER JOIN job_updates_to_instance_overrides AS io ON io.update_id = u.id
LEFT OUTER JOIN job_update_events AS e ON e.update_id = u.id
LEFT OUTER JOIN job_instance_update_events AS i ON i.update_id = u.id
+ LEFT OUTER JOIN job_update_locks AS l on l.update_id = u.id
</sql>
<select id="selectDetails" resultMap="jobUpdateDetailsMap">
@@ -315,10 +327,16 @@
ORDER BY e_timestamp_ms, i_timestamp_ms
</select>
- <!--Order by ID to facilitate proper re-insertion during recovery.-->
<select id="selectAllDetails" resultMap="jobUpdateDetailsMap">
<include refid="unscoped_details_select"/>
- ORDER BY u_id
+ </select>
+
+ <select id="selectLockToken" resultType="String">
+ SELECT
+ lock_token
+ FROM job_update_locks AS l
+ INNER JOIN job_updates u ON l.update_id = u.id
+ WHERE u.update_id = #{id}
</select>
<delete id="truncate">
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1c4f5278/src/main/resources/org/apache/aurora/scheduler/storage/db/schema.sql
----------------------------------------------------------------------
diff --git a/src/main/resources/org/apache/aurora/scheduler/storage/db/schema.sql b/src/main/resources/org/apache/aurora/scheduler/storage/db/schema.sql
index 1cf803f..a450a09 100644
--- a/src/main/resources/org/apache/aurora/scheduler/storage/db/schema.sql
+++ b/src/main/resources/org/apache/aurora/scheduler/storage/db/schema.sql
@@ -39,7 +39,8 @@ CREATE TABLE locks(
timestampMs BIGINT NOT NULL,
message VARCHAR,
- UNIQUE(job_key_id)
+ UNIQUE(job_key_id),
+ UNIQUE(token)
);
CREATE TABLE quotas(
@@ -109,6 +110,15 @@ CREATE TABLE job_updates(
UNIQUE(update_id)
);
+CREATE TABLE job_update_locks(
+ id IDENTITY,
+ update_id BIGINT NOT NULL REFERENCES job_updates(id) ON DELETE CASCADE,
+ lock_token VARCHAR NOT NULL REFERENCES locks(token) ON DELETE CASCADE,
+
+ UNIQUE(update_id),
+ UNIQUE(lock_token)
+);
+
CREATE TABLE job_update_configs(
id IDENTITY,
update_id BIGINT NOT NULL REFERENCES job_updates(id) ON DELETE CASCADE,
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1c4f5278/src/main/thrift/org/apache/aurora/gen/storage.thrift
----------------------------------------------------------------------
diff --git a/src/main/thrift/org/apache/aurora/gen/storage.thrift b/src/main/thrift/org/apache/aurora/gen/storage.thrift
index 9f8378e..7e50245 100644
--- a/src/main/thrift/org/apache/aurora/gen/storage.thrift
+++ b/src/main/thrift/org/apache/aurora/gen/storage.thrift
@@ -69,10 +69,18 @@ struct SaveHostAttributes {
struct SaveJobUpdate {
1: api.JobUpdate jobUpdate
+ 2: string lockToken
+}
+
+struct StoredJobUpdateDetails {
+ 1: api.JobUpdateDetails details
+ /** ID of the lock associated with this update. */
+ 2: string lockToken
}
struct SaveJobUpdateEvent {
1: api.JobUpdateEvent event
+ /** ID of the lock associated with this update. */
2: string updateId
}
@@ -147,7 +155,7 @@ struct Snapshot {
6: SchedulerMetadata schedulerMetadata
8: set<QuotaConfiguration> quotaConfigurations
9: set<api.Lock> locks
- 10: set<api.JobUpdateDetails> jobUpdateDetails
+ 10: set<StoredJobUpdateDetails> jobUpdateDetails
}
// A message header that calls out the number of expected FrameChunks to follow to form a complete
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1c4f5278/src/main/thrift/org/apache/aurora/gen/storage_local.thrift
----------------------------------------------------------------------
diff --git a/src/main/thrift/org/apache/aurora/gen/storage_local.thrift b/src/main/thrift/org/apache/aurora/gen/storage_local.thrift
deleted file mode 100644
index becfd75..0000000
--- a/src/main/thrift/org/apache/aurora/gen/storage_local.thrift
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// Thrift structures for a local log storage system, for use in simulated environments.
-namespace java org.apache.aurora.gen.test
-
-struct LogRecord {
- 1: binary contents
-}
-
-struct FileLogContents {
- 1: map<i64, LogRecord> records
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1c4f5278/src/test/java/org/apache/aurora/scheduler/state/JobUpdaterImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/JobUpdaterImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/JobUpdaterImplTest.java
index 1f985fb..90b4e8a 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/JobUpdaterImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/JobUpdaterImplTest.java
@@ -59,6 +59,7 @@ public class JobUpdaterImplTest extends EasyMockTest {
private static final IJobKey JOB = JobKeys.from("role", "env", "name");
private static final Identity IDENTITY = new Identity("role", "user");
private static final Long TIMESTAMP = 1234L;
+ private static final String LOCK = "lock token";
private JobUpdater updater;
private StorageTestUtil storageUtil;
@@ -106,12 +107,14 @@ public class JobUpdaterImplTest extends EasyMockTest {
oldTask6,
oldTask7);
- storageUtil.updateStore.saveJobUpdate(update);
+ storageUtil.updateStore.saveJobUpdate(update, LOCK);
storageUtil.updateStore.saveJobUpdateEvent(buildUpdateEvent(), UPDATE_ID);
control.replay();
- assertEquals(UPDATE_ID, updater.startJobUpdate(buildJobRequest(update), IDENTITY.getUser()));
+ assertEquals(
+ UPDATE_ID,
+ updater.startJobUpdate(buildJobRequest(update), IDENTITY.getUser(), LOCK));
}
@Test(expected = UpdaterException.class)
@@ -128,12 +131,12 @@ public class JobUpdaterImplTest extends EasyMockTest {
storageUtil.expectTaskFetch(Query.unscoped().byJob(JOB).active());
- storageUtil.updateStore.saveJobUpdate(update);
+ storageUtil.updateStore.saveJobUpdate(update, LOCK);
expectLastCall().andThrow(new Storage.StorageException("fail"));
control.replay();
- updater.startJobUpdate(buildJobRequest(update), IDENTITY.getUser());
+ updater.startJobUpdate(buildJobRequest(update), IDENTITY.getUser(), LOCK);
}
private static IJobUpdateRequest buildJobRequest(IJobUpdate update) {
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1c4f5278/src/test/java/org/apache/aurora/scheduler/storage/backup/RecoveryTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/backup/RecoveryTest.java b/src/test/java/org/apache/aurora/scheduler/storage/backup/RecoveryTest.java
index 2f2c3e1..5ac15ec 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/backup/RecoveryTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/backup/RecoveryTest.java
@@ -27,7 +27,6 @@ import com.twitter.common.util.testing.FakeClock;
import org.apache.aurora.gen.AssignedTask;
import org.apache.aurora.gen.HostAttributes;
import org.apache.aurora.gen.Identity;
-import org.apache.aurora.gen.JobUpdateDetails;
import org.apache.aurora.gen.Lock;
import org.apache.aurora.gen.ScheduledTask;
import org.apache.aurora.gen.TaskConfig;
@@ -35,6 +34,7 @@ import org.apache.aurora.gen.storage.QuotaConfiguration;
import org.apache.aurora.gen.storage.SchedulerMetadata;
import org.apache.aurora.gen.storage.Snapshot;
import org.apache.aurora.gen.storage.StoredJob;
+import org.apache.aurora.gen.storage.StoredJobUpdateDetails;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.storage.DistributedSnapshotStore;
@@ -165,7 +165,7 @@ public class RecoveryTest extends EasyMockTest {
.setQuotaConfigurations(ImmutableSet.<QuotaConfiguration>of())
.setTasks(ImmutableSet.<ScheduledTask>builder().add(tasks).build())
.setLocks(ImmutableSet.<Lock>of())
- .setJobUpdateDetails(ImmutableSet.<JobUpdateDetails>of());
+ .setJobUpdateDetails(ImmutableSet.<StoredJobUpdateDetails>of());
}
private static ScheduledTask makeTask(String taskId) {
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1c4f5278/src/test/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStoreTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStoreTest.java b/src/test/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStoreTest.java
index f695b85..f669dbe 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStoreTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStoreTest.java
@@ -15,6 +15,7 @@
package org.apache.aurora.scheduler.storage.db;
import java.util.List;
+import java.util.Set;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
@@ -34,8 +35,11 @@ import org.apache.aurora.gen.JobUpdateSettings;
import org.apache.aurora.gen.JobUpdateState;
import org.apache.aurora.gen.JobUpdateStatus;
import org.apache.aurora.gen.JobUpdateSummary;
+import org.apache.aurora.gen.Lock;
+import org.apache.aurora.gen.LockKey;
import org.apache.aurora.gen.Range;
import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.gen.storage.StoredJobUpdateDetails;
import org.apache.aurora.scheduler.base.JobKeys;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
@@ -49,11 +53,14 @@ import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary;
+import org.apache.aurora.scheduler.storage.entities.ILock;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
public class DBJobUpdateStoreTest {
@@ -79,13 +86,13 @@ public class DBJobUpdateStoreTest {
String updateId1 = "u1";
String updateId2 = "u2";
- IJobUpdate update1 = makeJobUpdate(JOB, updateId1);
- IJobUpdate update2 = makeJobUpdate(JOB, updateId2);
+ IJobUpdate update1 = makeJobUpdate(JobKeys.from("role", "env", "name1"), updateId1);
+ IJobUpdate update2 = makeJobUpdate(JobKeys.from("role", "env", "name2"), updateId2);
- saveUpdate(update1);
+ saveUpdate(update1, "lock1");
assertEquals(populateExpected(update1), getUpdateDetails(updateId1).get().getUpdate());
- saveUpdate(update2);
+ saveUpdate(update2, "lock2");
assertEquals(populateExpected(update1), getUpdateDetails(updateId1).get().getUpdate());
}
@@ -100,7 +107,7 @@ public class DBJobUpdateStoreTest {
IJobUpdate expected = IJobUpdate.build(builder);
// Save with empty overrides.
- saveUpdate(expected);
+ saveUpdate(expected, "lock");
assertEquals(populateExpected(expected), getUpdateDetails(updateId).get().getUpdate());
}
@@ -116,7 +123,7 @@ public class DBJobUpdateStoreTest {
// Save with null overrides.
builder.getConfiguration().getSettings().setUpdateOnlyTheseInstances(null);
- saveUpdate(IJobUpdate.build(builder));
+ saveUpdate(IJobUpdate.build(builder), "lock");
assertEquals(populateExpected(expected), getUpdateDetails(updateId).get().getUpdate());
}
@@ -125,8 +132,8 @@ public class DBJobUpdateStoreTest {
String updateId = "u1";
IJobUpdate update = makeJobUpdate(JOB, updateId);
- saveUpdate(update);
- saveUpdate(update);
+ saveUpdate(update, "lock1");
+ saveUpdate(update, "lock2");
}
@Test
@@ -136,7 +143,7 @@ public class DBJobUpdateStoreTest {
IJobUpdateEvent event1 = makeJobUpdateEvent(JobUpdateStatus.ROLLING_FORWARD, 124L);
IJobUpdateEvent event2 = makeJobUpdateEvent(JobUpdateStatus.ROLL_FORWARD_PAUSED, 125L);
- saveUpdate(update);
+ saveUpdate(update, "lock1");
assertEquals(populateExpected(update), getUpdateDetails(updateId).get().getUpdate());
assertEquals(ImmutableList.of(FIRST_EVENT), getUpdateDetails(updateId).get().getUpdateEvents());
@@ -161,7 +168,7 @@ public class DBJobUpdateStoreTest {
IJobInstanceUpdateEvent event1 = makeJobInstanceEvent(0, 125L, JobUpdateAction.ADD_INSTANCE);
IJobInstanceUpdateEvent event2 = makeJobInstanceEvent(1, 126L, JobUpdateAction.ADD_INSTANCE);
- saveUpdate(update);
+ saveUpdate(update, "lock");
assertEquals(populateExpected(update), getUpdateDetails(updateId).get().getUpdate());
assertEquals(0, getUpdateDetails(updateId).get().getInstanceEvents().size());
@@ -201,7 +208,7 @@ public class DBJobUpdateStoreTest {
567L,
567L);
- saveUpdate(update);
+ saveUpdate(update, "lock1");
// Assert state fields were ignored.
assertEquals(populateExpected(update), getUpdateDetails(updateId).get().getUpdate());
@@ -210,11 +217,12 @@ public class DBJobUpdateStoreTest {
@Test
public void testSaveJobUpdateWithoutEventFailsSelect() {
final String updateId = "u3";
- storage.write(new MutateWork.Quiet<Void>() {
+ storage.write(new MutateWork.NoResult.Quiet() {
@Override
- public Void apply(MutableStoreProvider storeProvider) {
- storeProvider.getJobUpdateStore().saveJobUpdate(makeJobUpdate(JOB, updateId));
- return null;
+ public void execute(MutableStoreProvider storeProvider) {
+ IJobUpdate update = makeJobUpdate(JOB, updateId);
+ storeProvider.getLockStore().saveLock(makeLock(update.getSummary().getJobKey(), "lock1"));
+ storeProvider.getJobUpdateStore().saveJobUpdate(update, "lock1");
}
});
assertEquals(Optional.<IJobUpdateDetails>absent(), getUpdateDetails(updateId));
@@ -224,16 +232,18 @@ public class DBJobUpdateStoreTest {
public void testMultipleJobDetails() {
String updateId1 = "u1";
String updateId2 = "u2";
- IJobUpdateDetails details1 = makeJobDetails(makeJobUpdate(JOB, updateId1));
- IJobUpdateDetails details2 = makeJobDetails(makeJobUpdate(JOB, updateId2));
+ IJobUpdateDetails details1 =
+ makeJobDetails(makeJobUpdate(JobKeys.from("role", "env", "name1"), updateId1));
+ IJobUpdateDetails details2 =
+ makeJobDetails(makeJobUpdate(JobKeys.from("role", "env", "name2"), updateId2));
- saveUpdate(details1.getUpdate());
- saveUpdate(details2.getUpdate());
+ saveUpdate(details1.getUpdate(), "lock1");
+ saveUpdate(details2.getUpdate(), "lock2");
details1 = updateJobDetails(populateExpected(details1.getUpdate()), FIRST_EVENT);
details2 = updateJobDetails(populateExpected(details2.getUpdate()), FIRST_EVENT);
- assertEquals(details1, getUpdateDetails(updateId1).get());
- assertEquals(details2, getUpdateDetails(updateId2).get());
+ assertEquals(Optional.of(details1), getUpdateDetails(updateId1));
+ assertEquals(Optional.of(details2), getUpdateDetails(updateId2));
IJobUpdateEvent jEvent11 = makeJobUpdateEvent(JobUpdateStatus.ROLLING_FORWARD, 456L);
IJobUpdateEvent jEvent12 = makeJobUpdateEvent(JobUpdateStatus.ERROR, 457L);
@@ -263,10 +273,14 @@ public class DBJobUpdateStoreTest {
populateExpected(details2.getUpdate(), JobUpdateStatus.ABORTED, CREATED_MS, 568L),
ImmutableList.of(FIRST_EVENT, jEvent21, jEvent22), ImmutableList.of(iEvent21, iEvent22));
- assertEquals(details1, getUpdateDetails(updateId1).get());
- assertEquals(details2, getUpdateDetails(updateId2).get());
+ assertEquals(Optional.of(details1), getUpdateDetails(updateId1));
+ assertEquals(Optional.of(details2), getUpdateDetails(updateId2));
- assertEquals(ImmutableList.of(details1, details2), getAllUpdateDetails());
+ assertEquals(
+ ImmutableSet.of(
+ new StoredJobUpdateDetails(details1.newBuilder(), "lock1"),
+ new StoredJobUpdateDetails(details2.newBuilder(), "lock2")),
+ getAllUpdateDetails());
}
@Test
@@ -278,7 +292,7 @@ public class DBJobUpdateStoreTest {
IJobInstanceUpdateEvent instanceEvent = IJobInstanceUpdateEvent.build(
new JobInstanceUpdateEvent(0, 125L, JobUpdateAction.ADD_INSTANCE));
- saveUpdate(update);
+ saveUpdate(update, "lock");
saveJobEvent(updateEvent, updateId);
saveJobInstanceEvent(instanceEvent, updateId);
assertEquals(
@@ -291,14 +305,107 @@ public class DBJobUpdateStoreTest {
assertEquals(Optional.<IJobUpdateDetails>absent(), getUpdateDetails(updateId));
}
+ @Test(expected = StorageException.class)
+ public void testSaveUpdateWithoutLock() {
+ final IJobUpdate update = makeJobUpdate(JOB, "updateId");
+ storage.write(new MutateWork.NoResult.Quiet() {
+ @Override
+ public void execute(MutableStoreProvider storeProvider) {
+ storeProvider.getJobUpdateStore().saveJobUpdate(update, "lock");
+ }
+ });
+ }
+
+ @Test(expected = StorageException.class)
+ public void testSaveTwoUpdatesForOneJob() {
+ final IJobUpdate update = makeJobUpdate(JOB, "updateId");
+ saveUpdate(update, "lock1");
+ saveUpdate(update, "lock2");
+ }
+
+ @Test(expected = StorageException.class)
+ public void testSaveTwoUpdatesSameJobKey() {
+ final IJobUpdate update1 = makeJobUpdate(JOB, "update1");
+ final IJobUpdate update2 = makeJobUpdate(JOB, "update2");
+ saveUpdate(update1, "lock1");
+ saveUpdate(update2, "lock1");
+ }
+
+ @Test
+ public void testLockCleared() {
+ final IJobUpdate update = makeJobUpdate(JOB, "update1");
+ saveUpdate(update, "lock1");
+
+ storage.write(new MutateWork.NoResult.Quiet() {
+ @Override
+ public void execute(MutableStoreProvider storeProvider) {
+ storeProvider.getLockStore().removeLock(
+ makeLock(update.getSummary().getJobKey(), "lock1").getKey());
+ }
+ });
+
+ assertEquals(
+ Optional.of(updateJobDetails(populateExpected(update), FIRST_EVENT)),
+ getUpdateDetails("update1"));
+ assertEquals(
+ ImmutableSet.of(
+ new StoredJobUpdateDetails(
+ updateJobDetails(populateExpected(update), FIRST_EVENT).newBuilder(),
+ null)),
+ getAllUpdateDetails());
+
+ assertEquals(
+ ImmutableList.of(populateExpected(update).getSummary()),
+ getSummaries(new JobUpdateQuery().setUpdateId("update1")));
+
+ // If the lock has been released for this job, we can start another update.
+ saveUpdate(makeJobUpdate(JOB, "update2"), "lock2");
+ }
+
+ @Test
+ public void testIsActive() {
+ storage.write(new MutateWork.NoResult.Quiet() {
+ @Override
+ public void execute(MutableStoreProvider storeProvider) {
+ final IJobUpdate update1 = makeJobUpdate(JobKeys.from("role", "env", "name1"), "update1");
+ final IJobUpdate update2 = makeJobUpdate(JobKeys.from("role", "env", "name2"), "update2");
+ saveUpdate(update1, "lock1");
+ assertTrue(storeProvider.getJobUpdateStore().isActive("update1"));
+ assertFalse(storeProvider.getJobUpdateStore().isActive("update2"));
+
+ saveUpdate(update2, "lock2");
+ assertTrue(storeProvider.getJobUpdateStore().isActive("update1"));
+ assertTrue(storeProvider.getJobUpdateStore().isActive("update2"));
+
+ storeProvider.getLockStore().removeLock(
+ makeLock(update1.getSummary().getJobKey(), "lock1").getKey());
+ assertFalse(storeProvider.getJobUpdateStore().isActive("update1"));
+ assertTrue(storeProvider.getJobUpdateStore().isActive("update2"));
+
+ storeProvider.getLockStore().removeLock(
+ makeLock(update2.getSummary().getJobKey(), "lock2").getKey());
+ assertFalse(storeProvider.getJobUpdateStore().isActive("update1"));
+ assertFalse(storeProvider.getJobUpdateStore().isActive("update2"));
+ }
+ });
+ }
+
@Test
public void testGetSummaries() {
- IJobKey job2 = JobKeys.from("role", "env", "name");
- IJobUpdateSummary s1 = saveSummary(JOB, "u1", 1230L, JobUpdateStatus.ROLLED_BACK, "user");
- IJobUpdateSummary s2 = saveSummary(JOB, "u2", 1231L, JobUpdateStatus.ABORTED, "user");
- IJobUpdateSummary s3 = saveSummary(JOB, "u3", 1239L, JobUpdateStatus.ERROR, "user2");
- IJobUpdateSummary s4 = saveSummary(JOB, "u4", 1234L, JobUpdateStatus.ROLL_BACK_PAUSED, "user3");
- IJobUpdateSummary s5 = saveSummary(job2, "u5", 1235L, JobUpdateStatus.ROLLING_FORWARD, "user4");
+ String role1 = "role1";
+ IJobKey job1 = JobKeys.from(role1, "env", "name1");
+ IJobKey job2 = JobKeys.from(role1, "env", "name2");
+ IJobKey job3 = JobKeys.from(role1, "env", "name3");
+ IJobKey job4 = JobKeys.from(role1, "env", "name4");
+ IJobKey job5 = JobKeys.from("role", "env", "name5");
+ IJobUpdateSummary s1 =
+ saveSummary(job1, "u1", 1230L, JobUpdateStatus.ROLLED_BACK, "user", "lock1");
+ IJobUpdateSummary s2 = saveSummary(job2, "u2", 1231L, JobUpdateStatus.ABORTED, "user", "lock2");
+ IJobUpdateSummary s3 = saveSummary(job3, "u3", 1239L, JobUpdateStatus.ERROR, "user2", "lock3");
+ IJobUpdateSummary s4 =
+ saveSummary(job4, "u4", 1234L, JobUpdateStatus.ROLL_BACK_PAUSED, "user3", "lock4");
+ IJobUpdateSummary s5 =
+ saveSummary(job5, "u5", 1235L, JobUpdateStatus.ROLLING_FORWARD, "user4", "lock5");
// Test empty query returns all.
assertEquals(ImmutableList.of(s1, s2, s4, s5, s3), getSummaries(new JobUpdateQuery()));
@@ -309,12 +416,12 @@ public class DBJobUpdateStoreTest {
// Test query by role.
assertEquals(
ImmutableList.of(s1, s2, s4, s3),
- getSummaries(new JobUpdateQuery().setRole(JOB.getRole())));
+ getSummaries(new JobUpdateQuery().setRole(role1)));
// Test query by job key.
assertEquals(
ImmutableList.of(s5),
- getSummaries(new JobUpdateQuery().setJobKey(job2.newBuilder())));
+ getSummaries(new JobUpdateQuery().setJobKey(job5.newBuilder())));
// Test query by user.
assertEquals(ImmutableList.of(s1, s2), getSummaries(new JobUpdateQuery().setUser("user")));
@@ -361,10 +468,10 @@ public class DBJobUpdateStoreTest {
});
}
- private List<IJobUpdateDetails> getAllUpdateDetails() {
- return storage.consistentRead(new Quiet<List<IJobUpdateDetails>>() {
+ private Set<StoredJobUpdateDetails> getAllUpdateDetails() {
+ return storage.consistentRead(new Quiet<Set<StoredJobUpdateDetails>>() {
@Override
- public List<IJobUpdateDetails> apply(Storage.StoreProvider storeProvider) {
+ public Set<StoredJobUpdateDetails> apply(Storage.StoreProvider storeProvider) {
return storeProvider.getJobUpdateStore().fetchAllJobUpdateDetails();
}
});
@@ -380,45 +487,50 @@ public class DBJobUpdateStoreTest {
});
}
- private void saveUpdate(final IJobUpdate update) {
- storage.write(new MutateWork.Quiet<Void>() {
+ private static ILock makeLock(IJobKey jobKey, String lockToken) {
+ return ILock.build(new Lock()
+ .setKey(LockKey.job(jobKey.newBuilder()))
+ .setToken(lockToken)
+ .setTimestampMs(100)
+ .setUser("fake user"));
+ }
+
+ private void saveUpdate(final IJobUpdate update, final String lockToken) {
+ storage.write(new MutateWork.NoResult.Quiet() {
@Override
- public Void apply(MutableStoreProvider storeProvider) {
- storeProvider.getJobUpdateStore().saveJobUpdate(update);
+ public void execute(MutableStoreProvider storeProvider) {
+ storeProvider.getLockStore().saveLock(makeLock(update.getSummary().getJobKey(), lockToken));
+ storeProvider.getJobUpdateStore().saveJobUpdate(update, lockToken);
storeProvider.getJobUpdateStore().saveJobUpdateEvent(
FIRST_EVENT,
update.getSummary().getUpdateId());
- return null;
}
});
}
private void saveJobEvent(final IJobUpdateEvent event, final String updateId) {
- storage.write(new MutateWork.Quiet<Void>() {
+ storage.write(new MutateWork.NoResult.Quiet() {
@Override
- public Void apply(MutableStoreProvider storeProvider) {
+ public void execute(MutableStoreProvider storeProvider) {
storeProvider.getJobUpdateStore().saveJobUpdateEvent(event, updateId);
- return null;
}
});
}
private void saveJobInstanceEvent(final IJobInstanceUpdateEvent event, final String updateId) {
- storage.write(new MutateWork.Quiet<Void>() {
+ storage.write(new MutateWork.NoResult.Quiet() {
@Override
- public Void apply(MutableStoreProvider storeProvider) {
+ public void execute(MutableStoreProvider storeProvider) {
storeProvider.getJobUpdateStore().saveJobInstanceUpdateEvent(event, updateId);
- return null;
}
});
}
private void truncateUpdates() {
- storage.write(new MutateWork.Quiet<Void>() {
+ storage.write(new MutateWork.NoResult.Quiet() {
@Override
- public Void apply(MutableStoreProvider storeProvider) {
+ public void execute(MutableStoreProvider storeProvider) {
storeProvider.getJobUpdateStore().deleteAllUpdatesAndEvents();
- return null;
}
});
}
@@ -458,7 +570,7 @@ public class DBJobUpdateStoreTest {
private IJobUpdateDetails makeJobDetails(IJobUpdate update) {
return updateJobDetails(
update,
- ImmutableList.<IJobUpdateEvent>of(FIRST_EVENT),
+ ImmutableList.of(FIRST_EVENT),
ImmutableList.<IJobInstanceUpdateEvent>of());
}
@@ -492,7 +604,8 @@ public class DBJobUpdateStoreTest {
String updateId,
Long modifiedTimestampMs,
JobUpdateStatus status,
- String user) {
+ String user,
+ String lockToken) {
IJobUpdateSummary summary = IJobUpdateSummary.build(new JobUpdateSummary()
.setUpdateId(updateId)
@@ -500,7 +613,7 @@ public class DBJobUpdateStoreTest {
.setUser(user));
IJobUpdate update = makeJobUpdate(summary);
- saveUpdate(update);
+ saveUpdate(update, lockToken);
saveJobEvent(makeJobUpdateEvent(status, modifiedTimestampMs), updateId);
return populateExpected(update, status, CREATED_MS, modifiedTimestampMs).getSummary();
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1c4f5278/src/test/java/org/apache/aurora/scheduler/storage/db/DbLockStoreTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/db/DbLockStoreTest.java b/src/test/java/org/apache/aurora/scheduler/storage/db/DbLockStoreTest.java
index ae4cef4..e9b210f 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/db/DbLockStoreTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/db/DbLockStoreTest.java
@@ -85,10 +85,10 @@ public class DbLockStoreTest {
});
}
- private static ILock makeLock(JobKey key) {
+ private static ILock makeLock(JobKey key, String token) {
return ILock.build(new Lock()
.setKey(LockKey.job(key))
- .setToken("lock1")
+ .setToken(token)
.setUser("testUser")
.setMessage("Test message")
.setTimestampMs(12345L));
@@ -108,8 +108,8 @@ public class DbLockStoreTest {
String job1 = "testJob1";
String job2 = "testJob2";
- ILock lock1 = makeLock(JobKeys.from(role, env, job1).newBuilder());
- ILock lock2 = makeLock(JobKeys.from(role, env, job2).newBuilder());
+ ILock lock1 = makeLock(JobKeys.from(role, env, job1).newBuilder(), "token1");
+ ILock lock2 = makeLock(JobKeys.from(role, env, job2).newBuilder(), "token2");
saveLocks(lock1, lock2);
assertLocks(lock1, lock2);
@@ -126,7 +126,7 @@ public class DbLockStoreTest {
String env = "testEnv";
String job = "testJob";
- ILock lock = makeLock(JobKeys.from(role, env, job).newBuilder());
+ ILock lock = makeLock(JobKeys.from(role, env, job).newBuilder(), "token1");
saveLocks(lock);
try {
@@ -145,7 +145,7 @@ public class DbLockStoreTest {
String env = "testEnv";
String job = "testJob";
- ILock lock = makeLock(JobKeys.from(role, env, job).newBuilder());
+ ILock lock = makeLock(JobKeys.from(role, env, job).newBuilder(), "token1");
saveLocks(lock);
removeLocks(lock);
@@ -163,8 +163,8 @@ public class DbLockStoreTest {
String env = "testEnv";
String job = "testJob";
- ILock lock1 = makeLock(JobKeys.from(role1, env, job).newBuilder());
- ILock lock2 = makeLock(JobKeys.from(role2, env, job).newBuilder());
+ ILock lock1 = makeLock(JobKeys.from(role1, env, job).newBuilder(), "token1");
+ ILock lock2 = makeLock(JobKeys.from(role2, env, job).newBuilder(), "token2");
assertEquals(Optional.<ILock>absent(), getLock(lock1.getKey()));
assertEquals(Optional.<ILock>absent(), getLock(lock2.getKey()));
@@ -189,8 +189,8 @@ public class DbLockStoreTest {
String job1 = "testJob1";
String job2 = "testJob2";
- ILock lock1 = makeLock(JobKeys.from(role, env, job1).newBuilder());
- ILock lock2 = makeLock(JobKeys.from(role, env, job2).newBuilder());
+ ILock lock1 = makeLock(JobKeys.from(role, env, job1).newBuilder(), "token1");
+ ILock lock2 = makeLock(JobKeys.from(role, env, job2).newBuilder(), "token2");
saveLocks(lock1, lock2);
assertLocks(lock1, lock2);
@@ -205,4 +205,18 @@ public class DbLockStoreTest {
assertLocks();
}
+
+ @Test
+ public void testDuplicateToken() throws Exception {
+ ILock lock = makeLock(JobKeys.from("role", "env", "job1").newBuilder(), "token1");
+ saveLocks(lock);
+ try {
+ saveLocks(makeLock(JobKeys.from("role", "env", "job2").newBuilder(), "token1"));
+ fail();
+ } catch (StorageException e) {
+ // Expected.
+ }
+
+ assertLocks(lock);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1c4f5278/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
index ebcb910..78798f2 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
@@ -763,19 +763,21 @@ public class LogStorageTest extends EasyMockTest {
.setTask(new TaskConfig())
.setInstances(ImmutableSet.of(new Range(0, 3)))))
.setSettings(new JobUpdateSettings())));
+ final String lockToken = "token";
new MutationFixture() {
@Override
protected void setupExpectations() throws Exception {
storageUtil.expectWriteOperation();
- storageUtil.updateStore.saveJobUpdate(update);
- streamMatcher.expectTransaction(Op.saveJobUpdate(new SaveJobUpdate(update.newBuilder())))
+ storageUtil.updateStore.saveJobUpdate(update, lockToken);
+ streamMatcher.expectTransaction(
+ Op.saveJobUpdate(new SaveJobUpdate(update.newBuilder(), lockToken)))
.andReturn(position);
}
@Override
protected void performMutations(MutableStoreProvider storeProvider) {
- storeProvider.getJobUpdateStore().saveJobUpdate(update);
+ storeProvider.getJobUpdateStore().saveJobUpdate(update, lockToken);
}
}.run();
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1c4f5278/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplTest.java
index bee9c9c..c752e66 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplTest.java
@@ -42,6 +42,7 @@ import org.apache.aurora.gen.storage.QuotaConfiguration;
import org.apache.aurora.gen.storage.SchedulerMetadata;
import org.apache.aurora.gen.storage.Snapshot;
import org.apache.aurora.gen.storage.StoredJob;
+import org.apache.aurora.gen.storage.StoredJobUpdateDetails;
import org.apache.aurora.scheduler.base.JobKeys;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.ResourceAggregates;
@@ -117,8 +118,10 @@ public class SnapshotStoreImplTest extends EasyMockTest {
.andReturn(ImmutableSet.of(IJobConfiguration.build(job.getJobConfiguration())));
expect(storageUtil.schedulerStore.fetchFrameworkId()).andReturn(Optional.of(frameworkId));
expect(storageUtil.lockStore.fetchLocks()).andReturn(ImmutableSet.of(lock));
+ String lockToken = "token";
expect(storageUtil.updateStore.fetchAllJobUpdateDetails())
- .andReturn(ImmutableList.of(updateDetails));
+ .andReturn(ImmutableSet.of(
+ new StoredJobUpdateDetails(updateDetails.newBuilder(), lockToken)));
expectDataWipe();
storageUtil.taskStore.saveTasks(tasks);
@@ -129,7 +132,7 @@ public class SnapshotStoreImplTest extends EasyMockTest {
IJobConfiguration.build(job.getJobConfiguration()));
storageUtil.schedulerStore.saveFrameworkId(frameworkId);
storageUtil.lockStore.saveLock(lock);
- storageUtil.updateStore.saveJobUpdate(updateDetails.getUpdate());
+ storageUtil.updateStore.saveJobUpdate(updateDetails.getUpdate(), lockToken);
storageUtil.updateStore.saveJobUpdateEvent(
Iterables.getOnlyElement(updateDetails.getUpdateEvents()),
updateId);
@@ -147,7 +150,8 @@ public class SnapshotStoreImplTest extends EasyMockTest {
.setJobs(ImmutableSet.of(job))
.setSchedulerMetadata(metadata)
.setLocks(ImmutableSet.of(lock.newBuilder()))
- .setJobUpdateDetails(ImmutableSet.of(updateDetails.newBuilder()));
+ .setJobUpdateDetails(ImmutableSet.of(
+ new StoredJobUpdateDetails(updateDetails.newBuilder(), lockToken)));
assertEquals(expected, snapshotStore.createSnapshot());
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1c4f5278/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
index 7dbf97a..649afa2 100644
--- a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
@@ -164,7 +164,8 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
private static final SessionKey SESSION = new SessionKey();
private static final IJobKey JOB_KEY = JobKeys.from(ROLE, DEFAULT_ENVIRONMENT, JOB_NAME);
private static final ILockKey LOCK_KEY = ILockKey.build(LockKey.job(JOB_KEY.newBuilder()));
- private static final ILock LOCK = ILock.build(new Lock().setKey(LOCK_KEY.newBuilder()));
+ private static final ILock LOCK =
+ ILock.build(new Lock().setKey(LOCK_KEY.newBuilder()).setToken("token"));
private static final JobConfiguration CRON_JOB = makeJob().setCronSchedule("* * * * *");
private static final Lock DEFAULT_LOCK = null;
private static final String TASK_ID = "task_id";
@@ -1811,8 +1812,9 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
public void testStartUpdate() throws Exception {
JobUpdateRequest request = createJobRequest(populatedTask());
expectAuth(ROLE, true);
- lockManager.validateIfLocked(LOCK_KEY, Optional.of(LOCK));
- expect(jobUpdater.startJobUpdate(IJobUpdateRequest.build(request), USER)).andReturn(UPDATE_ID);
+ expect(lockManager.acquireLock(LOCK_KEY, USER)).andReturn(LOCK);
+ expect(jobUpdater.startJobUpdate(IJobUpdateRequest.build(request), USER, LOCK.getToken()))
+ .andReturn(UPDATE_ID);
control.replay();
@@ -1843,8 +1845,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
public void testStartUpdateFailsLockValidation() throws Exception {
JobUpdateRequest request = createJobRequest(populatedTask());
expectAuth(ROLE, true);
- lockManager.validateIfLocked(LOCK_KEY, Optional.of(LOCK));
- expectLastCall().andThrow(new LockException("lock failed"));
+ expect(lockManager.acquireLock(LOCK_KEY, USER)).andThrow(new LockException("lock failed"));
control.replay();
@@ -1855,8 +1856,8 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
public void testStartUpdateFailsInUpdater() throws Exception {
JobUpdateRequest request = createJobRequest(populatedTask());
expectAuth(ROLE, true);
- lockManager.validateIfLocked(LOCK_KEY, Optional.of(LOCK));
- expect(jobUpdater.startJobUpdate(IJobUpdateRequest.build(request), USER))
+ expect(lockManager.acquireLock(LOCK_KEY, USER)).andReturn(LOCK);
+ expect(jobUpdater.startJobUpdate(IJobUpdateRequest.build(request), USER, LOCK.getToken()))
.andThrow(new UpdaterException("failed update"));
control.replay();