You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2016/03/08 19:11:06 UTC
aurora git commit: Implementing db snapshotting
Repository: aurora
Updated Branches:
refs/heads/master a91a759d0 -> 26efe5517
Implementing db snapshotting
Bugs closed: AURORA-1627
Reviewed at https://reviews.apache.org/r/44471/
Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/26efe551
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/26efe551
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/26efe551
Branch: refs/heads/master
Commit: 26efe5517fc0cb471101fdcb072e5dbf5d20bc56
Parents: a91a759
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Tue Mar 8 10:10:45 2016 -0800
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Tue Mar 8 10:10:45 2016 -0800
----------------------------------------------------------------------
NEWS | 2 +
.../thrift/org/apache/aurora/gen/storage.thrift | 3 +
config/checkstyle/checkstyle.xml | 3 -
.../org/apache/aurora/benchmark/JobUpdates.java | 50 +++-
.../aurora/benchmark/SnapshotBenchmarks.java | 69 ++---
.../aurora/benchmark/UpdateStoreBenchmarks.java | 55 +---
.../aurora/scheduler/storage/Storage.java | 13 +
.../storage/backup/TemporaryStorage.java | 5 +-
.../aurora/scheduler/storage/db/DbModule.java | 4 +-
.../aurora/scheduler/storage/db/DbStorage.java | 6 +
.../scheduler/storage/log/LogStorageModule.java | 6 +
.../storage/log/SnapshotStoreImpl.java | 175 +++++++++++--
.../storage/log/WriteAheadStorage.java | 6 +
.../scheduler/storage/backup/RecoveryTest.java | 12 +-
.../storage/log/SnapshotStoreImplIT.java | 262 +++++++++++++++++++
.../storage/log/SnapshotStoreImplTest.java | 196 --------------
.../sh/org/apache/aurora/e2e/test_end_to_end.sh | 1 +
17 files changed, 549 insertions(+), 319 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aurora/blob/26efe551/NEWS
----------------------------------------------------------------------
diff --git a/NEWS b/NEWS
index b84a945..7905451 100644
--- a/NEWS
+++ b/NEWS
@@ -14,6 +14,8 @@ Deprecations and removals:
- `TaskConfig.jobName`
- `TaskQuery.owner`
- Task ID strings are no longer prefixed by a timestamp.
+- Scheduler H2 in-memory database is now using MVStore: http://www.h2database.com/html/mvstore.html.
+ In addition, scheduler thrift snapshots are now supporting full DB dumps for faster restarts.
0.12.0
------
http://git-wip-us.apache.org/repos/asf/aurora/blob/26efe551/api/src/main/thrift/org/apache/aurora/gen/storage.thrift
----------------------------------------------------------------------
diff --git a/api/src/main/thrift/org/apache/aurora/gen/storage.thrift b/api/src/main/thrift/org/apache/aurora/gen/storage.thrift
index 6dc4614..9e4213f 100644
--- a/api/src/main/thrift/org/apache/aurora/gen/storage.thrift
+++ b/api/src/main/thrift/org/apache/aurora/gen/storage.thrift
@@ -149,6 +149,9 @@ struct Snapshot {
8: set<QuotaConfiguration> quotaConfigurations
9: set<api.Lock> locks
10: set<StoredJobUpdateDetails> jobUpdateDetails
+ 11: list<string> dbScript
+ // Indicates if experimental DB store for tasks and cron jobs was enabled when snapshot was cut.
+ 12: bool experimentalTaskStore
}
// A message header that calls out the number of expected FrameChunks to follow to form a complete
http://git-wip-us.apache.org/repos/asf/aurora/blob/26efe551/config/checkstyle/checkstyle.xml
----------------------------------------------------------------------
diff --git a/config/checkstyle/checkstyle.xml b/config/checkstyle/checkstyle.xml
index 2074beb..abc0760 100644
--- a/config/checkstyle/checkstyle.xml
+++ b/config/checkstyle/checkstyle.xml
@@ -234,9 +234,6 @@ limitations under the License.
<module name="NestedForDepth">
<property name="max" value="2"/>
</module>
- <module name="NestedTryDepth">
- <property name="max" value="1"/>
- </module>
<module name="NoClone"/>
<module name="NoFinalizer"/>
<module name="SuperClone"/>
http://git-wip-us.apache.org/repos/asf/aurora/blob/26efe551/src/jmh/java/org/apache/aurora/benchmark/JobUpdates.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/JobUpdates.java b/src/jmh/java/org/apache/aurora/benchmark/JobUpdates.java
index 50044e1..f4f8d00 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/JobUpdates.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/JobUpdates.java
@@ -17,6 +17,7 @@ import java.util.Arrays;
import java.util.Set;
import java.util.UUID;
+import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
@@ -32,19 +33,66 @@ import org.apache.aurora.gen.JobUpdateKey;
import org.apache.aurora.gen.JobUpdateSettings;
import org.apache.aurora.gen.JobUpdateStatus;
import org.apache.aurora.gen.JobUpdateSummary;
+import org.apache.aurora.gen.Lock;
+import org.apache.aurora.gen.LockKey;
import org.apache.aurora.gen.Range;
import org.apache.aurora.gen.TaskConfig;
import org.apache.aurora.scheduler.base.TaskTestUtil;
+import org.apache.aurora.scheduler.storage.JobUpdateStore;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
+import org.apache.aurora.scheduler.storage.entities.ILock;
/**
* Job update factory.
*/
final class JobUpdates {
+ private JobUpdates() {
+ // Utility class.
+ }
+
+ /**
+ * Saves job updates into provided storage.
+ *
+ * @param storage {@link Storage} instance.
+ * @param updates updates to save.
+ * @return update keys.
+ */
+ static Set<IJobUpdateKey> saveUpdates(Storage storage, Iterable<IJobUpdateDetails> updates) {
+ ImmutableSet.Builder<IJobUpdateKey> keyBuilder = ImmutableSet.builder();
+ storage.write((Storage.MutateWork.NoResult.Quiet) store -> {
+ JobUpdateStore.Mutable updateStore = store.getJobUpdateStore();
+ updateStore.deleteAllUpdatesAndEvents();
+ for (IJobUpdateDetails details : updates) {
+ IJobUpdateKey key = details.getUpdate().getSummary().getKey();
+ keyBuilder.add(key);
+ String lockToken = UUID.randomUUID().toString();
+ store.getLockStore().saveLock(ILock.build(new Lock()
+ .setKey(LockKey.job(key.getJob().newBuilder()))
+ .setToken(lockToken)
+ .setUser(Builder.USER)
+ .setTimestampMs(0L)));
+
+ updateStore.saveJobUpdate(details.getUpdate(), Optional.of(lockToken));
+
+ for (IJobUpdateEvent updateEvent : details.getUpdateEvents()) {
+ updateStore.saveJobUpdateEvent(key, updateEvent);
+ }
+
+ for (IJobInstanceUpdateEvent instanceEvent : details.getInstanceEvents()) {
+ updateStore.saveJobInstanceUpdateEvent(key, instanceEvent);
+ }
+ }
+ });
+ return keyBuilder.build();
+ }
static final class Builder {
- private static final String USER = "user";
+ static final String USER = "user";
private int numEvents = 1;
private int numInstanceEvents = 5000;
private int numInstanceOverrides = 1;
http://git-wip-us.apache.org/repos/asf/aurora/blob/26efe551/src/jmh/java/org/apache/aurora/benchmark/SnapshotBenchmarks.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/SnapshotBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/SnapshotBenchmarks.java
index ca484fa..2c56b2e 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/SnapshotBenchmarks.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/SnapshotBenchmarks.java
@@ -13,32 +13,26 @@
*/
package org.apache.aurora.benchmark;
-import java.util.Set;
-import java.util.UUID;
import java.util.concurrent.TimeUnit;
import javax.inject.Singleton;
import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableSet;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Key;
+import com.google.inject.TypeLiteral;
import org.apache.aurora.benchmark.fakes.FakeStatsProvider;
import org.apache.aurora.common.inject.Bindings;
import org.apache.aurora.common.stats.StatsProvider;
import org.apache.aurora.common.util.Clock;
-import org.apache.aurora.gen.Lock;
-import org.apache.aurora.gen.LockKey;
import org.apache.aurora.gen.storage.Snapshot;
-import org.apache.aurora.gen.storage.StoredJobUpdateDetails;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.db.DbModule;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
import org.apache.aurora.scheduler.storage.log.SnapshotStoreImpl;
+import org.apache.aurora.scheduler.storage.log.SnapshotStoreImpl.ExperimentalTaskStore;
import org.apache.thrift.TException;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
@@ -68,6 +62,7 @@ public class SnapshotBenchmarks {
public static class RestoreSnapshotWithUpdatesBenchmark {
private SnapshotStoreImpl snapshotStore;
private Snapshot snapshot;
+ private Storage storage;
@Param({"1", "5", "10"})
private int updateCount;
@@ -88,44 +83,34 @@ public class SnapshotBenchmarks {
// Return non-guessable result to satisfy "blackhole" requirement.
return System.currentTimeMillis() % 5 == 0;
}
- }
-
- private static SnapshotStoreImpl getSnapshotStore() {
- Bindings.KeyFactory keyFactory = Bindings.annotatedKeyFactory(Storage.Volatile.class);
- Injector injector = Guice.createInjector(
- new AbstractModule() {
- @Override
- protected void configure() {
- bind(Clock.class).toInstance(Clock.SYSTEM_CLOCK);
- bind(StatsProvider.class).toInstance(new FakeStatsProvider());
- bind(SnapshotStoreImpl.class).in(Singleton.class);
- }
- },
- DbModule.testModule(keyFactory, Optional.of(new DbModule.TaskStoreModule(keyFactory))));
- Storage storage = injector.getInstance(Key.get(Storage.class, Storage.Volatile.class));
- storage.prepare();
- return injector.getInstance(SnapshotStoreImpl.class);
- }
+ private SnapshotStoreImpl getSnapshotStore() {
+ Bindings.KeyFactory keyFactory = Bindings.annotatedKeyFactory(Storage.Volatile.class);
+ Injector injector = Guice.createInjector(
+ new AbstractModule() {
+ @Override
+ protected void configure() {
+ bind(Clock.class).toInstance(Clock.SYSTEM_CLOCK);
+ bind(StatsProvider.class).toInstance(new FakeStatsProvider());
+ bind(SnapshotStoreImpl.class).in(Singleton.class);
+ bind(new TypeLiteral<Boolean>() { }).annotatedWith(ExperimentalTaskStore.class)
+ .toInstance(true);
+ }
+ },
+ DbModule.testModule(keyFactory, Optional.of(new DbModule.TaskStoreModule(keyFactory))));
- private static Snapshot createSnapshot(int updates, int events, int instanceEvents) {
- Set<IJobUpdateDetails> updateDetails = new JobUpdates.Builder()
- .setNumEvents(events)
- .setNumInstanceEvents(instanceEvents)
- .build(updates);
+ storage = injector.getInstance(Key.get(Storage.class, Storage.Volatile.class));
+ storage.prepare();
+ return injector.getInstance(SnapshotStoreImpl.class);
+ }
- ImmutableSet.Builder<Lock> lockBuilder = ImmutableSet.builder();
- ImmutableSet.Builder<StoredJobUpdateDetails> detailsBuilder = ImmutableSet.builder();
- for (IJobUpdateDetails details : updateDetails) {
- IJobUpdateKey key = details.getUpdate().getSummary().getKey();
- String lockToken = UUID.randomUUID().toString();
+ private Snapshot createSnapshot(int updates, int events, int instanceEvents) {
+ JobUpdates.saveUpdates(storage, new JobUpdates.Builder()
+ .setNumEvents(events)
+ .setNumInstanceEvents(instanceEvents)
+ .build(updates));
- lockBuilder.add(new Lock(LockKey.job(key.getJob().newBuilder()), lockToken, "user", 0L));
- detailsBuilder.add(new StoredJobUpdateDetails(details.newBuilder(), lockToken));
+ return snapshotStore.createSnapshot();
}
-
- return new Snapshot()
- .setLocks(lockBuilder.build())
- .setJobUpdateDetails(detailsBuilder.build());
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/26efe551/src/jmh/java/org/apache/aurora/benchmark/UpdateStoreBenchmarks.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/UpdateStoreBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/UpdateStoreBenchmarks.java
index 92849d9..e5228ae 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/UpdateStoreBenchmarks.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/UpdateStoreBenchmarks.java
@@ -14,24 +14,15 @@
package org.apache.aurora.benchmark;
import java.util.Set;
-import java.util.UUID;
import java.util.concurrent.TimeUnit;
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
-import org.apache.aurora.gen.Lock;
-import org.apache.aurora.gen.LockKey;
-import org.apache.aurora.scheduler.storage.JobUpdateStore;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
import org.apache.aurora.scheduler.storage.db.DbUtil;
-import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
-import org.apache.aurora.scheduler.storage.entities.ILock;
import org.apache.thrift.TException;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
@@ -48,8 +39,6 @@ import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
public class UpdateStoreBenchmarks {
- private static final String USER = "user";
-
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
@Warmup(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS)
@@ -70,12 +59,9 @@ public class UpdateStoreBenchmarks {
@Setup(Level.Iteration)
public void setUpIteration() {
- storage.write((NoResult.Quiet) storeProvider -> {
- Set<IJobUpdateDetails> updates =
- new JobUpdates.Builder().setNumInstanceEvents(instances).build(1);
-
- keys = saveToStore(updates, storeProvider);
- });
+ keys = JobUpdates.saveUpdates(
+ storage,
+ new JobUpdates.Builder().setNumInstanceEvents(instances).build(1));
}
@TearDown(Level.Iteration)
@@ -113,12 +99,9 @@ public class UpdateStoreBenchmarks {
@Setup(Level.Iteration)
public void setUpIteration() {
- storage.write((NoResult.Quiet) storeProvider -> {
- Set<IJobUpdateDetails> updates =
- new JobUpdates.Builder().setNumInstanceOverrides(instanceOverrides).build(1);
-
- keys = saveToStore(updates, storeProvider);
- });
+ keys = JobUpdates.saveUpdates(
+ storage,
+ new JobUpdates.Builder().setNumInstanceOverrides(instanceOverrides).build(1));
}
@TearDown(Level.Iteration)
@@ -135,30 +118,4 @@ public class UpdateStoreBenchmarks {
Iterables.getOnlyElement(keys)).get());
}
}
-
- private static Set<IJobUpdateKey> saveToStore(
- Set<IJobUpdateDetails> updates,
- Storage.MutableStoreProvider storeProvider) {
-
- JobUpdateStore.Mutable updateStore = storeProvider.getJobUpdateStore();
- ImmutableSet.Builder<IJobUpdateKey> keyBuilder = ImmutableSet.builder();
- for (IJobUpdateDetails details : updates) {
- IJobUpdateKey key = details.getUpdate().getSummary().getKey();
- keyBuilder.add(key);
- String lockToken = UUID.randomUUID().toString();
- storeProvider.getLockStore().saveLock(
- ILock.build(new Lock(LockKey.job(key.getJob().newBuilder()), lockToken, USER, 0L)));
-
- updateStore.saveJobUpdate(details.getUpdate(), Optional.of(lockToken));
-
- for (IJobUpdateEvent updateEvent : details.getUpdateEvents()) {
- updateStore.saveJobUpdateEvent(key, updateEvent);
- }
-
- for (IJobInstanceUpdateEvent instanceEvent : details.getInstanceEvents()) {
- updateStore.saveJobInstanceUpdateEvent(key, instanceEvent);
- }
- }
- return keyBuilder.build();
- }
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/26efe551/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
index 5124d17..859c964 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
@@ -65,6 +65,19 @@ public interface Storage {
QuotaStore.Mutable getQuotaStore();
AttributeStore.Mutable getAttributeStore();
JobUpdateStore.Mutable getJobUpdateStore();
+
+ /**
+ * Gets direct low level access to the underlying storage.
+ * <p>
+ * This grants a potentially dangerous direct access to the underlying storage and should
+ * only be used during storage initialization when unstructured bulk data manipulations
+ * are required.
+ * </p>
+ *
+ * @param <T> Direct access type.
+ * @return Direct read/write accessor to the storage.
+ */
+ <T> T getUnsafeStoreAccess();
}
/**
http://git-wip-us.apache.org/repos/asf/aurora/blob/26efe551/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
index 46b3d10..5c7d92f 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
@@ -73,7 +73,10 @@ interface TemporaryStorage {
final SnapshotStore<Snapshot> snapshotStore = new SnapshotStoreImpl(
buildInfo,
clock,
- storage);
+ storage,
+ // Safe to pass false here to default to the non-experimental task store
+ // during restore from backup procedure.
+ false /** useDbSnapshotForTaskStore */);
snapshotStore.applySnapshot(snapshot);
return new TemporaryStorage() {
http://git-wip-us.apache.org/repos/asf/aurora/blob/26efe551/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java
index 6d8fa11..ff663fa 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java
@@ -68,7 +68,7 @@ public final class DbModule extends PrivateModule {
@CmdLine(name = "use_beta_db_task_store",
help = "Whether to use the experimental database-backed task store.")
- private static final Arg<Boolean> USE_DB_TASK_STORE = Arg.create(false);
+ public static final Arg<Boolean> USE_DB_TASK_STORE = Arg.create(false);
@CmdLine(name = "slow_query_log_threshold",
help = "Log all queries that take at least this long to execute.")
@@ -115,8 +115,6 @@ public final class DbModule extends PrivateModule {
Map<String, String> args = ImmutableMap.<String, String>builder()
.putAll(jdbcUriArgs)
- // We always disable the MvStore, as it is in beta as of this writing.
- .put("MV_STORE", "false")
// READ COMMITTED transaction isolation. More details here
// http://www.h2database.com/html/advanced.html?#transaction_isolation
.put("LOCK_MODE", "3")
http://git-wip-us.apache.org/repos/asf/aurora/blob/26efe551/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
index cca92dd..360914e 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
@@ -134,6 +134,12 @@ class DbStorage extends AbstractIdleService implements Storage {
public JobUpdateStore.Mutable getJobUpdateStore() {
return jobUpdateStore;
}
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public <T> T getUnsafeStoreAccess() {
+ return (T) sessionFactory.getConfiguration().getEnvironment().getDataSource();
+ }
};
this.statsProvider = requireNonNull(statsProvider);
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/26efe551/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java
index ed63a74..7dcd1bf 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java
@@ -31,8 +31,10 @@ import org.apache.aurora.scheduler.storage.CallOrderEnforcingStorage;
import org.apache.aurora.scheduler.storage.DistributedSnapshotStore;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
+import org.apache.aurora.scheduler.storage.db.DbModule;
import org.apache.aurora.scheduler.storage.log.LogManager.MaxEntrySize;
import org.apache.aurora.scheduler.storage.log.LogStorage.Settings;
+import org.apache.aurora.scheduler.storage.log.SnapshotStoreImpl.ExperimentalTaskStore;
import static org.apache.aurora.scheduler.storage.log.EntrySerializer.EntrySerializerImpl;
import static org.apache.aurora.scheduler.storage.log.LogManager.LogEntryHashFunction;
@@ -67,6 +69,9 @@ public class LogStorageModule extends PrivateModule {
bind(Settings.class)
.toInstance(new Settings(SHUTDOWN_GRACE_PERIOD.get(), SNAPSHOT_INTERVAL.get()));
+ bind(new TypeLiteral<Boolean>() { }).annotatedWith(ExperimentalTaskStore.class)
+ .toInstance(DbModule.USE_DB_TASK_STORE.get());
+
bind(new TypeLiteral<Amount<Integer, Data>>() { }).annotatedWith(MaxEntrySize.class)
.toInstance(MAX_LOG_ENTRY_SIZE.get());
bind(LogManager.class).in(Singleton.class);
@@ -77,6 +82,7 @@ public class LogStorageModule extends PrivateModule {
expose(Storage.class);
expose(NonVolatileStorage.class);
expose(DistributedSnapshotStore.class);
+ expose(new TypeLiteral<Boolean>() { }).annotatedWith(ExperimentalTaskStore.class);
bind(EntrySerializer.class).to(EntrySerializerImpl.class);
// TODO(ksweeney): We don't need a cryptographic checksum here - assess performance of MD5
http://git-wip-us.apache.org/repos/asf/aurora/blob/26efe551/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
index db90150..6fee251 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
@@ -13,13 +13,28 @@
*/
package org.apache.aurora.scheduler.storage.log;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
import java.util.Arrays;
+import java.util.List;
import java.util.Map;
import javax.inject.Inject;
+import javax.inject.Qualifier;
+import javax.sql.DataSource;
+import com.google.common.base.Joiner;
import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
import org.apache.aurora.common.inject.TimedInterceptor.Timed;
import org.apache.aurora.common.util.BuildInfo;
@@ -40,7 +55,6 @@ import org.apache.aurora.scheduler.storage.SnapshotStore;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
-import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
import org.apache.aurora.scheduler.storage.Storage.Volatile;
import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
@@ -64,17 +78,94 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> {
private static final Logger LOG = LoggerFactory.getLogger(SnapshotStoreImpl.class);
- private static final Iterable<SnapshotField> SNAPSHOT_FIELDS = Arrays.asList(
+ /**
+ * Number of rows to run in a single batch during dbsnapshot restore.
+ */
+ private static final int DB_BATCH_SIZE = 20;
+
+ private static boolean hasDbSnapshot(Snapshot snapshot) {
+ return snapshot.isSetDbScript();
+ }
+
+ private boolean hasDbTaskStore(Snapshot snapshot) {
+ return useDbSnapshotForTaskStore
+ && hasDbSnapshot(snapshot)
+ && snapshot.isExperimentalTaskStore();
+ }
+
+ private final Iterable<SnapshotField> snapshotFields = Arrays.asList(
+ // Order is critical here. The DB snapshot should always be tried first to ensure
+ // graceful migration to DBTaskStore. Otherwise, there is a direct risk of losing the cluster.
+ // The following scenario illustrates how that can happen:
+ // - Dbsnapshot:ON, DBTaskStore:OFF
+ // - Scheduler is updated with DBTaskStore:ON, restarts and populates all tasks from snapshot
+ // - Should the dbsnapshot get applied last, all tables would be dropped and recreated BUT
+ // since there was no task data stored in dbsnapshot (DBTaskStore was OFF last time
+ // snapshot was cut), all tasks would be erased
+ // - If the above is not detected before a new snapshot is cut all tasks will be dropped the
+ // moment a new snapshot is created
+ new SnapshotField() {
+ @Override
+ public void saveToSnapshot(MutableStoreProvider store, Snapshot snapshot) {
+ LOG.info("Saving dbsnapshot");
+ // Note: we don't use mybatis mapped statements for performance reasons and to avoid
+ // mapping/unmapping hassle as snapshot commands should never be used upstream.
+ try (Connection c = ((DataSource) store.getUnsafeStoreAccess()).getConnection()) {
+ try (PreparedStatement ps = c.prepareStatement("SCRIPT")) {
+ try (ResultSet rs = ps.executeQuery()) {
+ ImmutableList.Builder<String> builder = ImmutableList.builder();
+ while (rs.next()) {
+ String columnValue = rs.getString("SCRIPT");
+ builder.add(columnValue + "\n");
+ }
+ snapshot.setDbScript(builder.build());
+ }
+ }
+ } catch (SQLException e) {
+ Throwables.propagate(e);
+ }
+ }
+
+ @Override
+ public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) {
+
+ if (snapshot.isSetDbScript()) {
+ try (Connection c = ((DataSource) store.getUnsafeStoreAccess()).getConnection()) {
+ LOG.info("Dropping all tables");
+ try (PreparedStatement drop = c.prepareStatement("DROP ALL OBJECTS")) {
+ drop.executeUpdate();
+ }
+
+ LOG.info("Restoring dbsnapshot. Row count: " + snapshot.getDbScript().size());
+ // Partition the restore script into manageable size batches to avoid possible OOM
+ // due to large size DML statement.
+ List<List<String>> batches = Lists.partition(snapshot.getDbScript(), DB_BATCH_SIZE);
+ for (List<String> batch : batches) {
+ try (PreparedStatement restore = c.prepareStatement(Joiner.on("").join(batch))) {
+ restore.executeUpdate();
+ }
+ }
+ } catch (SQLException e) {
+ Throwables.propagate(e);
+ }
+ }
+ }
+ },
new SnapshotField() {
// It's important for locks to be replayed first, since there are relations that expect
// references to be valid on insertion.
@Override
- public void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
+ public void saveToSnapshot(MutableStoreProvider store, Snapshot snapshot) {
snapshot.setLocks(ILock.toBuildersSet(store.getLockStore().fetchLocks()));
}
@Override
public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) {
+ if (hasDbSnapshot(snapshot)) {
+ LOG.info("Deferring lock restore to dbsnapshot");
+ return;
+ }
+
store.getLockStore().deleteLocks();
if (snapshot.isSetLocks()) {
@@ -86,38 +177,42 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> {
},
new SnapshotField() {
@Override
- public void saveToSnapshot(StoreProvider storeProvider, Snapshot snapshot) {
+ public void saveToSnapshot(MutableStoreProvider store, Snapshot snapshot) {
snapshot.setHostAttributes(
- IHostAttributes.toBuildersSet(storeProvider.getAttributeStore().getHostAttributes()));
+ IHostAttributes.toBuildersSet(store.getAttributeStore().getHostAttributes()));
}
@Override
public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) {
+ if (hasDbSnapshot(snapshot)) {
+ LOG.info("Deferring attribute restore to dbsnapshot");
+ return;
+ }
+
store.getAttributeStore().deleteHostAttributes();
if (snapshot.isSetHostAttributes()) {
for (HostAttributes attributes : snapshot.getHostAttributes()) {
- // Prior to commit 5cf760b, the store would persist maintenance mode changes for
- // unknown hosts. 5cf760b began rejecting these, but the replicated log may still
- // contain entries with a null slave ID.
- if (attributes.isSetSlaveId()) {
- store.getAttributeStore().saveHostAttributes(IHostAttributes.build(attributes));
- } else {
- LOG.info("Dropping host attributes with no slave ID: " + attributes);
- }
+ store.getAttributeStore().saveHostAttributes(IHostAttributes.build(attributes));
}
}
}
},
new SnapshotField() {
@Override
- public void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
+ public void saveToSnapshot(MutableStoreProvider store, Snapshot snapshot) {
snapshot.setTasks(
IScheduledTask.toBuildersSet(store.getTaskStore().fetchTasks(Query.unscoped())));
+ snapshot.setExperimentalTaskStore(useDbSnapshotForTaskStore);
}
@Override
public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) {
+ if (hasDbTaskStore(snapshot)) {
+ LOG.info("Deferring task restore to dbsnapshot");
+ return;
+ }
+
store.getUnsafeTaskStore().deleteAllTasks();
if (snapshot.isSetTasks()) {
@@ -128,17 +223,23 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> {
},
new SnapshotField() {
@Override
- public void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
+ public void saveToSnapshot(MutableStoreProvider store, Snapshot snapshot) {
ImmutableSet.Builder<StoredCronJob> jobs = ImmutableSet.builder();
for (IJobConfiguration config : store.getCronJobStore().fetchJobs()) {
jobs.add(new StoredCronJob(config.newBuilder()));
}
snapshot.setCronJobs(jobs.build());
+ snapshot.setExperimentalTaskStore(useDbSnapshotForTaskStore);
}
@Override
public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) {
+ if (hasDbTaskStore(snapshot)) {
+ LOG.info("Deferring cron job restore to dbsnapshot");
+ return;
+ }
+
store.getCronJobStore().deleteJobs();
if (snapshot.isSetCronJobs()) {
@@ -151,12 +252,17 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> {
},
new SnapshotField() {
@Override
- public void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
+ public void saveToSnapshot(MutableStoreProvider store, Snapshot snapshot) {
// SchedulerMetadata is updated outside of the static list of SnapshotFields
}
@Override
public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) {
+ if (hasDbSnapshot(snapshot)) {
+ LOG.info("Deferring metadata restore to dbsnapshot");
+ return;
+ }
+
if (snapshot.isSetSchedulerMetadata()
&& snapshot.getSchedulerMetadata().isSetFrameworkId()) {
// No delete necessary here since this is a single value.
@@ -168,7 +274,7 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> {
},
new SnapshotField() {
@Override
- public void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
+ public void saveToSnapshot(MutableStoreProvider store, Snapshot snapshot) {
ImmutableSet.Builder<QuotaConfiguration> quotas = ImmutableSet.builder();
for (Map.Entry<String, IResourceAggregate> entry
: store.getQuotaStore().fetchQuotas().entrySet()) {
@@ -181,6 +287,11 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> {
@Override
public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) {
+ if (hasDbSnapshot(snapshot)) {
+ LOG.info("Deferring quota restore to dbsnapshot");
+ return;
+ }
+
store.getQuotaStore().deleteQuotas();
if (snapshot.isSetQuotaConfigurations()) {
@@ -193,12 +304,17 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> {
},
new SnapshotField() {
@Override
- public void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
+ public void saveToSnapshot(MutableStoreProvider store, Snapshot snapshot) {
snapshot.setJobUpdateDetails(store.getJobUpdateStore().fetchAllJobUpdateDetails());
}
@Override
public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) {
+ if (hasDbSnapshot(snapshot)) {
+ LOG.info("Deferring job update restore to dbsnapshot");
+ return;
+ }
+
JobUpdateStore.Mutable updateStore = store.getJobUpdateStore();
updateStore.deleteAllUpdatesAndEvents();
@@ -233,12 +349,27 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> {
private final BuildInfo buildInfo;
private final Clock clock;
private final Storage storage;
+ private final boolean useDbSnapshotForTaskStore;
+
+ /**
+ * Identifies if experimental task store is in use.
+ */
+ @Retention(RetentionPolicy.RUNTIME)
+ @Target({ ElementType.PARAMETER, ElementType.METHOD })
+ @Qualifier
+ public @interface ExperimentalTaskStore { }
@Inject
- public SnapshotStoreImpl(BuildInfo buildInfo, Clock clock, @Volatile Storage storage) {
+ public SnapshotStoreImpl(
+ BuildInfo buildInfo,
+ Clock clock,
+ @Volatile Storage storage,
+ @ExperimentalTaskStore boolean useDbSnapshotForTaskStore) {
+
this.buildInfo = requireNonNull(buildInfo);
this.clock = requireNonNull(clock);
this.storage = requireNonNull(storage);
+ this.useDbSnapshotForTaskStore = useDbSnapshotForTaskStore;
}
@Timed("snapshot_create")
@@ -252,7 +383,7 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> {
// Capture timestamp to signify the beginning of a snapshot operation, apply after in case
// one of the field closures is mean and tries to apply a timestamp.
long timestamp = clock.nowMillis();
- for (SnapshotField field : SNAPSHOT_FIELDS) {
+ for (SnapshotField field : snapshotFields) {
field.saveToSnapshot(storeProvider, snapshot);
}
@@ -274,14 +405,14 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> {
storage.write((NoResult.Quiet) storeProvider -> {
LOG.info("Restoring snapshot.");
- for (SnapshotField field : SNAPSHOT_FIELDS) {
+ for (SnapshotField field : snapshotFields) {
field.restoreFromSnapshot(storeProvider, snapshot);
}
});
}
private interface SnapshotField {
- void saveToSnapshot(StoreProvider storeProvider, Snapshot snapshot);
+ void saveToSnapshot(MutableStoreProvider storeProvider, Snapshot snapshot);
void restoreFromSnapshot(MutableStoreProvider storeProvider, Snapshot snapshot);
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/26efe551/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
index 2f07afb..d0de063 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
@@ -381,4 +381,10 @@ class WriteAheadStorage extends WriteAheadStorageForwarder implements
public JobUpdateStore.Mutable getJobUpdateStore() {
return this;
}
+
+ @Override
+ public <T> T getUnsafeStoreAccess() {
+ throw new UnsupportedOperationException(
+ "Unsupported since casual storage users should never be doing this.");
+ }
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/26efe551/src/test/java/org/apache/aurora/scheduler/storage/backup/RecoveryTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/backup/RecoveryTest.java b/src/test/java/org/apache/aurora/scheduler/storage/backup/RecoveryTest.java
index 172dd20..a33f6f7 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/backup/RecoveryTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/backup/RecoveryTest.java
@@ -96,7 +96,8 @@ public class RecoveryTest extends EasyMockTest {
expect(snapshotStore.createSnapshot()).andReturn(SNAPSHOT1);
Capture<MutateWork<Object, Exception>> transaction = createCapture();
expect(primaryStorage.write(capture(transaction))).andReturn(null);
- distributedStore.persist(SNAPSHOT1);
+ Capture<Snapshot> snapshot = createCapture();
+ distributedStore.persist(capture(snapshot));
shutDownNow.execute();
control.replay();
@@ -114,6 +115,9 @@ public class RecoveryTest extends EasyMockTest {
recovery.query(Query.unscoped()));
recovery.commit();
transaction.getValue().apply(storeProvider);
+
+ snapshot.getValue().unsetDbScript();
+ assertEquals(SNAPSHOT1, snapshot.getValue());
}
@Test
@@ -122,7 +126,8 @@ public class RecoveryTest extends EasyMockTest {
Snapshot modified = SNAPSHOT1.deepCopy().setTasks(ImmutableSet.of(TASK1.newBuilder()));
Capture<MutateWork<Object, Exception>> transaction = createCapture();
expect(primaryStorage.write(capture(transaction))).andReturn(null);
- distributedStore.persist(modified);
+ Capture<Snapshot> snapshot = createCapture();
+ distributedStore.persist(capture(snapshot));
shutDownNow.execute();
control.replay();
@@ -140,6 +145,9 @@ public class RecoveryTest extends EasyMockTest {
recovery.query(Query.unscoped()));
recovery.commit();
transaction.getValue().apply(storeProvider);
+
+ snapshot.getValue().unsetDbScript();
+ assertEquals(modified, snapshot.getValue());
}
@Test(expected = RecoveryException.class)
http://git-wip-us.apache.org/repos/asf/aurora/blob/26efe551/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplIT.java b/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplIT.java
new file mode 100644
index 0000000..6a39d89
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplIT.java
@@ -0,0 +1,262 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.log;
+
+import java.util.Map;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.common.util.testing.FakeBuildInfo;
+import org.apache.aurora.common.util.testing.FakeClock;
+import org.apache.aurora.gen.Attribute;
+import org.apache.aurora.gen.CronCollisionPolicy;
+import org.apache.aurora.gen.HostAttributes;
+import org.apache.aurora.gen.Identity;
+import org.apache.aurora.gen.InstanceTaskConfig;
+import org.apache.aurora.gen.JobConfiguration;
+import org.apache.aurora.gen.JobInstanceUpdateEvent;
+import org.apache.aurora.gen.JobKey;
+import org.apache.aurora.gen.JobUpdate;
+import org.apache.aurora.gen.JobUpdateAction;
+import org.apache.aurora.gen.JobUpdateDetails;
+import org.apache.aurora.gen.JobUpdateEvent;
+import org.apache.aurora.gen.JobUpdateInstructions;
+import org.apache.aurora.gen.JobUpdateKey;
+import org.apache.aurora.gen.JobUpdateSettings;
+import org.apache.aurora.gen.JobUpdateState;
+import org.apache.aurora.gen.JobUpdateStatus;
+import org.apache.aurora.gen.JobUpdateSummary;
+import org.apache.aurora.gen.Lock;
+import org.apache.aurora.gen.LockKey;
+import org.apache.aurora.gen.MaintenanceMode;
+import org.apache.aurora.gen.Range;
+import org.apache.aurora.gen.storage.QuotaConfiguration;
+import org.apache.aurora.gen.storage.SchedulerMetadata;
+import org.apache.aurora.gen.storage.Snapshot;
+import org.apache.aurora.gen.storage.StoredCronJob;
+import org.apache.aurora.gen.storage.StoredJobUpdateDetails;
+import org.apache.aurora.scheduler.ResourceAggregates;
+import org.apache.aurora.scheduler.base.JobKeys;
+import org.apache.aurora.scheduler.base.TaskTestUtil;
+import org.apache.aurora.scheduler.storage.SnapshotStore;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
+import org.apache.aurora.scheduler.storage.entities.ILock;
+import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.apache.aurora.scheduler.storage.mem.InMemStoresModule;
+import org.junit.Test;
+
+import static org.apache.aurora.common.inject.Bindings.KeyFactory.PLAIN;
+import static org.apache.aurora.common.util.testing.FakeBuildInfo.generateBuildInfo;
+import static org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
+import static org.apache.aurora.scheduler.storage.db.DbModule.testModule;
+import static org.apache.aurora.scheduler.storage.db.DbUtil.createStorage;
+import static org.apache.aurora.scheduler.storage.db.DbUtil.createStorageInjector;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class SnapshotStoreImplIT {
+
+ private static final long NOW = 10335463456L;
+ private static final IJobKey JOB_KEY = JobKeys.from("role", "env", "job");
+
+ private Storage storage;
+ private SnapshotStore<Snapshot> snapshotStore;
+
+ private void setUpStore(boolean dbTaskStore) {
+ storage = dbTaskStore
+ ? createStorage()
+ : createStorageInjector(
+ testModule(PLAIN, Optional.of(new InMemStoresModule(PLAIN)))).getInstance(Storage.class);
+
+ FakeClock clock = new FakeClock();
+ clock.setNowMillis(NOW);
+ snapshotStore = new SnapshotStoreImpl(
+ generateBuildInfo(),
+ clock,
+ storage,
+ dbTaskStore);
+ }
+
+ private static Snapshot makeComparable(Snapshot snapshot) {
+ Snapshot copy = snapshot.deepCopy();
+ // Ignore DB snapshot. It will be tested by asserting the DB data.
+ copy.unsetDbScript();
+ copy.setExperimentalTaskStore(false);
+ return copy;
+ }
+
+ @Test
+ public void testNoDBTaskStore() {
+ setUpStore(false);
+ populateStore();
+
+ Snapshot snapshot1 = snapshotStore.createSnapshot();
+ assertEquals(expected(), makeComparable(snapshot1));
+ assertFalse(snapshot1.isExperimentalTaskStore());
+
+ snapshotStore.applySnapshot(snapshot1);
+ Snapshot snapshot2 = snapshotStore.createSnapshot();
+ assertEquals(expected(), makeComparable(snapshot2));
+ assertEquals(makeComparable(snapshot1), makeComparable(snapshot2));
+ }
+
+ @Test
+ public void testMigrateToDBTaskStore() {
+ setUpStore(false);
+ populateStore();
+
+ Snapshot snapshot1 = snapshotStore.createSnapshot();
+ assertEquals(expected(), makeComparable(snapshot1));
+ assertFalse(snapshot1.isExperimentalTaskStore());
+
+ setUpStore(true);
+ snapshotStore.applySnapshot(snapshot1);
+ Snapshot snapshot2 = snapshotStore.createSnapshot();
+ assertTrue(snapshot2.isExperimentalTaskStore());
+ assertEquals(expected(), makeComparable(snapshot2));
+ assertEquals(makeComparable(snapshot1), makeComparable(snapshot2));
+ }
+
+ @Test
+ public void testMigrateFromDBTaskStore() {
+ setUpStore(true);
+ populateStore();
+
+ Snapshot snapshot1 = snapshotStore.createSnapshot();
+ assertEquals(expected(), makeComparable(snapshot1));
+ assertTrue(snapshot1.isExperimentalTaskStore());
+
+ setUpStore(false);
+ snapshotStore.applySnapshot(snapshot1);
+ Snapshot snapshot2 = snapshotStore.createSnapshot();
+ assertFalse(snapshot2.isExperimentalTaskStore());
+ assertEquals(expected(), makeComparable(snapshot2));
+ assertEquals(makeComparable(snapshot1), makeComparable(snapshot2));
+ }
+
+ @Test
+ public void testDBTaskStore() {
+ setUpStore(true);
+ populateStore();
+
+ Snapshot snapshot1 = snapshotStore.createSnapshot();
+ assertEquals(expected(), makeComparable(snapshot1));
+ assertTrue(snapshot1.isExperimentalTaskStore());
+
+ snapshotStore.applySnapshot(snapshot1);
+ Snapshot snapshot2 = snapshotStore.createSnapshot();
+ assertEquals(expected(), makeComparable(snapshot2));
+ assertEquals(makeComparable(snapshot1), makeComparable(snapshot2));
+ }
+
+ private static final IScheduledTask TASK = TaskTestUtil.makeTask("id", JOB_KEY);
+ private static final ITaskConfig TASK_CONFIG = TaskTestUtil.makeConfig(JOB_KEY);
+ private static final IJobConfiguration CRON_JOB = IJobConfiguration.build(new JobConfiguration()
+ .setKey(new JobKey("owner", "env", "name"))
+ .setOwner(new Identity("user"))
+ .setCronSchedule("* * * * *")
+ .setCronCollisionPolicy(CronCollisionPolicy.KILL_EXISTING)
+ .setInstanceCount(1)
+ .setTaskConfig(TASK_CONFIG.newBuilder()));
+ private static final String ROLE = "role";
+ private static final IResourceAggregate QUOTA = ResourceAggregates.LARGE;
+ private static final IHostAttributes ATTRIBUTES = IHostAttributes.build(
+ new HostAttributes("host", ImmutableSet.of(new Attribute("attr", ImmutableSet.of("value"))))
+ .setMode(MaintenanceMode.NONE)
+ .setSlaveId("slave id"));
+ private static final String FRAMEWORK_ID = "framework_id";
+ private static final Map<String, String> METADATA = ImmutableMap.of(
+ FakeBuildInfo.DATE, FakeBuildInfo.DATE,
+ FakeBuildInfo.GIT_REVISION, FakeBuildInfo.GIT_REVISION,
+ FakeBuildInfo.GIT_TAG, FakeBuildInfo.GIT_TAG);
+ private static final ILock LOCK = ILock.build(new Lock()
+ .setKey(LockKey.job(JobKeys.from("testRole", "testEnv", "testJob").newBuilder()))
+ .setToken("lockId")
+ .setUser("testUser")
+ .setTimestampMs(12345L));
+ private static final IJobUpdateKey UPDATE_ID =
+ IJobUpdateKey.build(new JobUpdateKey(JOB_KEY.newBuilder(), "updateId1"));
+ private static final IJobUpdateDetails UPDATE = IJobUpdateDetails.build(new JobUpdateDetails()
+ .setUpdate(new JobUpdate()
+ .setInstructions(new JobUpdateInstructions()
+ .setDesiredState(new InstanceTaskConfig()
+ .setTask(TASK_CONFIG.newBuilder())
+ .setInstances(ImmutableSet.of(new Range(0, 7))))
+ .setInitialState(ImmutableSet.of(
+ new InstanceTaskConfig()
+ .setInstances(ImmutableSet.of(new Range(0, 1)))
+ .setTask(TASK_CONFIG.newBuilder())))
+ .setSettings(new JobUpdateSettings()
+ .setBlockIfNoPulsesAfterMs(500)
+ .setUpdateGroupSize(1)
+ .setMaxPerInstanceFailures(1)
+ .setMaxFailedInstances(1)
+ .setMinWaitInInstanceRunningMs(200)
+ .setRollbackOnFailure(true)
+ .setWaitForBatchCompletion(true)
+ .setUpdateOnlyTheseInstances(ImmutableSet.of(new Range(0, 0)))))
+ .setSummary(new JobUpdateSummary()
+ .setState(new JobUpdateState().setStatus(JobUpdateStatus.ERROR))
+ .setUser("user")
+ .setKey(UPDATE_ID.newBuilder())))
+ .setUpdateEvents(ImmutableList.of(new JobUpdateEvent()
+ .setUser("user")
+ .setMessage("message")
+ .setStatus(JobUpdateStatus.ERROR)))
+ .setInstanceEvents(ImmutableList.of(new JobInstanceUpdateEvent()
+ .setAction(JobUpdateAction.INSTANCE_UPDATED))));
+
+ private Snapshot expected() {
+ return new Snapshot()
+ .setTimestamp(NOW)
+ .setTasks(ImmutableSet.of(TASK.newBuilder()))
+ .setQuotaConfigurations(ImmutableSet.of(new QuotaConfiguration(ROLE, QUOTA.newBuilder())))
+ .setHostAttributes(ImmutableSet.of(ATTRIBUTES.newBuilder()))
+ .setCronJobs(ImmutableSet.of(new StoredCronJob(CRON_JOB.newBuilder())))
+ .setSchedulerMetadata(new SchedulerMetadata(FRAMEWORK_ID, METADATA))
+ .setLocks(ImmutableSet.of(LOCK.newBuilder()))
+ .setJobUpdateDetails(ImmutableSet.of(
+ new StoredJobUpdateDetails(UPDATE.newBuilder(), LOCK.getToken())));
+ }
+
+ private void populateStore() {
+ storage.write((NoResult.Quiet) store -> {
+ store.getUnsafeTaskStore().saveTasks(ImmutableSet.of(TASK));
+ store.getCronJobStore().saveAcceptedJob(CRON_JOB);
+ store.getQuotaStore().saveQuota(ROLE, QUOTA);
+ store.getAttributeStore().saveHostAttributes(ATTRIBUTES);
+ store.getSchedulerStore().saveFrameworkId(FRAMEWORK_ID);
+ store.getLockStore().saveLock(LOCK);
+ store.getJobUpdateStore().saveJobUpdate(UPDATE.getUpdate(), Optional.of(LOCK.getToken()));
+ store.getJobUpdateStore().saveJobUpdateEvent(
+ UPDATE.getUpdate().getSummary().getKey(),
+ UPDATE.getUpdateEvents().get(0));
+ store.getJobUpdateStore().saveJobInstanceUpdateEvent(
+ UPDATE.getUpdate().getSummary().getKey(),
+ UPDATE.getInstanceEvents().get(0)
+ );
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/26efe551/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplTest.java
deleted file mode 100644
index 4407867..0000000
--- a/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplTest.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage.log;
-
-import java.util.Set;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-
-import org.apache.aurora.common.testing.easymock.EasyMockTest;
-import org.apache.aurora.common.util.testing.FakeBuildInfo;
-import org.apache.aurora.common.util.testing.FakeClock;
-import org.apache.aurora.gen.Attribute;
-import org.apache.aurora.gen.HostAttributes;
-import org.apache.aurora.gen.JobConfiguration;
-import org.apache.aurora.gen.JobInstanceUpdateEvent;
-import org.apache.aurora.gen.JobKey;
-import org.apache.aurora.gen.JobUpdate;
-import org.apache.aurora.gen.JobUpdateDetails;
-import org.apache.aurora.gen.JobUpdateEvent;
-import org.apache.aurora.gen.JobUpdateKey;
-import org.apache.aurora.gen.JobUpdateStatus;
-import org.apache.aurora.gen.JobUpdateSummary;
-import org.apache.aurora.gen.Lock;
-import org.apache.aurora.gen.LockKey;
-import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.gen.ScheduledTask;
-import org.apache.aurora.gen.storage.QuotaConfiguration;
-import org.apache.aurora.gen.storage.SchedulerMetadata;
-import org.apache.aurora.gen.storage.Snapshot;
-import org.apache.aurora.gen.storage.StoredCronJob;
-import org.apache.aurora.gen.storage.StoredJobUpdateDetails;
-import org.apache.aurora.scheduler.ResourceAggregates;
-import org.apache.aurora.scheduler.base.JobKeys;
-import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.storage.SnapshotStore;
-import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
-import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
-import org.apache.aurora.scheduler.storage.entities.ILock;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.apache.aurora.common.util.testing.FakeBuildInfo.generateBuildInfo;
-import static org.easymock.EasyMock.expect;
-import static org.junit.Assert.assertEquals;
-
-public class SnapshotStoreImplTest extends EasyMockTest {
-
- private static final long NOW = 10335463456L;
- private static final JobKey JOB_KEY = JobKeys.from("role", "env", "job").newBuilder();
-
- private StorageTestUtil storageUtil;
- private SnapshotStore<Snapshot> snapshotStore;
-
- @Before
- public void setUp() {
- FakeClock clock = new FakeClock();
- clock.setNowMillis(NOW);
- storageUtil = new StorageTestUtil(this);
- snapshotStore = new SnapshotStoreImpl(
- generateBuildInfo(),
- clock,
- storageUtil.storage);
- }
-
- private static IJobUpdateKey makeKey(String id) {
- return IJobUpdateKey.build(new JobUpdateKey(JOB_KEY, id));
- }
-
- @Test
- public void testCreateAndRestoreNewSnapshot() {
- ImmutableSet<IScheduledTask> tasks = ImmutableSet.of(
- IScheduledTask.build(new ScheduledTask().setStatus(ScheduleStatus.PENDING)));
-
- Set<QuotaConfiguration> quotas =
- ImmutableSet.of(
- new QuotaConfiguration("steve", ResourceAggregates.EMPTY.newBuilder()));
- IHostAttributes attribute = IHostAttributes.build(
- new HostAttributes("host", ImmutableSet.of(new Attribute("attr", ImmutableSet.of("value"))))
- .setSlaveId("slave id"));
- // A legacy attribute that has a maintenance mode set, but nothing else. These should be
- // dropped.
- IHostAttributes legacyAttribute = IHostAttributes.build(
- new HostAttributes("host", ImmutableSet.of()));
- StoredCronJob job = new StoredCronJob(
- new JobConfiguration().setKey(new JobKey("owner", "env", "name")));
- String frameworkId = "framework_id";
- ILock lock = ILock.build(new Lock()
- .setKey(LockKey.job(JobKeys.from("testRole", "testEnv", "testJob").newBuilder()))
- .setToken("lockId")
- .setUser("testUser")
- .setTimestampMs(12345L));
- SchedulerMetadata metadata = new SchedulerMetadata().setFrameworkId(frameworkId);
- metadata.setDetails(Maps.newHashMap());
- metadata.getDetails().put(FakeBuildInfo.DATE, FakeBuildInfo.DATE);
- metadata.getDetails().put(FakeBuildInfo.GIT_REVISION, FakeBuildInfo.GIT_REVISION);
- metadata.getDetails().put(FakeBuildInfo.GIT_TAG, FakeBuildInfo.GIT_TAG);
- IJobUpdateKey updateId1 = makeKey("updateId1");
- IJobUpdateKey updateId2 = makeKey("updateId2");
- IJobUpdateDetails updateDetails1 = IJobUpdateDetails.build(new JobUpdateDetails()
- .setUpdate(new JobUpdate().setSummary(
- new JobUpdateSummary().setKey(updateId1.newBuilder())))
- .setUpdateEvents(ImmutableList.of(new JobUpdateEvent().setStatus(JobUpdateStatus.ERROR)))
- .setInstanceEvents(ImmutableList.of(new JobInstanceUpdateEvent().setTimestampMs(123L))));
-
- IJobUpdateDetails updateDetails2 = IJobUpdateDetails.build(new JobUpdateDetails()
- .setUpdate(new JobUpdate().setSummary(
- new JobUpdateSummary().setKey(updateId2.newBuilder()))));
-
- storageUtil.expectOperations();
- expect(storageUtil.taskStore.fetchTasks(Query.unscoped())).andReturn(tasks);
- expect(storageUtil.quotaStore.fetchQuotas())
- .andReturn(ImmutableMap.of("steve", ResourceAggregates.EMPTY));
- expect(storageUtil.attributeStore.getHostAttributes())
- .andReturn(ImmutableSet.of(attribute, legacyAttribute));
- expect(storageUtil.jobStore.fetchJobs())
- .andReturn(ImmutableSet.of(IJobConfiguration.build(job.getJobConfiguration())));
- expect(storageUtil.schedulerStore.fetchFrameworkId()).andReturn(Optional.of(frameworkId));
- expect(storageUtil.lockStore.fetchLocks()).andReturn(ImmutableSet.of(lock));
- String lockToken = "token";
- expect(storageUtil.jobUpdateStore.fetchAllJobUpdateDetails())
- .andReturn(ImmutableSet.of(
- new StoredJobUpdateDetails(updateDetails1.newBuilder(), lockToken),
- new StoredJobUpdateDetails(updateDetails2.newBuilder(), null)));
-
- expectDataWipe();
- storageUtil.taskStore.saveTasks(tasks);
- storageUtil.quotaStore.saveQuota("steve", ResourceAggregates.EMPTY);
- expect(storageUtil.attributeStore.saveHostAttributes(attribute)).andReturn(true);
- storageUtil.jobStore.saveAcceptedJob(IJobConfiguration.build(job.getJobConfiguration()));
- storageUtil.schedulerStore.saveFrameworkId(frameworkId);
- storageUtil.lockStore.saveLock(lock);
- storageUtil.jobUpdateStore.saveJobUpdate(
- updateDetails1.getUpdate(), Optional.fromNullable(lockToken));
- storageUtil.jobUpdateStore.saveJobUpdateEvent(
- updateId1,
- Iterables.getOnlyElement(updateDetails1.getUpdateEvents()));
- storageUtil.jobUpdateStore.saveJobInstanceUpdateEvent(
- updateId1,
- Iterables.getOnlyElement(updateDetails1.getInstanceEvents()));
-
- // The saved object for update2 should be backfilled with update key.
- JobUpdate update2Expected = updateDetails2.getUpdate().newBuilder();
- update2Expected.getSummary().setKey(updateId2.newBuilder());
- storageUtil.jobUpdateStore.saveJobUpdate(
- IJobUpdate.build(update2Expected), Optional.absent());
-
- control.replay();
-
- Snapshot expected = new Snapshot()
- .setTimestamp(NOW)
- .setTasks(IScheduledTask.toBuildersSet(tasks))
- .setQuotaConfigurations(quotas)
- .setHostAttributes(ImmutableSet.of(attribute.newBuilder(), legacyAttribute.newBuilder()))
- .setCronJobs(ImmutableSet.of(job))
- .setSchedulerMetadata(metadata)
- .setLocks(ImmutableSet.of(lock.newBuilder()))
- .setJobUpdateDetails(ImmutableSet.of(
- new StoredJobUpdateDetails(updateDetails1.newBuilder(), lockToken),
- new StoredJobUpdateDetails(updateDetails2.newBuilder(), null)));
-
- Snapshot snapshot = snapshotStore.createSnapshot();
- assertEquals(expected, snapshot);
-
- snapshotStore.applySnapshot(expected);
- }
-
- private void expectDataWipe() {
- storageUtil.taskStore.deleteAllTasks();
- storageUtil.quotaStore.deleteQuotas();
- storageUtil.attributeStore.deleteHostAttributes();
- storageUtil.jobStore.deleteJobs();
- storageUtil.lockStore.deleteLocks();
- storageUtil.jobUpdateStore.deleteAllUpdatesAndEvents();
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/26efe551/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh
----------------------------------------------------------------------
diff --git a/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh b/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh
index 75130a3..b469f9b 100755
--- a/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh
+++ b/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh
@@ -150,6 +150,7 @@ test_update() {
assert_update_state $_jobkey 'ROLLING_FORWARD'
local _update_id=$(aurora update list $_jobkey --status ROLLING_FORWARD \
| tail -n +2 | awk '{print $2}')
+ aurora_admin scheduler_snapshot devcluster
sudo restart aurora-scheduler
assert_update_state $_jobkey 'ROLLING_FORWARD'
aurora update pause $_jobkey --message='hello'