You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2015/04/25 00:25:48 UTC
aurora git commit: Simplify storage bindings,
remove shims that are now mostly getting in the way.
Repository: aurora
Updated Branches:
refs/heads/master 7f3e4e3c5 -> 65df91bfd
Simplify storage bindings, remove shims that are now mostly getting in the way.
Reviewed at https://reviews.apache.org/r/33530/
Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/65df91bf
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/65df91bf
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/65df91bf
Branch: refs/heads/master
Commit: 65df91bfd7e3a2ada38a5fe4d620e6373d0f59bf
Parents: 7f3e4e3
Author: Bill Farner <wf...@apache.org>
Authored: Fri Apr 24 15:15:48 2015 -0700
Committer: Bill Farner <wf...@apache.org>
Committed: Fri Apr 24 15:15:48 2015 -0700
----------------------------------------------------------------------
.../aurora/benchmark/SchedulingBenchmarks.java | 4 +-
.../aurora/benchmark/ThriftApiBenchmarks.java | 5 +-
.../aurora/benchmark/UpdateStoreBenchmarks.java | 5 +-
.../aurora/scheduler/app/SchedulerMain.java | 12 +-
.../storage/backup/TemporaryStorage.java | 4 +-
.../aurora/scheduler/storage/db/DbModule.java | 7 +
.../aurora/scheduler/storage/db/DbStorage.java | 17 +-
.../aurora/scheduler/storage/db/DbUtil.java | 42 +++
.../scheduler/storage/db/MigrationModule.java | 60 -----
.../scheduler/storage/log/LogStorage.java | 30 +--
.../storage/mem/InMemStoresModule.java | 52 ++++
.../scheduler/storage/mem/MemStorage.java | 163 ------------
.../scheduler/storage/mem/MemStorageModule.java | 75 ------
.../scheduler/app/local/LocalSchedulerMain.java | 3 +-
.../scheduler/async/TaskSchedulerImplTest.java | 4 +-
.../scheduler/async/TaskSchedulerTest.java | 4 +-
.../cron/quartz/AuroraCronJobTest.java | 8 +-
.../aurora/scheduler/cron/quartz/CronIT.java | 4 +-
.../cron/quartz/CronJobManagerImplTest.java | 4 +-
.../scheduler/state/LockManagerImplTest.java | 4 +-
.../scheduler/state/StateManagerImplTest.java | 4 +-
.../scheduler/stats/ResourceCounterTest.java | 4 +-
.../scheduler/storage/StorageBackfillTest.java | 4 +-
.../scheduler/storage/db/DbStorageTest.java | 4 +
.../aurora/scheduler/storage/db/DbUtil.java | 34 ---
.../scheduler/storage/mem/MemStorageTest.java | 257 ------------------
.../scheduler/storage/mem/MemTaskStoreTest.java | 12 +-
.../storage/mem/StorageTransactionTest.java | 259 +++++++++++++++++++
.../aurora/scheduler/updater/JobUpdaterIT.java | 6 +-
29 files changed, 413 insertions(+), 678 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aurora/blob/65df91bf/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
index 12f42a9..372addc 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
@@ -58,10 +58,10 @@ import org.apache.aurora.scheduler.mesos.Driver;
import org.apache.aurora.scheduler.mesos.ExecutorSettings;
import org.apache.aurora.scheduler.state.StateModule;
import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.db.DbUtil;
import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.mem.MemStorage;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
@@ -104,7 +104,7 @@ public class SchedulingBenchmarks {
*/
@Setup(Level.Trial)
public void setUpBenchmark() {
- storage = MemStorage.newEmptyStorage();
+ storage = DbUtil.createStorage();
eventBus = new EventBus();
final FakeClock clock = new FakeClock();
clock.setNowMillis(System.currentTimeMillis());
http://git-wip-us.apache.org/repos/asf/aurora/blob/65df91bf/src/jmh/java/org/apache/aurora/benchmark/ThriftApiBenchmarks.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/ThriftApiBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/ThriftApiBenchmarks.java
index d20b088..6ec0e14 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/ThriftApiBenchmarks.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/ThriftApiBenchmarks.java
@@ -35,8 +35,6 @@ import org.apache.aurora.scheduler.state.LockManager;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.db.DbModule;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.mem.MemStorage;
-import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
import org.apache.aurora.scheduler.thrift.ThriftModule;
import org.apache.thrift.TException;
import org.openjdk.jmh.annotations.Benchmark;
@@ -93,8 +91,7 @@ public class ThriftApiBenchmarks {
bind(LockManager.class).toInstance(createThrowingFake(LockManager.class));
}
},
- new MemStorageModule(Bindings.KeyFactory.PLAIN),
- new DbModule(Bindings.annotatedKeyFactory(MemStorage.Delegated.class)),
+ DbModule.testModule(Bindings.KeyFactory.PLAIN),
new ThriftModule.ReadOnly());
api = injector.getInstance(ReadOnlyScheduler.Iface.class);
http://git-wip-us.apache.org/repos/asf/aurora/blob/65df91bf/src/jmh/java/org/apache/aurora/benchmark/UpdateStoreBenchmarks.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/UpdateStoreBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/UpdateStoreBenchmarks.java
index c7456aa..a4abbd8 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/UpdateStoreBenchmarks.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/UpdateStoreBenchmarks.java
@@ -50,8 +50,6 @@ import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
import org.apache.aurora.scheduler.storage.entities.ILock;
-import org.apache.aurora.scheduler.storage.mem.MemStorage;
-import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
import org.apache.thrift.TException;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
@@ -91,8 +89,7 @@ public class UpdateStoreBenchmarks {
bind(Clock.class).toInstance(Clock.SYSTEM_CLOCK);
}
},
- new MemStorageModule(Bindings.KeyFactory.PLAIN),
- new DbModule(Bindings.annotatedKeyFactory(MemStorage.Delegated.class)));
+ DbModule.testModule(Bindings.KeyFactory.PLAIN));
storage = injector.getInstance(Storage.class);
storage.prepare();
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/65df91bf/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
index 087abe5..3d19831 100644
--- a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
+++ b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
@@ -55,14 +55,11 @@ import org.apache.aurora.scheduler.log.mesos.MesosLogStreamModule;
import org.apache.aurora.scheduler.mesos.CommandLineDriverSettingsModule;
import org.apache.aurora.scheduler.mesos.ExecutorSettings;
import org.apache.aurora.scheduler.mesos.LibMesosLoadingModule;
+import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.backup.BackupModule;
import org.apache.aurora.scheduler.storage.db.DbModule;
-import org.apache.aurora.scheduler.storage.db.MigrationModule;
-import org.apache.aurora.scheduler.storage.log.LogStorage;
import org.apache.aurora.scheduler.storage.log.LogStorageModule;
import org.apache.aurora.scheduler.storage.log.SnapshotStoreImpl;
-import org.apache.aurora.scheduler.storage.mem.MemStorage.Delegated;
-import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
import static com.twitter.common.logging.RootLogConfig.Configuration;
@@ -157,13 +154,8 @@ public class SchedulerMain extends AbstractApplication {
.add(new AppModule(clusterName, serverSetPath, zkClientConfig, statsURLPrefix))
.addAll(getExtraModules())
.add(getPersistentStorageModule())
- .add(new MemStorageModule(Bindings.annotatedKeyFactory(LogStorage.WriteBehind.class)))
.add(new CronModule())
- .add(new DbModule(Bindings.annotatedKeyFactory(Delegated.class)))
- .add(new MigrationModule(
- Bindings.annotatedKeyFactory(LogStorage.WriteBehind.class),
- Bindings.annotatedKeyFactory(Delegated.class))
- )
+ .add(new DbModule(Bindings.annotatedKeyFactory(Storage.Volatile.class)))
.build();
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/65df91bf/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
index 586b53b..23c0c1e 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
@@ -28,9 +28,9 @@ import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
import org.apache.aurora.scheduler.storage.Storage.MutateWork;
import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
import org.apache.aurora.scheduler.storage.Storage.Work;
+import org.apache.aurora.scheduler.storage.db.DbUtil;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.log.SnapshotStoreImpl;
-import org.apache.aurora.scheduler.storage.mem.MemStorage;
/**
* A short-lived in-memory storage system that can be converted to a {@link Snapshot}.
@@ -66,7 +66,7 @@ interface TemporaryStorage {
class TemporaryStorageFactory implements Function<Snapshot, TemporaryStorage> {
@Override
public TemporaryStorage apply(Snapshot snapshot) {
- final Storage storage = MemStorage.newEmptyStorage();
+ final Storage storage = DbUtil.createStorage();
FakeClock clock = new FakeClock();
clock.setNowMillis(snapshot.getTimestamp());
final SnapshotStore<Snapshot> snapshotStore = new SnapshotStoreImpl(clock, storage);
http://git-wip-us.apache.org/repos/asf/aurora/blob/65df91bf/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java
index 439d8cc..d6ca430 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java
@@ -25,12 +25,15 @@ import com.google.inject.PrivateModule;
import com.twitter.common.inject.Bindings;
import org.apache.aurora.scheduler.storage.AttributeStore;
+import org.apache.aurora.scheduler.storage.CronJobStore;
import org.apache.aurora.scheduler.storage.JobUpdateStore;
import org.apache.aurora.scheduler.storage.LockStore;
import org.apache.aurora.scheduler.storage.QuotaStore;
import org.apache.aurora.scheduler.storage.SchedulerStore;
import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.TaskStore;
import org.apache.aurora.scheduler.storage.db.typehandlers.TypeHandlers;
+import org.apache.aurora.scheduler.storage.mem.InMemStoresModule;
import org.apache.ibatis.session.AutoMappingBehavior;
import org.apache.ibatis.session.SqlSessionFactory;
import org.apache.ibatis.transaction.jdbc.JdbcTransactionFactory;
@@ -130,6 +133,10 @@ public class DbModule extends PrivateModule {
addTypeHandlersClasses(TypeHandlers.getAll());
}
});
+ install(new InMemStoresModule(keyFactory));
+ expose(keyFactory.create(CronJobStore.Mutable.class));
+ expose(keyFactory.create(TaskStore.Mutable.class));
+
bindStore(AttributeStore.Mutable.class, DbAttributeStore.class);
bindStore(LockStore.Mutable.class, DbLockStore.class);
bindStore(QuotaStore.Mutable.class, DbQuotaStore.class);
http://git-wip-us.apache.org/repos/asf/aurora/blob/65df91bf/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
index 49db52d..fbdbb05 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
@@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.io.CharStreams;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.inject.Inject;
+import com.twitter.common.inject.TimedInterceptor.Timed;
import org.apache.aurora.gen.JobUpdateAction;
import org.apache.aurora.gen.JobUpdateStatus;
@@ -49,11 +50,6 @@ import static org.apache.ibatis.mapping.SqlCommandType.UPDATE;
* A storage implementation backed by a relational database.
* <p>
* Delegates read and write concurrency semantics to the underlying database.
- * This class is currently only partially implemented, with the underlying
- * {@link MutableStoreProvider} only providing some, but not all, store implementations. It is
- * designed to be a long term replacement for
- * {@link org.apache.aurora.scheduler.storage.mem.MemStorage}.
- * </p>
*/
class DbStorage extends AbstractIdleService implements Storage {
@@ -65,6 +61,8 @@ class DbStorage extends AbstractIdleService implements Storage {
DbStorage(
SqlSessionFactory sessionFactory,
EnumValueMapper enumValueMapper,
+ final CronJobStore.Mutable cronJobStore,
+ final TaskStore.Mutable taskStore,
final SchedulerStore.Mutable schedulerStore,
final AttributeStore.Mutable attributeStore,
final LockStore.Mutable lockStore,
@@ -86,17 +84,17 @@ class DbStorage extends AbstractIdleService implements Storage {
@Override
public CronJobStore.Mutable getCronJobStore() {
- throw new UnsupportedOperationException("Not yet implemented.");
+ return cronJobStore;
}
@Override
public TaskStore getTaskStore() {
- throw new UnsupportedOperationException("Not yet implemented.");
+ return taskStore;
}
@Override
public TaskStore.Mutable getUnsafeTaskStore() {
- throw new UnsupportedOperationException("Not yet implemented.");
+ return taskStore;
}
@Override
@@ -121,6 +119,7 @@ class DbStorage extends AbstractIdleService implements Storage {
};
}
+ @Timed("db_storage_read_operation")
@Override
@Transactional
public <T, E extends Exception> T read(Work<T, E> work) throws StorageException, E {
@@ -131,6 +130,7 @@ class DbStorage extends AbstractIdleService implements Storage {
}
}
+ @Timed("db_storage_write_operation")
@Override
@Transactional
public <T, E extends Exception> T write(MutateWork<T, E> work) throws StorageException, E {
@@ -148,6 +148,7 @@ class DbStorage extends AbstractIdleService implements Storage {
// TODO(wfarner): Including @Transactional here seems to render the UNDO_LOG changes useless,
// resulting in no performance gain. Figure out why.
+ @Timed("db_storage_bulk_load_operation")
@Override
public <E extends Exception> void bulkLoad(MutateWork.NoResult<E> work)
throws StorageException, E {
http://git-wip-us.apache.org/repos/asf/aurora/blob/65df91bf/src/main/java/org/apache/aurora/scheduler/storage/db/DbUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbUtil.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbUtil.java
new file mode 100644
index 0000000..eae1770
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbUtil.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.db;
+
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.twitter.common.inject.Bindings;
+
+import org.apache.aurora.scheduler.storage.Storage;
+
+/**
+ * Utility class for creating ad-hoc storage instances.
+ */
+public final class DbUtil {
+
+ private DbUtil() {
+ // Utility class.
+ }
+
+ /**
+ * Creates a new, empty storage system.
+ *
+ * @return A new storage instance.
+ */
+ public static Storage createStorage() {
+ Injector injector = Guice.createInjector(DbModule.testModule(Bindings.KeyFactory.PLAIN));
+ Storage storage = injector.getInstance(Storage.class);
+ storage.prepare();
+ return storage;
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/65df91bf/src/main/java/org/apache/aurora/scheduler/storage/db/MigrationModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/MigrationModule.java b/src/main/java/org/apache/aurora/scheduler/storage/db/MigrationModule.java
deleted file mode 100644
index a821de3..0000000
--- a/src/main/java/org/apache/aurora/scheduler/storage/db/MigrationModule.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage.db;
-
-import com.google.inject.AbstractModule;
-import com.twitter.common.inject.Bindings.KeyFactory;
-
-import org.apache.aurora.scheduler.storage.AttributeStore;
-import org.apache.aurora.scheduler.storage.JobUpdateStore;
-import org.apache.aurora.scheduler.storage.LockStore;
-import org.apache.aurora.scheduler.storage.QuotaStore;
-import org.apache.aurora.scheduler.storage.SchedulerStore;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Temporary module to wire the two partial storage implementations together as we
- * migrate from MemStorage to DbStorage. This accepts two {@link KeyFactory}s,
- * one that references the binding scope for the feature-complete write-behind
- * volatile storage system, and one for the binding scope of the new and partially-implemented
- * storage system.
- * <p>
- * Once the new storage system is feature-complete, this module will be deleted
- * as the binding bridge is no longer necessary.
- * </p>
- */
-public class MigrationModule extends AbstractModule {
-
- private final KeyFactory toFactory;
- private final KeyFactory fromFactory;
-
- public MigrationModule(KeyFactory from, KeyFactory to) {
- this.fromFactory = requireNonNull(from);
- this.toFactory = requireNonNull(to);
- }
-
- private <T> void link(Class<T> clazz) {
- bind(fromFactory.create(clazz)).to(toFactory.create(clazz));
- }
-
- @Override
- protected void configure() {
- link(AttributeStore.Mutable.class);
- link(LockStore.Mutable.class);
- link(QuotaStore.Mutable.class);
- link(SchedulerStore.Mutable.class);
- link(JobUpdateStore.Mutable.class);
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/65df91bf/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
index bb59cdf..c58f531 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
@@ -14,10 +14,6 @@
package org.apache.aurora.scheduler.storage.log;
import java.io.IOException;
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
@@ -28,7 +24,6 @@ import java.util.logging.Level;
import java.util.logging.Logger;
import javax.inject.Inject;
-import javax.inject.Qualifier;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
@@ -212,15 +207,6 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
new SlidingStats("log_storage_write_lock_wait", "ns");
private final AtomicLong droppedUpdateEvents = Stats.exportLong("dropped_update_events");
- /**
- * Identifies a local storage layer that is written to only after first ensuring the write
- * operation is persisted in the log.
- */
- @Retention(RetentionPolicy.RUNTIME)
- @Target({ ElementType.PARAMETER, ElementType.METHOD })
- @Qualifier
- public @interface WriteBehind { }
-
private final Map<LogEntry._Fields, Closure<LogEntry>> logEntryReplayActions;
private final Map<Op._Fields, Closure<Op>> transactionReplayActions;
@@ -230,14 +216,14 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
ShutdownRegistry shutdownRegistry,
Settings settings,
SnapshotStore<Snapshot> snapshotStore,
- @WriteBehind Storage storage,
- @WriteBehind SchedulerStore.Mutable schedulerStore,
- @WriteBehind CronJobStore.Mutable jobStore,
- @WriteBehind TaskStore.Mutable taskStore,
- @WriteBehind LockStore.Mutable lockStore,
- @WriteBehind QuotaStore.Mutable quotaStore,
- @WriteBehind AttributeStore.Mutable attributeStore,
- @WriteBehind JobUpdateStore.Mutable jobUpdateStore,
+ @Volatile Storage storage,
+ @Volatile SchedulerStore.Mutable schedulerStore,
+ @Volatile CronJobStore.Mutable jobStore,
+ @Volatile TaskStore.Mutable taskStore,
+ @Volatile LockStore.Mutable lockStore,
+ @Volatile QuotaStore.Mutable quotaStore,
+ @Volatile AttributeStore.Mutable attributeStore,
+ @Volatile JobUpdateStore.Mutable jobUpdateStore,
EventSink eventSink,
ReentrantLock writeLock) {
http://git-wip-us.apache.org/repos/asf/aurora/blob/65df91bf/src/main/java/org/apache/aurora/scheduler/storage/mem/InMemStoresModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/InMemStoresModule.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/InMemStoresModule.java
new file mode 100644
index 0000000..88fd006
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/InMemStoresModule.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.mem;
+
+import javax.inject.Singleton;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Key;
+import com.twitter.common.inject.Bindings.KeyFactory;
+
+import org.apache.aurora.scheduler.storage.CronJobStore;
+import org.apache.aurora.scheduler.storage.TaskStore;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Binding module for an in-memory storage system.
+ * <p>
+ * NOTE: These stores are being phased out in favor of database-backed stores.
+ */
+public final class InMemStoresModule extends AbstractModule {
+
+ private final KeyFactory keyFactory;
+
+ public InMemStoresModule(KeyFactory keyFactory) {
+ this.keyFactory = requireNonNull(keyFactory);
+ }
+
+ private <T> void bindStore(Class<T> binding, Class<? extends T> impl) {
+ bind(binding).to(impl);
+ bind(impl).in(Singleton.class);
+ Key<T> key = keyFactory.create(binding);
+ bind(key).to(impl);
+ }
+
+ @Override
+ protected void configure() {
+ bindStore(CronJobStore.Mutable.class, MemJobStore.class);
+ bindStore(TaskStore.Mutable.class, MemTaskStore.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/65df91bf/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorage.java
deleted file mode 100644
index c5ccccd..0000000
--- a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorage.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage.mem;
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-import javax.inject.Inject;
-import javax.inject.Qualifier;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-import com.google.inject.Key;
-import com.twitter.common.inject.Bindings;
-import com.twitter.common.inject.TimedInterceptor.Timed;
-
-import org.apache.aurora.scheduler.storage.AttributeStore;
-import org.apache.aurora.scheduler.storage.CronJobStore;
-import org.apache.aurora.scheduler.storage.JobUpdateStore;
-import org.apache.aurora.scheduler.storage.LockStore;
-import org.apache.aurora.scheduler.storage.QuotaStore;
-import org.apache.aurora.scheduler.storage.SchedulerStore;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.TaskStore;
-import org.apache.aurora.scheduler.storage.db.DbModule;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * A storage implementation comprised of individual in-memory store implementations.
- */
-public class MemStorage implements Storage {
- private final MutableStoreProvider storeProvider;
- private final Storage delegatedStore;
-
- /**
- * Identifies a storage layer to be delegated to instead of mem storage.
- */
- @Retention(RetentionPolicy.RUNTIME)
- @Target({ ElementType.PARAMETER, ElementType.METHOD })
- @Qualifier
- public @interface Delegated { }
-
- @VisibleForTesting
- static final String THREADS_WAITING_GAUGE = "storage_lock_threads_waiting";
-
- @Inject
- MemStorage(
- @Delegated final SchedulerStore.Mutable schedulerStore,
- final CronJobStore.Mutable jobStore,
- final TaskStore.Mutable taskStore,
- @Delegated final LockStore.Mutable lockStore,
- @Delegated final Storage delegated,
- @Delegated final QuotaStore.Mutable quotaStore,
- @Delegated final AttributeStore.Mutable attributeStore,
- @Delegated final JobUpdateStore.Mutable updateStore) {
-
- this.delegatedStore = requireNonNull(delegated);
- storeProvider = new MutableStoreProvider() {
- @Override
- public SchedulerStore.Mutable getSchedulerStore() {
- return schedulerStore;
- }
-
- @Override
- public CronJobStore.Mutable getCronJobStore() {
- return jobStore;
- }
-
- @Override
- public TaskStore getTaskStore() {
- return taskStore;
- }
-
- @Override
- public TaskStore.Mutable getUnsafeTaskStore() {
- return taskStore;
- }
-
- @Override
- public LockStore.Mutable getLockStore() {
- return lockStore;
- }
-
- @Override
- public QuotaStore.Mutable getQuotaStore() {
- return quotaStore;
- }
-
- @Override
- public AttributeStore.Mutable getAttributeStore() {
- return attributeStore;
- }
-
- @Override
- public JobUpdateStore.Mutable getJobUpdateStore() {
- return updateStore;
- }
- };
- }
-
- /**
- * Creates a new empty in-memory storage for use in testing.
- */
- @VisibleForTesting
- public static Storage newEmptyStorage() {
- Injector injector = Guice.createInjector(
- DbModule.testModule(Bindings.annotatedKeyFactory(Delegated.class)),
- new MemStorageModule(Bindings.annotatedKeyFactory(Volatile.class)));
- Storage storage = injector.getInstance(Key.get(Storage.class, Volatile.class));
- storage.prepare();
- return storage;
- }
-
- @Timed("mem_storage_read_operation")
- @Override
- public <T, E extends Exception> T read(final Work<T, E> work) throws StorageException, E {
- return delegatedStore.read(new Work<T, E>() {
- @Override
- public T apply(StoreProvider provider) throws E {
- return work.apply(storeProvider);
- }
- });
- }
-
- @Timed("mem_storage_write_operation")
- @Override
- public <T, E extends Exception> T write(final MutateWork<T, E> work) throws StorageException, E {
- return delegatedStore.write(new MutateWork<T, E>() {
- @Override
- public T apply(MutableStoreProvider provider) throws E {
- return work.apply(storeProvider);
- }
- });
- }
-
- @Timed("mem_storage_bulk_load_operation")
- @Override
- public <E extends Exception> void bulkLoad(MutateWork.NoResult<E> work)
- throws StorageException, E {
-
- delegatedStore.bulkLoad(work);
- }
-
- @Override
- public void prepare() throws StorageException {
- delegatedStore.prepare();
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/65df91bf/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorageModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorageModule.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorageModule.java
deleted file mode 100644
index 9068aa4..0000000
--- a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorageModule.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage.mem;
-
-import javax.inject.Singleton;
-
-import com.google.inject.Key;
-import com.google.inject.PrivateModule;
-import com.twitter.common.inject.Bindings.KeyFactory;
-
-import org.apache.aurora.scheduler.storage.CronJobStore;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.Storage.Volatile;
-import org.apache.aurora.scheduler.storage.TaskStore;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Binding module for an in-memory storage system.
- * <p>
- * Exposes bindings for storage components:
- * <ul>
- * <li>{@link org.apache.aurora.scheduler.storage.Storage}</li>
- * <li>Keyed with keys provided by the provided{@code keyFactory}:</li>
- * <ul>
- * <li>{@link org.apache.aurora.scheduler.storage.SchedulerStore}</li>
- * <li>{@link org.apache.aurora.scheduler.storage.CronJobStore}</li>
- * <li>{@link org.apache.aurora.scheduler.storage.TaskStore}</li>
- * <li>{@link org.apache.aurora.scheduler.storage.LockStore}</li>
- * <li>{@link org.apache.aurora.scheduler.storage.QuotaStore}</li>
- * <li>{@link org.apache.aurora.scheduler.storage.AttributeStore}</li>
- * </ul>
- * </ul>
- */
-public final class MemStorageModule extends PrivateModule {
-
- private final KeyFactory keyFactory;
-
- public MemStorageModule(KeyFactory keyFactory) {
- this.keyFactory = requireNonNull(keyFactory);
- }
-
- private <T> void bindStore(Class<T> binding, Class<? extends T> impl) {
- bind(binding).to(impl);
- bind(impl).in(Singleton.class);
- Key<T> key = keyFactory.create(binding);
- bind(key).to(impl);
- expose(key);
- }
-
- @Override
- protected void configure() {
- Key<Storage> storageKey = keyFactory.create(Storage.class);
- bind(storageKey).to(MemStorage.class);
- expose(storageKey);
- Key<Storage> exposedMemStorageKey = Key.get(Storage.class, Volatile.class);
- bind(exposedMemStorageKey).to(MemStorage.class);
- expose(exposedMemStorageKey);
- bind(MemStorage.class).in(Singleton.class);
-
- bindStore(CronJobStore.Mutable.class, MemJobStore.class);
- bindStore(TaskStore.Mutable.class, MemTaskStore.class);
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/65df91bf/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java b/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java
index 4a8d404..a91c4a2 100644
--- a/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java
+++ b/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java
@@ -35,7 +35,6 @@ import org.apache.aurora.scheduler.mesos.DriverSettings;
import org.apache.aurora.scheduler.storage.DistributedSnapshotStore;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
-import org.apache.aurora.scheduler.storage.log.LogStorage;
import org.apache.mesos.Protos;
import org.apache.mesos.SchedulerDriver;
import org.apache.shiro.io.ResourceUtils;
@@ -58,7 +57,7 @@ public class LocalSchedulerMain extends SchedulerMain {
return new AbstractModule() {
@Override
protected void configure() {
- bind(Storage.class).to(Key.get(Storage.class, LogStorage.WriteBehind.class));
+ bind(Storage.class).to(Key.get(Storage.class, Storage.Volatile.class));
bind(NonVolatileStorage.class).to(FakeNonVolatileStorage.class);
bind(DistributedSnapshotStore.class).toInstance(new DistributedSnapshotStore() {
@Override
http://git-wip-us.apache.org/repos/asf/aurora/blob/65df91bf/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
index bd1aaa1..53b21cb 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
@@ -48,9 +48,9 @@ import org.apache.aurora.scheduler.state.TaskAssigner.Assignment.Result;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
import org.apache.aurora.scheduler.storage.Storage.MutateWork;
+import org.apache.aurora.scheduler.storage.db.DbUtil;
import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.mem.MemStorage;
import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
import org.apache.mesos.Protos.TaskInfo;
import org.easymock.Capture;
@@ -243,7 +243,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
// Ensures that tasks in THROTTLED state are not considered part of the active job state passed
// to the assigner function.
- Storage memStorage = MemStorage.newEmptyStorage();
+ Storage memStorage = DbUtil.createStorage();
Injector injector = getInjector(memStorage);
scheduler = injector.getInstance(TaskScheduler.class);
http://git-wip-us.apache.org/repos/asf/aurora/blob/65df91bf/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
index 9035484..f17c434 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
@@ -62,11 +62,11 @@ import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
import org.apache.aurora.scheduler.storage.Storage.MutateWork;
import org.apache.aurora.scheduler.storage.Storage.StorageException;
import org.apache.aurora.scheduler.storage.TaskStore;
+import org.apache.aurora.scheduler.storage.db.DbUtil;
import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-import org.apache.aurora.scheduler.storage.mem.MemStorage;
import org.apache.mesos.Protos.OfferID;
import org.apache.mesos.Protos.SlaveID;
import org.apache.mesos.Protos.TaskID;
@@ -130,7 +130,7 @@ public class TaskSchedulerTest extends EasyMockTest {
@Before
public void setUp() {
- storage = MemStorage.newEmptyStorage();
+ storage = DbUtil.createStorage();
maintenance = createMock(MaintenanceController.class);
stateManager = createMock(StateManager.class);
assigner = createMock(TaskAssigner.class);
http://git-wip-us.apache.org/repos/asf/aurora/blob/65df91bf/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java b/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java
index 91cf5ed..831803f 100644
--- a/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java
@@ -27,10 +27,10 @@ import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.gen.ScheduledTask;
import org.apache.aurora.scheduler.state.StateManager;
import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.db.DbUtil;
import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-import org.apache.aurora.scheduler.storage.mem.MemStorage;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.junit.Before;
@@ -54,7 +54,7 @@ public class AuroraCronJobTest extends EasyMockTest {
@Before
public void setUp() {
- storage = MemStorage.newEmptyStorage();
+ storage = DbUtil.createStorage();
stateManager = createMock(StateManager.class);
backoffHelper = createMock(BackoffHelper.class);
@@ -94,11 +94,11 @@ public class AuroraCronJobTest extends EasyMockTest {
control.replay();
populateStorage(CronCollisionPolicy.CANCEL_NEW);
auroraCronJob.doExecute(QuartzTestUtil.AURORA_JOB_KEY);
- storage = MemStorage.newEmptyStorage();
+ storage = DbUtil.createStorage();
populateStorage(CronCollisionPolicy.KILL_EXISTING);
auroraCronJob.doExecute(QuartzTestUtil.AURORA_JOB_KEY);
- storage = MemStorage.newEmptyStorage();
+ storage = DbUtil.createStorage();
populateStorage(CronCollisionPolicy.RUN_OVERLAP);
auroraCronJob.doExecute(QuartzTestUtil.AURORA_JOB_KEY);
http://git-wip-us.apache.org/repos/asf/aurora/blob/65df91bf/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java b/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java
index d4f8b5b..863e9c9 100644
--- a/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java
@@ -33,9 +33,9 @@ import org.apache.aurora.scheduler.cron.CrontabEntry;
import org.apache.aurora.scheduler.cron.SanitizedCronJob;
import org.apache.aurora.scheduler.state.StateManager;
import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.db.DbUtil;
import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
-import org.apache.aurora.scheduler.storage.mem.MemStorage;
import org.easymock.IAnswer;
import org.junit.Before;
import org.junit.Test;
@@ -83,7 +83,7 @@ public class CronIT extends EasyMockTest {
@Before
public void setUp() throws Exception {
stateManager = createMock(StateManager.class);
- storage = MemStorage.newEmptyStorage();
+ storage = DbUtil.createStorage();
auroraCronJob = createMock(AuroraCronJob.class);
injector = Guice.createInjector(
http://git-wip-us.apache.org/repos/asf/aurora/blob/65df91bf/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImplTest.java
index d313326..abb915d 100644
--- a/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImplTest.java
@@ -29,9 +29,9 @@ import org.apache.aurora.scheduler.cron.CronJobManager;
import org.apache.aurora.scheduler.cron.CrontabEntry;
import org.apache.aurora.scheduler.cron.SanitizedCronJob;
import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.db.DbUtil;
import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
-import org.apache.aurora.scheduler.storage.mem.MemStorage;
import org.easymock.EasyMock;
import org.junit.Before;
import org.junit.Test;
@@ -59,7 +59,7 @@ public class CronJobManagerImplTest extends EasyMockTest {
@Before
public void setUp() {
- storage = MemStorage.newEmptyStorage();
+ storage = DbUtil.createStorage();
scheduler = createMock(Scheduler.class);
cronJobManager = new CronJobManagerImpl(storage, scheduler, TimeZone.getTimeZone("GMT"));
http://git-wip-us.apache.org/repos/asf/aurora/blob/65df91bf/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java
index 2f14205..9c9cf1b 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java
@@ -30,10 +30,10 @@ import org.apache.aurora.gen.Lock;
import org.apache.aurora.gen.LockKey;
import org.apache.aurora.scheduler.base.JobKeys;
import org.apache.aurora.scheduler.state.LockManager.LockException;
+import org.apache.aurora.scheduler.storage.db.DbUtil;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.ILock;
import org.apache.aurora.scheduler.storage.entities.ILockKey;
-import org.apache.aurora.scheduler.storage.mem.MemStorage;
import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
@@ -71,7 +71,7 @@ public class LockManagerImplTest extends EasyMockTest {
tokenGenerator = createMock(UUIDGenerator.class);
expect(tokenGenerator.createNew()).andReturn(TOKEN).anyTimes();
- lockManager = new LockManagerImpl(MemStorage.newEmptyStorage(), clock, tokenGenerator);
+ lockManager = new LockManagerImpl(DbUtil.createStorage(), clock, tokenGenerator);
}
@Test
http://git-wip-us.apache.org/repos/asf/aurora/blob/65df91bf/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
index c7fd3e5..afb7db8 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
@@ -47,10 +47,10 @@ import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
import org.apache.aurora.scheduler.mesos.Driver;
import org.apache.aurora.scheduler.storage.AttributeStore;
import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.db.DbUtil;
import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-import org.apache.aurora.scheduler.storage.mem.MemStorage;
import org.apache.mesos.Protos.SlaveID;
import org.easymock.Capture;
import org.easymock.EasyMock;
@@ -103,7 +103,7 @@ public class StateManagerImplTest extends EasyMockTest {
eventSink = createMock(EventSink.class);
rescheduleCalculator = createMock(RescheduleCalculator.class);
// TODO(William Farner): Use a mocked storage.
- storage = MemStorage.newEmptyStorage();
+ storage = DbUtil.createStorage();
stateManager = new StateManagerImpl(
clock,
driver,
http://git-wip-us.apache.org/repos/asf/aurora/blob/65df91bf/src/test/java/org/apache/aurora/scheduler/stats/ResourceCounterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/stats/ResourceCounterTest.java b/src/test/java/org/apache/aurora/scheduler/stats/ResourceCounterTest.java
index 82e9c76..7b12d64 100644
--- a/src/test/java/org/apache/aurora/scheduler/stats/ResourceCounterTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/stats/ResourceCounterTest.java
@@ -33,11 +33,11 @@ import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.TaskTestUtil;
import org.apache.aurora.scheduler.configuration.ConfigurationManager;
import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.db.DbUtil;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-import org.apache.aurora.scheduler.storage.mem.MemStorage;
import org.junit.Before;
import org.junit.Test;
@@ -72,7 +72,7 @@ public class ResourceCounterTest {
@Before
public void setUp() throws Exception {
- storage = MemStorage.newEmptyStorage();
+ storage = DbUtil.createStorage();
resourceCounter = new ResourceCounter(storage);
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/65df91bf/src/test/java/org/apache/aurora/scheduler/storage/StorageBackfillTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/StorageBackfillTest.java b/src/test/java/org/apache/aurora/scheduler/storage/StorageBackfillTest.java
index 254b231..5ad0de7 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/StorageBackfillTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/StorageBackfillTest.java
@@ -27,9 +27,9 @@ import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.TaskTestUtil;
import org.apache.aurora.scheduler.configuration.ConfigurationManager;
import org.apache.aurora.scheduler.configuration.SanitizedConfiguration;
+import org.apache.aurora.scheduler.storage.db.DbUtil;
import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.mem.MemStorage;
import org.junit.Before;
import org.junit.Test;
@@ -48,7 +48,7 @@ public class StorageBackfillTest {
@Before
public void setUp() {
- storage = MemStorage.newEmptyStorage();
+ storage = DbUtil.createStorage();
}
@Test
http://git-wip-us.apache.org/repos/asf/aurora/blob/65df91bf/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java
index 743f5ba..b24fef9 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java
@@ -16,6 +16,7 @@ package org.apache.aurora.scheduler.storage.db;
import com.twitter.common.testing.easymock.EasyMockTest;
import org.apache.aurora.scheduler.storage.AttributeStore;
+import org.apache.aurora.scheduler.storage.CronJobStore;
import org.apache.aurora.scheduler.storage.JobUpdateStore;
import org.apache.aurora.scheduler.storage.LockStore;
import org.apache.aurora.scheduler.storage.QuotaStore;
@@ -25,6 +26,7 @@ import org.apache.aurora.scheduler.storage.Storage.MutateWork;
import org.apache.aurora.scheduler.storage.Storage.StorageException;
import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
import org.apache.aurora.scheduler.storage.Storage.Work;
+import org.apache.aurora.scheduler.storage.TaskStore;
import org.apache.ibatis.exceptions.PersistenceException;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
@@ -56,6 +58,8 @@ public class DbStorageTest extends EasyMockTest {
storage = new DbStorage(
sessionFactory,
enumMapper,
+ createMock(CronJobStore.Mutable.class),
+ createMock(TaskStore.Mutable.class),
createMock(SchedulerStore.Mutable.class),
createMock(AttributeStore.Mutable.class),
createMock(LockStore.Mutable.class),
http://git-wip-us.apache.org/repos/asf/aurora/blob/65df91bf/src/test/java/org/apache/aurora/scheduler/storage/db/DbUtil.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/db/DbUtil.java b/src/test/java/org/apache/aurora/scheduler/storage/db/DbUtil.java
deleted file mode 100644
index 1eaf3fe..0000000
--- a/src/test/java/org/apache/aurora/scheduler/storage/db/DbUtil.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage.db;
-
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-import com.twitter.common.inject.Bindings;
-
-import org.apache.aurora.scheduler.storage.Storage;
-
-final class DbUtil {
-
- private DbUtil() {
- // Utility class.
- }
-
- static Storage createStorage() {
- Injector injector = Guice.createInjector(DbModule.testModule(Bindings.KeyFactory.PLAIN));
- Storage storage = injector.getInstance(Storage.class);
- storage.prepare();
- return storage;
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/65df91bf/src/test/java/org/apache/aurora/scheduler/storage/mem/MemStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/mem/MemStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/mem/MemStorageTest.java
deleted file mode 100644
index 30e2328..0000000
--- a/src/test/java/org/apache/aurora/scheduler/storage/mem/MemStorageTest.java
+++ /dev/null
@@ -1,257 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage.mem;
-
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.testing.TearDown;
-import com.google.common.testing.junit4.TearDownTestCase;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.util.concurrent.ExecutorServiceShutdown;
-
-import org.apache.aurora.gen.AssignedTask;
-import org.apache.aurora.gen.Identity;
-import org.apache.aurora.gen.ResourceAggregate;
-import org.apache.aurora.gen.ScheduledTask;
-import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork;
-import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.Work;
-import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-/**
- * TODO(William Farner): Wire a mechanism to allow verification of synchronized writers.
- */
-public class MemStorageTest extends TearDownTestCase {
-
- private ExecutorService executor;
- private Storage storage;
-
- @Before
- public void setUp() {
- executor = Executors.newCachedThreadPool(
- new ThreadFactoryBuilder().setNameFormat("SlowRead-%d").setDaemon(true).build());
- addTearDown(new TearDown() {
- @Override
- public void tearDown() {
- new ExecutorServiceShutdown(executor, Amount.of(1L, Time.SECONDS)).execute();
- }
- });
- storage = MemStorage.newEmptyStorage();
- }
-
- @Test
- public void testConcurrentReaders() throws Exception {
- // Validate that a slow read does not block another read.
-
- final CountDownLatch slowReadStarted = new CountDownLatch(1);
- final CountDownLatch slowReadFinished = new CountDownLatch(1);
-
- Future<String> future = executor.submit(new Callable<String>() {
- @Override
- public String call() throws Exception {
- return storage.read(new Work.Quiet<String>() {
- @Override
- public String apply(StoreProvider storeProvider) {
- slowReadStarted.countDown();
- try {
- slowReadFinished.await();
- } catch (InterruptedException e) {
- fail(e.getMessage());
- }
- return "slowResult";
- }
- });
- }
- });
-
- slowReadStarted.await();
-
- String fastResult = storage.read(new Work.Quiet<String>() {
- @Override
- public String apply(StoreProvider storeProvider) {
- return "fastResult";
- }
- });
- assertEquals("fastResult", fastResult);
- slowReadFinished.countDown();
- assertEquals("slowResult", future.get());
- }
-
- private IScheduledTask makeTask(String taskId) {
- return IScheduledTask.build(new ScheduledTask().setAssignedTask(
- new AssignedTask()
- .setTaskId(taskId)
- .setTask(new TaskConfig()
- .setOwner(new Identity().setRole("owner-" + taskId))
- .setJobName("job-" + taskId)
- .setEnvironment("env-" + taskId))));
- }
-
- private static class CustomException extends RuntimeException {
- }
-
- private <T, E extends RuntimeException> void expectWriteFail(MutateWork<T, E> work) {
- try {
- storage.write(work);
- fail("Expected a CustomException.");
- } catch (CustomException e) {
- // Expected.
- }
- }
-
- private void expectTasks(final String... taskIds) {
- storage.read(new Work.Quiet<Void>() {
- @Override
- public Void apply(StoreProvider storeProvider) {
- Query.Builder query = Query.unscoped();
- Set<String> ids = FluentIterable.from(storeProvider.getTaskStore().fetchTasks(query))
- .transform(Tasks.SCHEDULED_TO_ID)
- .toSet();
- assertEquals(ImmutableSet.<String>builder().add(taskIds).build(), ids);
- return null;
- }
- });
- }
-
- @Test
- public void testWritesUnderTransaction() {
- final IResourceAggregate quota = IResourceAggregate
- .build(new ResourceAggregate().setDiskMb(100).setNumCpus(2.0).setRamMb(512));
-
- try {
- storage.write(new MutateWork.NoResult.Quiet() {
- @Override
- protected void execute(MutableStoreProvider storeProvider) {
- storeProvider.getQuotaStore().saveQuota("a", quota);
- throw new CustomException();
- }
- });
- fail("Expected CustomException to be thrown.");
- } catch (CustomException e) {
- // Expected
- }
-
- storage.read(new Work.Quiet<Void>() {
- @Override
- public Void apply(StoreProvider storeProvider) {
- // If the previous write was under a transaction then there would be no quota records.
- assertEquals(ImmutableMap.<String, IResourceAggregate>of(),
- storeProvider.getQuotaStore().fetchQuotas());
- return null;
- }
- });
- }
-
- @Test
- public void testOperations() {
- expectWriteFail(new MutateWork.NoResult.Quiet() {
- @Override
- protected void execute(MutableStoreProvider storeProvider) {
- storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(makeTask("a"), makeTask("b")));
- throw new CustomException();
- }
- });
- expectTasks("a", "b");
-
- storage.write(new MutateWork.NoResult.Quiet() {
- @Override
- protected void execute(MutableStoreProvider storeProvider) {
- storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(makeTask("a"), makeTask("b")));
- }
- });
- expectTasks("a", "b");
-
- expectWriteFail(new MutateWork.NoResult.Quiet() {
- @Override
- protected void execute(MutableStoreProvider storeProvider) {
- storeProvider.getUnsafeTaskStore().deleteAllTasks();
- throw new CustomException();
- }
- });
- expectTasks();
-
- expectWriteFail(new MutateWork.NoResult.Quiet() {
- @Override
- protected void execute(MutableStoreProvider storeProvider) {
- storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(makeTask("a")));
- throw new CustomException();
- }
- });
- expectTasks("a");
- storage.read(new Work.Quiet<Void>() {
- @Override
- public Void apply(StoreProvider storeProvider) {
- assertEquals(
- makeTask("a"),
- Iterables.getOnlyElement(storeProvider.getTaskStore().fetchTasks(
- Query.taskScoped("a"))));
- return null;
- }
- });
-
- // Nested transaction where inner transaction fails.
- expectWriteFail(new MutateWork.NoResult.Quiet() {
- @Override
- protected void execute(MutableStoreProvider storeProvider) {
- storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(makeTask("c")));
- storage.write(new MutateWork.NoResult.Quiet() {
- @Override
- protected void execute(MutableStoreProvider storeProvider) {
- storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(makeTask("d")));
- throw new CustomException();
- }
- });
- }
- });
- expectTasks("a", "c", "d");
-
- // Nested transaction where outer transaction fails.
- expectWriteFail(new MutateWork.NoResult.Quiet() {
- @Override
- protected void execute(MutableStoreProvider storeProvider) {
- storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(makeTask("c")));
- storage.write(new MutateWork.NoResult.Quiet() {
- @Override
- protected void execute(MutableStoreProvider storeProvider) {
- storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(makeTask("d")));
- }
- });
- throw new CustomException();
- }
- });
- expectTasks("a", "c", "d");
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/65df91bf/src/test/java/org/apache/aurora/scheduler/storage/mem/MemTaskStoreTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/mem/MemTaskStoreTest.java b/src/test/java/org/apache/aurora/scheduler/storage/mem/MemTaskStoreTest.java
index 20c9204..688a02f 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/mem/MemTaskStoreTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/mem/MemTaskStoreTest.java
@@ -25,10 +25,6 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-import com.twitter.common.inject.Bindings;
-import com.twitter.common.inject.Bindings.KeyFactory;
import com.twitter.common.quantity.Amount;
import com.twitter.common.quantity.Time;
import com.twitter.common.util.concurrent.ExecutorServiceShutdown;
@@ -48,7 +44,7 @@ import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.storage.AttributeStore;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.TaskStore.Mutable.TaskMutation;
-import org.apache.aurora.scheduler.storage.db.DbModule;
+import org.apache.aurora.scheduler.storage.db.DbUtil;
import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
@@ -88,11 +84,7 @@ public class MemTaskStoreTest {
@Before
public void setUp() {
- Injector injector = Guice.createInjector(
- new MemStorageModule(KeyFactory.PLAIN),
- DbModule.testModule(Bindings.annotatedKeyFactory(MemStorage.Delegated.class)));
- storage = injector.getInstance(Storage.class);
- storage.prepare();
+ storage = DbUtil.createStorage();
storage.write(new Storage.MutateWork.NoResult.Quiet() {
@Override
http://git-wip-us.apache.org/repos/asf/aurora/blob/65df91bf/src/test/java/org/apache/aurora/scheduler/storage/mem/StorageTransactionTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/mem/StorageTransactionTest.java b/src/test/java/org/apache/aurora/scheduler/storage/mem/StorageTransactionTest.java
new file mode 100644
index 0000000..bad9eb5
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/storage/mem/StorageTransactionTest.java
@@ -0,0 +1,259 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.mem;
+
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.testing.TearDown;
+import com.google.common.testing.junit4.TearDownTestCase;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.util.concurrent.ExecutorServiceShutdown;
+
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.Identity;
+import org.apache.aurora.gen.ResourceAggregate;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork;
+import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
+import org.apache.aurora.scheduler.storage.Storage.Work;
+import org.apache.aurora.scheduler.storage.db.DbUtil;
+import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * TODO(William Farner): Wire a mechanism to allow verification of synchronized writers.
+ * TODO(wfarner): Merge this with DbStorageTest.
+ */
+public class StorageTransactionTest extends TearDownTestCase {
+
+ private ExecutorService executor;
+ private Storage storage;
+
+ @Before
+ public void setUp() {
+ executor = Executors.newCachedThreadPool(
+ new ThreadFactoryBuilder().setNameFormat("SlowRead-%d").setDaemon(true).build());
+ addTearDown(new TearDown() {
+ @Override
+ public void tearDown() {
+ new ExecutorServiceShutdown(executor, Amount.of(1L, Time.SECONDS)).execute();
+ }
+ });
+ storage = DbUtil.createStorage();
+ }
+
+ @Test
+ public void testConcurrentReaders() throws Exception {
+ // Validate that a slow read does not block another read.
+
+ final CountDownLatch slowReadStarted = new CountDownLatch(1);
+ final CountDownLatch slowReadFinished = new CountDownLatch(1);
+
+ Future<String> future = executor.submit(new Callable<String>() {
+ @Override
+ public String call() throws Exception {
+ return storage.read(new Work.Quiet<String>() {
+ @Override
+ public String apply(StoreProvider storeProvider) {
+ slowReadStarted.countDown();
+ try {
+ slowReadFinished.await();
+ } catch (InterruptedException e) {
+ fail(e.getMessage());
+ }
+ return "slowResult";
+ }
+ });
+ }
+ });
+
+ slowReadStarted.await();
+
+ String fastResult = storage.read(new Work.Quiet<String>() {
+ @Override
+ public String apply(StoreProvider storeProvider) {
+ return "fastResult";
+ }
+ });
+ assertEquals("fastResult", fastResult);
+ slowReadFinished.countDown();
+ assertEquals("slowResult", future.get());
+ }
+
+ private IScheduledTask makeTask(String taskId) {
+ return IScheduledTask.build(new ScheduledTask().setAssignedTask(
+ new AssignedTask()
+ .setTaskId(taskId)
+ .setTask(new TaskConfig()
+ .setOwner(new Identity().setRole("owner-" + taskId))
+ .setJobName("job-" + taskId)
+ .setEnvironment("env-" + taskId))));
+ }
+
+ private static class CustomException extends RuntimeException {
+ }
+
+ private <T, E extends RuntimeException> void expectWriteFail(MutateWork<T, E> work) {
+ try {
+ storage.write(work);
+ fail("Expected a CustomException.");
+ } catch (CustomException e) {
+ // Expected.
+ }
+ }
+
+ private void expectTasks(final String... taskIds) {
+ storage.read(new Work.Quiet<Void>() {
+ @Override
+ public Void apply(StoreProvider storeProvider) {
+ Query.Builder query = Query.unscoped();
+ Set<String> ids = FluentIterable.from(storeProvider.getTaskStore().fetchTasks(query))
+ .transform(Tasks.SCHEDULED_TO_ID)
+ .toSet();
+ assertEquals(ImmutableSet.<String>builder().add(taskIds).build(), ids);
+ return null;
+ }
+ });
+ }
+
+ @Test
+ public void testWritesUnderTransaction() {
+ final IResourceAggregate quota = IResourceAggregate
+ .build(new ResourceAggregate().setDiskMb(100).setNumCpus(2.0).setRamMb(512));
+
+ try {
+ storage.write(new MutateWork.NoResult.Quiet() {
+ @Override
+ protected void execute(MutableStoreProvider storeProvider) {
+ storeProvider.getQuotaStore().saveQuota("a", quota);
+ throw new CustomException();
+ }
+ });
+ fail("Expected CustomException to be thrown.");
+ } catch (CustomException e) {
+ // Expected
+ }
+
+ storage.read(new Work.Quiet<Void>() {
+ @Override
+ public Void apply(StoreProvider storeProvider) {
+ // If the previous write was under a transaction then there would be no quota records.
+ assertEquals(ImmutableMap.<String, IResourceAggregate>of(),
+ storeProvider.getQuotaStore().fetchQuotas());
+ return null;
+ }
+ });
+ }
+
+ @Test
+ public void testOperations() {
+ expectWriteFail(new MutateWork.NoResult.Quiet() {
+ @Override
+ protected void execute(MutableStoreProvider storeProvider) {
+ storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(makeTask("a"), makeTask("b")));
+ throw new CustomException();
+ }
+ });
+ expectTasks("a", "b");
+
+ storage.write(new MutateWork.NoResult.Quiet() {
+ @Override
+ protected void execute(MutableStoreProvider storeProvider) {
+ storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(makeTask("a"), makeTask("b")));
+ }
+ });
+ expectTasks("a", "b");
+
+ expectWriteFail(new MutateWork.NoResult.Quiet() {
+ @Override
+ protected void execute(MutableStoreProvider storeProvider) {
+ storeProvider.getUnsafeTaskStore().deleteAllTasks();
+ throw new CustomException();
+ }
+ });
+ expectTasks();
+
+ expectWriteFail(new MutateWork.NoResult.Quiet() {
+ @Override
+ protected void execute(MutableStoreProvider storeProvider) {
+ storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(makeTask("a")));
+ throw new CustomException();
+ }
+ });
+ expectTasks("a");
+ storage.read(new Work.Quiet<Void>() {
+ @Override
+ public Void apply(StoreProvider storeProvider) {
+ assertEquals(
+ makeTask("a"),
+ Iterables.getOnlyElement(storeProvider.getTaskStore().fetchTasks(
+ Query.taskScoped("a"))));
+ return null;
+ }
+ });
+
+ // Nested transaction where inner transaction fails.
+ expectWriteFail(new MutateWork.NoResult.Quiet() {
+ @Override
+ protected void execute(MutableStoreProvider storeProvider) {
+ storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(makeTask("c")));
+ storage.write(new MutateWork.NoResult.Quiet() {
+ @Override
+ protected void execute(MutableStoreProvider storeProvider) {
+ storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(makeTask("d")));
+ throw new CustomException();
+ }
+ });
+ }
+ });
+ expectTasks("a", "c", "d");
+
+ // Nested transaction where outer transaction fails.
+ expectWriteFail(new MutateWork.NoResult.Quiet() {
+ @Override
+ protected void execute(MutableStoreProvider storeProvider) {
+ storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(makeTask("c")));
+ storage.write(new MutateWork.NoResult.Quiet() {
+ @Override
+ protected void execute(MutableStoreProvider storeProvider) {
+ storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(makeTask("d")));
+ }
+ });
+ throw new CustomException();
+ }
+ });
+ expectTasks("a", "c", "d");
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/65df91bf/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
index 802c090..010e75f 100644
--- a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
@@ -33,7 +33,6 @@ import com.google.common.eventbus.EventBus;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.Injector;
-import com.twitter.common.inject.Bindings;
import com.twitter.common.inject.Bindings.KeyFactory;
import com.twitter.common.quantity.Amount;
import com.twitter.common.quantity.Time;
@@ -92,8 +91,6 @@ import org.apache.aurora.scheduler.storage.entities.ILock;
import org.apache.aurora.scheduler.storage.entities.ILockKey;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-import org.apache.aurora.scheduler.storage.mem.MemStorage.Delegated;
-import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
import org.apache.aurora.scheduler.testing.FakeScheduledExecutor;
import org.apache.aurora.scheduler.testing.FakeStatsProvider;
import org.apache.aurora.scheduler.updater.JobUpdateController.AuditData;
@@ -174,8 +171,7 @@ public class JobUpdaterIT extends EasyMockTest {
Injector injector = Guice.createInjector(
new UpdaterModule(executor),
- DbModule.testModule(Bindings.annotatedKeyFactory(Delegated.class)),
- new MemStorageModule(KeyFactory.PLAIN),
+ DbModule.testModule(KeyFactory.PLAIN),
new AbstractModule() {
@Override
protected void configure() {