You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2017/11/14 00:39:22 UTC
[2/2] aurora git commit: Remove LockStore
Remove LockStore
Reviewed at https://reviews.apache.org/r/63744/
Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/4fecf1f5
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/4fecf1f5
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/4fecf1f5
Branch: refs/heads/master
Commit: 4fecf1f594e09a5ed6909df49faeee51e5007f8e
Parents: fb64df2
Author: Bill Farner <wf...@apache.org>
Authored: Mon Nov 13 16:35:36 2017 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Mon Nov 13 16:35:36 2017 -0800
----------------------------------------------------------------------
.../thrift/org/apache/aurora/gen/api.thrift | 19 --
.../thrift/org/apache/aurora/gen/storage.thrift | 19 +-
.../org/apache/aurora/benchmark/JobUpdates.java | 13 +-
.../aurora/benchmark/ThriftApiBenchmarks.java | 2 -
.../aurora/benchmark/UpdateStoreBenchmarks.java | 3 -
.../ShiroAuthorizingParamInterceptor.java | 10 -
.../aurora/scheduler/state/LockManager.java | 50 -----
.../aurora/scheduler/state/LockManagerImpl.java | 93 ---------
.../aurora/scheduler/state/StateModule.java | 3 -
.../scheduler/storage/JobUpdateStore.java | 9 +-
.../aurora/scheduler/storage/LockStore.java | 61 ------
.../aurora/scheduler/storage/Storage.java | 2 -
.../scheduler/storage/log/LogStorage.java | 20 +-
.../storage/log/SnapshotStoreImpl.java | 37 +---
.../storage/log/WriteAheadStorage.java | 56 +-----
.../storage/mem/MemJobUpdateStore.java | 81 ++------
.../scheduler/storage/mem/MemLockStore.java | 72 -------
.../scheduler/storage/mem/MemStorage.java | 7 -
.../scheduler/storage/mem/MemStorageModule.java | 2 -
.../thrift/SchedulerThriftInterface.java | 24 +--
.../updater/JobUpdateControllerImpl.java | 19 +-
.../scheduler/http/AbstractJettyTest.java | 2 -
.../scheduler/state/LockManagerImplTest.java | 99 ---------
.../storage/AbstractJobUpdateStoreTest.java | 165 +++++----------
.../storage/AbstractLockStoreTest.java | 200 -------------------
.../scheduler/storage/backup/RecoveryTest.java | 1 -
.../scheduler/storage/log/LogStorageTest.java | 80 +-------
.../storage/log/SnapshotStoreImplIT.java | 11 +-
.../storage/log/WriteAheadStorageTest.java | 8 -
.../scheduler/storage/mem/MemLockStoreTest.java | 24 ---
.../storage/testing/StorageTestUtil.java | 5 -
.../aurora/scheduler/thrift/Fixtures.java | 3 -
.../thrift/SchedulerThriftInterfaceTest.java | 77 +------
.../aurora/scheduler/updater/JobUpdaterIT.java | 183 ++---------------
.../aurora/client/api/test_scheduler_client.py | 6 +-
35 files changed, 126 insertions(+), 1340 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aurora/blob/4fecf1f5/api/src/main/thrift/org/apache/aurora/gen/api.thrift
----------------------------------------------------------------------
diff --git a/api/src/main/thrift/org/apache/aurora/gen/api.thrift b/api/src/main/thrift/org/apache/aurora/gen/api.thrift
index c869493..1d36926 100644
--- a/api/src/main/thrift/org/apache/aurora/gen/api.thrift
+++ b/api/src/main/thrift/org/apache/aurora/gen/api.thrift
@@ -115,25 +115,6 @@ struct JobKey {
3: string name
}
-/** A unique lock key. */
-union LockKey {
- 1: JobKey job
-}
-
-/** A generic lock struct to facilitate context specific resource/operation serialization. */
-struct Lock {
- /** ID of the lock - unique per storage */
- 1: LockKey key
- /** UUID - facilitating soft lock authorization */
- 2: string token
- /** Lock creator */
- 3: string user
- /** Lock creation timestamp in milliseconds */
- 4: i64 timestampMs
- /** Optional message to record with the lock */
- 5: optional string message
-}
-
/** A unique identifier for the active task within a job. */
struct InstanceKey {
/** Key identifying the job. */
http://git-wip-us.apache.org/repos/asf/aurora/blob/4fecf1f5/api/src/main/thrift/org/apache/aurora/gen/storage.thrift
----------------------------------------------------------------------
diff --git a/api/src/main/thrift/org/apache/aurora/gen/storage.thrift b/api/src/main/thrift/org/apache/aurora/gen/storage.thrift
index 74983ba..c692a5f 100644
--- a/api/src/main/thrift/org/apache/aurora/gen/storage.thrift
+++ b/api/src/main/thrift/org/apache/aurora/gen/storage.thrift
@@ -28,14 +28,6 @@ struct SaveCronJob {
2: api.JobConfiguration jobConfig
}
-struct SaveLock {
- 1: api.Lock lock
-}
-
-struct RemoveLock {
- 1: api.LockKey lockKey
-}
-
struct RemoveJob {
2: api.JobKey jobKey
}
@@ -63,13 +55,12 @@ struct SaveHostAttributes {
struct SaveJobUpdate {
1: api.JobUpdate jobUpdate
- 2: string lockToken
+ // 2: deleted
}
struct StoredJobUpdateDetails {
1: api.JobUpdateDetails details
- /** ID of the lock associated with this update. */
- 2: string lockToken
+ // 2: deleted
}
struct SaveJobUpdateEvent {
@@ -97,8 +88,8 @@ union Op {
9: RemoveQuota removeQuota
10: SaveHostAttributes saveHostAttributes
// 11: removed
- 12: SaveLock saveLock
- 13: RemoveLock removeLock
+ // 12: deleted
+ // 13: deleted
14: SaveJobUpdate saveJobUpdate
15: SaveJobUpdateEvent saveJobUpdateEvent
16: SaveJobInstanceUpdateEvent saveJobInstanceUpdateEvent
@@ -142,7 +133,7 @@ struct Snapshot {
5: set<StoredCronJob> cronJobs
6: SchedulerMetadata schedulerMetadata
8: set<QuotaConfiguration> quotaConfigurations
- 9: set<api.Lock> locks
+ // 9: deleted
10: set<StoredJobUpdateDetails> jobUpdateDetails
//11: removed
//12: removed
http://git-wip-us.apache.org/repos/asf/aurora/blob/4fecf1f5/src/jmh/java/org/apache/aurora/benchmark/JobUpdates.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/JobUpdates.java b/src/jmh/java/org/apache/aurora/benchmark/JobUpdates.java
index cedddf4..a5d1894 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/JobUpdates.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/JobUpdates.java
@@ -17,7 +17,6 @@ import java.util.Arrays;
import java.util.Set;
import java.util.UUID;
-import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
@@ -33,8 +32,6 @@ import org.apache.aurora.gen.JobUpdateKey;
import org.apache.aurora.gen.JobUpdateSettings;
import org.apache.aurora.gen.JobUpdateStatus;
import org.apache.aurora.gen.JobUpdateSummary;
-import org.apache.aurora.gen.Lock;
-import org.apache.aurora.gen.LockKey;
import org.apache.aurora.gen.Metadata;
import org.apache.aurora.gen.Range;
import org.apache.aurora.gen.TaskConfig;
@@ -46,7 +43,6 @@ import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
-import org.apache.aurora.scheduler.storage.entities.ILock;
/**
* Job update factory.
@@ -71,14 +67,7 @@ final class JobUpdates {
for (IJobUpdateDetails details : updates) {
IJobUpdateKey key = details.getUpdate().getSummary().getKey();
keyBuilder.add(key);
- String lockToken = UUID.randomUUID().toString();
- store.getLockStore().saveLock(ILock.build(new Lock()
- .setKey(LockKey.job(key.getJob().newBuilder()))
- .setToken(lockToken)
- .setUser(Builder.USER)
- .setTimestampMs(0L)));
-
- updateStore.saveJobUpdate(details.getUpdate(), Optional.of(lockToken));
+ updateStore.saveJobUpdate(details.getUpdate());
for (IJobUpdateEvent updateEvent : details.getUpdateEvents()) {
updateStore.saveJobUpdateEvent(key, updateEvent);
http://git-wip-us.apache.org/repos/asf/aurora/blob/4fecf1f5/src/jmh/java/org/apache/aurora/benchmark/ThriftApiBenchmarks.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/ThriftApiBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/ThriftApiBenchmarks.java
index 7ccdb11..05071a5 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/ThriftApiBenchmarks.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/ThriftApiBenchmarks.java
@@ -35,7 +35,6 @@ import org.apache.aurora.scheduler.base.TaskTestUtil;
import org.apache.aurora.scheduler.configuration.ConfigurationManager;
import org.apache.aurora.scheduler.cron.CronPredictor;
import org.apache.aurora.scheduler.quota.QuotaManager;
-import org.apache.aurora.scheduler.state.LockManager;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
@@ -148,7 +147,6 @@ public class ThriftApiBenchmarks {
bind(Clock.class).toInstance(Clock.SYSTEM_CLOCK);
bind(CronPredictor.class).toInstance(createThrowingFake(CronPredictor.class));
bind(QuotaManager.class).toInstance(createThrowingFake(QuotaManager.class));
- bind(LockManager.class).toInstance(createThrowingFake(LockManager.class));
bind(StatsProvider.class).toInstance(new FakeStatsProvider());
bind(ConfigurationManager.class).toInstance(TaskTestUtil.CONFIGURATION_MANAGER);
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/4fecf1f5/src/jmh/java/org/apache/aurora/benchmark/UpdateStoreBenchmarks.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/UpdateStoreBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/UpdateStoreBenchmarks.java
index 992e950..c98c514 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/UpdateStoreBenchmarks.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/UpdateStoreBenchmarks.java
@@ -68,7 +68,6 @@ public class UpdateStoreBenchmarks {
public void tearDownIteration() {
storage.write((NoResult.Quiet) storeProvider -> {
storeProvider.getJobUpdateStore().deleteAllUpdatesAndEvents();
- storeProvider.getLockStore().deleteLocks();
});
}
@@ -108,7 +107,6 @@ public class UpdateStoreBenchmarks {
public void tearDownIteration() {
storage.write((NoResult.Quiet) storeProvider -> {
storeProvider.getJobUpdateStore().deleteAllUpdatesAndEvents();
- storeProvider.getLockStore().deleteLocks();
});
}
@@ -148,7 +146,6 @@ public class UpdateStoreBenchmarks {
public void tearDownIteration() {
storage.write((NoResult.Quiet) storeProvider -> {
storeProvider.getJobUpdateStore().deleteAllUpdatesAndEvents();
- storeProvider.getLockStore().deleteLocks();
});
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/4fecf1f5/src/main/java/org/apache/aurora/scheduler/http/api/security/ShiroAuthorizingParamInterceptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/api/security/ShiroAuthorizingParamInterceptor.java b/src/main/java/org/apache/aurora/scheduler/http/api/security/ShiroAuthorizingParamInterceptor.java
index 474a403..203599e 100644
--- a/src/main/java/org/apache/aurora/scheduler/http/api/security/ShiroAuthorizingParamInterceptor.java
+++ b/src/main/java/org/apache/aurora/scheduler/http/api/security/ShiroAuthorizingParamInterceptor.java
@@ -43,8 +43,6 @@ import org.apache.aurora.gen.JobConfiguration;
import org.apache.aurora.gen.JobKey;
import org.apache.aurora.gen.JobUpdateKey;
import org.apache.aurora.gen.JobUpdateRequest;
-import org.apache.aurora.gen.Lock;
-import org.apache.aurora.gen.LockKey;
import org.apache.aurora.gen.Response;
import org.apache.aurora.gen.ResponseCode;
import org.apache.aurora.gen.TaskConfig;
@@ -106,12 +104,6 @@ class ShiroAuthorizingParamInterceptor implements MethodInterceptor {
private static final FieldGetter<JobConfiguration, JobKey> JOB_CONFIGURATION_GETTER =
new ThriftFieldGetter<>(JobConfiguration.class, JobConfiguration._Fields.KEY, JobKey.class);
- private static final FieldGetter<Lock, LockKey> LOCK_GETTER =
- new ThriftFieldGetter<>(Lock.class, Lock._Fields.KEY, LockKey.class);
-
- private static final FieldGetter<LockKey, JobKey> LOCK_KEY_GETTER =
- new ThriftFieldGetter<>(LockKey.class, LockKey._Fields.JOB, JobKey.class);
-
private static final FieldGetter<JobUpdateKey, JobKey> JOB_UPDATE_KEY_GETTER =
new ThriftFieldGetter<>(JobUpdateKey.class, JobUpdateKey._Fields.JOB, JobKey.class);
@@ -124,8 +116,6 @@ class ShiroAuthorizingParamInterceptor implements MethodInterceptor {
FieldGetters.compose(UPDATE_REQUEST_GETTER, TASK_CONFIG_GETTER),
TASK_CONFIG_GETTER,
JOB_CONFIGURATION_GETTER,
- FieldGetters.compose(LOCK_GETTER, LOCK_KEY_GETTER),
- LOCK_KEY_GETTER,
JOB_UPDATE_KEY_GETTER,
INSTANCE_KEY_GETTER,
new IdentityFieldGetter<>(JobKey.class));
http://git-wip-us.apache.org/repos/asf/aurora/blob/4fecf1f5/src/main/java/org/apache/aurora/scheduler/state/LockManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/LockManager.java b/src/main/java/org/apache/aurora/scheduler/state/LockManager.java
deleted file mode 100644
index 1a65b08..0000000
--- a/src/main/java/org/apache/aurora/scheduler/state/LockManager.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.state;
-
-import org.apache.aurora.scheduler.storage.entities.IJobKey;
-import org.apache.aurora.scheduler.storage.entities.ILock;
-
-/**
- * Defines all {@link ILock} primitives like: acquire, release, validate.
- */
-public interface LockManager {
- /**
- * Creates, saves and returns a new {@link ILock} with the specified {@link IJobKey}.
- * This method is not re-entrant, i.e. attempting to acquire a lock with the
- * same key would throw a {@link LockException}.
- *
- * @param job The job being locked.
- * @param user Name of the user requesting a lock.
- * @return A new ILock instance.
- * @throws LockException In case the lock with specified key already exists.
- */
- ILock acquireLock(IJobKey job, String user) throws LockException;
-
- /**
- * Releases (removes) the lock associated with {@code job} from the system.
- *
- * @param job the job to unlock.
- */
- void releaseLock(IJobKey job);
-
- /**
- * Thrown when {@link ILock} related operation failed.
- */
- class LockException extends Exception {
- public LockException(String msg) {
- super(msg);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/4fecf1f5/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java
deleted file mode 100644
index ec05f50..0000000
--- a/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.state;
-
-import java.util.Date;
-import java.util.Optional;
-
-import javax.inject.Inject;
-
-import com.google.common.annotations.VisibleForTesting;
-
-import org.apache.aurora.common.util.Clock;
-import org.apache.aurora.gen.Lock;
-import org.apache.aurora.gen.LockKey;
-import org.apache.aurora.gen.LockKey._Fields;
-import org.apache.aurora.scheduler.base.JobKeys;
-import org.apache.aurora.scheduler.storage.LockStore;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
-import org.apache.aurora.scheduler.storage.entities.IJobKey;
-import org.apache.aurora.scheduler.storage.entities.ILock;
-import org.apache.aurora.scheduler.storage.entities.ILockKey;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Implements lock-related primitives required to provide mutual exclusion guarantees
- * to the critical Scheduler state-mutating operations.
- */
-@VisibleForTesting
-public class LockManagerImpl implements LockManager {
- private final Storage storage;
- private final Clock clock;
- private final UUIDGenerator tokenGenerator;
-
- @Inject
- LockManagerImpl(Storage storage, Clock clock, UUIDGenerator tokenGenerator) {
- this.storage = requireNonNull(storage);
- this.clock = requireNonNull(clock);
- this.tokenGenerator = requireNonNull(tokenGenerator);
- }
-
- @Override
- public ILock acquireLock(IJobKey job, final String user) throws LockException {
- return storage.write(storeProvider -> {
-
- LockStore.Mutable lockStore = storeProvider.getLockStore();
- ILockKey lockKey = ILockKey.build(LockKey.job(job.newBuilder()));
- Optional<ILock> existingLock = lockStore.fetchLock(lockKey);
-
- if (existingLock.isPresent()) {
- throw new LockException(String.format(
- "Operation for: %s is already in progress. Started at: %s. Current owner: %s.",
- formatLockKey(lockKey),
- new Date(existingLock.get().getTimestampMs()).toString(),
- existingLock.get().getUser()));
- }
-
- ILock lock = ILock.build(new Lock()
- .setKey(lockKey.newBuilder())
- .setToken(tokenGenerator.createNew().toString())
- .setTimestampMs(clock.nowMillis())
- .setUser(user));
-
- lockStore.saveLock(lock);
- return lock;
- });
- }
-
- @Override
- public void releaseLock(IJobKey job) {
- storage.write((NoResult.Quiet) storeProvider -> {
- storeProvider.getLockStore().removeLock(ILockKey.build(LockKey.job(job.newBuilder())));
- });
- }
-
- private static String formatLockKey(ILockKey lockKey) {
- return lockKey.getSetField() == _Fields.JOB
- ? JobKeys.canonicalString(lockKey.getJob())
- : "Unknown lock key type: " + lockKey.getSetField();
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/4fecf1f5/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateModule.java b/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
index d72f055..c03fff1 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
@@ -14,7 +14,6 @@
package org.apache.aurora.scheduler.state;
import java.util.List;
-
import javax.inject.Singleton;
import com.beust.jcommander.Parameter;
@@ -64,8 +63,6 @@ public class StateModule extends AbstractModule {
bind(UUIDGenerator.class).to(UUIDGeneratorImpl.class);
bind(UUIDGeneratorImpl.class).in(Singleton.class);
- bind(LockManager.class).to(LockManagerImpl.class);
- bind(LockManagerImpl.class).in(Singleton.class);
bindMaintenanceController(binder());
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/4fecf1f5/src/main/java/org/apache/aurora/scheduler/storage/JobUpdateStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/JobUpdateStore.java b/src/main/java/org/apache/aurora/scheduler/storage/JobUpdateStore.java
index 5b57399..b3d906b 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/JobUpdateStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/JobUpdateStore.java
@@ -20,7 +20,6 @@ import java.util.Set;
import com.google.common.base.Optional;
import org.apache.aurora.gen.JobUpdateStatus;
-import org.apache.aurora.gen.storage.StoredJobUpdateDetails;
import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
@@ -96,8 +95,7 @@ public interface JobUpdateStore {
*
* @return A read-only view of all job update details.
*/
- Set<StoredJobUpdateDetails> fetchAllJobUpdateDetails();
-
+ Set<IJobUpdateDetails> fetchAllJobUpdateDetails();
/**
* Fetches the events that have affected an instance within a job update.
*
@@ -125,11 +123,8 @@ public interface JobUpdateStore {
* without having at least one {@link IJobUpdateEvent} present in the store will return empty.
*
* @param update Update to save.
- * @param lockToken Optional UUID identifying the lock associated with this update.
- * The {@code lockToken} can be absent when terminal updates are re-inserted
- * during snapshot restore.
*/
- void saveJobUpdate(IJobUpdate update, Optional<String> lockToken);
+ void saveJobUpdate(IJobUpdate update);
/**
* Saves a new job update event.
http://git-wip-us.apache.org/repos/asf/aurora/blob/4fecf1f5/src/main/java/org/apache/aurora/scheduler/storage/LockStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/LockStore.java b/src/main/java/org/apache/aurora/scheduler/storage/LockStore.java
deleted file mode 100644
index 9764a01..0000000
--- a/src/main/java/org/apache/aurora/scheduler/storage/LockStore.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage;
-
-import java.util.Optional;
-import java.util.Set;
-
-import org.apache.aurora.scheduler.storage.entities.ILock;
-import org.apache.aurora.scheduler.storage.entities.ILockKey;
-
-/**
- * Stores all lock-related data and defines methods for saving, deleting and fetching locks.
- */
-public interface LockStore {
- /**
- * Fetches all locks available in the store.
- *
- * @return All locks in the store.
- */
- Set<ILock> fetchLocks();
-
- /**
- * Fetches a lock by its key.
- *
- * @param lockKey Key of the lock to fetch.
- * @return Optional lock.
- */
- Optional<ILock> fetchLock(ILockKey lockKey);
-
- interface Mutable extends LockStore {
- /**
- * Saves a new lock or overwrites the existing one with same LockKey.
- *
- * @param lock ILock to save.
- */
- void saveLock(ILock lock);
-
- /**
- * Removes the lock from the store.
- *
- * @param lockKey Key of the lock to remove.
- */
- void removeLock(ILockKey lockKey);
-
- /**
- * Deletes all locks from the store.
- */
- void deleteLocks();
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/4fecf1f5/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
index 7e810ab..7d325b6 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
@@ -40,7 +40,6 @@ public interface Storage {
SchedulerStore getSchedulerStore();
CronJobStore getCronJobStore();
TaskStore getTaskStore();
- LockStore getLockStore();
QuotaStore getQuotaStore();
AttributeStore getAttributeStore();
JobUpdateStore getJobUpdateStore();
@@ -69,7 +68,6 @@ public interface Storage {
*/
TaskStore.Mutable getUnsafeTaskStore();
- LockStore.Mutable getLockStore();
QuotaStore.Mutable getQuotaStore();
AttributeStore.Mutable getAttributeStore();
JobUpdateStore.Mutable getJobUpdateStore();
http://git-wip-us.apache.org/repos/asf/aurora/blob/4fecf1f5/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
index 3c9bae4..3ce2c7f 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
@@ -24,7 +24,6 @@ import java.util.function.Consumer;
import javax.inject.Inject;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.MoreExecutors;
@@ -51,7 +50,6 @@ import org.apache.aurora.scheduler.storage.AttributeStore;
import org.apache.aurora.scheduler.storage.CronJobStore;
import org.apache.aurora.scheduler.storage.DistributedSnapshotStore;
import org.apache.aurora.scheduler.storage.JobUpdateStore;
-import org.apache.aurora.scheduler.storage.LockStore;
import org.apache.aurora.scheduler.storage.QuotaStore;
import org.apache.aurora.scheduler.storage.SchedulerStore;
import org.apache.aurora.scheduler.storage.SnapshotStore;
@@ -64,8 +62,6 @@ import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
-import org.apache.aurora.scheduler.storage.entities.ILock;
-import org.apache.aurora.scheduler.storage.entities.ILockKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -178,7 +174,6 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
private final SchedulerStore.Mutable writeBehindSchedulerStore;
private final CronJobStore.Mutable writeBehindJobStore;
private final TaskStore.Mutable writeBehindTaskStore;
- private final LockStore.Mutable writeBehindLockStore;
private final QuotaStore.Mutable writeBehindQuotaStore;
private final AttributeStore.Mutable writeBehindAttributeStore;
private final JobUpdateStore.Mutable writeBehindJobUpdateStore;
@@ -211,7 +206,6 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
@Volatile SchedulerStore.Mutable schedulerStore,
@Volatile CronJobStore.Mutable jobStore,
@Volatile TaskStore.Mutable taskStore,
- @Volatile LockStore.Mutable lockStore,
@Volatile QuotaStore.Mutable quotaStore,
@Volatile AttributeStore.Mutable attributeStore,
@Volatile JobUpdateStore.Mutable jobUpdateStore,
@@ -227,7 +221,6 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
schedulerStore,
jobStore,
taskStore,
- lockStore,
quotaStore,
attributeStore,
jobUpdateStore,
@@ -246,7 +239,6 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
SchedulerStore.Mutable schedulerStore,
CronJobStore.Mutable jobStore,
TaskStore.Mutable taskStore,
- LockStore.Mutable lockStore,
QuotaStore.Mutable quotaStore,
AttributeStore.Mutable attributeStore,
JobUpdateStore.Mutable jobUpdateStore,
@@ -267,7 +259,6 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
this.writeBehindSchedulerStore = requireNonNull(schedulerStore);
this.writeBehindJobStore = requireNonNull(jobStore);
this.writeBehindTaskStore = requireNonNull(taskStore);
- this.writeBehindLockStore = requireNonNull(lockStore);
this.writeBehindQuotaStore = requireNonNull(quotaStore);
this.writeBehindAttributeStore = requireNonNull(attributeStore);
this.writeBehindJobUpdateStore = requireNonNull(jobUpdateStore);
@@ -289,7 +280,6 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
schedulerStore,
jobStore,
taskStore,
- lockStore,
quotaStore,
attributeStore,
jobUpdateStore,
@@ -360,16 +350,9 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
LOG.info("Dropping host attributes with no agent ID: " + attributes);
}
})
- .put(
- Op._Fields.SAVE_LOCK,
- op -> writeBehindLockStore.saveLock(ILock.build(op.getSaveLock().getLock())))
- .put(
- Op._Fields.REMOVE_LOCK,
- op -> writeBehindLockStore.removeLock(ILockKey.build(op.getRemoveLock().getLockKey())))
.put(Op._Fields.SAVE_JOB_UPDATE, op ->
writeBehindJobUpdateStore.saveJobUpdate(
- thriftBackfill.backFillJobUpdate(op.getSaveJobUpdate().getJobUpdate()),
- Optional.fromNullable(op.getSaveJobUpdate().getLockToken())))
+ thriftBackfill.backFillJobUpdate(op.getSaveJobUpdate().getJobUpdate())))
.put(Op._Fields.SAVE_JOB_UPDATE_EVENT, op -> {
SaveJobUpdateEvent event = op.getSaveJobUpdateEvent();
writeBehindJobUpdateStore.saveJobUpdateEvent(
@@ -487,7 +470,6 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
LOG.info("Snapshot complete."
+ " host attrs: " + snapshot.getHostAttributesSize()
+ ", cron jobs: " + snapshot.getCronJobsSize()
- + ", locks: " + snapshot.getLocksSize()
+ ", quota confs: " + snapshot.getQuotaConfigurationsSize()
+ ", tasks: " + snapshot.getTasksSize()
+ ", updates: " + snapshot.getJobUpdateDetailsSize());
http://git-wip-us.apache.org/repos/asf/aurora/blob/4fecf1f5/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
index 6462b80..57c483b 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
@@ -16,11 +16,11 @@ package org.apache.aurora.scheduler.storage.log;
import java.util.Arrays;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
import javax.inject.Inject;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
@@ -36,7 +36,6 @@ import org.apache.aurora.gen.HostAttributes;
import org.apache.aurora.gen.JobInstanceUpdateEvent;
import org.apache.aurora.gen.JobUpdateDetails;
import org.apache.aurora.gen.JobUpdateEvent;
-import org.apache.aurora.gen.Lock;
import org.apache.aurora.gen.storage.QuotaConfiguration;
import org.apache.aurora.gen.storage.SchedulerMetadata;
import org.apache.aurora.gen.storage.Snapshot;
@@ -54,7 +53,6 @@ import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
-import org.apache.aurora.scheduler.storage.entities.ILock;
import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.slf4j.Logger;
@@ -75,7 +73,6 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> {
private static final Logger LOG = LoggerFactory.getLogger(SnapshotStoreImpl.class);
- private static final String LOCK_FIELD = "locks";
private static final String HOST_ATTRIBUTES_FIELD = "hosts";
private static final String QUOTA_FIELD = "quota";
private static final String TASK_FIELD = "tasks";
@@ -94,29 +91,6 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> {
new SnapshotField() {
@Override
public String getName() {
- return LOCK_FIELD;
- }
-
- // It's important for locks to be replayed first, since there are relations that expect
- // references to be valid on insertion.
- @Override
- public void saveToSnapshot(MutableStoreProvider store, Snapshot snapshot) {
- snapshot.setLocks(ILock.toBuildersSet(store.getLockStore().fetchLocks()));
- }
-
- @Override
- public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) {
- if (snapshot.getLocksSize() > 0) {
- store.getLockStore().deleteLocks();
- for (Lock lock : snapshot.getLocks()) {
- store.getLockStore().saveLock(ILock.build(lock));
- }
- }
- }
- },
- new SnapshotField() {
- @Override
- public String getName() {
return HOST_ATTRIBUTES_FIELD;
}
@@ -243,7 +217,10 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> {
@Override
public void saveToSnapshot(MutableStoreProvider store, Snapshot snapshot) {
- snapshot.setJobUpdateDetails(store.getJobUpdateStore().fetchAllJobUpdateDetails());
+ snapshot.setJobUpdateDetails(
+ store.getJobUpdateStore().fetchAllJobUpdateDetails().stream()
+ .map(u -> new StoredJobUpdateDetails().setDetails(u.newBuilder()))
+ .collect(Collectors.toSet()));
}
@Override
@@ -253,9 +230,7 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> {
updateStore.deleteAllUpdatesAndEvents();
for (StoredJobUpdateDetails storedDetails : snapshot.getJobUpdateDetails()) {
JobUpdateDetails details = storedDetails.getDetails();
- updateStore.saveJobUpdate(
- thriftBackfill.backFillJobUpdate(details.getUpdate()),
- Optional.fromNullable(storedDetails.getLockToken()));
+ updateStore.saveJobUpdate(thriftBackfill.backFillJobUpdate(details.getUpdate()));
if (details.getUpdateEventsSize() > 0) {
for (JobUpdateEvent updateEvent : details.getUpdateEvents()) {
http://git-wip-us.apache.org/repos/asf/aurora/blob/4fecf1f5/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
index a5b58e8..4d051fc 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
@@ -25,7 +25,6 @@ import com.google.common.collect.ImmutableSet;
import org.apache.aurora.gen.storage.Op;
import org.apache.aurora.gen.storage.PruneJobUpdateHistory;
import org.apache.aurora.gen.storage.RemoveJob;
-import org.apache.aurora.gen.storage.RemoveLock;
import org.apache.aurora.gen.storage.RemoveQuota;
import org.apache.aurora.gen.storage.RemoveTasks;
import org.apache.aurora.gen.storage.SaveCronJob;
@@ -34,17 +33,14 @@ import org.apache.aurora.gen.storage.SaveHostAttributes;
import org.apache.aurora.gen.storage.SaveJobInstanceUpdateEvent;
import org.apache.aurora.gen.storage.SaveJobUpdate;
import org.apache.aurora.gen.storage.SaveJobUpdateEvent;
-import org.apache.aurora.gen.storage.SaveLock;
import org.apache.aurora.gen.storage.SaveQuota;
import org.apache.aurora.gen.storage.SaveTasks;
-import org.apache.aurora.gen.storage.StoredJobUpdateDetails;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.events.EventSink;
import org.apache.aurora.scheduler.events.PubsubEvent;
import org.apache.aurora.scheduler.storage.AttributeStore;
import org.apache.aurora.scheduler.storage.CronJobStore;
import org.apache.aurora.scheduler.storage.JobUpdateStore;
-import org.apache.aurora.scheduler.storage.LockStore;
import org.apache.aurora.scheduler.storage.QuotaStore;
import org.apache.aurora.scheduler.storage.SchedulerStore;
import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
@@ -60,8 +56,6 @@ import org.apache.aurora.scheduler.storage.entities.IJobUpdateInstructions;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary;
-import org.apache.aurora.scheduler.storage.entities.ILock;
-import org.apache.aurora.scheduler.storage.entities.ILockKey;
import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.slf4j.Logger;
@@ -80,7 +74,6 @@ class WriteAheadStorage implements
SchedulerStore.Mutable,
CronJobStore.Mutable,
TaskStore.Mutable,
- LockStore.Mutable,
QuotaStore.Mutable,
AttributeStore.Mutable,
JobUpdateStore.Mutable {
@@ -89,7 +82,6 @@ class WriteAheadStorage implements
private final SchedulerStore.Mutable schedulerStore;
private final CronJobStore.Mutable jobStore;
private final TaskStore.Mutable taskStore;
- private final LockStore.Mutable lockStore;
private final QuotaStore.Mutable quotaStore;
private final AttributeStore.Mutable attributeStore;
private final JobUpdateStore.Mutable jobUpdateStore;
@@ -103,7 +95,6 @@ class WriteAheadStorage implements
* @param schedulerStore Delegate.
* @param jobStore Delegate.
* @param taskStore Delegate.
- * @param lockStore Delegate.
* @param quotaStore Delegate.
* @param attributeStore Delegate.
* @param jobUpdateStore Delegate.
@@ -113,7 +104,6 @@ class WriteAheadStorage implements
SchedulerStore.Mutable schedulerStore,
CronJobStore.Mutable jobStore,
TaskStore.Mutable taskStore,
- LockStore.Mutable lockStore,
QuotaStore.Mutable quotaStore,
AttributeStore.Mutable attributeStore,
JobUpdateStore.Mutable jobUpdateStore,
@@ -124,7 +114,6 @@ class WriteAheadStorage implements
this.schedulerStore = requireNonNull(schedulerStore);
this.jobStore = requireNonNull(jobStore);
this.taskStore = requireNonNull(taskStore);
- this.lockStore = requireNonNull(lockStore);
this.quotaStore = requireNonNull(quotaStore);
this.attributeStore = requireNonNull(attributeStore);
this.jobUpdateStore = requireNonNull(jobUpdateStore);
@@ -221,27 +210,11 @@ class WriteAheadStorage implements
}
@Override
- public void saveLock(final ILock lock) {
- requireNonNull(lock);
-
- write(Op.saveLock(new SaveLock(lock.newBuilder())));
- lockStore.saveLock(lock);
- }
-
- @Override
- public void removeLock(final ILockKey lockKey) {
- requireNonNull(lockKey);
-
- write(Op.removeLock(new RemoveLock(lockKey.newBuilder())));
- lockStore.removeLock(lockKey);
- }
-
- @Override
- public void saveJobUpdate(IJobUpdate update, Optional<String> lockToken) {
+ public void saveJobUpdate(IJobUpdate update) {
requireNonNull(update);
- write(Op.saveJobUpdate(new SaveJobUpdate(update.newBuilder(), lockToken.orNull())));
- jobUpdateStore.saveJobUpdate(update, lockToken);
+ write(Op.saveJobUpdate(new SaveJobUpdate().setJobUpdate(update.newBuilder())));
+ jobUpdateStore.saveJobUpdate(update);
}
@Override
@@ -306,12 +279,6 @@ class WriteAheadStorage implements
}
@Override
- public void deleteLocks() {
- throw new UnsupportedOperationException(
- "Unsupported since casual storage users should never be doing this.");
- }
-
- @Override
public void deleteAllUpdatesAndEvents() {
throw new UnsupportedOperationException(
"Unsupported since casual storage users should never be doing this.");
@@ -333,11 +300,6 @@ class WriteAheadStorage implements
}
@Override
- public LockStore.Mutable getLockStore() {
- return this;
- }
-
- @Override
public QuotaStore.Mutable getQuotaStore() {
return this;
}
@@ -398,16 +360,6 @@ class WriteAheadStorage implements
}
@Override
- public Set<ILock> fetchLocks() {
- return this.lockStore.fetchLocks();
- }
-
- @Override
- public java.util.Optional<ILock> fetchLock(ILockKey lockKey) {
- return this.lockStore.fetchLock(lockKey);
- }
-
- @Override
public Optional<IHostAttributes> getHostAttributes(String host) {
return this.attributeStore.getHostAttributes(host);
}
@@ -443,7 +395,7 @@ class WriteAheadStorage implements
}
@Override
- public Set<StoredJobUpdateDetails> fetchAllJobUpdateDetails() {
+ public Set<IJobUpdateDetails> fetchAllJobUpdateDetails() {
return this.jobUpdateStore.fetchAllJobUpdateDetails();
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/4fecf1f5/src/main/java/org/apache/aurora/scheduler/storage/mem/MemJobUpdateStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemJobUpdateStore.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemJobUpdateStore.java
index d190add..826cee9 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemJobUpdateStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemJobUpdateStore.java
@@ -32,7 +32,6 @@ import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
@@ -50,10 +49,7 @@ import org.apache.aurora.gen.JobUpdateDetails;
import org.apache.aurora.gen.JobUpdateEvent;
import org.apache.aurora.gen.JobUpdateState;
import org.apache.aurora.gen.JobUpdateStatus;
-import org.apache.aurora.gen.LockKey;
-import org.apache.aurora.gen.storage.StoredJobUpdateDetails;
import org.apache.aurora.scheduler.storage.JobUpdateStore;
-import org.apache.aurora.scheduler.storage.LockStore;
import org.apache.aurora.scheduler.storage.Storage.StorageException;
import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
@@ -64,8 +60,6 @@ import org.apache.aurora.scheduler.storage.entities.IJobUpdateInstructions;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary;
-import org.apache.aurora.scheduler.storage.entities.ILock;
-import org.apache.aurora.scheduler.storage.entities.ILockKey;
import static java.util.Objects.requireNonNull;
@@ -78,14 +72,12 @@ public class MemJobUpdateStore implements JobUpdateStore.Mutable {
.reverse()
.onResultOf(u -> u.getUpdate().getSummary().getState().getLastModifiedTimestampMs());
- private final Map<IJobUpdateKey, UpdateAndLock> updates = Maps.newConcurrentMap();
- private final LockStore lockStore;
+ private final Map<IJobUpdateKey, IJobUpdateDetails> updates = Maps.newConcurrentMap();
private final LoadingCache<JobUpdateStatus, AtomicLong> jobUpdateEventStats;
private final LoadingCache<JobUpdateAction, AtomicLong> jobUpdateActionStats;
@Inject
- public MemJobUpdateStore(LockStore.Mutable lockStore, StatsProvider statsProvider) {
- this.lockStore = lockStore;
+ public MemJobUpdateStore(StatsProvider statsProvider) {
this.jobUpdateEventStats = CacheBuilder.newBuilder()
.build(new CacheLoader<JobUpdateStatus, AtomicLong>() {
@Override
@@ -125,13 +117,13 @@ public class MemJobUpdateStore implements JobUpdateStore.Mutable {
@Timed("job_update_store_fetch_details")
@Override
public synchronized Optional<IJobUpdateDetails> fetchJobUpdateDetails(IJobUpdateKey key) {
- return Optional.fromNullable(updates.get(key)).transform(u -> u.details);
+ return Optional.fromNullable(updates.get(key));
}
@Timed("job_update_store_fetch_update")
@Override
public synchronized Optional<IJobUpdate> fetchJobUpdate(IJobUpdateKey key) {
- return Optional.fromNullable(updates.get(key)).transform(u -> u.details.getUpdate());
+ return Optional.fromNullable(updates.get(key)).transform(IJobUpdateDetails::getUpdate);
}
@Timed("job_update_store_fetch_instructions")
@@ -140,37 +132,13 @@ public class MemJobUpdateStore implements JobUpdateStore.Mutable {
IJobUpdateKey key) {
return Optional.fromNullable(updates.get(key))
- .transform(u -> u.details.getUpdate().getInstructions());
- }
-
- private void refreshLocks() {
- // Simulate database behavior of join performed against locks, used to populate lockToken field.
-
- ImmutableMap.Builder<IJobUpdateKey, UpdateAndLock> refreshed = ImmutableMap.builder();
- for (Map.Entry<IJobUpdateKey, UpdateAndLock> entry : updates.entrySet()) {
- IJobUpdateDetails update = entry.getValue().details;
- Optional<String> updateLock = entry.getValue().lockToken;
- if (updateLock.isPresent()) {
- // Determine if token needs to be cleared to reflect lock store state. The token may only
- // remain if the lock store token exists and matches.
- Optional<String> storedLock = Optional.fromNullable(lockStore.fetchLock(ILockKey.build(
- LockKey.job(entry.getKey().getJob().newBuilder()))).map(ILock::getToken).orElse(null));
- if (!storedLock.isPresent() || !updateLock.equals(storedLock)) {
- refreshed.put(entry.getKey(), new UpdateAndLock(update, Optional.absent()));
- }
- }
- }
-
- updates.putAll(refreshed.build());
+ .transform(u -> u.getUpdate().getInstructions());
}
@Timed("job_update_store_fetch_all_details")
@Override
- public synchronized Set<StoredJobUpdateDetails> fetchAllJobUpdateDetails() {
- refreshLocks();
- return updates.values().stream()
- .map(u -> new StoredJobUpdateDetails(u.details.newBuilder(), u.lockToken.orNull()))
- .collect(Collectors.toSet());
+ public synchronized Set<IJobUpdateDetails> fetchAllJobUpdateDetails() {
+ return ImmutableSet.copyOf(updates.values());
}
@Timed("job_update_store_fetch_instance_events")
@@ -180,7 +148,7 @@ public class MemJobUpdateStore implements JobUpdateStore.Mutable {
int instanceId) {
return java.util.Optional.ofNullable(updates.get(key))
- .map(u -> u.details.getInstanceEvents())
+ .map(IJobUpdateDetails::getInstanceEvents)
.orElse(ImmutableList.of())
.stream()
.filter(e -> e.getInstanceId() == instanceId)
@@ -210,7 +178,7 @@ public class MemJobUpdateStore implements JobUpdateStore.Mutable {
@Timed("job_update_store_save_update")
@Override
- public synchronized void saveJobUpdate(IJobUpdate update, Optional<String> lockToken) {
+ public synchronized void saveJobUpdate(IJobUpdate update) {
requireNonNull(update);
validateInstructions(update.getInstructions());
@@ -224,9 +192,7 @@ public class MemJobUpdateStore implements JobUpdateStore.Mutable {
.setInstanceEvents(ImmutableList.of());
mutable.getUpdate().getSummary().setState(synthesizeUpdateState(mutable));
- updates.put(
- update.getSummary().getKey(),
- new UpdateAndLock(IJobUpdateDetails.build(mutable), lockToken));
+ updates.put(update.getSummary().getKey(), IJobUpdateDetails.build(mutable));
}
private static final Ordering<JobUpdateEvent> EVENT_ORDERING = Ordering.natural()
@@ -235,16 +201,16 @@ public class MemJobUpdateStore implements JobUpdateStore.Mutable {
@Timed("job_update_store_save_event")
@Override
public synchronized void saveJobUpdateEvent(IJobUpdateKey key, IJobUpdateEvent event) {
- UpdateAndLock update = updates.get(key);
+ IJobUpdateDetails update = updates.get(key);
if (update == null) {
throw new StorageException("Update not found: " + key);
}
- JobUpdateDetails mutable = update.details.newBuilder();
+ JobUpdateDetails mutable = update.newBuilder();
mutable.addToUpdateEvents(event.newBuilder());
mutable.setUpdateEvents(EVENT_ORDERING.sortedCopy(mutable.getUpdateEvents()));
mutable.getUpdate().getSummary().setState(synthesizeUpdateState(mutable));
- updates.put(key, new UpdateAndLock(IJobUpdateDetails.build(mutable), update.lockToken));
+ updates.put(key, IJobUpdateDetails.build(mutable));
jobUpdateEventStats.getUnchecked(event.getStatus()).incrementAndGet();
}
@@ -257,16 +223,16 @@ public class MemJobUpdateStore implements JobUpdateStore.Mutable {
IJobUpdateKey key,
IJobInstanceUpdateEvent event) {
- UpdateAndLock update = updates.get(key);
+ IJobUpdateDetails update = updates.get(key);
if (update == null) {
throw new StorageException("Update not found: " + key);
}
- JobUpdateDetails mutable = update.details.newBuilder();
+ JobUpdateDetails mutable = update.newBuilder();
mutable.addToInstanceEvents(event.newBuilder());
mutable.setInstanceEvents(INSTANCE_EVENT_ORDERING.sortedCopy(mutable.getInstanceEvents()));
mutable.getUpdate().getSummary().setState(synthesizeUpdateState(mutable));
- updates.put(key, new UpdateAndLock(IJobUpdateDetails.build(mutable), update.lockToken));
+ updates.put(key, IJobUpdateDetails.build(mutable));
jobUpdateActionStats.getUnchecked(event.getAction()).incrementAndGet();
}
@@ -283,7 +249,7 @@ public class MemJobUpdateStore implements JobUpdateStore.Mutable {
long historyPruneThresholdMs) {
Supplier<Stream<IJobUpdateSummary>> completedUpdates = () -> updates.values().stream()
- .map(u -> u.details.getUpdate().getSummary())
+ .map(u -> u.getUpdate().getSummary())
.filter(s -> TERMINAL_STATES.contains(s.getState().getStatus()));
Predicate<IJobUpdateSummary> expiredFilter =
@@ -311,7 +277,7 @@ public class MemJobUpdateStore implements JobUpdateStore.Mutable {
pruneBuilder.addAll(creationOrder
.leastOf(entry.getValue(), entry.getValue().size() - perJobRetainCount)
.stream()
- .map(s -> s.getKey())
+ .map(IJobUpdateSummary::getKey)
.iterator());
}
}
@@ -372,7 +338,6 @@ public class MemJobUpdateStore implements JobUpdateStore.Mutable {
// TODO(wfarner): Modification time is not a stable ordering for pagination, but we use it as
// such here. The behavior is carried over from DbJobupdateStore; determine if it is desired.
Stream<IJobUpdateDetails> matches = updates.values().stream()
- .map(u -> u.details)
.filter(filter)
.sorted(REVERSE_LAST_MODIFIED_ORDER)
.skip(query.getOffset());
@@ -383,14 +348,4 @@ public class MemJobUpdateStore implements JobUpdateStore.Mutable {
return matches;
}
-
- private static final class UpdateAndLock {
- private final IJobUpdateDetails details;
- private final Optional<String> lockToken;
-
- UpdateAndLock(IJobUpdateDetails details, Optional<String> lockToken) {
- this.details = details;
- this.lockToken = lockToken;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/4fecf1f5/src/main/java/org/apache/aurora/scheduler/storage/mem/MemLockStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemLockStore.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemLockStore.java
deleted file mode 100644
index 4c7bda8..0000000
--- a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemLockStore.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage.mem;
-
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-
-import com.google.common.base.Predicates;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
-
-import org.apache.aurora.scheduler.storage.LockStore;
-import org.apache.aurora.scheduler.storage.Storage.StorageException;
-import org.apache.aurora.scheduler.storage.entities.ILock;
-import org.apache.aurora.scheduler.storage.entities.ILockKey;
-
-/**
- * An in-memory lock store.
- */
-class MemLockStore implements LockStore.Mutable {
-
- private final Map<ILockKey, ILock> locks = Maps.newConcurrentMap();
-
- @Override
- public void saveLock(ILock lock) {
- // TODO(wfarner): Re-evaluate, this is not idempotent.
- if (locks.containsKey(lock.getKey())) {
- throw new StorageException("Duplicate lock key");
- }
- if (FluentIterable.from(locks.values())
- .transform(ILock::getToken)
- .anyMatch(Predicates.equalTo(lock.getToken()))) {
-
- throw new StorageException("Duplicate token");
- }
-
- locks.put(lock.getKey(), lock);
- }
-
- @Override
- public void removeLock(ILockKey lockKey) {
- locks.remove(lockKey);
- }
-
- @Override
- public void deleteLocks() {
- locks.clear();
- }
-
- @Override
- public Set<ILock> fetchLocks() {
- return ImmutableSet.copyOf(locks.values());
- }
-
- @Override
- public Optional<ILock> fetchLock(ILockKey lockKey) {
- return Optional.ofNullable(locks.get(lockKey));
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/4fecf1f5/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorage.java
index 7ace104..9f324b0 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorage.java
@@ -19,7 +19,6 @@ import org.apache.aurora.common.inject.TimedInterceptor.Timed;
import org.apache.aurora.scheduler.storage.AttributeStore;
import org.apache.aurora.scheduler.storage.CronJobStore;
import org.apache.aurora.scheduler.storage.JobUpdateStore;
-import org.apache.aurora.scheduler.storage.LockStore;
import org.apache.aurora.scheduler.storage.QuotaStore;
import org.apache.aurora.scheduler.storage.SchedulerStore;
import org.apache.aurora.scheduler.storage.Storage;
@@ -36,7 +35,6 @@ public class MemStorage implements Storage {
@Volatile final SchedulerStore.Mutable schedulerStore,
@Volatile final CronJobStore.Mutable jobStore,
@Volatile final TaskStore.Mutable taskStore,
- @Volatile final LockStore.Mutable lockStore,
@Volatile final QuotaStore.Mutable quotaStore,
@Volatile final AttributeStore.Mutable attributeStore,
@Volatile final JobUpdateStore.Mutable updateStore) {
@@ -63,11 +61,6 @@ public class MemStorage implements Storage {
}
@Override
- public LockStore.Mutable getLockStore() {
- return lockStore;
- }
-
- @Override
public QuotaStore.Mutable getQuotaStore() {
return quotaStore;
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/4fecf1f5/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorageModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorageModule.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorageModule.java
index 2ad84eb..edcea09 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorageModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorageModule.java
@@ -30,7 +30,6 @@ import org.apache.aurora.common.stats.StatsProvider;
import org.apache.aurora.scheduler.storage.AttributeStore;
import org.apache.aurora.scheduler.storage.CronJobStore;
import org.apache.aurora.scheduler.storage.JobUpdateStore;
-import org.apache.aurora.scheduler.storage.LockStore;
import org.apache.aurora.scheduler.storage.QuotaStore;
import org.apache.aurora.scheduler.storage.SchedulerStore;
import org.apache.aurora.scheduler.storage.Storage;
@@ -74,7 +73,6 @@ public final class MemStorageModule extends PrivateModule {
bindStore(TaskStore.Mutable.class, MemTaskStore.class);
bindStore(CronJobStore.Mutable.class, MemCronJobStore.class);
bindStore(AttributeStore.Mutable.class, MemAttributeStore.class);
- bindStore(LockStore.Mutable.class, MemLockStore.class);
bindStore(QuotaStore.Mutable.class, MemQuotaStore.class);
bindStore(SchedulerStore.Mutable.class, MemSchedulerStore.class);
bindStore(JobUpdateStore.Mutable.class, MemJobUpdateStore.class);
http://git-wip-us.apache.org/repos/asf/aurora/blob/4fecf1f5/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
index 534ae59..2cc567d 100644
--- a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
@@ -438,17 +438,6 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin {
return readOnlyScheduler.getTierConfigs();
}
- private void validateLockForTasks(Iterable<IScheduledTask> tasks) throws JobUpdatingException {
- ImmutableSet<IJobKey> uniqueKeys = FluentIterable.from(tasks)
- .transform(Tasks::getJob)
- .toSet();
-
- // Validate lock against every unique job key derived from the tasks.
- for (IJobKey key : uniqueKeys) {
- jobUpdateController.assertNotUpdating(key);
- }
- }
-
private static Query.Builder implicitKillQuery(Query.Builder query) {
// Unless statuses were specifically supplied, only attempt to kill active tasks.
return query.get().getStatuses().isEmpty() ? query.byStatus(ACTIVE_STATES) : query;
@@ -470,13 +459,14 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin {
}
return storage.write(storeProvider -> {
- Iterable<IScheduledTask> tasks = storeProvider.getTaskStore().fetchTasks(query);
try {
- validateLockForTasks(tasks);
+ jobUpdateController.assertNotUpdating(jobKey);
} catch (JobUpdatingException e) {
return error(JOB_UPDATING_ERROR, e);
}
+ Iterable<IScheduledTask> tasks = storeProvider.getTaskStore().fetchTasks(query);
+
LOG.info("Killing tasks matching " + query);
int tasksKilled = 0;
@@ -678,6 +668,10 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin {
public Response addInstances(InstanceKey key, int count) {
IJobKey jobKey = JobKeys.assertValid(IJobKey.build(key.getJobKey()));
+ if (count <= 0) {
+ return invalidRequest(INVALID_INSTANCE_COUNT);
+ }
+
Response response = empty();
return storage.write(storeProvider -> {
try {
@@ -690,10 +684,6 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin {
FluentIterable<IScheduledTask> currentTasks = FluentIterable.from(
storeProvider.getTaskStore().fetchTasks(Query.jobScoped(jobKey).active()));
- if (count <= 0) {
- return invalidRequest(INVALID_INSTANCE_COUNT);
- }
-
Optional<IScheduledTask> templateTask = Iterables.tryFind(
currentTasks,
e -> e.getAssignedTask().getInstanceId() == key.getInstanceId());
http://git-wip-us.apache.org/repos/asf/aurora/blob/4fecf1f5/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
index 27c0b43..dc8d11c 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
@@ -48,8 +48,6 @@ import org.apache.aurora.scheduler.SchedulerModule.TaskEventBatchWorker;
import org.apache.aurora.scheduler.base.InstanceKeys;
import org.apache.aurora.scheduler.base.JobKeys;
import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.state.LockManager;
-import org.apache.aurora.scheduler.state.LockManager.LockException;
import org.apache.aurora.scheduler.state.StateManager;
import org.apache.aurora.scheduler.storage.JobUpdateStore;
import org.apache.aurora.scheduler.storage.Storage;
@@ -65,7 +63,6 @@ import org.apache.aurora.scheduler.storage.entities.IJobUpdateInstructions;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary;
-import org.apache.aurora.scheduler.storage.entities.ILock;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.updater.StateEvaluator.Failure;
import org.slf4j.Logger;
@@ -112,7 +109,6 @@ class JobUpdateControllerImpl implements JobUpdateController {
"Unexpected problem running asynchronous updater for: %s. Triggering shutdown";
private final UpdateFactory updateFactory;
- private final LockManager lockManager;
private final Storage storage;
private final ScheduledExecutorService executor;
private final StateManager stateManager;
@@ -130,7 +126,6 @@ class JobUpdateControllerImpl implements JobUpdateController {
@Inject
JobUpdateControllerImpl(
UpdateFactory updateFactory,
- LockManager lockManager,
Storage storage,
ScheduledExecutorService executor,
StateManager stateManager,
@@ -140,7 +135,6 @@ class JobUpdateControllerImpl implements JobUpdateController {
TaskEventBatchWorker batchWorker) {
this.updateFactory = requireNonNull(updateFactory);
- this.lockManager = requireNonNull(lockManager);
this.storage = requireNonNull(storage);
this.executor = requireNonNull(executor);
this.stateManager = requireNonNull(stateManager);
@@ -187,16 +181,8 @@ class JobUpdateControllerImpl implements JobUpdateController {
}
LOG.info("Starting update for job " + job);
- ILock lock;
- try {
- lock = lockManager.acquireLock(job, auditData.getUser());
- } catch (LockException e) {
- throw new UpdateStateException(e.getMessage(), e);
- }
- storeProvider.getJobUpdateStore().saveJobUpdate(
- update,
- Optional.of(requireNonNull(lock.getToken())));
+ storeProvider.getJobUpdateStore().saveJobUpdate(update);
JobUpdateStatus status = ROLLING_FORWARD;
if (isCoordinatedUpdate(instructions)) {
@@ -474,8 +460,8 @@ class JobUpdateControllerImpl implements JobUpdateController {
JobUpdateEvent proposedEvent,
boolean record) throws UpdateStateException {
- JobUpdateStatus status = proposedEvent.getStatus();
JobUpdateStore.Mutable updateStore = storeProvider.getJobUpdateStore();
+ JobUpdateStatus status = proposedEvent.getStatus();
LOG.info("Update {} is now in state {}", key, status);
if (record) {
@@ -485,7 +471,6 @@ class JobUpdateControllerImpl implements JobUpdateController {
}
if (JobUpdateStore.TERMINAL_STATES.contains(status)) {
- lockManager.releaseLock(key.getJob());
pulseHandler.remove(key);
} else {
pulseHandler.updatePulseStatus(key, status);
http://git-wip-us.apache.org/repos/asf/aurora/blob/4fecf1f5/src/test/java/org/apache/aurora/scheduler/http/AbstractJettyTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/http/AbstractJettyTest.java b/src/test/java/org/apache/aurora/scheduler/http/AbstractJettyTest.java
index 8301b19..a3f6941 100644
--- a/src/test/java/org/apache/aurora/scheduler/http/AbstractJettyTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/http/AbstractJettyTest.java
@@ -59,7 +59,6 @@ import org.apache.aurora.scheduler.scheduling.RescheduleCalculator;
import org.apache.aurora.scheduler.scheduling.TaskGroups;
import org.apache.aurora.scheduler.scheduling.TaskGroups.TaskGroupsSettings;
import org.apache.aurora.scheduler.scheduling.TaskScheduler;
-import org.apache.aurora.scheduler.state.LockManager;
import org.apache.aurora.scheduler.stats.StatsModule;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.entities.IServerInfo;
@@ -128,7 +127,6 @@ public abstract class AbstractJettyTest extends EasyMockTest {
5));
bind(ServiceGroupMonitor.class).toInstance(serviceGroupMonitor);
bindMock(CronJobManager.class);
- bindMock(LockManager.class);
bindMock(OfferManager.class);
bindMock(RescheduleCalculator.class);
bindMock(TaskScheduler.class);
http://git-wip-us.apache.org/repos/asf/aurora/blob/4fecf1f5/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java
deleted file mode 100644
index 8e19794..0000000
--- a/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.state;
-
-import java.util.UUID;
-
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
-import org.apache.aurora.common.testing.easymock.EasyMockTest;
-import org.apache.aurora.common.util.testing.FakeClock;
-import org.apache.aurora.gen.Lock;
-import org.apache.aurora.gen.LockKey;
-import org.apache.aurora.scheduler.base.JobKeys;
-import org.apache.aurora.scheduler.state.LockManager.LockException;
-import org.apache.aurora.scheduler.storage.entities.IJobKey;
-import org.apache.aurora.scheduler.storage.entities.ILock;
-import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-
-import static org.easymock.EasyMock.expect;
-import static org.junit.Assert.assertEquals;
-
-public class LockManagerImplTest extends EasyMockTest {
- private static final String USER = "jim-user";
- private static final String MY_JOB = "myJob";
- private static final IJobKey JOB_KEY = JobKeys.from("jim", "devel", MY_JOB);
- private static final UUID TOKEN = UUID.fromString("79d6d790-3212-11e3-aa6e-0800200c9a66");
-
- private LockManager lockManager;
- private long timestampMs;
-
- @Rule
- public ExpectedException expectedException = ExpectedException.none();
-
- @Before
- public void setUp() throws Exception {
- FakeClock clock = new FakeClock();
- clock.advance(Amount.of(12345L, Time.SECONDS));
- timestampMs = clock.nowMillis();
-
- UUIDGenerator tokenGenerator = createMock(UUIDGenerator.class);
- expect(tokenGenerator.createNew()).andReturn(TOKEN).anyTimes();
-
- lockManager = new LockManagerImpl(MemStorageModule.newEmptyStorage(), clock, tokenGenerator);
- }
-
- @Test
- public void testAcquireLock() throws Exception {
- control.replay();
-
- ILock expected = ILock.build(new Lock()
- .setKey(LockKey.job(JOB_KEY.newBuilder()))
- .setToken(TOKEN.toString())
- .setTimestampMs(timestampMs)
- .setUser(USER));
-
- ILock actual = lockManager.acquireLock(JOB_KEY, USER);
- assertEquals(expected, actual);
- }
-
- @Test
- public void testAcquireLockInProgress() throws Exception {
- control.replay();
-
- expectLockException(JOB_KEY);
- lockManager.acquireLock(JOB_KEY, USER);
- lockManager.acquireLock(JOB_KEY, USER);
- }
-
- @Test
- public void testReleaseLock() throws Exception {
- control.replay();
-
- lockManager.acquireLock(JOB_KEY, USER);
- lockManager.releaseLock(JOB_KEY);
-
- // Should be able to lock again after releasing.
- lockManager.acquireLock(JOB_KEY, USER);
- }
-
- private void expectLockException(IJobKey key) {
- expectedException.expect(LockException.class);
- expectedException.expectMessage(JobKeys.canonicalString(key));
- }
-}